paint-brush
मैंने MongoDB-संचालित संदेश कतार क्यों बनाईद्वारा@allquiet
11,454 रीडिंग
11,454 रीडिंग

मैंने MongoDB-संचालित संदेश कतार क्यों बनाई

द्वारा All Quiet12m2023/08/27
Read on Terminal Reader

बहुत लंबा; पढ़ने के लिए

आप MongoDB के साथ HA और निष्पादक संदेश कतार बना सकते हैं क्योंकि यह समवर्ती परमाणु पढ़ने/अद्यतन संचालन के साथ-साथ परिवर्तन स्ट्रीम भी प्रदान करता है।
featured image - मैंने MongoDB-संचालित संदेश कतार क्यों बनाई
All Quiet HackerNoon profile picture
0-item
1-item
2-item


अरे👋


मैं मैड्स क्विस्ट, ऑल क्वाइट का संस्थापक हूं। हमने MongoDB पर आधारित एक घरेलू संदेश कतार लागू की है और मैं यहां इस बारे में बात करने के लिए हूं:

  1. हमने पहिये का पुनः आविष्कार क्यों किया?
  2. हमने पहिये का पुनः आविष्कार कैसे किया


1. हमने पहिये का पुनः आविष्कार क्यों किया?

हमें संदेश कतारबद्ध करने की आवश्यकता क्यों है?

ऑल क्वाइट पेजरड्यूटी के समान एक आधुनिक घटना प्रबंधन मंच है।


हमारे प्लेटफ़ॉर्म को निम्न सुविधाओं की आवश्यकता है:


  • उपयोगकर्ता के पंजीकरण के बाद एसिंक्रोनस रूप से डबल-ऑप्ट-इन ईमेल भेजना
  • पंजीकरण के 24 घंटे बाद एक अनुस्मारक ईमेल भेजना
  • फायरबेस क्लाउड मैसेजिंग (एफसीएम) के साथ पुश नोटिफिकेशन भेजना, जो नेटवर्क या लोड समस्याओं के कारण विफल हो सकता है। चूंकि पुश नोटिफिकेशन हमारे ऐप के लिए महत्वपूर्ण हैं, इसलिए यदि कोई समस्या है तो हमें उन्हें भेजने का पुनः प्रयास करना होगा।
  • हमारे एकीकरण के बाहर से ईमेल स्वीकार करना और उन्हें घटनाओं में संसाधित करना। यह प्रक्रिया विफल हो सकती है, इसलिए हम इसे अलग करना चाहते थे और प्रत्येक ईमेल पेलोड को एक कतार में संसाधित करना चाहते थे।




हमारा तकनीकी ढेर

हमारी विशिष्ट आवश्यकताओं को समझने के लिए, हमारे तकनीकी स्टैक में कुछ अंतर्दृष्टि प्राप्त करना महत्वपूर्ण है:


  • हम .NET Core 7 पर आधारित एक अखंड वेब एप्लिकेशन चलाते हैं।
  • .NET कोर एप्लिकेशन एक डॉकर कंटेनर में चलता है।
  • हम समानांतर में कई कंटेनर चलाते हैं।
  • एक HAProxy इंस्टेंस HTTP अनुरोधों को प्रत्येक कंटेनर में समान रूप से वितरित करता है, जिससे अत्यधिक उपलब्ध सेटअप सुनिश्चित होता है।
  • हम अपने अंतर्निहित डेटाबेस के रूप में MongoDB का उपयोग करते हैं, जिसे उपलब्धता क्षेत्रों में दोहराया जाता है।
  • उपरोक्त सभी घटकों को AWS द्वारा जेनेरिक EC2 VMs पर होस्ट किया गया है।

हमने पहिये का पुनः आविष्कार क्यों किया?

  • हम एक सरल कतार तंत्र चाहते थे जो एक साथ कई प्रक्रियाओं में चल सके और यह गारंटी दे कि प्रत्येक संदेश को केवल एक बार संसाधित किया गया था।
  • हमें पब/उप पैटर्न की आवश्यकता नहीं थी।
  • हमने CQRS/इवेंट सोर्सिंग पर आधारित एक जटिल वितरित प्रणाली का लक्ष्य नहीं रखा था क्योंकि, आप जानते हैं, वितरित प्रणालियों का पहला नियम वितरित नहीं करना है।
  • हम " उबाऊ तकनीक " चुनने के दर्शन का पालन करते हुए चीजों को यथासंभव सरल रखना चाहते थे।


