Transform Events Between Kinesis Streams with TriggerMesh

Chris Parlette
Chris Parlette
Jan 26, 2022
Transform Events Between Kinesis Streams with TriggerMesh
Transform Events Between Kinesis Streams with TriggerMesh

The use of Amazon Kinesis in AWS has grown since it’s release in 2013. Designed to help process big data in real-time, Kinesis streams help users capture, store, and transport data without rate limits or capacity constraints. As enterprises seek to make more and better use of data in real-time to drive greater agility and customer-centricity, the ability to get the right data to the users and systems that need it becomes paramount. 

This blog shows you how to use TriggerMesh open source integration to consume Kinesis events, filter them, and re-inject them into another Kinesis stream. TriggerMesh Filters determine which events to process based on their content. These may be the basis for dropping unwanted events or for creating rules engines for event processing. Because TriggerMesh is completely cloud-agnostic, you can use the same technique we demo here to connect Kinesis streams (or Kafka, or AWS SQS, or IBM MQ) with practically any other SaaS or on-premises app, or other cloud service.

By using TriggerMesh to read from and write to Kinesis streams, you can utilize the existing AWS resources that you already have to build a resilient, scalable event-driven architecture. In this article, we’ll look at how to use TriggerMesh to manipulate Amazon Kinesis streams events and highlight how this can work in your environment.

In this blog we present four main steps:

  • Install TriggerMesh
  • Setup your Kinesis streams
  • Create API keys to access Kinesis
  • Use the TriggerMesh API

We will use the TriggerMesh open source API to consume Kinesis events, filter them and re-inject them into another Kinesis stream. The result will be a set of declarative manifests that show you how to use integration as code across Clouds. The picture below shows you the event flow that we are going to build.

Install TriggerMesh

TriggerMesh has multiple installation options, based on what you’re most comfortable with and how you plan to use it. In the interest of time and space we refer you to our nice documentation. Your options include:

  1. Use an AMI in AWS
  2. Install in an existing Kubernetes cluster using Helm
  3. Build a cluster (for example, in EKS) and install TriggerMesh there
  4. Build from source
  5. Use the hosted TriggerMesh Cloud

With TriggerMesh installed you now have access to a Kinesis event source and a Kinesis target API object that we will use below to describe our event flow. Let’s now move on to our second step and create our streams.

Since TriggerMesh API is provided as Kubernetes extensions, all APIs can be managed via the Kubernetes client `kubectl`. Instructions below will show you some sample commands.

Set Up Kinesis Streams

Our example will use two different Kinesis streams in order to show the ability of TriggerMesh to read from a stream as a source and write to a stream as a target. The simplest setup is to create Data Streams using the “on-demand” capacity mode. For each data stream you create, choose a name and the capacity mode, then click “Create data stream”.

Add AWS Secrets to TriggerMesh

In order for TriggerMesh to have access to your AWS resources, you’ll need to create an IAM user in your AWS account. This user should get an IAM policy so they can read from and write to Kinesis streams. For this simple setup, the AWS-provided policy named “AmazonKinesisFullAccess” will work, but you may want to use a custom policy for more restricted access.

Once you have the IAM user created with the proper policy, you’ll need to provide the access key and secret to TriggerMesh using the following command:

kubectl create secret generic awscreds \
  --from-literal=access_key_id=<ACCESS_KEY_ID> \
  --from-literal=secret_access_key=<SECRET_ACCESS_KEY>

With our streams created in AWS and our AWS API key stored as a Kubernetes secret, we are now ready to use the TriggerMesh declarative API to define our event flow. In order to do this we need to create so-called manifests representing our event sources, targets and transformations.

Create Source and Target

In TriggerMesh, resources are created as Kubernetes objects. You’ll want to create a file for the source Kinesis stream and a file for the target Kinesis stream. These files will be slightly different, but the general idea is similar. First we’ll set up the target, since that will be used in the transformation. Here’s what the target file will look like (notice the API Kind `AWSKinesisTarget`):

apiVersion: targets.triggermesh.io/v1alpha1
kind: AWSKinesisTarget
metadata:
  name: kinesis-target
