dateo. Coding Blog

Coding, Tech and Developers Blog

masstransit
.net
kafka
messaging

Consuming Kafka messages with MassTransit

Dennis Frühauff on August 6th, 2024

Many articles on this blog deal with various aspects of MassTransit and asynchronous messaging techniques. Let's explore how you can incorporate an event broker alongside the existing messaging infrastructure in your project. For this, we consume Kafka messages via MassTransit.


The project that I am currently in is relying very much on MassTransit as an abstraction layer on top of AWS services to handle business processes asynchronously. Very soon, there will also be an integration of Kafka messages provided by a different application. Luckily for us, MassTransit offers the possibility to consume Kafka topics alongside the existing AWS integration.


I worked only very briefly with Kafka before, and since then, times have changed, so I thought it might be helpful to give you a quick introduction to how you can consume Kafka messages via MassTransit and, most importantly, make that work in your local environment.


You can find the sample code for this article on GitHub


What is Kafka?

In short, Kafka is an event streaming platform meant to deliver events in real-time to consumers. It is based on the communication between servers and clients via TCP that are set up to produce or consume on specific topics. Those topics are where the events are stored durably until they expire.


This durability aspect is one of the key differences to a classical message bus. Events from Kafka can be read as often as you'd like, i.e., consuming a message does not delete it from the bus. Instead, an expiration period can be configured for a topic, indicating when messages can actually be deleted from the system.


There is of course more to be known about Kafka, feel free to explore the docs on it; they are in fact pretty good.


Hosting Kafka on your local machine

While integrating event consumption from an existing Kafka infrastructure will be the main goal in my current project, developers will have to have a way to test the integration locally, isolated from any other machine. For that, one option is to host Kafka locally in a docker container and produce messages to it, so that your application can afterwards consume them. So we'll start with a docker compose file for Kafka:


networks:
  app-tier:
    driver: bridge

name: masstransit-kafka
services:
  kafka:
    image: 'bitnami/kafka:latest'
    container_name: 'kafka-demo'
    ports:
      - 9092:9092
      - 9094:9094
    networks:
      - app-tier
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_ADVERTISED_HOST_NAME=kafka

In this docker file, we are consuming the bitnami/kafka image to host our own Kafka instance locally. There is a mass of configuration options that are set via environment variables and it took me a while to actually make MassTransit communicate with this container so I will be very honest now: I am not 100% sure what each of those settings will actually do or if it is even necessary to make it work. Still, let's focus on a few settings here:


  • KAFKA_CFG_PROCESS_ROLES Configures what roles these instances will play. Kafka is multi-host, multi-producer, multi-consumer. So each node can play different roles based on your actual configuration.
  • KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: With this option, you don't have to create topics before sending events. We will still do that, just for the learning experience.KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 This is how we enable MassTransit to actually resolve the Kafka instance in your local network.

In case you checked out the sample repository on your machine, open a terminal inside the docker directory and run:


docker compose up -d

This will pull the latest image and start the above stack on your machine (I obviously assume you have docker installed, though).


Integrating Kafka into MassTransit

Let's assume we consuming events from Kafka that tell us that a payment was successfully processed.
An example (and still useless) message (or, rather, "event") could look like this:


public record PaymentSucceeded
{
    public string Name { get; init; }
}

Correspondingly, the consumer for this message could be:


public class PaymentSucceededConsumer : IConsumer<PaymentSucceeded>
{
    private readonly ILogger<PaymentSucceededConsumer> logger;

    public PaymentSucceededConsumer(ILogger<PaymentSucceededConsumer> logger)
    {
        this.logger = logger;
    }
    public Task Consume(ConsumeContext<PaymentSucceeded> context)
    {
        logger.LogInformation("Payment succeeded for customer {Name}", context.Message.Name);
        return Task.CompletedTask;
    }
}

This consumer does nothing but log the actual content of the message, but for our tutorial, that will be good enough.
Now, let's wire everything up.