अंततः, यह आपके बुनियादी ढांचे में चलने वाले हिस्सों की संख्या को कम करने के बारे में है। हमारा लक्ष्य अपने उत्कृष्ट ग्राहकों के लिए शानदार सुविधाएँ बनाना है, और हमारी सेवाओं को विश्वसनीय रूप से बनाए रखना अनिवार्य है। पांच नाइन से अधिक अपटाइम प्राप्त करने के लिए एकल डेटाबेस प्रणाली का प्रबंधन करना काफी चुनौतीपूर्ण है। तो एक अतिरिक्त HA RabbitMQ क्लस्टर के प्रबंधन का बोझ अपने ऊपर क्यों डालें?


सिर्फ AWS SQS का उपयोग क्यों न करें?

हाँ... AWS SQS, Google क्लाउड टास्क, या Azure क्यू स्टोरेज जैसे क्लाउड समाधान शानदार हैं! हालाँकि, उनके परिणामस्वरूप वेंडर लॉक-इन हो जाता। हम बस अपने ग्राहकों को स्केलेबल सेवा प्रदान करते हुए स्वतंत्र और लागत प्रभावी होने की आकांक्षा रखते हैं।



2. हमने पहिये का पुनः आविष्कार कैसे किया

संदेश कतार क्या है?

संदेश कतार एक ऐसी प्रणाली है जो संदेशों को संग्रहीत करती है। संदेशों के निर्माता इन्हें कतार में संग्रहीत करते हैं, जिन्हें बाद में प्रसंस्करण के लिए उपभोक्ताओं द्वारा हटा दिया जाता है। यह घटकों को अलग करने के लिए अविश्वसनीय रूप से फायदेमंद है, खासकर जब संदेशों को संसाधित करना एक संसाधन-गहन कार्य है।


हमारी कतार में क्या विशेषताएँ दिखनी चाहिए?

  • हमारे डेटा भंडारण के रूप में MongoDB का उपयोग करना
  • यह गारंटी देना कि प्रत्येक संदेश का उपयोग केवल एक बार किया जाता है
  • एकाधिक उपभोक्ताओं को एक साथ संदेशों को संसाधित करने की अनुमति देना
  • यह सुनिश्चित करना कि यदि संदेश प्रसंस्करण विफल हो जाता है, तो पुनः प्रयास संभव है
  • भविष्य के लिए संदेश उपभोग का शेड्यूल सक्षम करना
  • गारंटीकृत ऑर्डर की आवश्यकता नहीं है
  • उच्च उपलब्धता सुनिश्चित करना
  • यह सुनिश्चित करना कि संदेश और उनकी स्थितियाँ टिकाऊ हों और पुनरारंभ या विस्तारित डाउनटाइम का सामना कर सकें


MongoDB पिछले कुछ वर्षों में महत्वपूर्ण रूप से विकसित हुआ है और ऊपर सूचीबद्ध मानदंडों को पूरा कर सकता है।


कार्यान्वयन

आगे आने वाले अनुभागों में, मैं आपको हमारी संदेश कतार के MongoDB-विशिष्ट कार्यान्वयन के माध्यम से मार्गदर्शन करूंगा। हालाँकि आपको अपनी पसंदीदा प्रोग्रामिंग भाषा के लिए उपयुक्त क्लाइंट लाइब्रेरी की आवश्यकता होगी, जैसे कि ऑल क्वाइट के मामले में NodeJS, Go, या C#, जो अवधारणाएँ मैं साझा करूँगा वे प्लेटफ़ॉर्म अज्ञेयवादी हैं।


कतारों

प्रत्येक कतार जिसे आप उपयोग करना चाहते हैं उसे आपके MongoDB डेटाबेस में एक समर्पित संग्रह के रूप में दर्शाया गया है।

संदेश मॉडल

