Muitos de nós usamos o Kafka para publicar mensagens, mas como as recebemos? Neste artigo, escreveremos um pequeno aplicativo para consumir mensagens do Kafka. E, claro, testes e2e.
Kafka é um corretor de mensagens no qual alguns serviços geram mensagens e outros as recebem. Os corretores são usados principalmente em sistemas com arquitetura de microsserviços para passar mensagens entre serviços.
As mensagens são armazenadas em tópicos. Ao enviar uma mensagem, o produtor indica o nome do tópico, bem como a própria mensagem, que consiste em uma chave e um valor. E é isso; o trabalho do produtor está concluído.
Aí os consumidores entram em cena, se inscrevem no tema desejado e começam a ler as mensagens. Cada aplicação possui sua própria fila, a partir da qual o consumidor move o ponteiro de deslocamento.
As características distintivas do Kafka são:
Agora, vamos trabalhar com Kafka usando a estrutura NestJs. Primeiro, precisamos criar um controlador que irá processar mensagens.
@Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }
Preste atenção ao atributo @EventPattern
, que indica que nossa função handleEvent()
receberá mensagens do tópico especificado no arquivo de configuração config.get('kafka.topics.exampleTopic')
. O atributo @Payload()
ajuda a obter o valor da mensagem do tópico.
Para conectar seu aplicativo aos corretores Kafka, você precisa fazer duas coisas. Para começar, conecte o microsserviço no arquivo de inicialização:
app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });
E então execute os microsserviços no main.ts:
async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();
Para testar o aplicativo, uso o pacote @testcontainers/kafka. Com a ajuda disso, criei um contêiner zooKeeper e, em seguida, um contêiner Kafka:
export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }
Observe que, neste arquivo, substituí o endereço do corretor pelo contêiner recém-criado.
No próprio arquivo de teste, na função beforeAll
, crio um cliente Kafka. Com o produtor também crio um tópico e lanço nosso aplicativo.
beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);
Claro, na função afterAll
, você precisa parar os contêineres:
afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);
Escrevi um teste que verifica se quando uma mensagem chega em um tópico, nossa função manipuladora do controlador chama a função de serviço necessária. Para fazer isso, substituo a implementação da função handleExampleEvent
e espero que ela seja chamada.
describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });
Isso é tudo. Trabalhar com Kafka é incrivelmente fácil se você usar a estrutura NestJs. Espero que minha experiência seja útil para você. Um exemplo de aplicativo pode ser visto em https://github.com/waksund/kafka