Send Messages to Kafka Declaratively

Sebastien Goasguen

Sebastien Goasguen

Aug 16, 2021
Send  Messages to Kafka Declaratively
Send  Messages to Kafka Declaratively

Blog 2 in our four-part Series: Using TriggerMesh to Connect Apache Kafka And Kubernetes

In blog 1 in this series, we looked at how TriggerMesh lets you declaratively define your Kafka messages and send them in the CloudEvent format to a Kubernetes workload. 

In this blog, let’s do the opposite and send messages to a Kafka topic declaratively. This also means that you are able to declaratively define your Kafka sources and sinks with Kubernetes manifests, and avoid the pesky jar compilation and jar uploading to configure Kafka connect.

As described in the blog 1, the goal is to write Kubernetes manifests to declare how we send and receive—or produce and consume—messages to a Kafka topic. The diagram above shows it all. Note that a KafkaSink produces an event to a Kafka topic and therefore is a Kafka source from the point of view of the Kafka cluster. Therefore, the KafkaSource consumes an event from a Kafka topic and is a Kafka sink from the point of view of the Kafka cluster.


Because it would be much easier than writing my own Kafka clients, compiling, packaging, deploying etc. If only I could write a bit of config and let a system like Kubernetes manage it for me that would be great.


 We install two controllers and a CRD for a new kind called KafkaSink. For good measure, we also install the knative eventing CRD.

kubectl apply -f
kubectl apply --filename
kubectl apply --filename

Shortly after the apply, the Pods are running. Note that the sample below also shows the controller for KafkaSource

kubectl get pods -n knative-eventing
kafka-controller-86948cddc6-g5mnn 1/1 Running 0 2m18s
kafka-controller-manager-78c5588947-wt5bj 1/1 Running 0 5h29m
kafka-sink-receiver-7dcb7b9b44–5w6j7 1/1 Running 0 115s
kafka-webhook-eventing-fb6ddfd9d-jsmzm 1/1 Running 0 2m18s

And our new fancy CRD KafkaSink is in place

kubectl get crd | grep kafka 2021–06–18T08:44:51Z 2021–06–18T14:12:12Z 2021–06–18T08:44:52Z

The KafkaSink in Action

Using the same secret that allows us to talk to our Kafka cluster in confluent from Section 1, we can now set up a Kafka sink declaratively like so:

kind: KafkaSink
 name: my-kafka-topic
       name: kafkahackathon
 topic: hackathon

What this will do is setup an HTTP endpoint in my Kubernetes cluster where I can POST CloudEvents. These CloudEvents will then be produced to my Kafka cluster. The KafkaSink looks like:

kubectl get kafkasink
my-kafka-topic http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-topic
12m True

Now that we have an HTTP endpoint acting as an HTTP proxy to a Kafka topic, we can simply POST a CloudEvent to this endpoint and the Sink will “produce” the event to the topic. To do this we can get a quick shell in our cluster and with curl craft a CloudEvent by hand like so:

curl -v “http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-topic" \
> -X POST \
> -H “Ce-Id: 536808d3–88be-4077–9d7a-a3f162705f79” \
> -H “Ce-Specversion: 1.0” \
> -H “Ce-Type: dev.knative.samples.helloworld” \
> -H “Ce-Source: dev.knative.samples/helloworldsource” \
> -H “Content-Type: application/json” \
> -d ‘{“msg”:”Hello World from the curl pod.”}’

Checking the logs in the display Pod we see our CloudEvent being received by that KafkaSource that we setup in our previous Blog.

☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.samples.helloworld
source: dev.knative.samples/helloworldsource
id: 536808d3–88be-4077–9d7a-a3f162705f79
datacontenttype: application/json
“msg”: “Hello World from the curl pod.”

Head over to Blog 3 in this four-part series to learn about configuring Kafka sources and sinks in Kubernetes.

Create your first event flow in under 5 minutes