यहां संसाधित संदेश का एक उदाहरण दिया गया है:

 { "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }


आइए संदेश की प्रत्येक संपत्ति को देखें।


_पहचान

_id फ़ील्ड MongoDB की विहित विशिष्ट पहचानकर्ता संपत्ति है। यहां, इसमें NumberLong शामिल है, कोई objectId नहीं। हमें ObjectId के बजाय NumberLong आवश्यकता है क्योंकि:



जबकि ऑब्जेक्टआईडी मान समय के साथ बढ़ना चाहिए, वे आवश्यक रूप से मोनोटोनिक नहीं हैं। ऐसा इसलिए है क्योंकि वे:

  • केवल एक सेकंड का अस्थायी समाधान होता है, इसलिए एक ही सेकंड के भीतर बनाए गए ऑब्जेक्टआईडी मानों में कोई गारंटीकृत क्रम नहीं होता है, और
  • क्लाइंट द्वारा जेनरेट किए जाते हैं, जिनकी सिस्टम क्लॉक अलग-अलग हो सकती हैं।


हमारे सी# कार्यान्वयन में, हम प्रविष्टि समय के आधार पर मिलीसेकंड परिशुद्धता और गारंटीकृत ऑर्डर के साथ एक आईडी उत्पन्न करते हैं। हालाँकि हमें बहु-उपभोक्ता वातावरण (RabbitMQ के समान) में सख्त प्रसंस्करण आदेश की आवश्यकता नहीं है, केवल एक उपभोक्ता के साथ काम करते समय FIFO ऑर्डर बनाए रखना आवश्यक है। ऑब्जेक्टआईडी के साथ इसे हासिल करना संभव नहीं है। यदि यह आपके लिए महत्वपूर्ण नहीं है, तो भी आप ऑब्जेक्टआईडी का उपयोग कर सकते हैं।


वे स्थितियां

स्टेटस प्रॉपर्टी में एक सरणी होती है जिसमें संदेश प्रसंस्करण इतिहास होता है। इंडेक्स 0 पर, आपको वर्तमान स्थिति मिलेगी, जो इंडेक्सिंग के लिए महत्वपूर्ण है।


स्टेटस ऑब्जेक्ट में स्वयं तीन गुण होते हैं:

  • Status : "एनक्यूड", "प्रोसेसिंग", "प्रोसेस्ड", या "असफल" हो सकता है।
  • Timestamp : यह वर्तमान टाइमस्टैम्प को कैप्चर करता है।
  • NextReevaluation : यह रिकॉर्ड करता है कि अगला मूल्यांकन कब होना चाहिए, जो पुनर्प्रयास और भविष्य के निर्धारित निष्पादन दोनों के लिए आवश्यक है।


पेलोड

इस संपत्ति में आपके संदेश का विशिष्ट पेलोड शामिल है।


एक संदेश कतारबद्ध करना

एक संदेश जोड़ना संग्रह में एक सीधा सम्मिलन ऑपरेशन है, जिसकी स्थिति "एनक्यूड" पर सेट है।

  • तत्काल प्रसंस्करण के लिए, NextReevaluation को null पर सेट करें।
  • भविष्य में प्रसंस्करण के लिए, जब आप चाहते हैं कि आपका संदेश संसाधित हो, तो NextReevaluation को टाइमस्टैम्प पर सेट करें।
 db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });


किसी संदेश को कतारबद्ध करना

कतारबद्ध करना थोड़ा अधिक जटिल है लेकिन फिर भी अपेक्षाकृत सरल है। यह MongoDB की समवर्ती परमाणु पढ़ने और अद्यतन क्षमताओं पर बहुत अधिक निर्भर करता है।


MongoDB की यह आवश्यक सुविधा सुनिश्चित करती है:

  • प्रत्येक संदेश केवल एक बार संसाधित होता है।
  • एकाधिक उपभोक्ता एक साथ संदेशों को सुरक्षित रूप से संसाधित कर सकते हैं।


 db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


इसलिए हम एक संदेश पढ़ रहे हैं जो "एनक्यूड" स्थिति में है और साथ ही स्थिति "प्रसंस्करण" को स्थिति 0 पर सेट करके इसे संशोधित करते हैं। चूंकि यह ऑपरेशन परमाणु है, यह गारंटी देगा कि संदेश किसी अन्य उपभोक्ता द्वारा नहीं उठाया जाएगा। .


किसी संदेश को संसाधित के रूप में चिह्नित करना

एक बार जब संदेश का प्रसंस्करण पूरा हो जाता है, तो संदेश की आईडी का उपयोग करके संदेश की स्थिति को "संसाधित" में अपडेट करना एक सरल मामला है।

 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


किसी संदेश को विफल के रूप में चिह्नित करना

यदि प्रोसेसिंग विफल हो जाती है, तो हमें संदेश को तदनुसार चिह्नित करना होगा। अक्सर, आप संदेश को संसाधित करने का पुनः प्रयास करना चाह सकते हैं। इसे संदेश को पुन: पंक्तिबद्ध करके प्राप्त किया जा सकता है। कई परिदृश्यों में, प्रसंस्करण विफलता की प्रकृति के आधार पर, एक विशिष्ट देरी, जैसे 10 सेकंड के बाद संदेश को पुन: संसाधित करना समझ में आता है।


 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });


कतारबद्ध लूप

हमने स्थापित किया है कि कैसे हम अपनी "कतार" से वस्तुओं को आसानी से कतारबद्ध और हटा सकते हैं, जो वास्तव में, केवल एक MongoDB संग्रह है। हम NextReevaluation फ़ील्ड का लाभ उठाकर भविष्य के लिए संदेशों को "शेड्यूल" भी कर सकते हैं।


