এই নিবন্ধে, আমি স্পার্ক এবং ফ্লিঙ্কের মতো ফ্রেমওয়ার্কের মধ্যে ETL প্রক্রিয়াকে মানক করার অভিজ্ঞতা শেয়ার করার লক্ষ্য রাখি, বিশেষ করে যারা dbt-এর মতো ডেটা প্রসেসিং ফ্রেমওয়ার্ক ব্যবহার করেন না তাদের জন্য।
ডেটা প্রসেসিং হল একটি ব্যাপক ক্ষেত্র যা ব্যাচ এবং স্ট্রীম মোড উভয় ক্ষেত্রেই যথেষ্ট তথ্য ভলিউমের নিয়মিত প্রক্রিয়াকরণকে অন্তর্ভুক্ত করে। এই পদ্ধতিগুলি, প্রায়ই ETL প্রক্রিয়া হিসাবে উল্লেখ করা হয়, বিভিন্ন প্রোগ্রামিং ভাষা এবং কাঠামো ব্যবহার করে তৈরি করা হয়।
সাধারণত, ডেটা প্রক্রিয়াকরণের প্রধান ধাপগুলি নিম্নরূপ:
ডেটা পড়া: পরবর্তী প্রক্রিয়াকরণের জন্য বিভিন্ন উত্স থেকে ডেটা সংগ্রহ করা হয়।
রূপান্তর: প্রয়োজনীয় কাঠামো এবং বিন্যাস অর্জনের জন্য ডেটা বিভিন্ন রূপান্তর এবং ম্যানিপুলেশনের মধ্য দিয়ে যায়।
লেখা: রূপান্তরিত ডেটা তারপর টার্গেট স্টোরেজে সংরক্ষণ করা হয়।
পরিকল্পিতভাবে, এটি চিত্রে দেখানো হিসাবে উপস্থাপন করা হয়:
প্রধান পর্যায়গুলি বাস্তবায়িত হওয়ার পরে, প্রক্রিয়াটিকে উন্নত এবং পরিমার্জিত করতে আরও কয়েকটি পর্যায় যোগ করা যেতে পারে:
ডেটা গুণমান পরীক্ষা: এই পর্যায়ে সঠিকতা এবং সম্পূর্ণতা নিশ্চিত করতে ডেটা পরিষ্কার করা সহ প্রয়োজনীয়তা এবং প্রত্যাশাগুলির সাথে ডেটা সম্মতি যাচাই করা জড়িত।
ডেটা ক্যাটালগে ডেটা নিবন্ধন: নিবন্ধন প্রক্রিয়া ট্র্যাক এবং নথি উপলব্ধ ডেটা সেটগুলি। এটি কার্যকর ডেটা সংস্থার জন্য ডেটা এবং মেটাডেটা পরিচালনার সাথে জড়িত থাকতে পারে।
বিল্ডিং ডেটা বংশ: এটি নির্ভরতা বোঝার জন্য ডেটা সম্পদ এবং ETL প্রক্রিয়াগুলির মধ্যে সম্পর্কের ভিজ্যুয়ালাইজেশনের অনুমতি দেয়। এটি সম্ভাব্য ডেটা মানের সমস্যাগুলি সনাক্ত করতে এবং সামগ্রিক ডেটা গুণমান উন্নত করতে সহায়তা করতে পারে।
ডেটা শ্রেণীবিভাগ: অতিরিক্ত প্রক্রিয়াকরণ ডেটা গোপনীয়তা এবং সমালোচনার স্তরকে সংজ্ঞায়িত করতে পারে, সঠিক ডেটা সুরক্ষা এবং সুরক্ষা নিশ্চিত করে।
উল্লিখিত প্রতিটি ধাপ বিভিন্ন প্রযুক্তি এবং সরঞ্জাম ব্যবহার করে বাস্তবায়িত করা যেতে পারে, ডেটা প্রক্রিয়াকরণের প্রতিটি নির্দিষ্ট পর্যায়ের জন্য সঠিক সমাধান বেছে নেওয়ার ক্ষেত্রে নমনীয়তার অনুমতি দেয়।
ETL প্রক্রিয়ার প্রথম ধাপে উৎস থেকে ডেটা পড়া জড়িত। ফাইল, ডাটাবেস, বার্তা ব্রোকার এবং আরও অনেক কিছুর মতো এই পর্যায়ে বিভিন্ন ধরণের উত্স ব্যবহার করা যেতে পারে। ডেটা বিভিন্ন ফর্ম্যাটেও আসতে পারে, যেমন CSV, JSON, Parquet, Delta, Hudi, ইত্যাদি
from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()
ব্যবহারের উদাহরণ:
spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()
এই পদ্ধতিটি বিভিন্ন সুবিধা প্রদান করে:
ReadConfig
, যা সমস্ত প্রয়োজনীয় ডেটা পড়ার পরামিতি ধরে রাখতে পারে।input_file_name()
কল করতে পারি।
ডেটা পড়ার পরে, এটি অবশ্যই রূপান্তরিত হবে। এতে ফিল্টারিং, এগ্রিগেটিং এবং যোগদানের মতো বিভিন্ন ক্রিয়াকলাপ অন্তর্ভুক্ত থাকতে পারে। যেহেতু রূপান্তর প্রক্রিয়াটি জটিল এবং সময়সাপেক্ষ হতে পারে, তাই বোঝার সুবিধার জন্য এটিকে প্রমিত করা অত্যন্ত গুরুত্বপূর্ণ। আমার অভিজ্ঞতা থেকে, DataFrame API এর transform
পদ্ধতি ব্যবহার করা এই প্রক্রিয়াটিকে মানক করার একটি কার্যকর উপায়। এই পদ্ধতিটি আমাদেরকে একাধিক রূপান্তর চেইন করতে দেয়, কোড পঠনযোগ্যতা বাড়ায়। প্রতিটি রূপান্তরকে একটি পৃথক বিশুদ্ধ ফাংশন হিসাবে ঘোষণা করা যেতে পারে, একটি নির্দিষ্ট রূপান্তর সম্পাদন করে এবং স্বাধীনভাবে পরীক্ষা করা যেতে পারে।
import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )
ETL প্রক্রিয়ার শেষ পর্যায় হল রূপান্তরিত ডেটা টার্গেট স্টোরেজে লেখা, যা হতে পারে একটি ফাইল সিস্টেম, একটি ডাটাবেস বা একটি মেসেজ ব্রোকার। লোডিং পর্বে অতিরিক্ত পদক্ষেপগুলি অন্তর্ভুক্ত করতে পারে যেমন ডেটা বৈধতা, ডেটা নিবন্ধকরণ এবং ডেটা বংশ। এই পর্যায়ে, আমরা ডেটা পড়ার মতো একই পদ্ধতি প্রয়োগ করতে পারি, কিন্তু লেখার কৌশলগুলির সাথে।
from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)
ব্যবহারের উদাহরণ:
path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)
এই বাস্তবায়ন ETL প্রক্রিয়াটিকে মানসম্মত করে, এটিকে আরও বোধগম্য এবং বজায় রাখার যোগ্য করে তোলে। এটি বিদ্যমান কোড পরিবর্তন না করেই নতুন পঠন এবং লেখার কৌশল যোগ করার সুবিধা দেয়। প্রতিটি কৌশল অনন্য পরামিতি ধারণ করতে পারে, এবং অতিরিক্ত যুক্তি প্রধান কোড ব্যাহত না করে কৌশলগুলিতে অন্তর্ভুক্ত করা যেতে পারে। পদ্ধতিটি প্রতিটি কৌশলের পৃথক পরীক্ষাও সক্ষম করে। অধিকন্তু, আমরা ETL প্রক্রিয়ায় অতিরিক্ত পর্যায়গুলি প্রবর্তন করতে পারি, যেমন ডেটা গুণমান পরীক্ষা, ডেটা ক্যাটালগে ডেটা নিবন্ধন, ডেটা লাইনেজ তৈরি করা এবং ডেটা শ্রেণীবিভাগ।