Viele von uns verwenden Kafka, um Nachrichten zu veröffentlichen, aber wie erhalten wir sie? In diesem Artikel werden wir eine kleine Anwendung zum Konsumieren von Nachrichten von Kafka schreiben. Und natürlich E2E-Tests.
Kafka ist ein Nachrichtenbroker, bei dem einige Dienste Nachrichten generieren und andere diese empfangen. Broker werden hauptsächlich in Systemen mit einer Microservice-Architektur verwendet, um Nachrichten zwischen Diensten weiterzuleiten.
Nachrichten werden in Themen gespeichert. Beim Senden einer Nachricht gibt der Produzent den Namen des Themas sowie die Nachricht selbst an, die aus einem Schlüssel und einem Wert besteht. Und das ist es; Die Arbeit des Produzenten ist beendet.
Dann kommen die Konsumenten ins Spiel, sie abonnieren das gewünschte Thema und beginnen, Nachrichten zu lesen. Jede Anwendung verfügt über eine eigene Warteschlange, aus der der Verbraucher den Offset-Zeiger verschiebt.
Die Besonderheiten von Kafka sind:
Lassen Sie uns nun mit Kafka unter Verwendung des NestJs-Frameworks arbeiten. Zuerst müssen wir einen Controller erstellen, der Nachrichten verarbeitet.
@Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }
Achten Sie auf das Attribut @EventPattern
, das angibt, dass unsere Funktion handleEvent()
Nachrichten von dem in der Konfigurationsdatei config.get('kafka.topics.exampleTopic')
angegebenen Thema empfängt. Das Attribut @Payload()
hilft dabei, den Wert aus der Themennachricht abzurufen.
Um Ihre Anwendung mit Kafka-Brokern zu verbinden, müssen Sie zwei Dinge tun. Verbinden Sie zunächst den Microservice in der Startdatei:
app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });
Und führen Sie dann die Microservices in main.ts aus:
async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();
Um die Anwendung zu testen, verwende ich das Paket @testcontainers/kafka. Mit dieser Hilfe habe ich einen zooKeeper-Container und dann einen Kafka-Container erstellt:
export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }
Bitte beachten Sie, dass ich in dieser Datei die Brokeradresse für den neu erstellten Container überschrieben habe.
In der Testdatei selbst, in der beforeAll
Funktion, erstelle ich einen Kafka-Client. Mit dem Produzenten erstelle ich auch ein Thema und starte unsere Anwendung.
beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);
Natürlich müssen Sie in der Funktion afterAll
die Container stoppen:
afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);
Ich habe einen Test geschrieben, der überprüft, ob unsere Handlerfunktion vom Controller die erforderliche Servicefunktion aufruft, wenn eine Nachricht in einem Thema eintrifft. Dazu überschreibe ich die Implementierung der Funktion handleExampleEvent
und warte auf ihren Aufruf.
describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });
Das ist alles. Die Arbeit mit Kafka ist unglaublich einfach, wenn Sie das NestJs-Framework verwenden. Ich hoffe, dass meine Erfahrung für Sie von Nutzen sein wird. Eine Beispielanwendung finden Sie unter https://github.com/waksund/kafka