Birçoğumuz mesajları yayınlamak için Kafka'yı kullanıyoruz, ancak bunları nasıl alıyoruz? Bu yazımızda Kafka'dan mesaj tüketmek için küçük bir uygulama yazacağız. Ve elbette e2e testleri.
Kafka, bazı servislerin mesaj ürettiği, diğerlerinin ise aldığı bir mesaj aracısıdır. Aracılar öncelikle mikro hizmet mimarisine sahip sistemlerde hizmetler arasında mesaj iletmek için kullanılır.
Mesajlar konularda saklanır. Bir mesaj gönderirken yapımcı, konunun adının yanı sıra bir anahtar ve değerden oluşan mesajın kendisini de belirtir. Ve bu kadar; Yapımcının işi bitti.
Daha sonra devreye tüketiciler giriyor, istedikleri konuya abone oluyor ve mesajları okumaya başlıyorlar. Her uygulamanın, tüketicinin ofset işaretçisini hareket ettirdiği kendi okuma kuyruğu vardır.
Kafka'nın ayırt edici özellikleri şunlardır:
Şimdi NestJs çerçevesini kullanarak Kafka ile çalışalım. Öncelikle mesajları işleyecek bir kontrolör oluşturmamız gerekiyor.
@Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }
handleEvent()
fonksiyonumuzun config.get('kafka.topics.exampleTopic')
yapılandırma dosyasında belirtilen konudan mesajlar alacağını belirten @EventPattern
niteliğine dikkat edin. @Payload()
özelliği, konu mesajından değerin alınmasına yardımcı olur.
Uygulamanızı Kafka aracılarına bağlamak için iki şey yapmanız gerekir. Başlamak için mikro hizmeti başlangıç dosyasına bağlayın:
app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });
Daha sonra mikro hizmetleri main.ts dosyasında çalıştırın:
async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();
Uygulamayı test etmek için @testcontainers/kafka paketini kullanıyorum. Bunun yardımıyla bir zooKeeper konteyneri ve ardından bir Kafka konteyneri oluşturdum:
export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }
Lütfen bu dosyada aracı adresini yeni oluşturulan kapsayıcıya geçersiz kıldığımı unutmayın.
Test dosyasının kendisinde beforeAll
işlevinde bir Kafka istemcisi oluşturuyorum. Yapımcıyla birlikte bir konu da oluşturup uygulamamızı başlatıyorum.
beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);
Elbette afterAll
işlevinde kapları durdurmanız gerekir:
afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);
Bir konuya mesaj geldiğinde denetleyicideki işleyici fonksiyonumuzun gerekli servis fonksiyonunu çağırdığını doğrulayan bir test yazdım. Bunu yapmak için, handleExampleEvent
fonksiyonunun uygulamasını geçersiz kılıyorum ve çağrılmasını bekliyorum.
describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });
Bu kadar. NestJs çerçevesini kullanırsanız Kafka ile çalışmak inanılmaz derecede kolaydır. Umarım deneyimim sizin için yararlı olacaktır. Örnek bir uygulama https://github.com/waksund/kafka adresinde görülebilir.