How to Consume Kafka Messages With NestJS

Written by vdolzhenko | Published 2023/12/28
Tech Story Tags: kafka | nestjs | typescript | docker | testcontainers | consume-kafka-messages-guide | hackernoon-top-story | kafka-tips-and-tricks

TLDRKafka is a message broker in which some services generate messages and others receive them. In this article we will write a small application for consuming messages from kafka. And of course e2e tests. First we need to create a controller that will process messages. Then run the microservices in the main services in the NestJs framework.via the TL;DR App

Many of us use Kafka to publish messages, but how do we receive them? In this article, we will write a small application for consuming messages from Kafka. And, of course, e2e tests.

Let's First Understand How Kafka Works and What It Is.

Kafka is a message broker in which some services generate messages and others receive them. Brokers are primarily used in systems with a microservice architecture to pass messages between services.

Messages are stored in topics. When sending a message, the producer indicates the name of the topic, as well as the message itself, which consists of a key and a value. And that’s it; the producer’s work is finished.

Then consumers come into play, they subscribe to the desired topic, and start reading messages. Each application has its own queue, reading from which the consumer moves the offset pointer.


The distinctive features of Kafka are:

  • Guarantee that all messages will be ordered exactly in the sequence in which they arrived in the topic

  • Kafka stores read messages for a while

  • High throughput

Now, let's work with Kafka using the NestJs framework. First, we need to create a controller that will process messages.

@Controller()
export class AppController{

    constructor(
        private readonly appService: AppService,
    ) {
    }

    @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA)
    handleEvent(
        @Payload() payload: ExamplePayloadDto,
    ): Promise<void> {
        return this.appService.handleExampleEvent(payload.message);
    }
}

Pay attention to the @EventPattern attribute, which indicates that our handleEvent() function will receive messages from the topic specified in the configuration file config.get('kafka.topics.exampleTopic'). The @Payload() attribute helps to get the value from the topic message.

To connect your application to Kafka brokers, you need to do two things. To begin, connect the microservice in the startup file:

app.connectMicroservice({
        transport: Transport.KAFKA,
        options: {
            client: {
                clientId: config.get('kafka.clientId'),
                brokers: config.get('kafka.brokers'),
                retry: {
                    retries: config.get('kafka.retryCount'),
                },
            },
            consumer: {
                groupId: config.get('kafka.consumer.groupId'),
            },
        },
    });

And then run the microservices in the main.ts:

async function bootstrap() {
    const app = await NestFactory.create(AppModule, {
        bufferLogs: true,
    });
    appStartup(app);

    await app.startAllMicroservices();
    await app.listen(config.get('app.port'));
};

void bootstrap();

To test the application, I use the @testcontainers/kafka package. With the help of this, I created a zooKeeper container, and then a Kafka container:

export async function kafkaSetup(): Promise<StartedTestContainer[]> {
    const network = await new Network().start();

    const zooKeeperHost = "zookeeper";
    const zooKeeperPort = 2181;
    const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2")
        .withNetwork(network)
        .withNetworkAliases(zooKeeperHost)
        .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
        .withExposedPorts(zooKeeperPort)
        .start();

    const kafkaPort = 9093;
    const kafkaContainer = await new KafkaContainer()
        .withNetwork(network)
        .withZooKeeper(zooKeeperHost, zooKeeperPort)
        .withExposedPorts(kafkaPort)
        .start();

    const externalPort = kafkaContainer.getMappedPort(kafkaPort);

    config.set('kafka.brokers', [`localhost:${externalPort}`]);

    return [
        zookeeperContainer,
        kafkaContainer,
    ];
}

Please note that, in this file, I have overridden the broker address to the newly created container.

In the test file itself, in the beforeAll function, I create a Kafka client. With the producer, I also create a topic and launch our application.

beforeAll(async () => {
        kafkaContainers = await kafkaSetup();

        kafka = new Kafka({
            clientId: 'mock',
            brokers: config.get('kafka.brokers'),
            logLevel: logLevel.NOTHING,
        });
        producer = kafka.producer();
        await producer.connect();

        const admin = kafka.admin();
        await admin.connect();
        await admin.createTopics({
            topics: [{ topic: config.get('kafka.topics.exampleTopic') }],
        });

        appService = mockDeep<AppService>();

        const module: TestingModule = await Test.createTestingModule({
            imports: [AppModule],
        })
            .overrideProvider(AppService)
            .useValue(appService)
            .compile();

        app = module.createNestApplication();
        appStartup(app);
        await app.startAllMicroservices();

        await app.init();

    }, 30 * 1000);

Of course, in the afterAll function, you need to stop the containers:

afterAll(async () => {
        await app.close();
        await Promise.all(kafkaContainers.map(c => c.stop()));
    }, 15 * 1000);

I wrote a test that verifies that when a message arrives in a topic, our handler function from the controller calls the necessary service function. To do this, I override the implementation of the handleExampleEvent function and wait for it to be called.

describe('handleEvent', () => {
        it('should call appService', async () => {
            let resolve: (value: unknown) => void;
            const promise = new Promise((res) => {
                resolve = res;
            });
            appService.handleExampleEvent.mockImplementation(async () => {
                resolve(0);
            });

            const event: ExamplePayloadDto = {
                message: 'Hello World!',
            };

            await producer.send({
                topic: config.get('kafka.topics.exampleTopic'),
                messages: [{
                    key: 'key',
                    value: JSON.stringify(event),
                }]
            });

            await promise;

            await kafka.producer().disconnect();
        });
    });

That's all. Working with Kafka is incredibly easy if you use the NestJs framework. I hope my experience will be useful to you. An example application can be seen at https://github.com/waksund/kafka


Written by vdolzhenko | I'm a Team Lead in blockchain and .Net industry where I have designed architectures and developed projects from scratch.
Published by HackerNoon on 2023/12/28