

The use of Amazon Kinesis in AWS has grown since it’s release in 2013. Designed to help process big data in real-time, Kinesis streams help users capture, store, and transport data without rate limits or capacity constraints. As enterprises seek to make more and better use of data in real-time to drive greater agility and customer-centricity, the ability to get the right data to the users and systems that need it becomes paramount.
This blog shows you how to use TriggerMesh open source integration to consume Kinesis events, filter them, and re-inject them into another Kinesis stream. TriggerMesh Filters determine which events to process based on their content. These may be the basis for dropping unwanted events or for creating rules engines for event processing. Because TriggerMesh is completely cloud-agnostic, you can use the same technique we demo here to connect Kinesis streams (or Kafka, or AWS SQS, or IBM MQ) with practically any other SaaS or on-premises app, or other cloud service.
By using TriggerMesh to read from and write to Kinesis streams, you can utilize the existing AWS resources that you already have to build a resilient, scalable event-driven architecture. In this article, we’ll look at how to use TriggerMesh to manipulate Amazon Kinesis streams events and highlight how this can work in your environment.
In this blog we present four main steps:
We will use the TriggerMesh open source API to consume Kinesis events, filter them and re-inject them into another Kinesis stream. The result will be a set of declarative manifests that show you how to use integration as code across Clouds. The picture below shows you the event flow that we are going to build.
TriggerMesh has multiple installation options, based on what you’re most comfortable with and how you plan to use it. In the interest of time and space we refer you to our nice documentation. Your options include:
With TriggerMesh installed you now have access to a Kinesis event source and a Kinesis target API object that we will use below to describe our event flow. Let’s now move on to our second step and create our streams.
Since TriggerMesh API is provided as Kubernetes extensions, all APIs can be managed via the Kubernetes client `kubectl`. Instructions below will show you some sample commands.
Our example will use two different Kinesis streams in order to show the ability of TriggerMesh to read from a stream as a source and write to a stream as a target. The simplest setup is to create Data Streams using the “on-demand” capacity mode. For each data stream you create, choose a name and the capacity mode, then click “Create data stream”.
In order for TriggerMesh to have access to your AWS resources, you’ll need to create an IAM user in your AWS account. This user should get an IAM policy so they can read from and write to Kinesis streams. For this simple setup, the AWS-provided policy named “AmazonKinesisFullAccess” will work, but you may want to use a custom policy for more restricted access.
Once you have the IAM user created with the proper policy, you’ll need to provide the access key and secret to TriggerMesh using the following command:
With our streams created in AWS and our AWS API key stored as a Kubernetes secret, we are now ready to use the TriggerMesh declarative API to define our event flow. In order to do this we need to create so-called manifests representing our event sources, targets and transformations.
In TriggerMesh, resources are created as Kubernetes objects. You’ll want to create a file for the source Kinesis stream and a file for the target Kinesis stream. These files will be slightly different, but the general idea is similar. First we’ll set up the target, since that will be used in the transformation. Here’s what the target file will look like (notice the API Kind `AWSKinesisTarget`):
In the above file, you must replace the <TARGET_ARN_STRING> with the ARN from your Kinesis stream. You can optionally change the partition entry and the “name” section under “metadata”, but you’ll want to remember what you chose for those.
Next, set up the Kinesis source:
Again, you’ll need to change the <SOURCE_ARN_STRING>. If you changed the target name, then use that same name in the “sink” section so TriggerMesh knows where you want the data to go.
Now that you have these files created, apply them to your TriggerMesh installation by using the following commands:
Once the definitions are updated in TriggerMesh, you’re ready to test the flow of data from one Kinesis stream to another. There are a few ways to do this:
To see the data come into the target Kinesis stream, you can:
Now that you’ve got an event flow, we can start adding TriggerMesh components into the mix to really show the manipulation options. In this example, let’s create a Filter to examine the content of the payload from the source stream in order to only pass along certain events.
Our filter will check the Data field of the record in the source Kinesis stream to see if the keyword “allowed” is included in the text. The sink should be set to the target Kinesis stream. Here’s what the filter file will look like:
To add this filter, we’ll need to modify the ‘sink’ section of the source to point to the filter:
Apply the filter and the new source using:
Now that you’ve got a TriggerMesh installation that is reading events from an Amazon Kinesis data stream and putting those into a completely separate Kinesis data stream, you can start expanding your use of TriggerMesh. Some options for next steps include: