paint-brush
如何使用 NestJS 消费 Kafka 消息经过@vdolzhenko
6,868 讀數
6,868 讀數

如何使用 NestJS 消费 Kafka 消息

经过 Viktoria Dolzhenko7m2023/12/28
Read on Terminal Reader

太長; 讀書

Kafka 是一个消息代理,其中一些服务生成消息,另一些服务接收消息。在本文中,我们将编写一个小应用程序来消费来自 kafka 的消息。当然还有 e2e 测试。首先我们需要创建一个处理消息的控制器。然后运行NestJs框架中主服务中的微服务。
featured image - 如何使用 NestJS 消费 Kafka 消息
Viktoria Dolzhenko HackerNoon profile picture
0-item

我们很多人都使用Kafka来发布消息,但是我们如何接收消息呢?在本文中,我们将编写一个小型应用程序来消费来自 Kafka 的消息。当然,还有 e2e 测试。

让我们首先了解 Kafka 是如何工作的以及它是什么。

Kafka 是一个消息代理,其中一些服务生成消息,另一些服务接收消息。代理主要用于具有微服务架构的系统中,以在服务之间传递消息。


消息存储在主题中。发送消息时,生产者指示主题名称以及消息本身,其中由键和值组成。就是这样;制作人的工作完成了。


然后消费者开始发挥作用,他们订阅所需的主题,并开始阅读消息。每个应用程序都有自己的队列,消费者从中读取移动偏移指针。



卡夫卡的显着特点是:

  • 保证所有消息将按照它们到达主题的顺序精确排序


  • Kafka将读取的消息存储一段时间


  • 高通量


现在,让我们使用 NestJs 框架来使用 Kafka。首先,我们需要创建一个处理消息的控制器。


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


注意@EventPattern属性,它表示我们的handleEvent()函数将从配置文件config.get('kafka.topics.exampleTopic')中指定的主题接收消息。 @Payload()属性有助于从主题消息中获取值。


要将您的应用程序连接到 Kafka 代理,您需要做两件事。首先,连接启动文件中的微服务:


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


然后在main.ts中运行微服务:


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


为了测试应用程序,我使用 @testcontainers/kafka 包。在此帮助下,我创建了一个zooKeeper容器,然后创建了一个Kafka容器:


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


请注意,在此文件中,我已将代理地址覆盖为新创建的容器。

在测试文件本身的beforeAll函数中,我创建了一个 Kafka 客户端。我还与制作人一起创建一个主题并启动我们的应用程序。


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


当然,在afterAll函数中,您需要停止容器:


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


我编写了一个测试,验证当消息到达主题时,控制器中的处理程序函数会调用必要的服务函数。为此,我重写了handleExampleEvent函数的实现并等待它被调用。


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


就这样。如果您使用 NestJs 框架,使用 Kafka 会非常容易。我希望我的经验对你有用。示例应用程序可以在https://github.com/waksund/kafka查看