paint-brush
Node.js ट्यूटोरियल: Kafka के साथ एक सरल इवेंट-संचालित एप्लिकेशन कैसे बनाएंद्वारा@alvinslee
559 रीडिंग
559 रीडिंग

Node.js ट्यूटोरियल: Kafka के साथ एक सरल इवेंट-संचालित एप्लिकेशन कैसे बनाएं

द्वारा Alvin Lee14m2024/06/28
Read on Terminal Reader

बहुत लंबा; पढ़ने के लिए

Heroku पर Node.js और Apache Kafka का उपयोग करके एक सरल ईवेंट-संचालित एप्लिकेशन बनाने का तरीका जानें। यह गाइड Kafka क्लस्टर सेट अप करना, ईवेंट बनाने और उपभोग करने के लिए Node.js ऐप बनाना और Heroku पर एप्लिकेशन को तैनात करना शामिल करता है। अंत में, आपके पास रीयल-टाइम डेटा प्रोसेसिंग के साथ ईवेंट-संचालित आर्किटेक्चर का एक कार्यशील उदाहरण होगा।
featured image - Node.js ट्यूटोरियल: Kafka के साथ एक सरल इवेंट-संचालित एप्लिकेशन कैसे बनाएं
Alvin Lee HackerNoon profile picture

क्या आपने कभी सोचा है कि आपके कुछ पसंदीदा ऐप वास्तविक समय के अपडेट को कैसे संभालते हैं? लाइव स्पोर्ट्स स्कोर, स्टॉक मार्केट टिकर, या यहां तक कि सोशल मीडिया नोटिफिकेशन - वे सभी डेटा को तुरंत प्रोसेस करने के लिए इवेंट-ड्रिवन आर्किटेक्चर (EDA) पर निर्भर करते हैं। EDA एक वार्तालाप की तरह है जहां हर नई जानकारी तत्काल प्रतिक्रिया को ट्रिगर करती है। यह वही है जो किसी एप्लिकेशन को अधिक इंटरैक्टिव और प्रतिक्रियाशील बनाता है।


इस वॉकथ्रू में, हम आपको Heroku पर Apache Kafka का उपयोग करके एक सरल ईवेंट-संचालित एप्लिकेशन बनाने में मार्गदर्शन करेंगे। हम निम्नलिखित को कवर करेंगे:

  • Heroku पर काफ़्का क्लस्टर स्थापित करना


  • एक Node.js अनुप्रयोग का निर्माण करना जो घटनाओं का उत्पादन और उपभोग करता है


  • अपने एप्लिकेशन को Heroku पर तैनात करना


अपाचे काफ्का EDA सिस्टम बनाने के लिए एक शक्तिशाली उपकरण है। यह एक ओपन-सोर्स प्लेटफ़ॉर्म है जिसे वास्तविक समय के डेटा फ़ीड को संभालने के लिए डिज़ाइन किया गया है। अपाचे काफ्का ऑन हीरोकू एक हीरोकू ऐड-ऑन है जो काफ्का को एक सेवा के रूप में प्रदान करता है। हीरोकू अनुप्रयोगों को तैनात करना और प्रबंधित करना बहुत आसान बनाता है, और मैं हाल ही में अपने प्रोजेक्ट में इसका अधिक उपयोग कर रहा हूँ। जब आप इवेंट-संचालित एप्लिकेशन चलाना चाहते हैं तो काफ्का को हीरोकू के साथ संयोजित करना सेटअप प्रक्रिया को सरल बनाता है।


इस गाइड के अंत तक, आपके पास एक चालू एप्लीकेशन होगा जो Heroku पर Apache Kafka के साथ EDA की शक्ति को प्रदर्शित करता है। चलिए शुरू करते हैं!

शुरू करना