जो कमी है वह यह है कि हम नियमित रूप से कतार में कैसे उतरेंगे। उपभोक्ताओं को किसी प्रकार के लूप में findAndModify कमांड निष्पादित करने की आवश्यकता होती है। एक सीधा तरीका यह होगा कि एक अंतहीन लूप बनाया जाए जिसमें हम एक संदेश को हटाएं और संसाधित करें। यह तरीका सीधा और असरदार है. हालाँकि, यह डेटाबेस और नेटवर्क पर काफी दबाव डालेगा।


एक विकल्प यह होगा कि लूप पुनरावृत्तियों के बीच विलंब शुरू किया जाए, उदाहरण के लिए, 100 एमएस। इससे लोड तो काफी कम हो जाएगा लेकिन कतार में लगने की गति भी कम हो जाएगी।


समस्या का समाधान वह है जिसे MongoDB परिवर्तन धारा के रूप में संदर्भित करता है।


MongoDB परिवर्तन धाराएँ

परिवर्तन धाराएँ क्या हैं? मैं इसे MongoDB के लोगों से बेहतर नहीं समझा सकता:


परिवर्तन धाराएँ अनुप्रयोगों को वास्तविक समय डेटा परिवर्तनों तक पहुँचने की अनुमति देती हैं […]। एप्लिकेशन एक ही संग्रह पर सभी डेटा परिवर्तनों की सदस्यता लेने के लिए परिवर्तन स्ट्रीम का उपयोग कर सकते हैं […] और उन पर तुरंत प्रतिक्रिया दे सकते हैं।


महान! हम अपने कतार संग्रह में नए बनाए गए दस्तावेज़ों को सुन सकते हैं, जिसका प्रभावी अर्थ नए कतारबद्ध संदेशों को सुनना है


यह बिल्कुल सरल है:

 const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });



अनुसूचित और अनाथ संदेश

हालाँकि, परिवर्तन धारा दृष्टिकोण, अनुसूचित और अनाथ दोनों संदेशों के लिए काम नहीं करता है क्योंकि स्पष्ट रूप से ऐसा कोई परिवर्तन नहीं है जिसे हम सुन सकें।


  • शेड्यूल किए गए संदेश बस संग्रह में "एनक्यूड" स्थिति और भविष्य के लिए सेट "नेक्स्टरीवैल्यूएशन" फ़ील्ड के साथ बैठते हैं।
  • अनाथ संदेश वे हैं जो "प्रसंस्करण" स्थिति में थे जब उनकी उपभोक्ता प्रक्रिया समाप्त हो गई थी। वे संग्रह में "प्रसंस्करण" स्थिति के साथ बने रहेंगे, लेकिन कोई भी उपभोक्ता कभी भी अपनी स्थिति को "संसाधित" या "असफल" में नहीं बदलेगा।


इन उपयोग के मामलों के लिए, हमें अपने सरल लूप पर वापस लौटना होगा। हालाँकि, हम पुनरावृत्तियों के बीच काफी उदार विलंब का उपयोग कर सकते हैं।


इसे लपेट रहा है

"पारंपरिक" डेटाबेस, जैसे MySQL , PostgreSQL , या MongoDB (जिसे मैं पारंपरिक भी मानता हूं), आज अविश्वसनीय रूप से शक्तिशाली हैं। यदि सही ढंग से उपयोग किया जाता है (सुनिश्चित करें कि आपके इंडेक्स अनुकूलित हैं!), तो वे तेज़ हैं, प्रभावशाली पैमाने पर हैं, और पारंपरिक होस्टिंग प्लेटफ़ॉर्म पर लागत प्रभावी हैं।


कई उपयोग मामलों को केवल एक डेटाबेस और आपकी पसंदीदा प्रोग्रामिंग भाषा का उपयोग करके संबोधित किया जा सकता है। हमेशा "सही काम के लिए सही उपकरण" का होना जरूरी नहीं है, जिसका मतलब है कि रेडिस, इलास्टिक्सर्च, रैबिटएमक्यू आदि जैसे उपकरणों के विविध सेट को बनाए रखना। अक्सर, रखरखाव ओवरहेड इसके लायक नहीं है।


हालाँकि प्रस्तावित समाधान, उदाहरण के लिए, RabbitMQ के प्रदर्शन से मेल नहीं खा सकता है, यह आमतौर पर पर्याप्त है और एक ऐसे बिंदु तक बढ़ सकता है जो आपके स्टार्टअप के लिए महत्वपूर्ण सफलता का प्रतीक होगा।


सॉफ्टवेयर इंजीनियरिंग ट्रेड-ऑफ को नेविगेट करने के बारे में है। अपना बुद्धिमानी से चुनें.