Build a custom TriggerMesh component with Python's FastAPI framework

Jeff Neff

Jeff Neff

Apr 4, 2023
Build a custom TriggerMesh component with Python's FastAPI framework
Build a custom TriggerMesh component with Python's FastAPI framework

FastAPI has become a very popular Python web framework to build REST APIs. Started in 2018, its performance is said to be on par with Node.js or Go-based applications. Compared to Flask it handles events asynchronously.

In this post we will build a simple FastAPI application to transform events within a TriggerMesh integration. In order to do this our application with handle CloudEvents by using the FastAPI CloudEvent plugin.

The diagram below shows what we will build. A TriggerMesh broker will handle the event routing, all events will go to a Google CloudRun service for display, events of type io.foo.com will get transformed by the FastAPI application that will add a key to the payload and change the event type to io.foo.com.transformed. We will send a sample event to Trigger the flow by using the tmctl CLI.

Install FastAPI

First, let's install the prerequisites. With Python and pip installed, get the FastAPI and CloudEvents modules like so:

pip install "fastapi[all]"
pip install fastapi-cloudevents

The command above will also install uvicorn, the ASGI Web server implementation for Python that is used by FastAPI.


Verify that everything is available:

import uvicorn
from fastapi_cloudevents import CloudEvent, install_fastapi_cloudevents
from fastapi import FastAPI

The TriggerMesh CLI can be installed via brew with:

brew install triggermesh/cli/tmctl

To verify that tmctl works you can follow our quickstart guide.

Create a FastAPI application using CloudEvents

To build a TriggerMesh component that can be used within an event-based integration, that component needs to accept and reply CloudEvents.

To get started quickly you can copy the sample from the GitHub repo into a local file called component.py.

import uvicorn
from fastapi import FastAPI
from fastapi_cloudevents import CloudEvent, install_fastapi_cloudevents

app = FastAPI()
app = install_fastapi_cloudevents(app)

@app.post("/")
async def on_event(event: CloudEvent) -> CloudEvent:
    return CloudEvent(
        type="my.response-type.v1",
        data=event.data,
        datacontenttype=event.datacontenttype,
    )

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)


To run that FastAPI application you can use Python locally:

python ./component.py
overriding custom non json response default response class
INFO:     Started server process [58162]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)

With the application running you are now ready to use it as part of a TriggerMesh integration. 

Route events to the FastAPI service with TriggerMesh

To test this event transformation we are going to setup the event bridge depicted in the diagram shown above. First we create an event broker and then we add a target event display running in Google CloudRun that will receive all events for display. To route all events to this service we create a wildcard trigger, meaning all event types will be routed to it. That event display is a continuously running cloud service, feel free to use it for your own development purposes as well.

tmctl create broker fastapi
tmctl create target cloudevents --name cloudrun --endpoint https://triggermesh-console-tu4luqbmqq-uc.a.run.app/
tmctl create trigger --target cloudrun

Then we'll create a CloudEvents target to send events to the FastAPI service, and route events of type io.foo.com to it with a trigger:

tmctl create target cloudevents --endpoint http://host.docker.internal:8080
tmctl create trigger --eventTypes io.foo.com --target fastapi-cloudeventstarget

You are now ready to send events in order to test that the flow works. You'll observe that events of type io.foo.bar are received by the event display service without having undergone any transformation because they don't match the Trigger filter for the Fast API service. However, events of type io.foo.com are transformed by the FastAPI application, with a new key added and the type of the returned event set to io.foo.com.transformed.


Dockerize your FastAPI service

Keeping things super simple, below is a Dockerfile to package your application as a Docker container:

FROM python:slim-buster

RUN pip install fastapi-cloudevents
RUN pip install uvicorn

COPY . .

CMD ["python", "./component.py"]

Build and push the container image with the Docker CLI like so:

docker build -t runseb/fastapi .
docker push runseb/fastapi

Use the Dockerized FastAPI service in your integration

You are now ready to use your Dockerized service in your integration and deploy it both locally and in a Kubernetes cluster.

To deploy it locally you can create a TriggerMesh target component by using the --from-image option, rather than having to create a CloudEvents target and then routing the event to the containerized FastAPI service. You can then create a Trigger that routes events to this containerized version of the FastAPI service.

tmctl create target --name runsebfastapi --from-image runseb/fastapi
tmctl create trigger --target runsebfastapi --eventTypes io.foo.bar

To deploy this to Kubernetes you can simply generate the YAML manifest with tmctl and you will see that the target is defined as a Knative service which references your FastAPI container image:

tmctl dump
...
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  labels:
    triggermesh.io/context: fastapi
    triggermesh.io/role: target
  name: fastapi-target-service
spec:
  template:
    spec:
      containers:
      - env: []
        image: runseb/fastapi
        name: user-container

To deploy it in Kubernetes simply do:

tmctl dump | kubectl apply -f -

Have fun!

Create your first event flow in under 5 minutes