इससे पहले कि हम कोड में उतरें, आइए कुछ मुख्य अवधारणाओं की समीक्षा करें। एक बार जब आप इन्हें समझ लेंगे, तो आगे बढ़ना आसान हो जाएगा।


  • घटनाएँ डेटा के वे टुकड़े होते हैं जो सिस्टम में किसी घटना को दर्शाते हैं, जैसे किसी सेंसर से तापमान का पाठ्यांक।


  • विषय वे श्रेणियाँ या चैनल हैं जहाँ घटनाएँ प्रकाशित की जाती हैं। उन्हें उन विषयों के रूप में सोचें जिन्हें आप न्यूज़लेटर में सब्सक्राइब करते हैं।


  • निर्माता वे इकाइयाँ हैं जो घटनाओं को बनाते हैं और विषयों को भेजते हैं। हमारे डेमो EDA एप्लिकेशन में, हमारे निर्माता मौसम सेंसर का एक सेट होंगे।


  • उपभोक्ता वे इकाइयाँ हैं जो विषयों से घटनाओं को पढ़ते हैं और संसाधित करते हैं। हमारे एप्लिकेशन में एक उपभोक्ता होगा जो मौसम डेटा घटनाओं को सुनता है और उन्हें लॉग करता है।

हमारे एप्लिकेशन का परिचय

हम KafkaJS लाइब्रेरी का उपयोग करके Node.js एप्लिकेशन बनाएंगे। हमारा एप्लिकेशन कैसे काम करेगा, इसका एक त्वरित अवलोकन यहां दिया गया है:


  1. हमारे मौसम सेंसर (उत्पादक) समय-समय पर डेटा उत्पन्न करेंगे - जैसे तापमान, आर्द्रता और बैरोमीटर का दबाव - और इन घटनाओं को अपाचे काफ्का को भेजेंगे। डेमो उद्देश्यों के लिए, डेटा यादृच्छिक रूप से उत्पन्न किया जाएगा।


  2. हम एक उपभोक्ता को विषयों को सुनने के लिए रखेंगे। जब कोई नई घटना प्राप्त होगी, तो वह डेटा को लॉग में लिखेगा।


  3. हम संपूर्ण सेटअप को Heroku पर तैनात करेंगे और घटनाओं के घटित होने पर उन पर निगरानी रखने के लिए Heroku लॉग का उपयोग करेंगे।

आवश्यक शर्तें

शुरू करने से पहले, सुनिश्चित करें कि आपके पास निम्नलिखित हैं:


  • हेरोकू खाता: यदि आपके पास खाता नहीं है, तो हेरोकू पर साइन अप करें
  • Heroku CLI: Heroku CLI डाउनलोड करें और इंस्टॉल करें
  • विकास के लिए अपनी स्थानीय मशीन पर Node.js इंस्टॉल करें। अपनी मशीन पर, मैं Node (v.20.9.0) और npm (10.4.0) का उपयोग कर रहा हूँ।


इस पूरे प्रोजेक्ट का कोडबेस इस GitHub रिपॉजिटरी में उपलब्ध है। कोड को क्लोन करने और इस पोस्ट में उसका अनुसरण करने के लिए स्वतंत्र महसूस करें।


अब जबकि हमने मूल बातें समझ ली हैं, तो आइए Heroku पर अपना काफ्का क्लस्टर स्थापित करें और निर्माण शुरू करें।

Heroku पर काफ़्का क्लस्टर स्थापित करना

चलिए Heroku पर सब कुछ सेट अप करते हैं। यह एक बहुत ही तेज़ और आसान प्रक्रिया है।

चरण 1: Heroku CLI के माध्यम से लॉग इन करें

 ~/project$ heroku login

चरण 2: Heroku ऐप बनाएं

 ~/project$ heroku create weather-eda


(मैंने अपने Heroku ऐप का नाम weather-eda रखा है, लेकिन आप अपने ऐप के लिए एक अद्वितीय नाम चुन सकते हैं।)

