Using TriggerMesh Cloud Native Integration

Instantly Connect Apache Kafka Data Streams with Any Application Anywhere

Executive Summary

Many technology teams struggle with integrating data and applications across a growing mix of infrastructure. On one end, data platform teams are building faster and better data pipelines using Apache Kafka for real-time data handling. On the other end, application platform teams are increasing feature velocity with containers, microservices, and Kubernetes.

As these two technology teams accelerate in isolation, integrating real-time data into applications is a painful bottleneck.

Existing integration approaches suffer from a number of velocity-sapping weaknesses, such as low-level programming, custom app extensions, and expensive external consultants. In a recent poll, 67 percent of DevOps professionals said they wait a month or longer to integrate services across infrastructures.

This paper describes a declarative API for processing Kafka messages in Kubernetes-based workloads. 

It also allows for declaratively managing the production and consumption of Kafka messages to and from Kubernetes workloads.

TriggerMesh is a new breed of open source integration platform built in the cloud on Kubernetes and Knative. With TriggerMesh, you can declaratively design, deploy, and manage integrations as code.

This paper shows how TriggerMesh removes the speed bumps introduced by current integration approaches, allowing enterprises to realize the full velocity and resiliency advantages of Kafka and Kubernetes.

Section 1: Consuming Kafka messages in Kubernetes

Enterprises now deal with staggering amounts of data. Whether in a data lake or broken across numerous silos, they now face new challenges such as data freshness, complications in data capturing, or building new event-driven workflows. They are on the hunt for new, better methods of real-time data handling.

To achieve real-time data handling, most enterprises have turned to Apache Kafka—arguably the leader in message brokering and data streaming. Enterprises who have gone this route are also showing a growing use of Kubernetes to manage containerized workloads.

Combining these two powerful technologies presented a challenge of its own, which is why TriggerMesh has developed a declarative API to facilitate the ingestion and processing of Kafka messages. A declarative API allows developers to declare a desired outcome instead of executing statements one after another, such as with an imperative API. This approach allows developers to declare a Kafka source, such as a Kafka topic, and set the target for those messages in a Kubernetes manifest. Now Kafka consumers can be managed with a Kubernetes API and a DevOps mindset.

A declarative Kafka source

Knative is an enterprise-level open source solution for creating serverless applications on Kubernetes. The serverless approach to application development relies on event management and event-driven processes. Knative provides a Kubernetes API extension—a CustomResourceDefinition or CRD—called KafkaSource that defines how messages from Kafka are consumed. This means that developers do not have to write custom clients, but instead just a configuration file.

Hands on: Deploying a KafkaSource CRD

Here are example commands showing how quickly a developer can deploy the KafkaSource CRD to a knative-eventing namespace.

kubectl create ns knative-eventing
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/source.yaml

With one "apply", we have our CRD and our controller. You can examine the success of these commands on your Kubernetes deployment like this:

$ kubectl get pods -n knative-eventing

NAME READY STATUS RESTARTS AGE
kafka-controller-manager-78c5588947-wt5bj 1/1 Running 0 53s
kubectl get crd | grep kafka
kafkabindings.bindings.knative.dev 2021--06--18T08:44:51Z
kafkasources.sources.knative.dev 2021--06--18T08:44:52Z

The last step is to define which Kafka topic that the application will use to receive and consume messages. This can be done with a Kubernetes manifest like this:

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:  
  name: my-kafkaspec:  
  bootstrapServers:    
    - pkc-453q9.us-east4.gcp.confluent.cloud:9092  
    net:    
      sasl:      
        enable: true      
        password:        
          secretKeyRef:          
          key: password          
          name: kafkahackathon      
        type:        
          secretKeyRef:          
            key: sasl.mechanism          
            name: kafkahackathon      
        user:        
          secretKeyRef:          
            key: user          
            name: kafkahackathon
# This example doesn't include the configuration of TLS certificates.

sink:  
  ref:    
    apiVersion: v1    
    kind: Service    
    name: display  
  topics: 
    - hackathon

Note that in this example, we are consuming messages from a Kafka cluster running in the Confluent cloud. You can find the Kafka documentation here, or contact us at TriggerMesh and we'd be happy to support you.

