paint-brush
Cách sử dụng tin nhắn Kafka với NestJSby@vdolzhenko
3,603
3,603

Cách sử dụng tin nhắn Kafka với NestJS

Kafka là một nhà môi giới tin nhắn trong đó một số dịch vụ tạo ra tin nhắn và những dịch vụ khác nhận chúng. Trong bài viết này, chúng tôi sẽ viết một ứng dụng nhỏ để tiêu thụ tin nhắn từ kafka. Và tất nhiên là các bài kiểm tra e2e. Đầu tiên chúng ta cần tạo một bộ điều khiển để xử lý tin nhắn. Sau đó chạy các vi dịch vụ trong các dịch vụ chính trong khung NestJs.
featured image - Cách sử dụng tin nhắn Kafka với NestJS
Viktoria Dolzhenko HackerNoon profile picture
0-item

Nhiều người trong chúng ta sử dụng Kafka để xuất bản tin nhắn, nhưng làm cách nào để nhận được chúng? Trong bài viết này, chúng tôi sẽ viết một ứng dụng nhỏ để tiêu thụ tin nhắn từ Kafka. Và tất nhiên, các bài kiểm tra e2e.

Trước tiên hãy hiểu Kafka hoạt động như thế nào và nó là gì.

Kafka là nhà môi giới tin nhắn trong đó một số dịch vụ tạo tin nhắn và những dịch vụ khác nhận chúng. Trình môi giới chủ yếu được sử dụng trong các hệ thống có kiến trúc vi dịch vụ để truyền thông điệp giữa các dịch vụ.


Tin nhắn được lưu trữ trong các chủ đề. Khi gửi tin nhắn, nhà sản xuất cho biết tên của chủ đề cũng như chính tin nhắn đó, bao gồm khóa và giá trị. Và thế là xong; công việc của nhà sản xuất đã hoàn thành.


Sau đó, người tiêu dùng bắt đầu tham gia, họ đăng ký chủ đề mong muốn và bắt đầu đọc tin nhắn. Mỗi ứng dụng có hàng đợi riêng, việc đọc từ đó người tiêu dùng sẽ di chuyển con trỏ offset.



Đặc điểm nổi bật của Kafka là:

  • Đảm bảo rằng tất cả các tin nhắn sẽ được sắp xếp chính xác theo trình tự mà chúng đến trong chủ đề


  • Cửa hàng Kafka đọc tin nhắn một lúc


  • Thông lượng cao


Bây giờ, hãy làm việc với Kafka bằng khung NestJs. Đầu tiên, chúng ta cần tạo một bộ điều khiển để xử lý tin nhắn.


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


Hãy chú ý đến thuộc tính @EventPattern , nó cho biết rằng hàm handleEvent() của chúng ta sẽ nhận được thông báo từ chủ đề được chỉ định trong tệp cấu hình config.get('kafka.topics.exampleTopic') . Thuộc tính @Payload() giúp lấy giá trị từ thông báo chủ đề.


Để kết nối ứng dụng của bạn với nhà môi giới Kafka, bạn cần thực hiện hai việc. Để bắt đầu, hãy kết nối microservice trong tệp khởi động:


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


Và sau đó chạy microservice trong main.ts:


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


Để kiểm tra ứng dụng, tôi sử dụng gói @testcontainers/kafka. Với sự trợ giúp của việc này, tôi đã tạo một thùng chứa ZooKeeper và sau đó là thùng chứa Kafka:


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


Xin lưu ý rằng, trong tệp này, tôi đã ghi đè địa chỉ người môi giới vào vùng chứa mới được tạo.

Trong chính tệp thử nghiệm, trong hàm beforeAll , tôi tạo một ứng dụng khách Kafka. Với nhà sản xuất, tôi cũng tạo một chủ đề và khởi chạy ứng dụng của mình.


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


Tất nhiên, trong hàm afterAll , bạn cần dừng các vùng chứa:


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


Tôi đã viết một bài kiểm tra để xác minh rằng khi có tin nhắn đến một chủ đề, hàm xử lý của chúng tôi từ bộ điều khiển sẽ gọi hàm dịch vụ cần thiết. Để làm điều này, tôi ghi đè việc triển khai hàm handleExampleEvent và đợi nó được gọi.


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


Đó là tất cả. Làm việc với Kafka cực kỳ dễ dàng nếu bạn sử dụng khung NestJs. Tôi hy vọng kinh nghiệm của tôi sẽ hữu ích cho bạn. Bạn có thể xem một ứng dụng ví dụ tại https://github.com/waksund/kafka