paint-brush
Как использовать сообщения Kafka с помощью NestJSby@vdolzhenko
3,603
3,603

Как использовать сообщения Kafka с помощью NestJS

Kafka — это брокер сообщений, в котором одни службы генерируют сообщения, а другие их получают. В этой статье мы напишем небольшое приложение для получения сообщений из Kafka. И, конечно же, e2e-тесты. Сначала нам нужно создать контроллер, который будет обрабатывать сообщения. Затем запустите микросервисы в основных сервисах платформы NestJs.
featured image - Как использовать сообщения Kafka с помощью NestJS
Viktoria Dolzhenko HackerNoon profile picture
0-item

Многие из нас используют Kafka для публикации сообщений, но как мы их получаем? В этой статье мы напишем небольшое приложение для получения сообщений от Kafka. И, конечно же, e2e-тесты.

Давайте сначала разберемся, как работает Kafka и что это такое.

Kafka — это брокер сообщений, в котором одни службы генерируют сообщения, а другие их получают. Брокеры в основном используются в системах с микросервисной архитектурой для передачи сообщений между сервисами.


Сообщения хранятся в темах. При отправке сообщения производитель указывает название темы, а также само сообщение, состоящее из ключа и значения. Вот и все; работа продюсера закончена.


Тогда в игру вступают потребители, они подписываются на нужную тему и начинают читать сообщения. У каждого приложения есть своя очередь, считывая из которой потребитель перемещает указатель смещения.



Отличительными чертами Кафки являются:

  • Гарантия, что все сообщения будут упорядочены именно в той последовательности, в которой они поступили в тему.


  • Kafka некоторое время сохраняет прочитанные сообщения


  • Высокая пропускная способность


Теперь давайте поработаем с Kafka, используя фреймворк NestJs. Во-первых, нам нужно создать контроллер, который будет обрабатывать сообщения.


 @Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }


Обратите внимание на атрибут @EventPattern , который указывает, что наша функция handleEvent() будет получать сообщения из темы, указанной в файле конфигурации config.get('kafka.topics.exampleTopic') . Атрибут @Payload() помогает получить значение из сообщения темы.


Чтобы подключить ваше приложение к брокерам Kafka, вам нужно сделать две вещи. Для начала подключите микросервис в файле запуска:


 app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });


А затем запустите микросервисы в main.ts:


 async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();


Для тестирования приложения я использую пакет @testcontainers/kafka. С помощью этого я создал контейнер ZooKeeper, а затем контейнер Kafka:


 export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }


Обратите внимание, что в этом файле я переопределил адрес брокера для вновь созданного контейнера.

В самом тестовом файле в функции beforeAll я создаю клиент Kafka. С продюсером я также создаю тему и запускаю наше приложение.


 beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);


Разумеется, в функции afterAll нужно остановить контейнеры:


 afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);


Я написал тест, который проверяет, что при поступлении сообщения в топик наша функция-обработчик из контроллера вызывает необходимую сервисную функцию. Для этого я переопределяю реализацию функции handleExampleEvent и жду ее вызова.


 describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });


Вот и все. Работать с Kafka невероятно легко, если вы используете фреймворк NestJs. Надеюсь, мой опыт будет вам полезен. Пример приложения можно увидеть по адресу https://github.com/waksund/kafka .