আমরা অনেকেই বার্তা প্রকাশের জন্য কাফকা ব্যবহার করি, কিন্তু আমরা কীভাবে সেগুলি গ্রহণ করব? এই নিবন্ধে, আমরা কাফকার থেকে বার্তা গ্রহণের জন্য একটি ছোট অ্যাপ্লিকেশন লিখব। এবং, অবশ্যই, e2e পরীক্ষা।
কাফকা একটি বার্তা ব্রোকার যেখানে কিছু পরিষেবা বার্তা তৈরি করে এবং অন্যরা সেগুলি গ্রহণ করে। ব্রোকাররা প্রাথমিকভাবে একটি মাইক্রোসার্ভিস আর্কিটেকচার সহ সিস্টেমে পরিষেবাগুলির মধ্যে বার্তাগুলি পাস করার জন্য ব্যবহৃত হয়।
বার্তা বিষয় সংরক্ষণ করা হয়. একটি বার্তা পাঠানোর সময়, প্রযোজক বিষয়ের নাম নির্দেশ করে, সেইসাথে বার্তাটি নিজেই, যা একটি কী এবং একটি মান নিয়ে গঠিত। এবং এটাই; প্রযোজকের কাজ শেষ।
তারপর ভোক্তারা খেলতে আসে, তারা পছন্দসই বিষয়ের সদস্যতা নেয় এবং বার্তা পড়তে শুরু করে। প্রতিটি অ্যাপ্লিকেশানের নিজস্ব সারি রয়েছে, যা পড়ার মাধ্যমে গ্রাহক অফসেট পয়েন্টার সরান।
কাফকার স্বতন্ত্র বৈশিষ্ট্য হল:
এখন, NestJs ফ্রেমওয়ার্ক ব্যবহার করে কাফকার সাথে কাজ করা যাক। প্রথমত, আমাদের একটি নিয়ামক তৈরি করতে হবে যা বার্তাগুলিকে প্রক্রিয়া করবে।
@Controller() export class AppController{ constructor( private readonly appService: AppService, ) { } @EventPattern(config.get('kafka.topics.exampleTopic'), Transport.KAFKA) handleEvent( @Payload() payload: ExamplePayloadDto, ): Promise<void> { return this.appService.handleExampleEvent(payload.message); } }
@EventPattern
অ্যাট্রিবিউটের দিকে মনোযোগ দিন, যা নির্দেশ করে যে আমাদের handleEvent()
ফাংশন কনফিগারেশন ফাইল config.get('kafka.topics.exampleTopic')
এ নির্দিষ্ট বিষয় থেকে বার্তা পাবে। @Payload()
অ্যাট্রিবিউট টপিক মেসেজ থেকে মান পেতে সাহায্য করে।
আপনার অ্যাপ্লিকেশান কাফকা ব্রোকারদের সাথে সংযুক্ত করতে, আপনাকে দুটি জিনিস করতে হবে৷ শুরু করতে, স্টার্টআপ ফাইলে মাইক্রোসার্ভিস সংযোগ করুন:
app.connectMicroservice({ transport: Transport.KAFKA, options: { client: { clientId: config.get('kafka.clientId'), brokers: config.get('kafka.brokers'), retry: { retries: config.get('kafka.retryCount'), }, }, consumer: { groupId: config.get('kafka.consumer.groupId'), }, }, });
এবং তারপর main.ts এ মাইক্রোসার্ভিস চালান:
async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); appStartup(app); await app.startAllMicroservices(); await app.listen(config.get('app.port')); }; void bootstrap();
অ্যাপ্লিকেশন পরীক্ষা করতে, আমি @testcontainers/kafka প্যাকেজ ব্যবহার করি। এটির সাহায্যে, আমি একটি চিড়িয়াখানার ধারক এবং তারপর একটি কাফকা ধারক তৈরি করেছি:
export async function kafkaSetup(): Promise<StartedTestContainer[]> { const network = await new Network().start(); const zooKeeperHost = "zookeeper"; const zooKeeperPort = 2181; const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:7.3.2") .withNetwork(network) .withNetworkAliases(zooKeeperHost) .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) .withExposedPorts(zooKeeperPort) .start(); const kafkaPort = 9093; const kafkaContainer = await new KafkaContainer() .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) .withExposedPorts(kafkaPort) .start(); const externalPort = kafkaContainer.getMappedPort(kafkaPort); config.set('kafka.brokers', [`localhost:${externalPort}`]); return [ zookeeperContainer, kafkaContainer, ]; }
অনুগ্রহ করে মনে রাখবেন, এই ফাইলে, আমি নতুন তৈরি কন্টেইনারে ব্রোকার ঠিকানাটি ওভাররাইড করেছি।
টেস্ট ফাইলেই, beforeAll
ফাংশনে, আমি একটি কাফকা ক্লায়েন্ট তৈরি করি। প্রযোজকের সাথে, আমি একটি বিষয় তৈরি করি এবং আমাদের অ্যাপ্লিকেশন চালু করি।
beforeAll(async () => { kafkaContainers = await kafkaSetup(); kafka = new Kafka({ clientId: 'mock', brokers: config.get('kafka.brokers'), logLevel: logLevel.NOTHING, }); producer = kafka.producer(); await producer.connect(); const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: config.get('kafka.topics.exampleTopic') }], }); appService = mockDeep<AppService>(); const module: TestingModule = await Test.createTestingModule({ imports: [AppModule], }) .overrideProvider(AppService) .useValue(appService) .compile(); app = module.createNestApplication(); appStartup(app); await app.startAllMicroservices(); await app.init(); }, 30 * 1000);
অবশ্যই, afterAll
ফাংশনে, আপনাকে পাত্রগুলি বন্ধ করতে হবে:
afterAll(async () => { await app.close(); await Promise.all(kafkaContainers.map(c => c.stop())); }, 15 * 1000);
আমি একটি পরীক্ষা লিখেছিলাম যা যাচাই করে যে যখন একটি বিষয়ে একটি বার্তা আসে, নিয়ামক থেকে আমাদের হ্যান্ডলার ফাংশন প্রয়োজনীয় পরিষেবা ফাংশন কল করে। এটি করার জন্য, আমি handleExampleEvent
ফাংশনের বাস্তবায়ন ওভাররাইড করি এবং এটি কল করার জন্য অপেক্ষা করি।
describe('handleEvent', () => { it('should call appService', async () => { let resolve: (value: unknown) => void; const promise = new Promise((res) => { resolve = res; }); appService.handleExampleEvent.mockImplementation(async () => { resolve(0); }); const event: ExamplePayloadDto = { message: 'Hello World!', }; await producer.send({ topic: config.get('kafka.topics.exampleTopic'), messages: [{ key: 'key', value: JSON.stringify(event), }] }); await promise; await kafka.producer().disconnect(); }); });
এখানেই শেষ. আপনি যদি NestJs ফ্রেমওয়ার্ক ব্যবহার করেন তাহলে কাফকার সাথে কাজ করা অবিশ্বাস্যভাবে সহজ। আমি আশা করি আমার অভিজ্ঞতা আপনার কাজে লাগবে। একটি উদাহরণ অ্যাপ্লিকেশন https://github.com/waksund/kafka এ দেখা যেতে পারে