Consuming Kafka Messages in Kubernetes

Consuming Kafka Messages in Kubernetes
Consuming Kafka Messages in Kubernetes

Blog 1 in our four-part Series: Using TriggerMesh to Connect Apache Kafka And Kubernetes

It is said that you can only move as fast as your slowest part. For many technology teams, the slowest part is integrating data and applications across a growing mix of infrastructure. As Apache Kafka emerges as the leading streaming solution for data teams and Kubernetes as the top orchestration platform for developers, integrating these two platforms is a growing bottleneck.

Existing integration approaches suffer from a number of velocity-sapping weaknesses, such as low-level programming, custom app extensions, and expensive external consultants. In a recent poll, 67% of DevOps professionals said they wait a month or longer to integrate services across infrastructures.

What’s needed is a declarative API to describe what to do with Kafka messages in Kubernetes-based workloads, and to declaratively manage the production and consumption of Kafka messages to and from Kubernetes workloads.

TriggerMesh is a new breed of integration platform built in the cloud on Kubernetes and Knative so you can declaratively design, deploy, and manage integrations as codeTM. 

This is the first in a four-part technical blog series that I originally published on Medium on Consuming Kafka Messages in Kubernetes

There has been an increasing need for real-time data handling within the enterprise. Whether that’s a freshness of data issue, a change data capture issue, or an event driven workflow issue. The latest very nice public example I saw about this is Shopify re architecture of their data warehouse using Debezium.

To achieve real-time data handling, most enterprises run Apache Kafka—arguably the leader in message brokering and data streaming.

In those enterprises that use Kafka we typically also see a growing use of Kubernetes, the de-facto platform for managing containerized workloads. 

This means that enterprises now have both Kafka and Kubernetes. We believe that the best way to marry these two powerful technologies is with a declarative API to describe what to do with your Kafka messages. This way, instead of writing Kafka client consumers, packaging that in your code etc., Developers instead can simply declaratively define a Kafka Source (i.e. a Kafka topic is the source of a message) and set the target for that message in a Kubernetes manifest. 

A Declarative Kafka Source

The best technology to declaratively connect Kafka sources with a Kubernetes target is Knative. Many people know that Knative is about serverless, but what is less well-known is that serverless is mostly about handling events and building event-driven applications.

Knative provides a Kubernetes API extension (i.e. CRD) KafkaSource and a controller to declaratively define how to consume messages from Kafka. This means we can skip writing our own clients, and instead just write a config. What is even more handy is that this particular KafkaSource codebase can run without Knative, you just need the namespace. Let’s dive in to see how we do this:

kubectl create ns knative-eventing 
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/source.yaml


With one apply, we have our CRD and our controller. Let’s check it out:

kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
kafka-controller-manager-78c5588947-wt5bj 1/1 Running 0 53s
kubectl get crd | grep kafka
kafkabindings.bindings.knative.dev 2021–06–18T08:44:51Z
kafkasources.sources.knative.dev 2021–06–18T08:44:52Z


And now you can write which Kafka topic you want to consume messages from and where to send them with a beautiful Kubernetes manifest that looks like this:

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
 name: my-kafka
spec:
 bootstrapServers:
 - pkc-453q9.us-east4.gcp.confluent.cloud:9092
 net:
sasl:
enable: true
password:
secretKeyRef:
key: password
name: kafkahackathon
type:
secretKeyRef:
key: sasl.mechanism
name: kafkahackathon
user:
secretKeyRef:
key: user
name: kafkahackathon
...
 sink:
ref:
apiVersion: v1
kind: Service
name: display
 topics:
 - hackathon


For clarity, we skipped the configuration of the TLS certificates. Note that in the example, we are  consuming messages from a Kafka cluster running in the Confluent cloud. You can find more complete documentation here, or contact us here at TriggerMesh and we’d be happy to support you.

 To display the events, we can deploy a super simple event Sink like this:

kubectl run display — image=gcr.io/knative-releases/knative.dev/eventing/cmd/event_display

kubectl expose pod display --port=80

The target for the Kafka messages is defined in the sink section of the KafkaSource manifest. Now, whatever you have in your Kafka topic will get consumed and sent over to the display as a CloudEvent and will look something like:

kubectl logs -f display
☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.kafka.event
source: /apis/v1/namespaces/default/kafkasources/my-kafka#hackathon
subject: partition:0#106
id: partition:0/offset:106
time: 2021–06–18T09:21:30.489Z
Extensions,
kafkaheaderkinesisapproximatearrivaltimestamp: 2021–06–18T09:21:30.489Z
kafkaheaderkinesispartitionkey: static
kafkaheaderkinesissequencenumber: 49618035385845691671998692582194447474413536493243990018
kafkaheaderkinesisshardid: shardId-000000000000
kafkaheaderkinesisstreamname: confluent
key: static
Data,
{“specversion”:”1.0",”id”:”8dda1630-d016–11eb-8b1d-f6456a02a124",
”source”:”https://github.com/sebgoa/transform",
"type":"dev.knative.source.github.issues","subject":"8",
"datacontenttype":"application/json",
"time":"2021-06-18T09:21:28.036834426Z","data":{"action":"reopened","issue":...
 


And that’s all there is to declaratively defining your Kafka messages and sending them in the CloudEvent format to a Kubernetes workload. 

Blog two will look at the opposite scenario when we want to go from Kubernetes to Kafka.




Create your first event flow in under 5 minutes