चरण 3: Heroku ऐड-ऑन पर Apache Kafka जोड़ें

 ~/project$ heroku addons:create heroku-kafka:basic-0 Creating heroku-kafka:basic-0 on ⬢ weather-eda... ~$0.139/hour (max $100/month) The cluster should be available in a few minutes. Run `heroku kafka:wait` to wait until the cluster is ready. You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka kafka-adjacent-07560 is being created in the background. The app will restart when complete... Use heroku addons:info kafka-adjacent-07560 to check creation progress Use heroku addons:docs heroku-kafka to view documentation


आप Heroku ऐड-ऑन पर Apache Kafka के बारे में अधिक जानकारी यहाँ पा सकते हैं। हमारे डेमो के लिए, मैं ऐड-ऑन का बेसिक 0 टियर जोड़ रहा हूँ। ऐड-ऑन की लागत $0.139/घंटा है। जब मैं इस डेमो एप्लिकेशन को बना रहा था, तो मैंने ऐड-ऑन का एक घंटे से भी कम समय तक इस्तेमाल किया और फिर मैंने इसे बंद कर दिया।


हेरोकू को काफ़्का को आपके लिए तैयार करने में कुछ मिनट लगते हैं। बहुत जल्द, आप यह देखेंगे:


 ~/project$ heroku addons:info kafka-adjacent-07560 === kafka-adjacent-07560 Attachments: weather-eda::KAFKA Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time) Max Price: $100/month Owning app: weather-eda Plan: heroku-kafka:basic-0 Price: ~$0.139/hour State: created

चरण 4: काफ़्का क्रेडेंशियल और कॉन्फ़िगरेशन प्राप्त करें

हमारे काफ़्का क्लस्टर के चालू होने के बाद, हमें क्रेडेंशियल और अन्य कॉन्फ़िगरेशन प्राप्त करने की आवश्यकता होगी। Heroku हमारे एप्लिकेशन के लिए कई कॉन्फ़िगरेशन वैर बनाता है, उन्हें अभी बनाए गए काफ़्का क्लस्टर से जानकारी के साथ पॉप्युलेट करता है। हम निम्नलिखित चलाकर इन सभी कॉन्फ़िगरेशन वैर को देख सकते हैं:


 ~/project$ heroku config === weather-eda Config Vars KAFKA_CLIENT_CERT: -----BEGIN CERTIFICATE----- MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... -----END CERTIFICATE----- KAFKA_CLIENT_CERT_KEY: -----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3 ... -----END RSA PRIVATE KEY----- KAFKA_PREFIX: columbia-68051. KAFKA_TRUSTED_CERT: -----BEGIN CERTIFICATE----- MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4= -----END CERTIFICATE----- KAFKA_URL: kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096


जैसा कि आप देख सकते हैं, हमारे पास कई कॉन्फ़िगरेशन वैरिएबल हैं। हमें अपने प्रोजेक्ट रूट फ़ोल्डर में .env नामक एक फ़ाइल चाहिए जिसमें ये सभी कॉन्फ़िगरेशन वैर वैल्यू हों। ऐसा करने के लिए, हम बस निम्नलिखित कमांड चलाते हैं:


 ~/project$ heroku config --shell > .env


हमारी .env फ़ाइल इस तरह दिखती है:


 KAFKA_CLIENT_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_CLIENT_CERT_KEY="-----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----" KAFKA_PREFIX="columbia-68051." KAFKA_TRUSTED_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_URL="kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096"


इसके अलावा, हम अपनी .gitignore फ़ाइल में .env जोड़ना सुनिश्चित करते हैं। हम इस संवेदनशील डेटा को अपनी रिपॉजिटरी में जमा नहीं करना चाहेंगे।

चरण 5: Heroku CLI में Kafka प्लगइन स्थापित करें

Heroku CLI में Kafka से संबंधित कमांड नहीं आते हैं। चूँकि हम Kafka का उपयोग कर रहे हैं, इसलिए हमें CLI प्लगइन इंस्टॉल करना होगा।


 ~/project$ heroku plugins:install heroku-kafka Installing plugin heroku-kafka... installed v2.12.0


अब, हम CLI से अपने काफ़्का क्लस्टर का प्रबंधन कर सकते हैं।


 ~/project$ heroku kafka:info === KAFKA_URL Plan: heroku-kafka:basic-0 Status: available Version: 2.8.2 Created: 2024-05-27T18:44:38.023+00:00 Topics: [··········] 0 / 40 topics, see heroku kafka:topics Prefix: columbia-68051. Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor) Messages: 0 messages/s Traffic: 0 bytes/s in / 0 bytes/s out Data Size: [··········] 0 bytes / 4.00 GB (0.00%) Add-on: kafka-adjacent-07560 ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40)

चरण 6: क्लस्टर के साथ बातचीत का परीक्षण करें

बस एक समझदारी भरे परीक्षण के तौर पर, आइए हम अपने काफ़्का क्लस्टर के साथ खेलें। हम एक विषय बनाकर शुरू करते हैं।


 ~/project$ heroku kafka:topics:create test-topic-01 Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560... done Use `heroku kafka:topics:info test-topic-01` to monitor your topic. Your topic is using the prefix columbia-68051.. ~/project$ heroku kafka:topics:info test-topic-01 ▸ topic test-topic-01 is not available yet


एक या दो मिनट के भीतर हमारा विषय उपलब्ध हो जाता है।


 ~/project$ heroku kafka:topics:info test-topic-01 === kafka-adjacent-07560 :: test-topic-01 Topic Prefix: columbia-68051. Producers: 0 messages/second (0 bytes/second) total Consumers: 0 bytes/second total Partitions: 8 partitions Replication Factor: 3 Compaction: Compaction is disabled for test-topic-01 Retention: 24 hours


इसके बाद, इस टर्मिनल विंडो में, हम एक उपभोक्ता के रूप में कार्य करेंगे, इस विषय को सुनकर इसे सुनेंगे।


 ~/project$ heroku kafka:topics:tail test-topic-01


यहां से, टर्मिनल बस विषय पर प्रकाशित किसी भी घटना की प्रतीक्षा करता है।


एक अलग टर्मिनल विंडो में, हम निर्माता के रूप में कार्य करेंगे, और विषय पर कुछ संदेश प्रकाशित करेंगे।


 ~/project$ heroku kafka:topics:write test-topic-01 "hello world!"


अपने उपभोक्ता की टर्मिनल विंडो में वापस आकर हम यह देखते हैं:


 ~/project$ heroku kafka:topics:tail test-topic-01 test-topic-01 0 0 12 hello world!


बहुत बढ़िया! हमने अपने काफ़्का क्लस्टर में एक टॉपिक के लिए एक इवेंट को सफलतापूर्वक तैयार और उपयोग किया है। हम अपने Node.js एप्लिकेशन पर आगे बढ़ने के लिए तैयार हैं। आइए अपने प्लेग्राउंड को साफ-सुथरा रखने के लिए इस टेस्ट टॉपिक को नष्ट कर दें।


 ~/project$ heroku kafka:topics:destroy test-topic-01 ▸ This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda ▸ To proceed, type weather-eda or re-run this command with --confirm weather-eda > weather-eda Deleting topic test-topic-01... done Your topic has been marked for deletion, and will be removed from the cluster shortly ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40).

चरण 7: हमारे एप्लिकेशन के लिए काफ़्का तैयार करें

हमारे एप्लिकेशन को काफ़्का का उपयोग करने के लिए तैयार करने हेतु, हमें दो चीजें बनाने की आवश्यकता होगी: एक विषय और एक उपभोक्ता समूह।


आइए वह विषय बनाएं जिसका उपयोग हमारा एप्लिकेशन करेगा।


 ~/project$ heroku kafka:topics:create weather-data


इसके बाद, हम उपभोक्ता समूह बनाएंगे जिसका हिस्सा हमारे एप्लिकेशन का उपभोक्ता होगा:


 ~/project$ heroku kafka:consumer-groups:create weather-consumers


