Многие из нас используют Kafka для публикации сообщений, но как мы их получаем? В этой статье мы напишем небольшое приложение для получения сообщений от Kafka. И, конечно же, e2e-тесты.
Kafka — это брокер сообщений, в котором одни службы генерируют сообщения, а другие их получают. Брокеры в основном используются в системах с микросервисной архитектурой для передачи сообщений между сервисами.
Сообщения хранятся в темах. При отправке сообщения производитель указывает название темы, а также само сообщение, состоящее из ключа и значения. Вот и все; работа продюсера закончена.
Тогда в игру вступают потребители, они подписываются на нужную тему и начинают читать сообщения. У каждого приложения есть своя очередь, считывая из которой потребитель перемещает указатель смещения.
Отличительными чертами Кафки являются:
Теперь давайте поработаем с Kafka, используя фреймворк NestJs. Во-первых, нам нужно создать контроллер, который будет обрабатывать сообщения.
@Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }
Обратите внимание на атрибут @EventPattern
, который указывает, что наша функция handleEvent()
будет получать сообщения из темы, указанной в файле конфигурации config.get('kafka.topics.exampleTopic')
. Атрибут @Payload()
помогает получить значение из сообщения темы.
Чтобы подключить ваше приложение к брокерам Kafka, вам нужно сделать две вещи. Для начала подключите микросервис в файле запуска:
app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });
А затем запустите микросервисы в main.ts:
async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();
Для тестирования приложения я использую пакет @testcontainers/kafka. С помощью этого я создал контейнер ZooKeeper, а затем контейнер Kafka:
export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }
Обратите внимание, что в этом файле я переопределил адрес брокера для вновь созданного контейнера.
В самом тестовом файле в функции beforeAll
я создаю клиент Kafka. С продюсером я также создаю тему и запускаю наше приложение.
beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);
Разумеется, в функции afterAll
нужно остановить контейнеры:
afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);
Я написал тест, который проверяет, что при поступлении сообщения в топик наша функция-обработчик из контроллера вызывает необходимую сервисную функцию. Для этого я переопределяю реализацию функции handleExampleEvent
и жду ее вызова.
describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });
Вот и все. Работать с Kafka невероятно легко, если вы используете фреймворк NestJs. Надеюсь, мой опыт будет вам полезен. Пример приложения можно увидеть по адресу https://github.com/waksund/kafka .