इस लेख में, मेरा लक्ष्य स्पार्क और फ्लिंक जैसे ढांचे के भीतर ईटीएल प्रक्रिया को मानकीकृत करने के अपने अनुभव को साझा करना है, खासकर उन लोगों के लिए जो डीबीटी जैसे डेटा प्रोसेसिंग ढांचे का उपयोग नहीं करते हैं।
डेटा प्रोसेसिंग एक व्यापक क्षेत्र है जिसमें बैच और स्ट्रीम दोनों मोड में पर्याप्त सूचना मात्रा की नियमित प्रोसेसिंग शामिल है। इन प्रक्रियाओं को, जिन्हें अक्सर ईटीएल प्रक्रियाओं के रूप में जाना जाता है, विभिन्न प्रोग्रामिंग भाषाओं और रूपरेखाओं का उपयोग करके विकसित की जाती हैं।
आमतौर पर, डेटा प्रोसेसिंग के मुख्य चरण इस प्रकार हैं:
डेटा पढ़ना: बाद की प्रोसेसिंग के लिए विभिन्न स्रोतों से डेटा एकत्र किया जाता है।
परिवर्तन: आवश्यक संरचना और प्रारूप प्राप्त करने के लिए डेटा विभिन्न परिवर्तनों और हेरफेर से गुजरता है।
लेखन: परिवर्तित डेटा को फिर लक्ष्य भंडारण में संग्रहीत किया जाता है।
योजनाबद्ध रूप से, इसे चित्र में दिखाए अनुसार दर्शाया गया है:
मुख्य चरणों के लागू होने के बाद, प्रक्रिया को बढ़ाने और परिष्कृत करने के लिए कई अन्य चरण जोड़े जा सकते हैं:
डेटा गुणवत्ता जांच: इस चरण में सटीकता और पूर्णता सुनिश्चित करने के लिए डेटा सफाई सहित आवश्यकताओं और अपेक्षाओं के साथ डेटा अनुपालन की पुष्टि करना शामिल है।
डेटा कैटलॉग में डेटा पंजीकरण: पंजीकरण प्रक्रिया उपलब्ध डेटा सेट को ट्रैक और दस्तावेज़ित करती है। इसमें प्रभावी डेटा संगठन के लिए डेटा और मेटाडेटा प्रबंधन भी शामिल हो सकता है।
डेटा वंश का निर्माण: यह निर्भरता को समझने के लिए डेटा परिसंपत्तियों और ईटीएल प्रक्रियाओं के बीच संबंधों के दृश्य की अनुमति देता है। यह संभावित डेटा गुणवत्ता समस्याओं की पहचान करने और समग्र डेटा गुणवत्ता में सुधार करने में मदद कर सकता है।
डेटा वर्गीकरण: अतिरिक्त प्रसंस्करण डेटा गोपनीयता और गंभीरता के स्तर को परिभाषित कर सकता है, जिससे उचित डेटा सुरक्षा और सुरक्षा सुनिश्चित हो सके।
उल्लिखित प्रत्येक चरण को विभिन्न प्रौद्योगिकियों और उपकरणों का उपयोग करके कार्यान्वित किया जा सकता है, जिससे डेटा प्रोसेसिंग के प्रत्येक विशिष्ट चरण के लिए सही समाधान चुनने में लचीलापन मिलता है।
ईटीएल प्रक्रिया में पहले चरण में स्रोत से डेटा पढ़ना शामिल है। इस स्तर पर विभिन्न प्रकार के स्रोतों का उपयोग किया जा सकता है, जैसे फ़ाइलें, डेटाबेस, संदेश ब्रोकर और बहुत कुछ। डेटा विभिन्न स्वरूपों में भी आ सकता है, जैसे CSV, JSON, Parquet, Delta, Hudi, आदि। इन विभिन्न स्रोतों से डेटा पढ़ने के लिए यहां कुछ बॉयलरप्लेट कोड दिए गए हैं (रणनीति पैटर्न के रूप में कार्यान्वित):
from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()
उपयोग उदाहरण:
spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()
यह दृष्टिकोण कई लाभ प्रदान करता है:
ReadConfig
जैसा एक समर्पित वर्ग बना सकते हैं, जो सभी आवश्यक डेटा रीडिंग पैरामीटर रख सकता है।input_file_name()
कॉल कर सकते हैं।
डेटा को पढ़ने के बाद उसे बदलना होगा। इसमें फ़िल्टरिंग, एकत्रीकरण और जुड़ाव जैसे विभिन्न ऑपरेशन शामिल हो सकते हैं। चूँकि परिवर्तन प्रक्रिया जटिल और समय लेने वाली हो सकती है, इसलिए समझने में आसानी के लिए इसे मानकीकृत करना महत्वपूर्ण है। मेरे अनुभव से, डेटाफ़्रेम एपीआई की transform
विधि का उपयोग करना इस प्रक्रिया को मानकीकृत करने का एक प्रभावी तरीका है। यह विधि हमें कोड पठनीयता को बढ़ाते हुए कई परिवर्तनों को श्रृंखलाबद्ध करने देती है। प्रत्येक परिवर्तन को एक विशिष्ट परिवर्तन करते हुए एक अलग शुद्ध कार्य के रूप में घोषित किया जा सकता है, और स्वतंत्र रूप से परीक्षण किया जा सकता है।
import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )
ईटीएल प्रक्रिया में अंतिम चरण रूपांतरित डेटा को लक्ष्य भंडारण में लिखना है, जो एक फ़ाइल सिस्टम, एक डेटाबेस या एक संदेश ब्रोकर हो सकता है। लोडिंग चरण में डेटा सत्यापन, डेटा पंजीकरण और डेटा वंशावली जैसे अतिरिक्त चरण भी शामिल हो सकते हैं। इस चरण के लिए, हम डेटा पढ़ने के समान तरीकों को लागू कर सकते हैं, लेकिन लेखन रणनीतियों के साथ।
from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)
उपयोग उदाहरण:
path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)
यह कार्यान्वयन ईटीएल प्रक्रिया को मानकीकृत करता है, जिससे यह अधिक समझने योग्य और रखरखाव योग्य बन जाता है। यह मौजूदा कोड को संशोधित किए बिना नई पढ़ने और लिखने की रणनीतियों को जोड़ने की सुविधा प्रदान करता है। प्रत्येक रणनीति में अद्वितीय पैरामीटर हो सकते हैं, और मुख्य कोड को बाधित किए बिना अतिरिक्त तर्क को रणनीतियों में शामिल किया जा सकता है। यह दृष्टिकोण प्रत्येक रणनीति के अलग-अलग परीक्षण को भी सक्षम बनाता है। इसके अलावा, हम ईटीएल प्रक्रिया में अतिरिक्त चरण पेश कर सकते हैं, जैसे डेटा गुणवत्ता जांच, डेटा कैटलॉग में डेटा पंजीकरण, डेटा वंश का निर्माण और डेटा वर्गीकरण।