पिछली पोस्ट में, मैंने अपाचे आइसबर्ग का परिचय दिया था और दिखाया था कि यह भंडारण के लिए मिनिओ का उपयोग कैसे करता है। मैंने यह भी दिखाया कि विकास मशीन कैसे स्थापित करें। ऐसा करने के लिए, मैंने प्रोसेसिंग इंजन के रूप में अपाचे स्पार्क कंटेनर, एक REST कैटलॉग और स्टोरेज के लिए MiniIO स्थापित करने के लिए डॉकर कंपोज़ का उपयोग किया। मैंने एक बहुत ही सरल उदाहरण के साथ निष्कर्ष निकाला जिसमें डेटा को ग्रहण करने के लिए अपाचे स्पार्क का उपयोग किया गया और डेटा को क्वेरी करने के लिए PyIceberg का उपयोग किया गया। यदि आप अपाचे आइसबर्ग में नए हैं या यदि आपको अपाचे आइसबर्ग को अपनी विकास मशीन पर स्थापित करने की आवश्यकता है, तो इस परिचयात्मक पोस्ट को पढ़ें।
इस पोस्ट में, मैं वहीं से शुरू करूंगा जहां मेरी पिछली पोस्ट खत्म हुई थी और एक सामान्य बड़ी डेटा समस्या की जांच करूंगा - कच्चे डेटा, असंरचित डेटा और संरचित डेटा (डेटा जो कच्चे से क्यूरेट किया गया है) के लिए भंडारण प्रदान करने के लिए एकल समाधान की आवश्यकता है डेटा)। इसके अतिरिक्त, उसी समाधान को एक प्रोसेसिंग इंजन प्रदान करना चाहिए जो क्यूरेटेड डेटा के विरुद्ध कुशल रिपोर्टिंग की अनुमति देता है। यह डेटा लेकहाउस का वादा है - संरचित डेटा के लिए डेटा वेयरहाउस की क्षमताएं और असंरचित डेटा के लिए डेटा लेक्स की क्षमताएं - सभी एक केंद्रीकृत समाधान में।
आइए हमारे बड़े डेटा परिदृश्य को अधिक विस्तार से देखें।
नीचे दिया गया चित्र एक सामान्य समस्या और एक काल्पनिक समाधान दर्शाता है। डेटा कई स्थानों से और कई प्रारूपों में डेटा सेंटर में आ रहा है। एक केंद्रीकृत समाधान की आवश्यकता है जो कच्चे डेटा को इस तरह परिवर्तित करने की अनुमति देता है कि एक प्रोसेसिंग इंजन कुशलतापूर्वक बिजनेस इंटेलिजेंस, डेटा एनालिटिक्स और मशीन लर्निंग का समर्थन कर सके। साथ ही, यह समाधान डेटा अन्वेषण और मशीन लर्निंग के लिए असंरचित डेटा (पाठ, चित्र, ऑडियो और वीडियो) संग्रहीत करने में भी सक्षम होना चाहिए। यदि परिवर्तन को दोबारा चलाने की आवश्यकता हो या डेटा अखंडता मुद्दे की जांच की आवश्यकता हो तो इसे किसी भी डेटा को उसके मूल प्रारूप में भी रखना चाहिए।
एक ठोस उदाहरण के रूप में, एक वैश्विक कस्टोडियल बैंक की कल्पना करें जो अपने ग्राहकों के लिए म्यूचुअल फंड का प्रबंधन कर रहा है। प्रत्येक ग्राहक के लिए प्रत्येक फंड के लिए रिकॉर्ड की लेखांकन पुस्तक और रिकॉर्ड की निवेश पुस्तक का प्रतिनिधित्व करने वाला डेटा दुनिया भर के भौगोलिक क्षेत्रों से लगातार डेटा लेकहाउस में प्रवाहित हो रहा है। वहां से, सुरक्षित मार्ग की जांच होनी चाहिए (क्या भेजी गई सभी चीजें प्राप्त हुईं), और डेटा गुणवत्ता जांच चलनी चाहिए। अंत में, डेटा को विभाजित किया जा सकता है और दूसरे स्टोर में लोड किया जा सकता है जो दिन की शुरुआत और दिन के अंत की रिपोर्टिंग का समर्थन करेगा।
वैकल्पिक रूप से, शायद यह आरेख एक IOT परिदृश्य का प्रतिनिधित्व करता है जहां मौसम स्टेशन तापमान और अन्य मौसम-संबंधी डेटा भेज रहे हैं। परिदृश्य चाहे जो भी हो, डेटा को उसके मूल प्रारूप में सुरक्षित रूप से संग्रहीत करने का एक तरीका आवश्यक है और फिर किसी भी डेटा को अधिक संरचित तरीके से संग्रहीत करने और संसाधित करने की आवश्यकता है - सभी एक केंद्रीकृत समाधान में। यह डेटा लेकहाउस का वादा है - डेटा वेयरहाउस और डेटा लेक का सबसे अच्छा संयोजन एक केंद्रीकृत समाधान में।
आइए ऊपर वर्णित काल्पनिक समाधान को वास्तविक बनाएं। इसे नीचे दिए गए चित्र में दर्शाया गया है।
हमारे डेटा लेकहाउस में दो तार्किक घटक हैं। पहला संरचित डेटा के लिए अपाचे आइसबर्ग का कार्यान्वयन है - डेटा वेयरहाउस के बराबर। (यह वही है जो मैंने अपनी पिछली पोस्ट में बनाया था - इसलिए मैं यहां विस्तार में नहीं जाऊंगा।) दूसरा तार्किक घटक असंरचित डेटा के लिए मिनिओ है - हमारे डेटा लेकहाउस का डेटा लेक पक्ष। लेकहाउस में आने वाला सारा डेटा मिनियो के इस तार्किक उदाहरण तक पहुंचाया जाता है। वास्तव में, ऊपर दिखाए गए मिनिओ के दो तार्किक उदाहरण आपके डेटा सेंटर में मिनिओ के एक ही उदाहरण हो सकते हैं। यदि आप जिस क्लस्टर में मिनिआईओ चला रहे हैं, वह आने वाले सभी डेटा के अंतर्ग्रहण और अपाचे आइसबर्ग की प्रसंस्करण आवश्यकताओं को संभाल सकता है, तो ऐसी तैनाती से पैसे की बचत होगी। वास्तव में, मैं इस पोस्ट में यही करूँगा। मैं सभी असंरचित और कच्चे डेटा को रखने के लिए अपाचे आइसबर्ग के मिनिओ के उदाहरण के भीतर एक बाल्टी का उपयोग करूंगा।
आइए उस डेटासेट का परिचय देकर डेटा के साथ खेलना शुरू करें जिसका उपयोग मैं इस अभ्यास के लिए करूंगा और इसे मिनियो में डालूंगा।
इस पोस्ट में हम जिस डेटासेट का प्रयोग करेंगे वह एक सार्वजनिक डेटासेट है जिसे ग्लोबल सरफेस समरी ऑफ द डे (जीएसओडी) के रूप में जाना जाता है, जिसे नेशनल ओशनिक एंड एटमॉस्फेरिक एडमिनिस्ट्रेशन (एनओएए) द्वारा प्रबंधित किया जाता है। एनओएए वर्तमान में दुनिया भर के 9000 से अधिक स्टेशनों से डेटा बनाए रखता है और जीएसओडी डेटासेट में इन स्टेशनों से प्रति दिन सारांश जानकारी शामिल होती है। आप यहां डेटा डाउनलोड कर सकते हैं. प्रति वर्ष एक gzip फ़ाइल होती है। यह 1929 में शुरू होता है और 2022 में (इस लेखन के समय) समाप्त होता है। अपना डेटा लेकहाउस बनाने के लिए, मैंने हर साल की फ़ाइल डाउनलोड की और इसे हमारे डेटा लेकहाउस के लिए उपयोग किए जा रहे मिनिओ इंस्टेंस में रखा। मैंने सभी फाइलों को `लेक` नाम की एक बाल्टी में डाल दिया। मिनिओ के हमारे उदाहरण में दो बकेट नीचे दिखाए गए हैं। `वेयरहाउस' बकेट तब बनाया गया था जब हमने अपाचे आइसबर्ग स्थापित किया था।
मैंने कच्चे डेटा को मैन्युअल रूप से ग्रहण करने के लिए मिनिओ कंसोल का उपयोग किया। एक पेशेवर पाइपलाइन में, आप इसे स्वचालित तरीके से करना चाहेंगे। मिनिओ में डेटा प्राप्त करने के लिए काफ्का और कुबेरनेट्स का उपयोग कैसे करें यह देखने के लिए कुबेरनेट्स में मिनियो में काफ्का और स्ट्रीम डेटा कैसे सेट करें, इसकी जांच करें।
इन फ़ाइलों को डाउनलोड करने की सुविधा के लिए पैक किया गया है - यदि आप रिपोर्ट या ग्राफ़ बनाने के लिए सीधे उनका उपयोग करने का प्रयास करते हैं, तो यह एक बहुत ही आईओ-गहन ऑपरेशन (और संभावित रूप से सीपीयू-गहन) होगा। कल्पना करें कि आप किसी निर्दिष्ट स्टेशन से प्रति वर्ष औसत तापमान का चार्ट बनाना चाहते हैं। ऐसा करने के लिए, आपको प्रत्येक फ़ाइल को खोलना होगा और प्रत्येक पंक्ति में उन प्रविष्टियों को खोजना होगा जो रुचि के दिन आपके स्टेशन से मेल खाती हों। डेटा को क्यूरेट करने और क्यूरेटेड डेटा पर रिपोर्टिंग के लिए हमारी डेटा लेकहाउस क्षमताओं का उपयोग करना एक बेहतर विकल्प है। पहला कदम एक नया ज्यूपिटर नोटबुक स्थापित करना है।
सबसे पहले, ज्यूपिटर नोटबुक सर्वर पर जाएँ जो अपाचे स्पार्क प्रोसेसिंग इंजन में स्थापित है। इसे http://localhost:8888 पर पाया जा सकता है। एक नई नोटबुक बनाएं और पहले सेल में, नीचे दिखाए गए आयात जोड़ें। (इस पोस्ट में बनाई गई सभी पूर्ण नोटबुक यहां पाई जा सकती हैं।)
from collections import namedtuple import csv import json import logging import tarfile from time import time from typing import List from minio import Minio from minio.error import S3Error import pandas as pd import pyarrow as pa import pyarrow.parquet as pq pd.options.mode.chained_assignment = None bucket_name = 'lake'
ध्यान दें कि हम मिनिओ लाइब्रेरी आयात कर रहे हैं। हम जो नोटबुक बना रहे हैं वह अनस्ट्रक्चर्ड स्टोरेज (मिनियो डेटा लेक) से स्ट्रक्चर्ड स्टोरेज (अपाचे आइसबर्ग, जो हुड के नीचे मिनिओ का उपयोग करता है) तक एक ईटीएल पाइपलाइन है। आपकी नोटबुक की शुरुआत इस तरह दिखनी चाहिए।
अब, हम अपने डेटा के लिए एक आइसबर्ग डेटाबेस और तालिका बना सकते हैं।
जीएसओडी डेटासेट के लिए डेटाबेस और तालिका बनाना सीधा है। नीचे दी गई स्क्रिप्ट डेटाबेस बनाएगी जिसे हम `noaa` नाम देंगे। आयात के बाद इसे एक सेल में जोड़ें।
%%sql CREATE DATABASE IF NOT EXISTS noaa;
नीचे दी गई स्क्रिप्ट gsod
तालिका बनाएगी।
%%sql CREATE TABLE IF NOT EXISTS noaa.gsod ( station string, date timestamp, latitude double, longitude double, name string, temp double ) USING iceberg PARTITIONED BY (station)
जैसे ही आप अपाचे आइसबर्ग के साथ खेलते हैं, आप अक्सर एक टेबल गिराना चाहेंगे ताकि आप फिर से एक प्रयोग शुरू कर सकें। यदि आप इसके सेटअप के बारे में कुछ भी बदलना चाहते हैं तो नीचे दी गई स्क्रिप्ट gsod
तालिका को हटा देगी।
%%sql DROP TABLE IF EXISTS noaa.gsod;
अब जबकि हमारे लेकहाउस में कच्ची वर्ष-आधारित ज़िप फ़ाइलें हैं, हम उन्हें निकाल सकते हैं, बदल सकते हैं और अपने डेटा लेकहाउस में लोड कर सकते हैं। आइए पहले कुछ सहायक कार्यों का परिचय दें। नीचे दिया गया फ़ंक्शन एक निर्दिष्ट बकेट में मिनिओ ऑब्जेक्ट की एक सूची लौटाएगा जो एक उपसर्ग से मेल खाता है।
def get_object_list(bucket_name: str, prefix: str) -> List[str]: ''' Gets a list of objects from a bucket. ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_list = [] objects = client.list_objects(bucket_name, prefix=prefix, recursive=True) for obj in objects: object_list.append(obj.object_name) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return object_list
ध्यान दें कि उपरोक्त कोड में, एक MiniIO क्रेडेंशियल फ़ाइल की आवश्यकता है। इसे मिनिओ कंसोल से प्राप्त किया जा सकता है। यदि आप नहीं जानते कि मिनिओ क्रेडेंशियल्स कैसे प्राप्त करें, तो इस पोस्ट का एक अनुभाग है जो दिखाता है कि उन्हें कैसे उत्पन्न और डाउनलोड किया जाए।
इसके बाद, हमें MiniIO से ऑब्जेक्ट प्राप्त करने के लिए एक फ़ंक्शन की आवश्यकता है। चूँकि ऑब्जेक्ट टार फ़ाइलें हैं, इसलिए हमें टार संग्रह से डेटा निकालने और उसे पांडा डेटाफ़्रेम में बदलने के लिए भी इस फ़ंक्शन की आवश्यकता होती है। यह नीचे दिए गए फ़ंक्शन का उपयोग करके किया जाता है।
def tar_to_df(bucket_name: str, object_name: str) -> pd.DataFrame: ''' This function will take a tarfile reference in MinIO and do the following: - unzip the tarfile - turn the data into a single DataFrame object ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Temp file to use for processing the tar files. temp_file_name = 'temp.tar.gz' # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_info = client.fget_object(bucket_name, object_name, temp_file_name) Row = namedtuple('Row', ('station', 'date', 'latitude', 'longitude', 'elevation', 'name', 'temp', 'temp_attributes', 'dewp', 'dewp_attributes', 'slp', 'SLP_attributes', 'stp', 'stp_attributes', 'visib', 'visib_attributes', 'wdsp', 'wdsp_attributes', 'mxspd', 'gust', 'max', 'max_attributes', 'min', 'min_attributes', 'prcp', 'prcp_attributes', 'sndp', 'frshtt')) # Columns of interest and their data types. dtypes={ 'station': 'string', 'date': 'datetime64[ns]', 'latitude': 'float64', 'longitude': 'float64', 'name': 'string', 'temp': 'float64' } tar = tarfile.open(temp_file_name, 'r:gz') all_rows = [] for member in tar.getmembers(): member_handle = tar.extractfile(member) byte_data = member_handle.read() decoded_string = byte_data.decode() lines = decoded_string.splitlines() reader = csv.reader(lines, delimiter=',') # Get all the rows in the member. Skip the header. _ = next(reader) file_rows = [Row(*l) for l in reader] all_rows += file_rows df = pd.DataFrame.from_records(all_rows, columns=Row._fields) df = df[list(dtypes.keys())] for c in df.columns: if dtypes[c] == 'float64': df[c] = pd.to_numeric(df[c], errors='coerce') df = df.astype(dtype=dtypes) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return df
ये दोनों फ़ंक्शन सामान्य उपयोगिताएँ हैं जिनका पुन: उपयोग किया जा सकता है, चाहे आप मिनिओ के साथ कुछ भी कर रहे हों। उन्हें कोड स्निपेट्स के अपने व्यक्तिगत संग्रह या अपने संगठन के Github Gist में डालने पर विचार करें।
अब, हम अपने लेकहाउस के गोदाम वाले हिस्से में डेटा भेजने के लिए तैयार हैं। यह नीचे दिए गए कोड के साथ किया जा सकता है, जो स्पार्क सत्र शुरू करता है, सभी जीएसओडी टार फाइलों के माध्यम से लूप करता है, निकालता है, रूपांतरित करता है और इसे हमारी आइसबर्ग तालिका में भेजता है।
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Jupyter').getOrCreate() objects = get_object_list(bucket_name, 'noaa/gsod') for obj in reversed(objects): print(obj) df = tar_to_df(bucket_name, obj) table = pa.Table.from_pandas(df) pq.write_table(table, 'temp.parquet') df = spark.read.parquet('temp.parquet') df.write.mode('append').saveAsTable('noaa.gsod')
इस अनुभाग में कोड ने मैन्युअल रूप से मिनियो बकेट से डेटा लोड किया है। उत्पादन परिवेश में, आप इस कोड को एक सेवा में तैनात करना चाहेंगे और स्वचालित अंतर्ग्रहण के लिए मिनिओ बकेट इवेंट का उपयोग करना चाहेंगे।
आइए रिपोर्टिंग के लिए एक नई नोटबुक शुरू करें। नीचे दिया गया सेल उन उपयोगिताओं को आयात करता है जिनकी हमें आवश्यकता होगी। विशेष रूप से, हम डेटा पुनर्प्राप्ति के लिए PyIceberg, डेटा गड़बड़ी के लिए पांडा और डेटा को विज़ुअलाइज़ करने के लिए सीबॉर्न का उपयोग करेंगे।
from pyiceberg.catalog import load_catalog from pyiceberg.expressions import GreaterThanOrEqual, EqualTo import pandas as pd import seaborn as sns pd.options.mode.chained_assignment = None catalog = load_catalog('default')
हम जो करना चाहते हैं वह किसी दिए गए मौसम स्टेशन के लिए प्रति वर्ष औसत तापमान की गणना करना है। यह हमें प्रति वर्ष एक संख्या देता है और वर्ष के सभी मौसमों को ध्यान में रखता है। पहला कदम किसी दिए गए स्टेशन के सभी डेटा के लिए आइसबर्ग से पूछताछ करना है। यह नीचे PyIceberg का उपयोग करके किया गया है।
tbl = catalog.load_table('noaa.gsod') sc = tbl.scan(row_filter="station == '72502014734'") df = sc.to_arrow().to_pandas() df.head(10)
उपरोक्त कोड में प्रयुक्त स्टेशन आईडी नेवार्क लिबर्टी इंटरनेशनल एयरपोर्ट, एनजे, यूएस में स्थित एक स्टेशन के लिए है। यह 1973 (लगभग 50 वर्षों का डेटा) से परिचालन में है। जब कोड चलेगा, तो आपको नीचे आउटपुट मिलेगा। (मैं नमूना प्राप्त करने के लिए डेटाफ़्रेम हेड() फ़ंक्शन का उपयोग कर रहा हूं।)
इसके बाद, हमें वर्ष के अनुसार समूह बनाना होगा और माध्य की गणना करनी होगी। पांडा का उपयोग करते हुए, यह कोड की कुछ पंक्तियाँ हैं। किसी लूपिंग की आवश्यकता नहीं है.
df['year'] = df['date'].dt.year df = df[['year','temp']] grouped_by_year = df.groupby('year') average_by_year = grouped_by_year.mean() average_by_year
एक बार यह सेल चलने के बाद, आपको प्रत्येक वर्ष के लिए एक ही मान दिखाई देगा। शीर्ष कुछ वर्ष नीचे दिखाए गए हैं।
अंत में, हम अपने वार्षिक औसत की कल्पना कर सकते हैं। हम एक लाइन प्लॉट बनाने के लिए सीबॉर्न का उपयोग करेंगे। इसमें कोड की केवल एक पंक्ति लगती है।
sns.lineplot(data=df, x="year", y="temp", errorbar=None)
लाइन प्लॉट नीचे दिखाया गया है.
एक अन्य कमांड जो आपको पहली बार रिपोर्ट चलाने के बाद हमेशा चलानी चाहिए वह नीचे है।
[task.file.file_path for task in sc.plan_files()]
यह एक सूची समझ है जो आपको अपाचे आइसबर्ग में उन सभी डेटा फ़ाइलों की एक सूची देगी जिनमें आपकी क्वेरी से मेल खाने वाला डेटा है। बहुत कुछ होगा, भले ही आइसबर्ग का मेटाडेटा कई को फ़िल्टर कर सकता है। इसमें शामिल सभी फाइलों को देखने से यह तथ्य सामने आता है कि हाई-स्पीड ऑब्जेक्ट स्टोरेज लेकहाउस का एक महत्वपूर्ण हिस्सा है।
इस पोस्ट में, हमने मिनिओ और अपाचे आइसबर्ग का उपयोग करके एक डेटा लेकहाउस बनाया। हमने जीएसओडी डेटासेट का उपयोग करके ऐसा किया। सबसे पहले, कच्चा डेटा हमारे डेटा लेकहाउस (मिनियो) के लेक साइड पर अपलोड किया गया था। वहां से, हमने अपाचे आइसबर्ग (हमारे डेटा लेकहाउस का डेटा वेयरहाउस पक्ष) में एक डेटाबेस और एक तालिका बनाई। फिर हमने डेटा को लेक से डेटा लेकहाउस के भीतर वेयरहाउस तक ले जाने के लिए एक सरल ईटीएल पाइपलाइन का निर्माण किया।
एक बार जब हमारे पास अपाचे आइसबर्ग पूरी तरह से डेटा से भर गया, तो हम एक औसत वार्षिक तापमान रिपोर्ट बनाने और उसकी कल्पना करने में सक्षम हो गए।
ध्यान रखें कि यदि आप उत्पादन में डेटा लेकहाउस बनाना चाहते हैं, तो आपको मिनिओ की एंटरप्राइज़ सुविधाओं की आवश्यकता होगी। ऑब्जेक्ट जीवनचक्र प्रबंधन , सुरक्षा सर्वोत्तम अभ्यास , काफ्का स्ट्रीमिंग और बकेट इवेंट को देखने पर विचार करें।
यहाँ भी प्रकाशित किया गया है.