spec:
  arn: <TARGET_ARN_STRING>
  awsApiKey:
    secretKeyRef:
      key: access_key_id
      name: awscreds
  awsApiSecret:
    secretKeyRef:
      key: secret_access_key
      name: awscreds
  partition: "test"

In the above file, you must replace the <TARGET_ARN_STRING> with the ARN from your Kinesis stream. You can optionally change the partition entry and the “name” section under “metadata”, but you’ll want to remember what you chose for those.

Next, set up the Kinesis source:

apiVersion: sources.triggermesh.io/v1alpha1
kind: AWSKinesisSource
metadata:
  name: kinesis-source
spec:
  arn: <SOURCE_ARN_STRING>
  auth:
    credentials:
      accessKeyID:
        valueFromSecret:
          key: access_key_id
          name: awscreds
      secretAccessKey:
        valueFromSecret:
          key: secret_access_key
          name: awscreds
  sink:
    ref:
      apiVersion: targets.triggermesh.io/v1alpha1
      kind: AWSKinesisTarget
      name: kinesis-target

Again, you’ll need to change the <SOURCE_ARN_STRING>. If you changed the target name, then use that same name in the “sink” section so TriggerMesh knows where you want the data to go.

Now that you have these files created, apply them to your TriggerMesh installation by using the following commands:

kubectl apply -f kinesis_source.yaml
kubectl apply -f kinesis_target.yaml

Test Flow

Once the definitions are updated in TriggerMesh, you’re ready to test the flow of data from one Kinesis stream to another. There are a few ways to do this:

  1. Amazon created a Kinesis Data Generator, though it requires an Amazon Cognito user to use.
  2. Use a small script from Lambda or your local machine to send sample data.  Below is an example of a small Python3 script:
#!/usr/bin/python3
import json
import boto3

kinesis_client = boto3.client('kinesis')
response = kinesis_client.put_record(
	StreamName="triggermesh-source-test", 
	Data=json.dumps('stuff'),
	PartitionKey="partitionkey")
print(response)

To see the data come into the target Kinesis stream, you can:

  1. Use Twilio’s bash script to read from the stream
  2. On the Kinesis console, you can watch the Cloudwatch metrics to see the data come in

Adding a Filter

Now that you’ve got an event flow, we can start adding TriggerMesh components into the mix to really show the manipulation options.  In this example, let’s create a Filter to examine the content of the payload from the source stream in order to only pass along certain events.

Our filter will check the Data field of the record in the source Kinesis stream to see if the keyword “allowed” is included in the text.  The sink should be set to the target Kinesis stream.  Here’s what the filter file will look like:

apiVersion: routing.triggermesh.io/v1alpha1
kind: Filter
metadata:
  name: filter-kinesis
spec:
  expression: matches($Data, “allowed”)
  sink:
    ref:
      apiVersion: targets.triggermesh.io/v1alpha1
      kind: AWSKinesisTarget
      name: kinesis-target

To add this filter, we’ll need to modify the ‘sink’ section of the source to point to the filter:

apiVersion: sources.triggermesh.io/v1alpha1
kind: AWSKinesisSource
metadata:
  name: kinesis-source
spec:
  arn: <SOURCE_ARN_STRING>
  auth:
    credentials:
      accessKeyID:
        valueFromSecret:
          key: access_key_id
          name: awscreds
      secretAccessKey:
        valueFromSecret:
          key: secret_access_key
          name: awscreds
  sink:
    ref:
      apiVersion: routing.triggermesh.io/v1alpha1
      kind: Filter
      name: filter-kinesis

Apply the filter and the new source using:

kubectl apply -f kinesis_filter.yaml
kubectl apply -f kinesis_source.yaml

Extra Credit: Add a Transformation or Splitter

Now that you’ve got a TriggerMesh installation that is reading events from an Amazon Kinesis data stream and putting those into a completely separate Kinesis data stream, you can start expanding your use of TriggerMesh. Some options for next steps include:

  1. Create a Splitter to take the data from the source stream and split it into multiple events if it contains a list of separate items
  2. Create a Transformation to edit the data from the source stream before putting it into the target stream
  3. Send the event from the source stream to two separate targets, such as storing the event in S3 or posting to Slack

Related Posts

Join our newsletter

Sign up for our newsletter for the latest news, training, and offers to help you build event-driven systems with TriggerMesh.