paint-brush
Comment consommer des messages Kafka avec NestJSby@vdolzhenko
3,603
3,603

Comment consommer des messages Kafka avec NestJS

Kafka est un courtier de messages dans lequel certains services génèrent des messages et d'autres les reçoivent. Dans cet article, nous allons écrire une petite application pour consommer les messages de Kafka. Et bien sûr les tests e2e. Nous devons d’abord créer un contrôleur qui traitera les messages. Exécutez ensuite les microservices dans les services principaux du framework NestJs.
featured image - Comment consommer des messages Kafka avec NestJS
Viktoria Dolzhenko HackerNoon profile picture
0-item

Nous sommes nombreux à utiliser Kafka pour publier des messages, mais comment les recevons-nous ? Dans cet article, nous allons écrire une petite application pour consommer les messages de Kafka. Et bien sûr, les tests e2e.

Comprenons d'abord comment fonctionne Kafka et ce que c'est.

Kafka est un courtier de messages dans lequel certains services génèrent des messages et d'autres les reçoivent. Les courtiers sont principalement utilisés dans les systèmes dotés d'une architecture de microservices pour transmettre des messages entre les services.


Les messages sont stockés dans des sujets. Lors de l'envoi d'un message, le producteur indique le nom du sujet, ainsi que le message lui-même, qui est constitué d'une clé et d'une valeur. Et c'est tout; le travail du producteur est terminé.


Ensuite, les consommateurs entrent en jeu, s’abonnent au sujet souhaité et commencent à lire les messages. Chaque application possède sa propre file d'attente de lecture à partir de laquelle le consommateur déplace le pointeur de décalage.



Les caractéristiques distinctives de Kafka sont :

  • Garantir que tous les messages seront classés exactement dans l'ordre dans lequel ils sont arrivés dans le sujet


  • Kafka stocke les messages lus pendant un certain temps


  • Débit élevé


Travaillons maintenant avec Kafka en utilisant le framework NestJs. Tout d’abord, nous devons créer un contrôleur qui traitera les messages.


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


Faites attention à l'attribut @EventPattern , qui indique que notre fonction handleEvent() recevra des messages du sujet spécifié dans le fichier de configuration config.get('kafka.topics.exampleTopic') . L'attribut @Payload() permet d'obtenir la valeur du message du sujet.


Pour connecter votre application aux courtiers Kafka, vous devez faire deux choses. Pour commencer, connectez le microservice dans le fichier de démarrage :


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


Et puis exécutez les microservices dans le fichier main.ts :


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


Pour tester l'application, j'utilise le package @testcontainers/kafka. Avec l'aide de cela, j'ai créé un conteneur zooKeeper, puis un conteneur Kafka :


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


Veuillez noter que, dans ce fichier, j'ai remplacé l'adresse du courtier par le conteneur nouvellement créé.

Dans le fichier de test lui-même, dans la fonction beforeAll , je crée un client Kafka. Avec le producteur, je crée également un sujet et lance notre application.


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


Bien entendu, dans la fonction afterAll , vous devez arrêter les conteneurs :


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


J'ai écrit un test qui vérifie que lorsqu'un message arrive dans un sujet, notre fonction de gestionnaire depuis le contrôleur appelle la fonction de service nécessaire. Pour ce faire, je remplace l'implémentation de la fonction handleExampleEvent et j'attends qu'elle soit appelée.


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


C'est tout. Travailler avec Kafka est incroyablement simple si vous utilisez le framework NestJs. J'espère que mon expérience vous sera utile. Un exemple d'application peut être consulté sur https://github.com/waksund/kafka