To display incoming Kafka events, we can deploy a simple event sink like this:

kubectl run display --- image=gcr.io/knative-releases/
knative.dev/eventing/cmd/event_display
kubectl expose pod display --port=80

The target for the Kafka messages is defined in the sink section of the KafkaSource manifest. Every message in the Kafka topic will get consumed and displayed as a CloudEvent, which will look something like this:

$ kubectl logs -f display
☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.kafka.event
source: /apis/v1/namespaces/default/kafkasources/my-kafka#hackathon
subject: partition:0#106
id: partition:0/offset:106
time: 2021--06--18T09:21:30.489Z
Extensions,
kafkaheaderkinesisapproximatearrivaltimestamp:
2021--06--18T09:21:30.489Z
kafkaheaderkinesispartitionkey: static
kafkaheaderkinesissequencenumber:
49618035385845691671998692582194447474413536493243990018
kafkaheaderkinesisshardid: shardId-000000000000
kafkaheaderkinesisstreamname: confluentkey: static
Data,
{"specversion":"1.0","id":"8dda1630-d016--11eb-8b1d-f6456a02a124","source":"[[https://github.com/sebgoa/transform][_https://github.com/sebgoa/transform_]]","type":"dev.knative.source.github.issues","subject":"8","datacontenttype":"application/json","time":"2021-06-18T09:21:28.036834426Z","data":{"action":"reopened","issue":...

That's all there is to declaratively defining Kafka messages and sending them in CloudEvent format to a Kubernetes workload.

Section 2: Send messages to Kafka declaratively

In this section, we show how to do the opposite operation: sending messages to a Kafka topic declaratively. We will show you how to declaratively define your Kafka sources and sinks with Kubernetes manifests. This avoids the need to compile and upload jar files to configure Kafka connections.

Our goal is to write Kubernetes manifests to declare how we send and receive—or produce and consume—messages to a Kafka topic.

A KafkaSink produces an event to a Kafka topic and therefore is a Kafka source from the point of view of the Kafka cluster. The KafkaSource consumes an event from a Kafka topic and is a Kafka sink from the point of view of the Kafka cluster.

To accomplish this configuration, we install two controllers and two new CRDs: KafkaSink and Knative Eventing.

kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-crds.yaml
kubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-controller.yaml
kubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-sink.yaml

Shortly after running these commands, the pods will be up and running. Note that the example below also shows the controller for KafkaSource.

$ kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
kafka-controller-86948cddc6-g5mnn 1/1 Running 0 2m18s
kafka-controller-manager-78c5588947-wt5bj 1/1 Running 0 5h29m
kafka-sink-receiver-7dcb7b9b44--5w6j7 1/1 Running 0 115s
kafka-webhook-eventing-fb6ddfd9d-jsmzm 1/1 Running 0 2m18s

And with this, our new CRD KafkaSink is in place, as you can see here.

$ kubectl get crd | grep kafka
kafkabindings.bindings.knative.dev 2021--06--18T08:44:51Z
kafkasinks.eventing.knative.dev 2021--06--18T14:12:12Z
kafkasources.sources.knative.dev 2021--06--18T08:44:52Z

Seeing the KafkaSink in action

To see the KafkaSink in action, we need to set up an HTTP endpoint in our Kubernetes cluster where we can send CloudEvents. These CloudEvents will then be produced—or sent to—the Kafka cluster. Using the same information that allowed us to talk to the Kafka cluster in Section 1, we can set up a Kafka sink declaratively with this configuration:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata: 
  name: my-kafka-topic
spec: 
  auth:   
    secret:     
      ref:
        name: kafkahackathon 
bootstrapServers: 
  - pkc-456q9.us-east4.gcp.confluent.cloud:9092
topic: hackathon

Now we have an HTTP endpoint that acts as an HTTP proxy to a Kafka topic. This example shows how the KafkaSink should look:

$ kubectl get kafkasink
NAME URL AGE READY REASON
my-kafka-topic http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-topic 12m True

We can now POST a CloudEvent to this endpoint and the Sink will produce the event to the topic. To do this, use curl at the command line to craft a CloudEvent by hand. The following is an example:

curl -v "http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-topic" \
	-X POST \
  -H "Ce-Id: 536808d3--88be-4077--9d7a-a3f162705f79" \
  -H "Ce-Specversion: 1.0" \
  -H "Ce-Type: dev.knative.samples.helloworld" \
  -H "Ce-Source: dev.knative.samples/helloworldsource" \
  -H "Content-Type: application/json" \
  -d '{"msg":"Hello World from the curl pod."}'

We can now see the incoming CloudEvent being received by our KafkaSource. It should look like this:

☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.samples.helloworld
source: dev.knative.samples/helloworldsource
id: 536808d3--88be-4077--9d7a-a3f162705f79
datacontenttype: application/json
Data,
{
"msg": "Hello World from the curl pod."
}

The flow configured in this section allows us to produce events anywhere as CloudEvents, send them to a Kafka topic, and then consume the events by sending them to a Kubernetes workload. As an added bonus, all of this configuration has been defined declaratively.

Section 3: Configuring Kafka sources and sinks in Kubernetes

This section introduces a declarative method for configuring Kafka sources and sinks by incorporating third party event sources and sinks. Streaming data into and out of a Kafka cluster is usually done with a Kafka Connect cluster. However, this configuration process has proven to be a pain point for developers.

It involves a number of complicated steps, such as:

  • Deploying and running a Kafka Connect cluster
  • Updating and compiling connectors in Java
  • Uploading JARs to specific directories in the Kafka Connect cluster
Representation of the Kafka Connect data pipeline from the Confluent Blog

While there are Kafka Connectors available to self-managed Confluent cloud users, they still leave a lot of work for Kafka users who wish to manage their sources and sinks effectively. This section will demonstrate how to declaratively define and manage Kafka sources and sinks in Kubernetes clusters.

This approach allows users to utilize event sources from any third-party service, including GitHub, GitLab, AWS SQS, and Google Storage. These events can be consumed as Kafka messages and sent to any other third-party sink, such AWS Lambda, Azure Storage, and ElasticSearch.

Instead of a complex workflow compiling JARs and uploading them to a Kafka Connect cluster, this approach can leverage your Kubernetes cluster and define your Kafka sources and sinks with Kubernetes objects.

This brings a declarative mindset to your Kafka sources and sinks, declares a single source of truth, and unifies the management of Kafka flows and microservices. This shows it is possible to simplify the targeting of microservices running in Kubernetes when Kafka messages are present.

Configuring a Kafka source

Building on the examples in the previous sections, our next step is to create an addressable endpoint in our Kubernetes cluster. This endpoint will be the source of messages into our Kafka cluster. It is created using a configuration like the following.

Note that this configuration specifies both a bootstrap server on the Confluent cloud and a specific topic name.

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-topic
spec:
  auth:
    secret:
      ref:
        name: kafkahackathon
  bootstrapServers:
    — pkc-456q9.us-east4.gcp.confluent.cloud:9092
  topic: hackathon

Once the KafkaSink is defined, we can now create an AWS SQS source that will send messages directly into Kafka. This configuration requires the AWS SQS source's ARN and a Kubernetes secret that contains the AWS API keys. The manifest for this source will look similar to this:

apiVersion: sources.triggermesh.io/v1alpha1
kind: AWSSQSSource
metadata: 
  name: samplequeue
spec: 
  arn: arn:aws:sqs:us-west-2:1234567890:triggermeshqueue
  credentials:   
    accessKeyID:     
      valueFromSecret:       
        name: awscreds       
        key: aws_access_key_id   
    secretAccessKey:     
      valueFromSecret:       
        name: awscreds       
        key: aws_secret_access_key
  sink:   
    ref:     
      apiVersion: eventing.knative.dev/v1alpha1     
      kind: KafkaSink    
      name: my-kafka-topic

ith this configuration, all SQS messages will be consumed and sent to Kafka using the CloudEvents specification. This example shows that with just two Kubernetes objects, it is possible to declaratively define a Kafka source.

Configuring a Kafka sink

When defining a Kafka sink, it is important to recall that a sink from the perspective of the Kafka cluster is a source from the perspective of the Kubernetes cluster. In other words, to define a Kafka sink, we instead define a KafkaSource object in our Kubernetes cluster.

This example manifest specifies a bootstrap server as a Kafka cluster in the Confluent cloud. It also specifies a topic as a source for messages to consume, and a sink—the end target of the Kafka messages. Note that this manifest also points to a Kubernetes service called display

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:  
  name: my-kafka
spec:  bootstrapServers:    
  — pkc-453q9.us-east4.gcp.confluent.cloud:9092
net:  
  sasl:    
    enable: true
    
# Skip TLS certificate configuration

sink:  
  ref:    
    apiVersion: v1    
    kind: Service    
    name: display  
  topics:    
    — hackathon

As this example shows, all we need for a Kafka sink definition is a single Kubernetes manifest and a target microservice exposed as a traditional Kubernetes service.

Section 4: Simplifying Kafka connectors configuration with an integration language

All of the previous examples have relied heavily on YAML, a markup language that is intended to be human readable, but in practice can be bulky, verbose, and confusing. Everything in the Kubernetes ecosystem seems to be configured with YAML, and DevOps ends up writing and maintaining a lot of it. In the Kubernetes community, the proliferation of these configuration files has been referred to as a "face full of YAML."

TriggerMesh recently open sourced TriggerMesh Integration Language (TIL), an authoring tool for integrating various applications with event-driven mechanisms at its core. This tool has two goals:

TIL declaratively represents the design of an integration, abstracting the API objects being used to implement enterprise integration patterns.

There are different patterns in each major component: source, channel, router, transformer, and target.

Let us examine how this new configuration language can represent a Kafka and Kubernetes flow similar to the examples used earlier.

TriggerMesh Integration Language

If we approach a Kafka flow as an integration, we can represent that flow with TriggerMesh Integration Language. This example shows that TIL vastly simplifies equivalent configurations written in YAML.

In an earlier section, we saw YAML configuration for a KafkaSink object that looked like this:

The TIL syntax shortens this significantly. It eliminates a number of extraneous configuration elements, such as API version. Since all we need to know is that this target endpoint is a Kafka endpoint, our configuration becomes:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:  
  name: my-kafka-topic
spec:
auth:  
  secret:    
    ref:      
      name: kafkahackathon
bootstrapServers:
  — pkc-456q9.us-east4.gcp.confluent.cloud:9092
topic: hackathon

GitHub Kafka source and microservice as a Kafka sink

Equipped with TIL, we can declare a message flow declaratively and manage it in Kubernetes. For example, a GitHub event source sending events to a Kafka stream and consuming from a Kafka stream to target a serverless workload on Kubernetes would be declared like this:

source "github" "git_source" {  
    owner_and_repository = "sebgoa/transform"  
    event_types = ["push", "issues"]  
    tokens = secret_name("github-secret")  
    to = target.my_kafka_topic
}
target "kafka" "my_kafka_topic" {  
    topic = "hackathon"  
    bootstrap_servers = ["pkc-4q3.us-east4.gcp.confluent.cloud:9092"]  
    auth = secret_name("kafkahackathon")
}
source "kafka" "my_kafka" {   
    bootstrap_servers = ["pkc-4q3.us-east4.gcp.confluent.cloud:9092"]   
    topics = ["hackathon"]   sasl_auth =  secret_name("kafkahackathon")   
    tls = secret_name("kafkahackathon")   
    to = target.mymicroservice
}
target "container" "mymicroservice" {   
    image = "docker.io/n3wscott/sockeye:v0.7.0"   public = true
}

This configuration file can quickly be used to generate a Kubernetes-ready configuration on the command line using the til command like this:

til generate kafka.hcl | kubectl apply -f-

Conclusions

Marrying the two biggest trends in enterprise IT—Kafka and Kubernetes—allows teams to fluidly connect all their data with all their apps. However, smoothly connecting these two technologies requires a new approach, one facilitated by TriggerMesh.

In this paper, we showed you how TriggerMesh simplifies writing enterprise integrations by adopting a cloud-native declaration. We then demonstrated an innovative solution to the "face full of YAML" problem by describing enterprise integration with the TriggerMesh Integration Language.

Create your first event flow in under 5 minutes