अरे👋
मैं मैड्स क्विस्ट, ऑल क्वाइट का संस्थापक हूं। हमने MongoDB पर आधारित एक घरेलू संदेश कतार लागू की है और मैं यहां इस बारे में बात करने के लिए हूं:
ऑल क्वाइट पेजरड्यूटी के समान एक आधुनिक घटना प्रबंधन मंच है।
हमारे प्लेटफ़ॉर्म को निम्न सुविधाओं की आवश्यकता है:
हमारी विशिष्ट आवश्यकताओं को समझने के लिए, हमारे तकनीकी स्टैक में कुछ अंतर्दृष्टि प्राप्त करना महत्वपूर्ण है:
अंततः, यह आपके बुनियादी ढांचे में चलने वाले हिस्सों की संख्या को कम करने के बारे में है। हमारा लक्ष्य अपने उत्कृष्ट ग्राहकों के लिए शानदार सुविधाएँ बनाना है, और हमारी सेवाओं को विश्वसनीय रूप से बनाए रखना अनिवार्य है। पांच नाइन से अधिक अपटाइम प्राप्त करने के लिए एकल डेटाबेस प्रणाली का प्रबंधन करना काफी चुनौतीपूर्ण है। तो एक अतिरिक्त HA RabbitMQ क्लस्टर के प्रबंधन का बोझ अपने ऊपर क्यों डालें?
हाँ... AWS SQS, Google क्लाउड टास्क, या Azure क्यू स्टोरेज जैसे क्लाउड समाधान शानदार हैं! हालाँकि, उनके परिणामस्वरूप वेंडर लॉक-इन हो जाता। हम बस अपने ग्राहकों को स्केलेबल सेवा प्रदान करते हुए स्वतंत्र और लागत प्रभावी होने की आकांक्षा रखते हैं।
संदेश कतार एक ऐसी प्रणाली है जो संदेशों को संग्रहीत करती है। संदेशों के निर्माता इन्हें कतार में संग्रहीत करते हैं, जिन्हें बाद में प्रसंस्करण के लिए उपभोक्ताओं द्वारा हटा दिया जाता है। यह घटकों को अलग करने के लिए अविश्वसनीय रूप से फायदेमंद है, खासकर जब संदेशों को संसाधित करना एक संसाधन-गहन कार्य है।
MongoDB पिछले कुछ वर्षों में महत्वपूर्ण रूप से विकसित हुआ है और ऊपर सूचीबद्ध मानदंडों को पूरा कर सकता है।
आगे आने वाले अनुभागों में, मैं आपको हमारी संदेश कतार के MongoDB-विशिष्ट कार्यान्वयन के माध्यम से मार्गदर्शन करूंगा। हालाँकि आपको अपनी पसंदीदा प्रोग्रामिंग भाषा के लिए उपयुक्त क्लाइंट लाइब्रेरी की आवश्यकता होगी, जैसे कि ऑल क्वाइट के मामले में NodeJS, Go, या C#, जो अवधारणाएँ मैं साझा करूँगा वे प्लेटफ़ॉर्म अज्ञेयवादी हैं।
प्रत्येक कतार जिसे आप उपयोग करना चाहते हैं उसे आपके MongoDB डेटाबेस में एक समर्पित संग्रह के रूप में दर्शाया गया है।
यहां संसाधित संदेश का एक उदाहरण दिया गया है:
{ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }
आइए संदेश की प्रत्येक संपत्ति को देखें।
_id
फ़ील्ड MongoDB की विहित विशिष्ट पहचानकर्ता संपत्ति है। यहां, इसमें NumberLong
शामिल है, कोई objectId नहीं। हमें ObjectId
के बजाय NumberLong
आवश्यकता है क्योंकि:
जबकि ऑब्जेक्टआईडी मान समय के साथ बढ़ना चाहिए, वे आवश्यक रूप से मोनोटोनिक नहीं हैं। ऐसा इसलिए है क्योंकि वे:
- केवल एक सेकंड का अस्थायी समाधान होता है, इसलिए एक ही सेकंड के भीतर बनाए गए ऑब्जेक्टआईडी मानों में कोई गारंटीकृत क्रम नहीं होता है, और
- क्लाइंट द्वारा जेनरेट किए जाते हैं, जिनकी सिस्टम क्लॉक अलग-अलग हो सकती हैं।
हमारे सी# कार्यान्वयन में, हम प्रविष्टि समय के आधार पर मिलीसेकंड परिशुद्धता और गारंटीकृत ऑर्डर के साथ एक आईडी उत्पन्न करते हैं। हालाँकि हमें बहु-उपभोक्ता वातावरण (RabbitMQ के समान) में सख्त प्रसंस्करण आदेश की आवश्यकता नहीं है, केवल एक उपभोक्ता के साथ काम करते समय FIFO ऑर्डर बनाए रखना आवश्यक है। ऑब्जेक्टआईडी के साथ इसे हासिल करना संभव नहीं है। यदि यह आपके लिए महत्वपूर्ण नहीं है, तो भी आप ऑब्जेक्टआईडी का उपयोग कर सकते हैं।
स्टेटस प्रॉपर्टी में एक सरणी होती है जिसमें संदेश प्रसंस्करण इतिहास होता है। इंडेक्स 0 पर, आपको वर्तमान स्थिति मिलेगी, जो इंडेक्सिंग के लिए महत्वपूर्ण है।
स्टेटस ऑब्जेक्ट में स्वयं तीन गुण होते हैं:
Status
: "एनक्यूड", "प्रोसेसिंग", "प्रोसेस्ड", या "असफल" हो सकता है।Timestamp
: यह वर्तमान टाइमस्टैम्प को कैप्चर करता है।NextReevaluation
: यह रिकॉर्ड करता है कि अगला मूल्यांकन कब होना चाहिए, जो पुनर्प्रयास और भविष्य के निर्धारित निष्पादन दोनों के लिए आवश्यक है।
इस संपत्ति में आपके संदेश का विशिष्ट पेलोड शामिल है।
एक संदेश जोड़ना संग्रह में एक सीधा सम्मिलन ऑपरेशन है, जिसकी स्थिति "एनक्यूड" पर सेट है।
NextReevaluation
को null
पर सेट करें।NextReevaluation
को टाइमस्टैम्प पर सेट करें। db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });
कतारबद्ध करना थोड़ा अधिक जटिल है लेकिन फिर भी अपेक्षाकृत सरल है। यह MongoDB की समवर्ती परमाणु पढ़ने और अद्यतन क्षमताओं पर बहुत अधिक निर्भर करता है।
MongoDB की यह आवश्यक सुविधा सुनिश्चित करती है:
db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
इसलिए हम एक संदेश पढ़ रहे हैं जो "एनक्यूड" स्थिति में है और साथ ही स्थिति "प्रसंस्करण" को स्थिति 0 पर सेट करके इसे संशोधित करते हैं। चूंकि यह ऑपरेशन परमाणु है, यह गारंटी देगा कि संदेश किसी अन्य उपभोक्ता द्वारा नहीं उठाया जाएगा। .
एक बार जब संदेश का प्रसंस्करण पूरा हो जाता है, तो संदेश की आईडी का उपयोग करके संदेश की स्थिति को "संसाधित" में अपडेट करना एक सरल मामला है।
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
यदि प्रोसेसिंग विफल हो जाती है, तो हमें संदेश को तदनुसार चिह्नित करना होगा। अक्सर, आप संदेश को संसाधित करने का पुनः प्रयास करना चाह सकते हैं। इसे संदेश को पुन: पंक्तिबद्ध करके प्राप्त किया जा सकता है। कई परिदृश्यों में, प्रसंस्करण विफलता की प्रकृति के आधार पर, एक विशिष्ट देरी, जैसे 10 सेकंड के बाद संदेश को पुन: संसाधित करना समझ में आता है।
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });
हमने स्थापित किया है कि कैसे हम अपनी "कतार" से वस्तुओं को आसानी से कतारबद्ध और हटा सकते हैं, जो वास्तव में, केवल एक MongoDB संग्रह है। हम NextReevaluation
फ़ील्ड का लाभ उठाकर भविष्य के लिए संदेशों को "शेड्यूल" भी कर सकते हैं।
जो कमी है वह यह है कि हम नियमित रूप से कतार में कैसे उतरेंगे। उपभोक्ताओं को किसी प्रकार के लूप में findAndModify
कमांड निष्पादित करने की आवश्यकता होती है। एक सीधा तरीका यह होगा कि एक अंतहीन लूप बनाया जाए जिसमें हम एक संदेश को हटाएं और संसाधित करें। यह तरीका सीधा और असरदार है. हालाँकि, यह डेटाबेस और नेटवर्क पर काफी दबाव डालेगा।
एक विकल्प यह होगा कि लूप पुनरावृत्तियों के बीच विलंब शुरू किया जाए, उदाहरण के लिए, 100 एमएस। इससे लोड तो काफी कम हो जाएगा लेकिन कतार में लगने की गति भी कम हो जाएगी।
समस्या का समाधान वह है जिसे MongoDB परिवर्तन धारा के रूप में संदर्भित करता है।
परिवर्तन धाराएँ क्या हैं? मैं इसे MongoDB के लोगों से बेहतर नहीं समझा सकता:
परिवर्तन धाराएँ अनुप्रयोगों को वास्तविक समय डेटा परिवर्तनों तक पहुँचने की अनुमति देती हैं […]। एप्लिकेशन एक ही संग्रह पर सभी डेटा परिवर्तनों की सदस्यता लेने के लिए परिवर्तन स्ट्रीम का उपयोग कर सकते हैं […] और उन पर तुरंत प्रतिक्रिया दे सकते हैं।
महान! हम अपने कतार संग्रह में नए बनाए गए दस्तावेज़ों को सुन सकते हैं, जिसका प्रभावी अर्थ नए कतारबद्ध संदेशों को सुनना है
यह बिल्कुल सरल है:
const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
हालाँकि, परिवर्तन धारा दृष्टिकोण, अनुसूचित और अनाथ दोनों संदेशों के लिए काम नहीं करता है क्योंकि स्पष्ट रूप से ऐसा कोई परिवर्तन नहीं है जिसे हम सुन सकें।
इन उपयोग के मामलों के लिए, हमें अपने सरल लूप पर वापस लौटना होगा। हालाँकि, हम पुनरावृत्तियों के बीच काफी उदार विलंब का उपयोग कर सकते हैं।
"पारंपरिक" डेटाबेस, जैसे MySQL , PostgreSQL , या MongoDB (जिसे मैं पारंपरिक भी मानता हूं), आज अविश्वसनीय रूप से शक्तिशाली हैं। यदि सही ढंग से उपयोग किया जाता है (सुनिश्चित करें कि आपके इंडेक्स अनुकूलित हैं!), तो वे तेज़ हैं, प्रभावशाली पैमाने पर हैं, और पारंपरिक होस्टिंग प्लेटफ़ॉर्म पर लागत प्रभावी हैं।
कई उपयोग मामलों को केवल एक डेटाबेस और आपकी पसंदीदा प्रोग्रामिंग भाषा का उपयोग करके संबोधित किया जा सकता है। हमेशा "सही काम के लिए सही उपकरण" का होना जरूरी नहीं है, जिसका मतलब है कि रेडिस, इलास्टिक्सर्च, रैबिटएमक्यू आदि जैसे उपकरणों के विविध सेट को बनाए रखना। अक्सर, रखरखाव ओवरहेड इसके लायक नहीं है।
हालाँकि प्रस्तावित समाधान, उदाहरण के लिए, RabbitMQ के प्रदर्शन से मेल नहीं खा सकता है, यह आमतौर पर पर्याप्त है और एक ऐसे बिंदु तक बढ़ सकता है जो आपके स्टार्टअप के लिए महत्वपूर्ण सफलता का प्रतीक होगा।
सॉफ्टवेयर इंजीनियरिंग ट्रेड-ऑफ को नेविगेट करने के बारे में है। अपना बुद्धिमानी से चुनें.