paint-brush
So konsumieren Sie Kafka-Nachrichten mit NestJSby@vdolzhenko
3,603
3,603

So konsumieren Sie Kafka-Nachrichten mit NestJS

Kafka ist ein Nachrichtenbroker, bei dem einige Dienste Nachrichten generieren und andere diese empfangen. In diesem Artikel werden wir eine kleine Anwendung zum Konsumieren von Nachrichten von Kafka schreiben. Und natürlich E2E-Tests. Zuerst müssen wir einen Controller erstellen, der Nachrichten verarbeitet. Führen Sie dann die Microservices in den Hauptdiensten im NestJs-Framework aus.
featured image - So konsumieren Sie Kafka-Nachrichten mit NestJS
Viktoria Dolzhenko HackerNoon profile picture
0-item

Viele von uns verwenden Kafka, um Nachrichten zu veröffentlichen, aber wie erhalten wir sie? In diesem Artikel werden wir eine kleine Anwendung zum Konsumieren von Nachrichten von Kafka schreiben. Und natürlich E2E-Tests.

Lassen Sie uns zunächst verstehen, wie Kafka funktioniert und was es ist.

Kafka ist ein Nachrichtenbroker, bei dem einige Dienste Nachrichten generieren und andere diese empfangen. Broker werden hauptsächlich in Systemen mit einer Microservice-Architektur verwendet, um Nachrichten zwischen Diensten weiterzuleiten.


Nachrichten werden in Themen gespeichert. Beim Senden einer Nachricht gibt der Produzent den Namen des Themas sowie die Nachricht selbst an, die aus einem Schlüssel und einem Wert besteht. Und das ist es; Die Arbeit des Produzenten ist beendet.


Dann kommen die Konsumenten ins Spiel, sie abonnieren das gewünschte Thema und beginnen, Nachrichten zu lesen. Jede Anwendung verfügt über eine eigene Warteschlange, aus der der Verbraucher den Offset-Zeiger verschiebt.



Die Besonderheiten von Kafka sind:

  • Stellen Sie sicher, dass alle Nachrichten genau in der Reihenfolge sortiert werden, in der sie im Thema angekommen sind


  • Kafka speichert gelesene Nachrichten eine Zeit lang


  • Hoher Durchsatz


Lassen Sie uns nun mit Kafka unter Verwendung des NestJs-Frameworks arbeiten. Zuerst müssen wir einen Controller erstellen, der Nachrichten verarbeitet.


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


Achten Sie auf das Attribut @EventPattern , das angibt, dass unsere Funktion handleEvent() Nachrichten von dem in der Konfigurationsdatei config.get('kafka.topics.exampleTopic') angegebenen Thema empfängt. Das Attribut @Payload() hilft dabei, den Wert aus der Themennachricht abzurufen.


Um Ihre Anwendung mit Kafka-Brokern zu verbinden, müssen Sie zwei Dinge tun. Verbinden Sie zunächst den Microservice in der Startdatei:


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


Und führen Sie dann die Microservices in main.ts aus:


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


Um die Anwendung zu testen, verwende ich das Paket @testcontainers/kafka. Mit dieser Hilfe habe ich einen zooKeeper-Container und dann einen Kafka-Container erstellt:


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


Bitte beachten Sie, dass ich in dieser Datei die Brokeradresse für den neu erstellten Container überschrieben habe.

In der Testdatei selbst, in der beforeAll Funktion, erstelle ich einen Kafka-Client. Mit dem Produzenten erstelle ich auch ein Thema und starte unsere Anwendung.


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


Natürlich müssen Sie in der Funktion afterAll die Container stoppen:


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


Ich habe einen Test geschrieben, der überprüft, ob unsere Handlerfunktion vom Controller die erforderliche Servicefunktion aufruft, wenn eine Nachricht in einem Thema eintrifft. Dazu überschreibe ich die Implementierung der Funktion handleExampleEvent und warte auf ihren Aufruf.


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


Das ist alles. Die Arbeit mit Kafka ist unglaublich einfach, wenn Sie das NestJs-Framework verwenden. Ich hoffe, dass meine Erfahrung für Sie von Nutzen sein wird. Eine Beispielanwendung finden Sie unter https://github.com/waksund/kafka