paint-brush
Como consumir mensagens Kafka com NestJSby@vdolzhenko
3,659
3,659

Como consumir mensagens Kafka com NestJS

Kafka é um corretor de mensagens no qual alguns serviços geram mensagens e outros as recebem. Neste artigo escreveremos um pequeno aplicativo para consumir mensagens do kafka. E, claro, testes e2e. Primeiro precisamos criar um controlador que irá processar mensagens. Em seguida, execute os microsserviços nos principais serviços do framework NestJs.
featured image - Como consumir mensagens Kafka com NestJS
Viktoria Dolzhenko HackerNoon profile picture
0-item

Muitos de nós usamos o Kafka para publicar mensagens, mas como as recebemos? Neste artigo, escreveremos um pequeno aplicativo para consumir mensagens do Kafka. E, claro, testes e2e.

Vamos primeiro entender como funciona o Kafka e o que é.

Kafka é um corretor de mensagens no qual alguns serviços geram mensagens e outros as recebem. Os corretores são usados principalmente em sistemas com arquitetura de microsserviços para passar mensagens entre serviços.


As mensagens são armazenadas em tópicos. Ao enviar uma mensagem, o produtor indica o nome do tópico, bem como a própria mensagem, que consiste em uma chave e um valor. E é isso; o trabalho do produtor está concluído.


Aí os consumidores entram em cena, se inscrevem no tema desejado e começam a ler as mensagens. Cada aplicação possui sua própria fila, a partir da qual o consumidor move o ponteiro de deslocamento.



As características distintivas do Kafka são:

  • Garantir que todas as mensagens serão ordenadas exatamente na sequência em que chegaram ao tópico


  • Kafka armazena mensagens lidas por um tempo


  • Alto rendimento


Agora, vamos trabalhar com Kafka usando a estrutura NestJs. Primeiro, precisamos criar um controlador que irá processar mensagens.


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


Preste atenção ao atributo @EventPattern , que indica que nossa função handleEvent() receberá mensagens do tópico especificado no arquivo de configuração config.get('kafka.topics.exampleTopic') . O atributo @Payload() ajuda a obter o valor da mensagem do tópico.


Para conectar seu aplicativo aos corretores Kafka, você precisa fazer duas coisas. Para começar, conecte o microsserviço no arquivo de inicialização:


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


E então execute os microsserviços no main.ts:


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


Para testar o aplicativo, uso o pacote @testcontainers/kafka. Com a ajuda disso, criei um contêiner zooKeeper e, em seguida, um contêiner Kafka:


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


Observe que, neste arquivo, substituí o endereço do corretor pelo contêiner recém-criado.

No próprio arquivo de teste, na função beforeAll , crio um cliente Kafka. Com o produtor também crio um tópico e lanço nosso aplicativo.


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


Claro, na função afterAll , você precisa parar os contêineres:


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


Escrevi um teste que verifica se quando uma mensagem chega em um tópico, nossa função manipuladora do controlador chama a função de serviço necessária. Para fazer isso, substituo a implementação da função handleExampleEvent e espero que ela seja chamada.


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


Isso é tudo. Trabalhar com Kafka é incrivelmente fácil se você usar a estrutura NestJs. Espero que minha experiência seja útil para você. Um exemplo de aplicativo pode ser visto em https://github.com/waksund/kafka