हम अपना Node.js एप्लिकेशन बनाने के लिए तैयार हैं!

एप्लिकेशन बनाएं

आइए एक नई परियोजना आरंभ करें और अपनी निर्भरताएं स्थापित करें।


 ~/project$ npm init -y ~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty


हमारी परियोजना में दो प्रक्रियाएँ चलेंगी:


  1. consumer.js , जो विषय के लिए सब्सक्राइब है और प्रकाशित होने वाली किसी भी घटना को लॉग करता है।


  2. producer.js , जो हर कुछ सेकंड में विषय पर कुछ यादृच्छिक मौसम डेटा प्रकाशित करेगा।


इन दोनों प्रक्रियाओं को हमारे काफ़्का क्लस्टर से कनेक्ट करने के लिए काफ़्काजेएस का उपयोग करने की आवश्यकता होगी, इसलिए हम अपने कोड को पुनः प्रयोज्य बनाने के लिए मॉड्यूलर बनाएंगे।

काफ़्का क्लाइंट के साथ काम करना

प्रोजेक्ट src फ़ोल्डर में, हम kafka.js नामक एक फ़ाइल बनाते हैं। यह इस तरह दिखता है:


 const { Kafka } = require('kafkajs'); const BROKER_URLS = process.env.KAFKA_URL.split(',').map(uri => uri.replace('kafka+ssl://','' )) const TOPIC = `${process.env.KAFKA_PREFIX}weather-data` const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers` const kafka = new Kafka({ clientId: 'weather-eda-app-nodejs-client', brokers: BROKER_URLS, ssl: { rejectUnauthorized: false, ca: process.env.KAFKA_TRUSTED_CERT, key: process.env.KAFKA_CLIENT_CERT_KEY, cert: process.env.KAFKA_CLIENT_CERT, }, }) const producer = async () => { const p = kafka.producer() await p.connect() return p; } const consumer = async () => { const c = kafka.consumer({ groupId: CONSUMER_GROUP, sessionTimeout: 30000 }) await c.connect() await c.subscribe({ topics: [TOPIC] }); return c; } module.exports = { producer, consumer, topic: TOPIC, groupId: CONSUMER_GROUP };


इस फ़ाइल में, हम एक नया काफ़्का क्लाइंट बनाकर शुरू करते हैं। इसके लिए काफ़्का ब्रोकर्स के लिए URL की आवश्यकता होती है, जिसे हम अपनी .env फ़ाइल में KAFKA_URL वैरिएबल से पार्स कर सकते हैं (जो मूल रूप से Heroku कॉन्फ़िगरेशन को कॉल करने से आया था)। कनेक्शन प्रयास को प्रमाणित करने के लिए, हमें KAFKA_TRUSTED_CERT , KAFKA_CLIENT_CERT_KEY , और KAFKA_CLIENT_CERT प्रदान करने की आवश्यकता है।


फिर, अपने काफ्का क्लाइंट से, हम एक producer और एक consumer बनाते हैं, और यह सुनिश्चित करते हैं कि हमारे उपभोक्ता weather-data विषय के सदस्य हों।

काफ़्का उपसर्ग पर स्पष्टीकरण

kafka.js में ध्यान दें कि हमने अपने विषय और उपभोक्ता समूह के नाम के आगे KAFKA_PREFIX जोड़ दिया है। हम Heroku पर Apache Kafka के लिए बेसिक 0 प्लान का उपयोग कर रहे हैं, जो एक मल्टी-टेनेंट Kafka प्लान है। इसका मतलब है कि हम KAFKA_PREFIX के साथ काम करते हैं । भले ही हमने अपने विषय का नाम weather-data और अपने उपभोक्ता समूह का नाम weather-consumers , लेकिन हमारे मल्टी-टेनेंट Kafka क्लस्टर में उनके वास्तविक नामों में KAFKA_PREFIX जोड़ा जाना चाहिए (यह सुनिश्चित करने के लिए कि वे अद्वितीय हैं)।


इसलिए, तकनीकी रूप से, हमारे डेमो के लिए, वास्तविक विषय का नाम columbia-68051.weather-data है, न कि weather-data । (उपभोक्ता समूह के नाम के लिए भी ऐसा ही है।)

निर्माता प्रक्रिया

अब, आइए अपनी बैकग्राउंड प्रक्रिया बनाएं जो हमारे मौसम सेंसर उत्पादकों के रूप में कार्य करेगी। हमारे प्रोजेक्ट रूट फ़ोल्डर में, हमारे पास producer.js नामक एक फ़ाइल है। यह इस तरह दिखता है:


 require('dotenv').config(); const kafka = require('./src/kafka.js'); const { faker } = require('@faker-js/faker'); const SENSORS = ['sensor01','sensor02','sensor03','sensor04','sensor05']; const MAX_DELAY_MS = 20000; const READINGS = ['temperature','humidity','barometric_pressure']; const MAX_TEMP = 130; const MIN_PRESSURE = 2910; const PRESSURE_RANGE = 160; const getRandom = (arr) => arr[faker.number.int(arr.length - 1)]; const getRandomReading = { temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100), humidity: () => faker.number.int(100) / 100, barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100 }; const sleep = (ms) => { return new Promise((resolve) => { setTimeout(resolve, ms); }); }; (async () => { const producer = await kafka.producer() while(true) { const sensor = getRandom(SENSORS) const reading = getRandom(READINGS) const value = getRandomReading[reading]() const data = { reading, value } await producer.send({ topic: kafka.topic, messages: [{ key: sensor, value: JSON.stringify(data) }] }) await sleep(faker.number.int(MAX_DELAY_MS)) } })()


फ़ाइल में बहुत सारा कोड यादृच्छिक मान उत्पन्न करने से संबंधित है। मैं महत्वपूर्ण भागों को हाइलाइट करूँगा:


  • हम पांच अलग-अलग मौसम सेंसर का अनुकरण करेंगे। उनके नाम SENSORS में पाए जाते हैं।


  • एक सेंसर तीन संभावित रीडिंग में से एक के लिए एक मान उत्सर्जित (प्रकाशित) करेगा: temperature , humidity , या barometric_pressuregetRandomReading ऑब्जेक्ट में इनमें से प्रत्येक रीडिंग के लिए एक फ़ंक्शन होता है, जो एक उचित संगत मान उत्पन्न करता है।


  • संपूर्ण प्रक्रिया एक अनंत while लूप के साथ एक async फ़ंक्शन के रूप में चलती है।


while लूप के भीतर, हम:


  • यादृच्छिक रूप से एक sensor चुनें.
  • यादृच्छिक रूप से एक reading चुनें।
  • उस रीडिंग के लिए एक यादृच्छिक value उत्पन्न करें.
  • इस डेटा को टॉपिक पर प्रकाशित करने के लिए producer.send कॉल करें। sensor इवेंट के लिए key के रूप में कार्य करता है, जबकि reading और value इवेंट संदेश का निर्माण करेंगे।
  • फिर, हम लूप की अगली पुनरावृत्ति से पहले 20 सेकंड तक प्रतीक्षा करते हैं।

उपभोक्ता प्रक्रिया

consumer.js में पृष्ठभूमि प्रक्रिया काफी सरल है।


 require('dotenv').config(); const logger = require('./src/logger.js'); const kafka = require('./src/kafka.js'); (async () => { const consumer = await kafka.consumer() await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const sensorId = message.key.toString() const messageObj = JSON.parse(message.value.toString()) const logMessage = { sensorId } logMessage[messageObj.reading] = messageObj.value logger.info(logMessage) } }) })()


हमारा consumer पहले से ही weather-data विषय के लिए सब्सक्राइब है। हम consumer.run को कॉल करते हैं, और फिर हम eachMessage के लिए एक हैंडलर सेट करते हैं। जब भी Kafka किसी संदेश के consumer को सूचित करता है, तो वह संदेश को लॉग करता है। बस इतना ही है।

प्रक्रियाएँ और Procfile

package.json फ़ाइल में, हमें कुछ scripts जोड़ने की ज़रूरत है जो हमारे निर्माता और उपभोक्ता पृष्ठभूमि प्रक्रियाओं को शुरू करती हैं। फ़ाइल में अब निम्नलिखित शामिल होना चाहिए:


 ... "scripts": { "start": "echo 'do nothing'", "start:consumer": "node consumer.js", "start:producer": "node producer.js" }, ...


इनमें से महत्वपूर्ण हैं start:consumer और start:producer । लेकिन हम अपनी फ़ाइल में start रखते हैं (भले ही यह कुछ भी सार्थक न करे) क्योंकि Heroku बिल्डर को इसकी अपेक्षा होती है।


इसके बाद, हम एक Procfile बनाते हैं जो Heroku को बताएगा कि हमारे Heroku ऐप के लिए हमें किन विभिन्न वर्कर्स की आवश्यकता है। हमारे प्रोजेक्ट के रूट फ़ोल्डर में, Procfile इस तरह दिखना चाहिए:


 consumer_worker: npm run start:consumer producer_worker: npm run start:producer


बहुत आसान है, है न? हमारे पास एक बैकग्राउंड प्रोसेस वर्कर होगा जिसे consumer_worker कहा जाता है, और दूसरा जिसे producer_worker कहा जाता है। आप देखेंगे कि हमारे पास web वर्कर नहीं है, जो कि आप आमतौर पर वेब एप्लिकेशन के लिए Procfile में देखते हैं। हमारे Heroku ऐप के लिए, हमें बस दो बैकग्राउंड वर्कर की आवश्यकता है। हमें web आवश्यकता नहीं है।

एप्लिकेशन को तैनात और परीक्षण करें

इसके साथ ही, हमारा पूरा कोड सेट हो गया है। हमने अपना पूरा कोड रेपो में डाल दिया है, और हम तैनाती के लिए तैयार हैं।


 ~/project$ git push heroku main … remote: -----> Build succeeded! … remote: -----> Compressing... remote: Done: 48.6M remote: -----> Launching... … remote: Verifying deploy... done


तैनाती के बाद, हम यह सुनिश्चित करना चाहते हैं कि हम अपने डायनो को ठीक से स्केल करें। हमें वेब प्रक्रिया के लिए डायनो की आवश्यकता नहीं है, लेकिन हमें consumer_worker और producer_worker दोनों के लिए एक की आवश्यकता होगी। हम अपनी ज़रूरतों के आधार पर इन प्रक्रियाओं को सेट करने के लिए निम्न कमांड चलाते हैं।


 ~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1 Scaling dynos... done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco


अब, सब कुछ चालू हो जाना चाहिए। पर्दे के पीछे, हमारे producer_worker काफ़्का क्लस्टर से जुड़ना चाहिए और फिर हर कुछ सेकंड में मौसम सेंसर डेटा प्रकाशित करना शुरू करना चाहिए। फिर, हमारे consumer_worker काफ़्का क्लस्टर से जुड़ना चाहिए और उस विषय से प्राप्त होने वाले किसी भी संदेश को लॉग करना चाहिए जिसकी वह सदस्यता लेता है।


यह देखने के लिए कि हमारा consumer_worker क्या कर रहा है, हम अपने Heroku लॉग्स में देख सकते हैं।


 ~/project$ heroku logs --tail … heroku[producer_worker.1]: Starting process with command `npm run start:producer` heroku[producer_worker.1]: State changed from starting to up app[producer_worker.1]: app[producer_worker.1]: > [email protected] start:producer app[producer_worker.1]: > node producer.js app[producer_worker.1]: … heroku[consumer_worker.1]: Starting process with command `npm run start:consumer` heroku[consumer_worker.1]: State changed from starting to up app[consumer_worker.1]: app[consumer_worker.1]: > [email protected] start:consumer app[consumer_worker.1]: > node consumer.js app[consumer_worker.1]: app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:20.660Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"columbia-68051.weather-consumers"} app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:23.702Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"columbia-68051.weather-consumers","memberId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","leaderId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","isLeader":true,"memberAssignment":{"columbia-68051.test-topic-1":[0,1,2,3,4,5,6,7]},"groupProtocol":"RoundRobinAssigner","duration":3041} app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {"sensorId":"sensor01","temperature":87.84} app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.3} app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {"sensorId":"sensor03","temperature":22.11} app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":29.71} app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {"sensorId":"sensor05","barometric_pressure":29.55} app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {"sensorId":"sensor04","temperature":90.58} app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {"sensorId":"sensor02","barometric_pressure":29.25} app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {"sensorId":"sensor04","humidity":0.1} app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.34} app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {"sensorId":"sensor02","humidity":0.61} app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":30.36} app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {"sensorId":"sensor03","temperature":104.55}


यह काम करता है! हम जानते हैं कि हमारा निर्माता समय-समय पर काफ़्का को संदेश प्रकाशित कर रहा है क्योंकि हमारा उपभोक्ता उन्हें प्राप्त कर रहा है और फिर उन्हें लॉग कर रहा है।


बेशक, एक बड़े EDA ऐप में, हर सेंसर एक निर्माता होता है। वे विभिन्न उद्देश्यों के लिए कई विषयों पर प्रकाशित कर सकते हैं, या वे सभी एक ही विषय पर प्रकाशित कर सकते हैं। और आपका उपभोक्ता कई विषयों के लिए सब्सक्राइब हो सकता है। साथ ही, हमारे डेमो ऐप में, हमारे उपभोक्ता बस eachMessage पर बहुत कुछ उत्सर्जित करते हैं; लेकिन एक EDA एप्लिकेशन में, एक उपभोक्ता किसी तृतीय-पक्ष API को कॉल करके, एक SMS अधिसूचना भेजकर या डेटाबेस से पूछताछ करके प्रतिक्रिया दे सकता है।


अब जब आपको घटनाओं, विषयों, उत्पादकों और उपभोक्ताओं की बुनियादी समझ हो गई है, और आप जानते हैं कि काफ़्का के साथ कैसे काम करना है, तो आप अधिक जटिल व्यावसायिक उपयोग मामलों को पूरा करने के लिए अपने स्वयं के EDA अनुप्रयोगों को डिज़ाइन और निर्माण करना शुरू कर सकते हैं।

निष्कर्ष

EDA बहुत शक्तिशाली है - आप आसान स्केलेबिलिटी और रीयल-टाइम डेटा प्रोसेसिंग जैसी प्रमुख सुविधाओं का आनंद लेते हुए अपने सिस्टम को अलग कर सकते हैं। EDA के लिए, Kafka एक महत्वपूर्ण उपकरण है, जो आपको आसानी से उच्च-थ्रूपुट डेटा स्ट्रीम को संभालने में मदद करता है। Heroku पर Apache Kafka का उपयोग करने से आपको जल्दी से शुरुआत करने में मदद मिलती है। चूंकि यह एक प्रबंधित सेवा है, इसलिए आपको Kafka क्लस्टर प्रबंधन के जटिल भागों के बारे में चिंता करने की आवश्यकता नहीं है। आप बस अपने ऐप्स बनाने पर ध्यान केंद्रित कर सकते हैं।


यहाँ से, आपके लिए प्रयोग और प्रोटोटाइप का समय आ गया है। पहचानें कि कौन से उपयोग के मामले EDA के साथ अच्छी तरह से फिट होते हैं। इसमें गोता लगाएँ, Heroku पर इसका परीक्षण करें, और कुछ अद्भुत बनाएँ। हैप्पी कोडिंग!