अपाचे हुडी एक स्ट्रीमिंग डेटा लेक प्लेटफ़ॉर्म है जो कोर वेयरहाउस और डेटाबेस कार्यक्षमता को सीधे डेटा लेक में लाता है। खुद को डेल्टा या अपाचे आइसबर्ग जैसा खुला फ़ाइल प्रारूप कहने से संतुष्ट नहीं, हुडी टेबल, लेनदेन, अपसर्ट/डिलीट, उन्नत इंडेक्स, स्ट्रीमिंग अंतर्ग्रहण सेवाएं, डेटा क्लस्टरिंग/संघनन अनुकूलन और समवर्ती प्रदान करता है।
2016 में पेश किया गया, हुडी Hadoop पारिस्थितिकी तंत्र में मजबूती से निहित है, नाम के पीछे के अर्थ को ध्यान में रखते हुए: Hadoop Upserts andD Incrementals। इसे एचडीएफएस पर बड़े विश्लेषणात्मक डेटासेट के भंडारण का प्रबंधन करने के लिए विकसित किया गया था। हुडी का प्राथमिक उद्देश्य स्ट्रीमिंग डेटा के अंतर्ग्रहण के दौरान विलंबता को कम करना है।
समय के साथ, हुडी मिनिओ सहित क्लाउड स्टोरेज और ऑब्जेक्ट स्टोरेज का उपयोग करने के लिए विकसित हुआ है। हुडी का एचडीएफएस से दूर जाना दुनिया के बड़े चलन के साथ-साथ चलता है और परफॉर्मेंट, स्केलेबल और क्लाउड-नेटिव ऑब्जेक्ट स्टोरेज के लिए विरासती एचडीएफएस को पीछे छोड़ देता है। हुडी का अनुकूलन प्रदान करने का वादा जो अपाचे स्पार्क, फ्लिंक, प्रेस्टो, ट्रिनो और अन्य के लिए विश्लेषणात्मक कार्यभार को तेज़ बनाता है, बड़े पैमाने पर क्लाउड-नेटिव एप्लिकेशन प्रदर्शन के मिनिओ के वादे के साथ अच्छी तरह से मेल खाता है।
उत्पादन में हुडी का उपयोग करने वाली कंपनियों में उबर , अमेज़ॅन , बाइटडांस और रॉबिनहुड शामिल हैं। ये दुनिया की कुछ सबसे बड़ी स्ट्रीमिंग डेटा झीलें हैं। इस उपयोग के मामले में हुडी की कुंजी यह है कि यह एक वृद्धिशील डेटा प्रोसेसिंग स्टैक प्रदान करता है जो स्तंभ डेटा पर कम-विलंबता प्रसंस्करण का संचालन करता है। आमतौर पर, सिस्टम Apache Parquet या ORC जैसे खुले फ़ाइल प्रारूप का उपयोग करके एक बार डेटा लिखते हैं, और इसे अत्यधिक स्केलेबल ऑब्जेक्ट स्टोरेज या वितरित फ़ाइल सिस्टम के शीर्ष पर संग्रहीत करते हैं। हुडी इस डेटा को ग्रहण करने, बदलने और प्रबंधित करने के लिए एक डेटा प्लेन के रूप में कार्य करता है। हुडी Hadoop FileSystem API का उपयोग करके स्टोरेज के साथ इंटरैक्ट करता है, जो HDFS से लेकर ऑब्जेक्ट स्टोरेज से लेकर इन-मेमोरी फाइल सिस्टम तक के कार्यान्वयन के साथ संगत है (लेकिन जरूरी नहीं कि इसके लिए इष्टतम हो)।
हुडी एक बेस फ़ाइल और डेल्टा लॉग फ़ाइलों का उपयोग करता है जो किसी दिए गए बेस फ़ाइल में अपडेट/परिवर्तन संग्रहीत करता है। बेस फ़ाइलें Parquet (स्तंभकार) या HFile (अनुक्रमित) हो सकती हैं। डेल्टा लॉग को एवरो (पंक्ति) के रूप में सहेजा जाता है क्योंकि आधार फ़ाइल में परिवर्तन होते ही उन्हें रिकॉर्ड करना समझ में आता है।
हुडी किसी दिए गए आधार फ़ाइल में सभी परिवर्तनों को ब्लॉक के अनुक्रम के रूप में एन्कोड करता है। ब्लॉक डेटा ब्लॉक, डिलीट ब्लॉक या रोलबैक ब्लॉक हो सकते हैं। नई आधार फ़ाइलें प्राप्त करने के लिए इन ब्लॉकों को मर्ज किया जाता है। यह एन्कोडिंग एक स्व-निहित लॉग भी बनाती है।
स्रोत .
तालिका प्रारूप में तालिका का फ़ाइल लेआउट, तालिका की स्कीमा और मेटाडेटा शामिल होता है जो तालिका में परिवर्तनों को ट्रैक करता है। हुडी स्ट्रीम प्रोसेसिंग पर जोर देने के अनुरूप, स्कीम-ऑन-राइट को लागू करता है, ताकि यह सुनिश्चित किया जा सके कि पाइपलाइनें गैर-बैकवर्ड-संगत परिवर्तनों से न टूटे।
हुडी किसी दी गई तालिका/विभाजन के लिए फ़ाइलों को एक साथ समूहित करता है, और रिकॉर्ड कुंजियों और फ़ाइल समूहों के बीच मानचित्र बनाता है। जैसा कि ऊपर बताया गया है, सभी अपडेट एक विशिष्ट फ़ाइल समूह के लिए डेल्टा लॉग फ़ाइलों में दर्ज किए जाते हैं। यह डिज़ाइन हाइव ACID से अधिक कुशल है, जिसे प्रश्नों को संसाधित करने के लिए सभी बेस फ़ाइलों के विरुद्ध सभी डेटा रिकॉर्ड को मर्ज करना होगा। हुडी का डिज़ाइन तेज़ कुंजी-आधारित अपसर्ट और डिलीट की आशा करता है क्योंकि यह फ़ाइल समूह के लिए डेल्टा लॉग के साथ काम करता है, संपूर्ण डेटासेट के लिए नहीं।
हुडी किसी दी गई तालिका/विभाजन के लिए फ़ाइलों को एक साथ समूहित करता है, और रिकॉर्ड कुंजियों और फ़ाइल समूहों के बीच मानचित्र बनाता है। जैसा कि ऊपर बताया गया है, सभी अपडेट एक विशिष्ट फ़ाइल समूह के लिए डेल्टा लॉग फ़ाइलों में दर्ज किए जाते हैं। यह डिज़ाइन हाइव ACID से अधिक कुशल है, जिसे प्रश्नों को संसाधित करने के लिए सभी बेस फ़ाइलों के विरुद्ध सभी डेटा रिकॉर्ड को मर्ज करना होगा। हुडी का डिज़ाइन तेज़ कुंजी-आधारित अपसर्ट और डिलीट की आशा करता है क्योंकि यह फ़ाइल समूह के लिए डेल्टा लॉग के साथ काम करता है, संपूर्ण डेटासेट के लिए नहीं।
स्रोत .
समयरेखा को समझना महत्वपूर्ण है क्योंकि यह हुडी के सभी टेबल मेटाडेटा के लिए सत्य इवेंट लॉग के स्रोत के रूप में कार्य करता है। टाइमलाइन हमारे मामले में .hoodie
फ़ोल्डर या बकेट में संग्रहीत होती है। इवेंट को टाइमलाइन पर तब तक बनाए रखा जाता है जब तक उन्हें हटा नहीं दिया जाता। समयरेखा समग्र तालिका के साथ-साथ फ़ाइल समूहों के लिए भी मौजूद है, जो मूल आधार फ़ाइल में डेल्टा लॉग को लागू करके फ़ाइल समूह के पुनर्निर्माण को सक्षम करती है। बार-बार लिखने/कमिट करने के लिए अनुकूलन करने के लिए, हुडी का डिज़ाइन संपूर्ण तालिका के आकार के सापेक्ष मेटाडेटा को छोटा रखता है।
टाइमलाइन पर नई घटनाओं को एक आंतरिक मेटाडेटा तालिका में सहेजा जाता है और मर्ज-ऑन-रीड तालिकाओं की एक श्रृंखला के रूप में कार्यान्वित किया जाता है, जिससे कम लेखन प्रवर्धन प्रदान किया जाता है। नतीजतन, हुडी मेटाडेटा में तेजी से बदलाव को जल्दी से अवशोषित कर सकता है। इसके अलावा, मेटाडेटा तालिका HFile बेस फ़ाइल प्रारूप का उपयोग करती है, जो कुंजी के अनुक्रमित लुकअप के सेट के साथ प्रदर्शन को और अनुकूलित करती है जो संपूर्ण मेटाडेटा तालिका को पढ़ने की आवश्यकता से बचाती है। महंगी समय लेने वाली क्लाउड फ़ाइल लिस्टिंग से बचने के लिए सभी भौतिक फ़ाइल पथ जो तालिका का हिस्सा हैं, मेटाडेटा में शामिल किए गए हैं।
हुडी लेखक आर्किटेक्चर की सुविधा प्रदान करते हैं जहां हुडी एसीआईडी लेनदेन समर्थन के साथ एक उच्च-प्रदर्शन लेखन परत के रूप में कार्य करता है जो अपडेट और डिलीट जैसे बहुत तेजी से वृद्धिशील परिवर्तनों को सक्षम करता है।
एक विशिष्ट हुडी आर्किटेक्चर हुडी टेबलों पर डेटा पहुंचाने के लिए स्पार्क या फ्लिंक पाइपलाइनों पर निर्भर करता है। हुडी लेखन पथ को डिस्क पर केवल पारक्वेट या एवरो फ़ाइल लिखने की तुलना में अधिक कुशल बनाने के लिए अनुकूलित किया गया है। हुडी लेखन संचालन का विश्लेषण करता है और उन्हें वृद्धिशील ( insert
, upsert
, delete
) या बैच संचालन ( insert_overwrite
, insert_overwrite_table
, delete_partition
, bulk_insert
) के रूप में वर्गीकृत करता है और फिर आवश्यक अनुकूलन लागू करता है।
हुडी लेखक मेटाडेटा बनाए रखने के लिए भी जिम्मेदार हैं। प्रत्येक रिकॉर्ड के लिए, प्रतिबद्ध समय और उस रिकॉर्ड के लिए अद्वितीय अनुक्रम संख्या (यह काफ्का ऑफ़सेट के समान है) लिखी जाती है जिससे रिकॉर्ड स्तर में परिवर्तन प्राप्त करना संभव हो जाता है। उपयोगकर्ता आने वाली डेटा स्ट्रीम में ईवेंट समय फ़ील्ड भी निर्दिष्ट कर सकते हैं और मेटाडेटा और हुडी टाइमलाइन का उपयोग करके उन्हें ट्रैक कर सकते हैं। इससे स्ट्रीम प्रोसेसिंग में नाटकीय सुधार हो सकते हैं क्योंकि हुडी में प्रत्येक रिकॉर्ड के लिए आगमन और घटना का समय दोनों शामिल हैं, जिससे जटिल स्ट्रीम प्रोसेसिंग पाइपलाइनों के लिए मजबूत वॉटरमार्क बनाना संभव हो जाता है।
लेखकों और पाठकों के बीच स्नैपशॉट अलगाव स्पार्क, हाइव, फ्लिंक, प्रेस्ट, ट्रिनो और इम्पाला सहित सभी प्रमुख डेटा लेक क्वेरी इंजनों से टेबल स्नैपशॉट को लगातार क्वेरी करने की अनुमति देता है। Parquet और Avro की तरह, Hudi तालिकाओं को स्नोफ्लेक और SQL सर्वर जैसी बाहरी तालिकाओं के रूप में पढ़ा जा सकता है।
हुडी रीडर को हल्के वजन के लिए विकसित किया गया है। जहां भी संभव हो, इंजन-विशिष्ट वेक्टरकृत रीडर और कैशिंग, जैसे कि प्रेस्टो और स्पार्क, का उपयोग किया जाता है। जब हुडी को किसी क्वेरी के लिए आधार और लॉग फ़ाइलों को मर्ज करना होता है, तो हुडी स्पिलेबल मैप्स और आलसी रीडिंग जैसे तंत्रों का उपयोग करके मर्ज प्रदर्शन में सुधार करता है, साथ ही रीड-अनुकूलित क्वेरी भी प्रदान करता है।
हुडी में कुछ उल्लेखनीय रूप से शक्तिशाली वृद्धिशील क्वेरी क्षमताएं शामिल हैं। मेटाडेटा इसके मूल में है, जो बड़ी प्रतिबद्धताओं को छोटे टुकड़ों के रूप में उपभोग करने की अनुमति देता है और डेटा के लेखन और वृद्धिशील क्वेरी को पूरी तरह से अलग करता है। मेटाडेटा के कुशल उपयोग के माध्यम से, समय यात्रा एक परिभाषित प्रारंभ और स्टॉप बिंदु के साथ एक और वृद्धिशील क्वेरी है। हुडी परमाणु रूप से किसी भी समय एकल फ़ाइल समूहों के लिए कुंजी मैप करता है, हुडी तालिकाओं पर पूर्ण सीडीसी क्षमताओं का समर्थन करता है। जैसा कि हुडी लेखक अनुभाग में ऊपर चर्चा की गई है, प्रत्येक तालिका फ़ाइल समूहों से बनी है, और प्रत्येक फ़ाइल समूह का अपना स्वयं-निहित मेटाडेटा है।
हुडी की सबसे बड़ी ताकत वह गति है जिसके साथ यह स्ट्रीमिंग और बैच डेटा दोनों को ग्रहण करता है। upsert
की क्षमता प्रदान करके, हुडी संपूर्ण तालिकाओं या विभाजनों को फिर से लिखने की तुलना में तेजी से परिमाण के कार्य आदेशों को निष्पादित करता है।
हुडी की अंतर्ग्रहण गति का लाभ उठाने के लिए, डेटा लेकहाउस को उच्च IOPS और थ्रूपुट में सक्षम भंडारण परत की आवश्यकता होती है। मिनियो की स्केलेबिलिटी और उच्च-प्रदर्शन का संयोजन वही है जो हुडी को चाहिए। मिनिओ रियल-टाइम एंटरप्राइज डेटा लेक को पावर देने के लिए आवश्यक प्रदर्शन में सक्षम है - हाल ही में बेंचमार्क ने GETs पर 325 GiB/s (349 GB/s) और PUTs पर 165 GiB/s (177 GB/s) हासिल किया है। ऑफ-द-शेल्फ NVMe SSDs के 32 नोड्स।
एक सक्रिय उद्यम हुडी डेटा लेक बड़ी संख्या में छोटी लकड़ी और एवरो फ़ाइलों को संग्रहीत करता है। मिनिओ में कई छोटे फ़ाइल अनुकूलन शामिल हैं जो तेज़ डेटा लेक को सक्षम करते हैं। छोटी वस्तुओं को मेटाडेटा के साथ इनलाइन सहेजा जाता है, जिससे हुडी मेटाडेटा और सूचकांक जैसी छोटी फ़ाइलों को पढ़ने और लिखने के लिए आवश्यक IOPS कम हो जाता है।
स्कीमा प्रत्येक हुडी तालिका का एक महत्वपूर्ण घटक है। हुडी स्कीमा को लागू कर सकता है, या यह स्कीमा विकास की अनुमति दे सकता है ताकि स्ट्रीमिंग डेटा पाइपलाइन बिना टूटे अनुकूलित हो सके। इसके अलावा, हुडी यह सुनिश्चित करने के लिए स्कीमा-ऑन-राइटर लागू करता है कि परिवर्तन पाइपलाइनों को न तोड़ें। हुडी टेबल के स्कीमा को संग्रहीत करने, प्रबंधित करने और विकसित करने के लिए एवरो पर निर्भर करता है।
हुडी डेटा लेक को ACID लेनदेन संबंधी गारंटी प्रदान करता है। हुडी परमाणु लेखन को सुनिश्चित करता है: प्रतिबद्धताओं को परमाणु रूप से एक समयरेखा के लिए बनाया जाता है और एक समय टिकट दिया जाता है जो उस समय को दर्शाता है जिस पर कार्रवाई हुई मानी जाती है। हुडी राइटर, टेबल और रीडर प्रक्रियाओं के बीच स्नैपशॉट को अलग करता है, इसलिए प्रत्येक टेबल के सुसंगत स्नैपशॉट पर काम करता है। हुडी ने लेखकों के बीच आशावादी समवर्ती नियंत्रण (ओसीसी) और टेबल सेवाओं और लेखकों के बीच और कई टेबल सेवाओं के बीच गैर-अवरुद्ध एमवीसीसी-आधारित समवर्ती नियंत्रण के साथ इसे पूरा किया।
यह ट्यूटोरियल आपको स्पार्क, हुडी और मिनिओ की स्थापना के बारे में बताएगा और कुछ बुनियादी हुडी सुविधाओं से परिचित कराएगा। यह ट्यूटोरियल अपाचे हुडी स्पार्क गाइड पर आधारित है, जिसे क्लाउड-नेटिव मिनिओ ऑब्जेक्ट स्टोरेज के साथ काम करने के लिए अनुकूलित किया गया है।
ध्यान दें कि संस्करणित बाल्टियों के साथ काम करने से हुडी में कुछ रखरखाव ओवरहेड जुड़ जाता है। कोई भी ऑब्जेक्ट जिसे हटाया जाता है वह एक डिलीट मार्कर बनाता है। जैसे ही हुडी क्लीनर उपयोगिता का उपयोग करके फ़ाइलों को साफ करता है, समय के साथ डिलीट मार्करों की संख्या बढ़ जाती है। इन डिलीट मार्करों को साफ करने के लिए जीवनचक्र प्रबंधन को सही ढंग से कॉन्फ़िगर करना महत्वपूर्ण है क्योंकि यदि डिलीट मार्करों की संख्या 1000 तक पहुंच जाती है तो सूची ऑपरेशन अवरुद्ध हो सकता है। हुडी परियोजना अनुरक्षक जीवनचक्र नियमों का उपयोग करके एक दिन के बाद डिलीट मार्करों को साफ करने की सलाह देते हैं।
अपाचे स्पार्क डाउनलोड और इंस्टॉल करें ।
मिनिआईओ डाउनलोड और इंस्टॉल करें । कंसोल के लिए आईपी पता, टीसीपी पोर्ट, एक्सेस कुंजी और गुप्त कुंजी रिकॉर्ड करें।
मिनियो क्लाइंट डाउनलोड और इंस्टॉल करें ।
ऑब्जेक्ट स्टोरेज के साथ काम करने के लिए S3A का उपयोग करने के लिए AWS और AWS Hadoop लाइब्रेरी डाउनलोड करें और उन्हें अपने क्लासपाथ में जोड़ें।
AWS: aws-java-sdk:1.10.34
(या उच्चतर)
Hadoop: hadoop-aws:2.7.3
(या उच्चतर)
जार फ़ाइलें डाउनलोड करें , उन्हें अनज़िप करें और उन्हें /opt/spark/jars
पर कॉपी करें।
हुडी डेटा को संग्रहित करने के लिए एक बकेट बनाने के लिए मिनियो क्लाइंट का उपयोग करें:
mc alias set myminio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi
भंडारण के लिए मिनिओ का उपयोग करने के लिए कॉन्फ़िगर किए गए हुडी के साथ स्पार्क शेल प्रारंभ करें। S3A के लिए प्रविष्टियों को अपनी MiniIO सेटिंग्स के साथ कॉन्फ़िगर करना सुनिश्चित करें।
spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'
फिर, स्पार्क के भीतर हुडी को आरंभ करें।
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord
ध्यान दें कि यहबाहरी कॉन्फ़िगरेशन फ़ाइल बनाने के लिए हुडी का बार-बार उपयोग सरल बना देगा।
इसे आज़माएं और स्काला का उपयोग करके एक साधारण छोटी हुडी टेबल बनाएं। हुडी डेटा जेनरेटर सैंपल ट्रिप स्कीमा के आधार पर सैंपल इंसर्ट और अपडेट जेनरेट करने का एक त्वरित और आसान तरीका है।
val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator
निम्नलिखित नए यात्रा डेटा उत्पन्न करेगा, उन्हें डेटाफ़्रेम में लोड करेगा और हमारे द्वारा बनाए गए डेटाफ़्रेम को मिनिओ में हुडी तालिका के रूप में लिखेगा। mode(Overwrite)
उस स्थिति में तालिका को अधिलेखित और पुन: बनाता है जब वह पहले से मौजूद हो। यात्रा डेटा एक रिकॉर्ड कुंजी ( uuid
), विभाजन फ़ील्ड ( region/country/city
) और तर्क ( ts
) पर निर्भर करता है ताकि यह सुनिश्चित किया जा सके कि यात्रा रिकॉर्ड प्रत्येक विभाजन के लिए अद्वितीय हैं। हम डिफ़ॉल्ट राइट ऑपरेशन, upsert
उपयोग करेंगे। जब आपके पास अपडेट के बिना कार्यभार हो, तो आप insert
या bulk_insert
उपयोग कर सकते हैं जो तेज़ हो सकता है।
val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
एक ब्राउज़र खोलें और अपनी एक्सेस कुंजी और गुप्त कुंजी के साथ http://<your-MinIO-IP>:<port>
पर MiniIO में लॉग इन करें। आपको बाल्टी में हुडी टेबल दिखाई देगी।
बकेट में एक .hoodie
पथ भी होता है जिसमें मेटाडेटा होता है, और americas
और asia
पथ होते हैं जिनमें डेटा होता है।
मेटाडेटा पर एक नज़र डालें. संपूर्ण ट्यूटोरियल पूरा करने के बाद मेरा .hoodie
पथ इस तरह दिखता है। हम देख सकते हैं कि मैंने तालिका को मंगलवार 13 सितंबर, 2022 को 9:02, 10:37, 10:48, 10:52 और 10:56 पर संशोधित किया।
आइए Hudi डेटा को डेटाफ़्रेम में लोड करें और एक उदाहरण क्वेरी चलाएँ।
// spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
नहीं, हम 1988 में हूटी और ब्लोफिश संगीत कार्यक्रम देखने जाने की बात नहीं कर रहे हैं।
हुडी टेबल पर प्रत्येक लेखन नए स्नैपशॉट बनाता है। स्नैपशॉट को तालिका के संस्करणों के रूप में सोचें जिन्हें समय यात्रा प्रश्नों के लिए संदर्भित किया जा सकता है।
कुछ समय यात्रा संबंधी प्रश्नों को आज़माएं (आपके लिए प्रासंगिक होने के लिए आपको टाइमस्टैम्प बदलना होगा)।
spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)
यह प्रक्रिया वैसी ही है जब हमने पहले नया डेटा डाला था। डेटा को अपडेट करने की हुडी की क्षमता को प्रदर्शित करने के लिए, हम मौजूदा यात्रा रिकॉर्ड के लिए अपडेट जेनरेट करेंगे, उन्हें डेटाफ़्रेम में लोड करेंगे और फिर डेटाफ़्रेम को पहले से ही मिनिआईओ में सहेजी गई हुडी तालिका में लिखेंगे।
ध्यान दें कि हम append
सेव मोड का उपयोग कर रहे हैं। एक सामान्य दिशानिर्देश यह है कि जब तक आप एक नई तालिका नहीं बना रहे हों, तब तक append
मोड का उपयोग करें ताकि कोई भी रिकॉर्ड अधिलेखित न हो। हुडी के साथ काम करने का एक विशिष्ट तरीका वास्तविक समय में स्ट्रीमिंग डेटा को शामिल करना, उन्हें तालिका में जोड़ना और फिर कुछ तर्क लिखना है जो अभी जोड़े गए रिकॉर्ड के आधार पर मौजूदा रिकॉर्ड को मर्ज और अपडेट करता है। वैकल्पिक रूप से, overwrite
मोड का उपयोग करके लिखने से तालिका पहले से मौजूद होने पर हटा दी जाती है और पुनः बनाई जाती है।
// spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
डेटा क्वेरी करने पर अद्यतन यात्रा रिकॉर्ड दिखाई देंगे।
हुडी रिकॉर्ड्स की एक स्ट्रीम प्रदान कर सकता है जो वृद्धिशील क्वेरी का उपयोग करके दिए गए टाइमस्टैम्प के बाद से बदल गया है। हमें बस एक प्रारंभ समय प्रदान करने की आवश्यकता है जिससे वर्तमान प्रतिबद्धता के माध्यम से परिवर्तन देखने के लिए परिवर्तन स्ट्रीम किए जाएंगे, और हम स्ट्रीम को सीमित करने के लिए समाप्ति समय का उपयोग कर सकते हैं।
हुडी के लिए वृद्धिशील क्वेरी एक बहुत बड़ी बात है क्योंकि यह आपको बैच डेटा पर स्ट्रीमिंग पाइपलाइन बनाने की अनुमति देती है।
// spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
हुडी एक विशिष्ट समय और तारीख के अनुसार डेटा की क्वेरी कर सकता है।
// spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
हुडी रिकॉर्ड हटाने के दो अलग-अलग तरीकों का समर्थन करता है। एक सॉफ्ट डिलीट रिकॉर्ड कुंजी को बरकरार रखता है और अन्य सभी फ़ील्ड के मानों को समाप्त कर देता है। सॉफ्ट डिलीट मिनिओ में कायम रहते हैं और केवल हार्ड डिलीट का उपयोग करके डेटा लेक से हटाए जाते हैं।
// spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
इसके विपरीत, हार्ड डिलीट वे हैं जिन्हें हम डिलीट के रूप में सोचते हैं। रिकॉर्ड कुंजी और संबंधित फ़ील्ड तालिका से हटा दिए जाते हैं।
// spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
डेटा लेक तब डेटा लेकहाउस बन जाता है जब वह मौजूदा डेटा को अपडेट करने की क्षमता हासिल कर लेता है। हम कुछ नया यात्रा डेटा तैयार करने जा रहे हैं और फिर अपने मौजूदा डेटा को अधिलेखित कर देंगे। यह ऑपरेशन एक upsert
से तेज़ है जहां हुडी आपके लिए एक ही बार में संपूर्ण लक्ष्य विभाजन की गणना करता है। यहां हम स्वचालित अनुक्रमण, पूर्व संयोजन और पुनर्विभाजन को बायपास करने के लिए कॉन्फ़िगरेशन निर्दिष्ट करते हैं जो upsert
आपके लिए करेगा।
// spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)
स्कीमा विकास आपको समय के साथ डेटा में होने वाले परिवर्तनों के अनुकूल हुडी तालिका की स्कीमा को बदलने की अनुमति देता है।
नीचे स्कीमा और विभाजन को क्वेरी करने और विकसित करने के कुछ उदाहरण दिए गए हैं। अधिक गहन चर्चा के लिए, कृपया स्कीमा इवोल्यूशन | देखें अपाचे हुडी . ध्यान दें कि यदि आप ये कमांड चलाते हैं, तो वे इस ट्यूटोरियल से भिन्न आपकी हुडी टेबल स्कीमा को बदल देंगे।
-- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
वर्तमान में, SHOW partitions
केवल फ़ाइल सिस्टम पर काम करता है, क्योंकि यह फ़ाइल सिस्टम तालिका पथ पर आधारित है।
इस ट्यूटोरियल में हुडी की क्षमताओं को प्रदर्शित करने के लिए स्पार्क का उपयोग किया गया। हालाँकि, हुडी कई टेबल प्रकारों/क्वेरी प्रकारों का समर्थन कर सकता है और हुडी टेबल को हाइव, स्पार्क, प्रेस्टो और बहुत कुछ जैसे क्वेरी इंजनों से क्वेरी किया जा सकता है। हुडी प्रोजेक्ट में एक डेमो वीडियो है जो स्थानीय स्तर पर चलने वाले सभी निर्भर सिस्टमों के साथ डॉकर-आधारित सेटअप पर यह सब दिखाता है।
अपाचे हुडी डेटा लेक के लिए पहला ओपन टेबल प्रारूप था, और स्ट्रीमिंग आर्किटेक्चर में विचार करने योग्य है। हुडी समुदाय और पारिस्थितिकी तंत्र जीवित और सक्रिय हैं, क्लाउड-नेटिव स्ट्रीमिंग डेटा लेक के लिए Hadoop/HDFS को Hudi/ऑब्जेक्ट स्टोरेज से बदलने पर जोर बढ़ रहा है। हुडी स्टोरेज के लिए मिनियो का उपयोग मल्टी-क्लाउड डेटा लेक और एनालिटिक्स का मार्ग प्रशस्त करता है। मिनिआईओ में स्थानों के बीच डेटा को सिंक्रनाइज़ करने के लिए सक्रिय-सक्रिय प्रतिकृति शामिल है - ऑन-प्रिमाइस, सार्वजनिक/निजी क्लाउड में और किनारे पर - भौगोलिक लोड संतुलन और तेज़ हॉट-हॉट फेलओवर जैसी महान सामग्री उद्यमों को सक्षम करने में सक्षम है।
आज ही मिनियो पर हुडी आज़माएं। यदि आपके कोई प्रश्न हैं या सुझाव साझा करना चाहते हैं, तो कृपया हमारे स्लैक चैनल के माध्यम से संपर्क करें।
यहाँ भी प्रकाशित किया गया है.