Send Messages to Kafka Declaratively

Sebastien Goasguen
Sebastien Goasguen
Aug 16, 2021
Send  Messages to Kafka Declaratively
Send  Messages to Kafka Declaratively

Blog 2 in our four-part Series: Using TriggerMesh to Connect Apache Kafka And Kubernetes

In blog 1 in this series, we looked at how TriggerMesh lets you declaratively define your Kafka messages and send them in the CloudEvent format to a Kubernetes workload. 

In this blog, let’s do the opposite and send messages to a Kafka topic declaratively. This also means that you are able to declaratively define your Kafka sources and sinks with Kubernetes manifests, and avoid the pesky jar compilation and jar uploading to configure Kafka connect.



As described in the blog 1, the goal is to write Kubernetes manifests to declare how we send and receive—or produce and consume—messages to a Kafka topic. The diagram above shows it all. Note that a KafkaSink produces an event to a Kafka topic and therefore is a Kafka source from the point of view of the Kafka cluster. Therefore, the KafkaSource consumes an event from a Kafka topic and is a Kafka sink from the point of view of the Kafka cluster.

 Why?

Because it would be much easier than writing my own Kafka clients, compiling, packaging, deploying etc. If only I could write a bit of config and let a system like Kubernetes manage it for me that would be great.

 How?

 We install two controllers and a CRD for a new kind called KafkaSink. For good measure, we also install the knative eventing CRD.

kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-crds.yaml
kubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-controller.yaml
kubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-sink.yaml

Shortly after the apply, the Pods are running. Note that the sample below also shows the controller for KafkaSource

kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
kafka-controller-86948cddc6-g5mnn 1/1 Running 0 2m18s
kafka-controller-manager-78c5588947-wt5bj 1/1 Running 0 5h29m
kafka-sink-receiver-7dcb7b9b44–5w6j7 1/1 Running 0 115s
kafka-webhook-eventing-fb6ddfd9d-jsmzm 1/1 Running 0 2m18s

And our new fancy CRD KafkaSink is in place

kubectl get crd | grep kafka
kafkabindings.bindings.knative.dev 2021–06–18T08:44:51Z
kafkasinks.eventing.knative.dev 2021–06–18T14:12:12Z
kafkasources.sources.knative.dev 2021–06–18T08:44:52Z

The KafkaSink in Action

Using the same secret that allows us to talk to our Kafka cluster in confluent from Section 1, we can now set up a Kafka sink declaratively like so:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
 name: my-kafka-topic
spec:
 auth:
   secret:
     ref:
       name: kafkahackathon
 bootstrapServers:
 - pkc-456q9.us-east4.gcp.confluent.cloud:9092
 topic: hackathon

What this will do is setup an HTTP endpoint in my Kubernetes cluster where I can POST CloudEvents. These CloudEvents will then be produced to my Kafka cluster. The KafkaSink looks like:

kubectl get kafkasink
NAME URL AGE READY REASON
my-kafka-topic http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-topic
12m True

Now that we have an HTTP endpoint acting as an HTTP proxy to a Kafka topic, we can simply POST a CloudEvent to this endpoint and the Sink will “produce” the event to the topic. To do this we can get a quick shell in our cluster and with curl craft a CloudEvent by hand like so:

curl -v “http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-topic" \
> -X POST \
> -H “Ce-Id: 536808d3–88be-4077–9d7a-a3f162705f79” \
> -H “Ce-Specversion: 1.0” \
> -H “Ce-Type: dev.knative.samples.helloworld” \
> -H “Ce-Source: dev.knative.samples/helloworldsource” \
> -H “Content-Type: application/json” \
> -d ‘{“msg”:”Hello World from the curl pod.”}’


Checking the logs in the display Pod we see our CloudEvent being received by that KafkaSource that we setup in our previous Blog.

☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.samples.helloworld
source: dev.knative.samples/helloworldsource
id: 536808d3–88be-4077–9d7a-a3f162705f79
datacontenttype: application/json
Data,
{
“msg”: “Hello World from the curl pod.”
}

Related Posts

Join our newsletter

Subscribe to TriggerMesh newsletter. You'll get only the most important news about TriggerMesh