In our ASP.NET application, we need to add a rider alongside the existing MassTransit registrations:


builder.Services.AddMassTransit(x =>
{
    x.UsingInMemory();

    x.AddRider(rider =>
    {
        rider.AddConsumer<PaymentSucceededConsumer>();
        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");

            k.TopicEndpoint<PaymentSucceeded>(
                topicName: "payment-succeeded", 
                groupId: "payment-group", 
                configure: e =>
            {
                e.ConfigureConsumer<PaymentSucceededConsumer>(context);
            });
        });
    });
});

Now, a rider is MassTransit's way to integrate event streaming solutions like Kafka or Azure Event Hub into bus-centered solutions which are actually MassTransit's ballpark. It serves as a bridge between those two messaging concepts and lets you both consume as well as produce events if that is needed.


Here, we are configuring this rider to connect to the locally running docker instance of Kafka, and we are also setting up one specific consumer that is supposed to listen to a specific topic.


Please also note the call to x.UsingInMemory. This is only a placeholder representing any existing bus configuration in your application. In any real-life example, this might hold your traditional message consumers and the connection to AWS SNS/SQS, Azure Service Bus, RabbitMQ, etc.


This is actually all of the setup that we need. However, if we now start the application, MassTransit will give us a ConsumeException because the topic that we configured to subscribe to does not exist. So let's head over to the Kafka instance and set it up.


To execute commands inside of the running Kafka container, open a (Windows) terminal and enter:


docker exec -it kafka-demo /bin/bash

The binary files to actually interact with Kafka are located in a subfolder that we need to cd into:


cd opt/bitnami/kafka/bin

From there, we can create a topic with the exact same name that MassTransit will expect to see:


kafka-topics.sh --create --topic payment-succeeded --bootstrap-server kafka:9092

Hopefully, you will be greeted with a Created topic payment-succeeded.


If so, we are ready to run the .NET application. It should give no exceptions, but instead just tell us that the bus was started.


info: MassTransit[0]
      Configured endpoint payment-group, Consumer: WebApi.PaymentSucceededConsumer
info: Microsoft.Hosting.Lifetime[14]
      Now listening on: http://localhost:5236
info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
      Content root path: D:\repos\Sandbox.MassTransitKafka\WebApi
info: MassTransit[0]
      Bus started: loopback://localhost/

We are now ready to actually produce a message. Again, from the shell inside the docker container, type:


kafka-console-producer.sh --broker-list localhost:9092 --topic payment-succeeded

Not much will happen, you are now inside of a continuous chat-like mode with that topic. Each line you enter will be published to all listeners.


MassTransit will expect JSON-formatted events to be able to deserialize them into the contract that we defined for the consumer. So, a valid message could look like this:


{ "Name": "John Smith" }

If you try this out, the logger in the running .NET application should show you the following line:


info: WebApi.PaymentSucceededConsumer[0]
      Payment succeeded for customer John Smith

Any additional properties that you would enter into the JSON event, will not be serialized as long as you do not define them in the message contract. Conversely, if you omit the Name property from the JSON, the consumed message will just be populated with null.


Conclusion

We just dipped our toe into the possibilities of connecting an existing application with MassTransit support to a separate event streaming platform like Kafka.
MassTransit offers a variety of configuration options for your event consumer with which you can fine-tune the interaction between the two systems.


I hope you found some value in this article. Personally, I have a lot to learn about how we will reliably integrate Kafka into the business processes in my current project, but at least this gives me a starting point to be able to develop things locally, without affecting other environments.



Please share on social media, stay in touch via the contact form, and subscribe to our post newsletter!

Be the first to know when a new post was released

We don’t spam!
Read our Privacy Policy for more info.

We use cookies on our website to give you the most relevant experience by remembering your preferences and repeat visits. By clicking “Accept All”, you consent to the use of ALL the cookies. However, you may visit "Cookie Settings" to provide a controlled consent.