paint-brush
NestJS を使用して Kafka メッセージを使用する方法by@vdolzhenko
3,659
3,659

NestJS を使用して Kafka メッセージを使用する方法

Kafka は、一部のサービスがメッセージを生成し、他のサービスがメッセージを受信するメッセージ ブローカーです。この記事では、kafka からのメッセージを使用するための小さなアプリケーションを作成します。そしてもちろん e2e テスト。まず、メッセージを処理するコントローラーを作成する必要があります。次に、NestJs フレームワークのメイン サービスでマイクロサービスを実行します。
featured image - NestJS を使用して Kafka メッセージを使用する方法
Viktoria Dolzhenko HackerNoon profile picture
0-item

私たちの多くは Kafka を使用してメッセージをパブリッシュしますが、メッセージを受信するにはどうすればよいでしょうか?この記事では、Kafka からのメッセージを消費するための小さなアプリケーションを作成します。そしてもちろん、e2e テスト。

まず、Kafka がどのように機能し、それが何であるかを理解しましょう。

Kafka は、一部のサービスがメッセージを生成し、他のサービスがメッセージを受信するメッセージ ブローカーです。ブローカーは主に、サービス間でメッセージを渡すためにマイクロサービス アーキテクチャを備えたシステムで使用されます。


メッセージはトピックに保存されます。メッセージを送信するとき、プロデューサーはトピックの名前と、キーと値で構成されるメッセージ自体を示します。以上です;プロデューサーの仕事は終わった。


次に、消費者が登場し、目的のトピックを購読し、メッセージを読み始めます。各アプリケーションには独自のキューがあり、コンシューマはそこからオフセット ポインタを移動します。



Kafka の特徴は次のとおりです。

  • すべてのメッセージがトピックに到着した順序で正確に並べられることを保証します。


  • Kafka は既読メッセージをしばらく保存します


  • 高スループット


次に、NestJs フレームワークを使用して Kafka を操作してみましょう。まず、メッセージを処理するコントローラーを作成する必要があります。


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


@EventPattern属性に注目してください。これは、 handleEvent()関数が、構成ファイルconfig.get('kafka.topics.exampleTopic')で指定されたトピックからメッセージを受信することを示します。 @Payload()属性は、トピック メッセージから値を取得するのに役立ちます。


アプリケーションを Kafka ブローカーに接続するには、2 つのことを行う必要があります。まず、スタートアップ ファイルでマイクロサービスを接続します。


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


次に、main.ts でマイクロサービスを実行します。


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


アプリケーションをテストするには、@testcontainers/kafka パッケージを使用します。これを利用して、zooKeeper コンテナを作成し、次に Kafka コンテナを作成しました。


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


このファイルでは、ブローカー アドレスを新しく作成したコンテナーにオーバーライドしていることに注意してください。

テスト ファイル自体のbeforeAll関数で、Kafka クライアントを作成します。私もプロデューサーと一緒にトピックを作成し、アプリケーションを起動します。


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


もちろん、 afterAll関数では、コンテナーを停止する必要があります。


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


メッセージがトピックに到着すると、コントローラーからのハンドラー関数が必要なサービス関数を呼び出すことを検証するテストを作成しました。これを行うには、 handleExampleEvent関数の実装をオーバーライドし、それが呼び出されるのを待ちます。


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


それだけです。 NestJs フレームワークを使用すると、Kafka の操作が非常に簡単になります。私の経験があなたのお役に立てば幸いです。アプリケーションの例はhttps://github.com/waksund/kafkaでご覧いただけます。