paint-brush
স্পার্ক-জবসের জন্য ডেভ স্ট্যান্ডার্ডদ্বারা@lexaneon
138 পড়া

স্পার্ক-জবসের জন্য ডেভ স্ট্যান্ডার্ড

দ্বারা Alexey Artemov6m2024/02/12
Read on Terminal Reader

অতিদীর্ঘ; পড়তে

স্পার্ক এবং ফ্লিঙ্ক ফ্রেমওয়ার্কের মধ্যে ETL প্রক্রিয়াটিকে মানক করার জন্য ব্যবহারিক কৌশলগুলি আবিষ্কার করুন, ডেটা পড়া, রূপান্তরকরণ এবং লেখার মতো মূল ধাপগুলিকে কভার করে৷ উন্নত দক্ষতা এবং মাপযোগ্যতার জন্য আপনার ডেটা প্রসেসিং ওয়ার্কফ্লোকে অপ্টিমাইজ করতে ডেটা গুণমান পরীক্ষা, নিবন্ধন, বংশ বিল্ডিং এবং শ্রেণীবিভাগ সহ চ্যালেঞ্জ এবং সমাধানগুলি অন্বেষণ করুন।
featured image - স্পার্ক-জবসের জন্য ডেভ স্ট্যান্ডার্ড
Alexey Artemov HackerNoon profile picture
0-item


এই নিবন্ধে, আমি স্পার্ক এবং ফ্লিঙ্কের মতো ফ্রেমওয়ার্কের মধ্যে ETL প্রক্রিয়াকে মানক করার অভিজ্ঞতা শেয়ার করার লক্ষ্য রাখি, বিশেষ করে যারা dbt-এর মতো ডেটা প্রসেসিং ফ্রেমওয়ার্ক ব্যবহার করেন না তাদের জন্য।


প্রসঙ্গ

ডেটা প্রসেসিং হল একটি ব্যাপক ক্ষেত্র যা ব্যাচ এবং স্ট্রীম মোড উভয় ক্ষেত্রেই যথেষ্ট তথ্য ভলিউমের নিয়মিত প্রক্রিয়াকরণকে অন্তর্ভুক্ত করে। এই পদ্ধতিগুলি, প্রায়ই ETL প্রক্রিয়া হিসাবে উল্লেখ করা হয়, বিভিন্ন প্রোগ্রামিং ভাষা এবং কাঠামো ব্যবহার করে তৈরি করা হয়।


সাধারণত, ডেটা প্রক্রিয়াকরণের প্রধান ধাপগুলি নিম্নরূপ:


  1. ডেটা পড়া: পরবর্তী প্রক্রিয়াকরণের জন্য বিভিন্ন উত্স থেকে ডেটা সংগ্রহ করা হয়।

  2. রূপান্তর: প্রয়োজনীয় কাঠামো এবং বিন্যাস অর্জনের জন্য ডেটা বিভিন্ন রূপান্তর এবং ম্যানিপুলেশনের মধ্য দিয়ে যায়।

  3. লেখা: রূপান্তরিত ডেটা তারপর টার্গেট স্টোরেজে সংরক্ষণ করা হয়।


পরিকল্পিতভাবে, এটি চিত্রে দেখানো হিসাবে উপস্থাপন করা হয়: ডেটা প্রসেসিং পর্যায়।


চ্যালেঞ্জ

প্রধান পর্যায়গুলি বাস্তবায়িত হওয়ার পরে, প্রক্রিয়াটিকে উন্নত এবং পরিমার্জিত করতে আরও কয়েকটি পর্যায় যোগ করা যেতে পারে:


  • ডেটা গুণমান পরীক্ষা: এই পর্যায়ে সঠিকতা এবং সম্পূর্ণতা নিশ্চিত করতে ডেটা পরিষ্কার করা সহ প্রয়োজনীয়তা এবং প্রত্যাশাগুলির সাথে ডেটা সম্মতি যাচাই করা জড়িত।

  • ডেটা ক্যাটালগে ডেটা নিবন্ধন: নিবন্ধন প্রক্রিয়া ট্র্যাক এবং নথি উপলব্ধ ডেটা সেটগুলি। এটি কার্যকর ডেটা সংস্থার জন্য ডেটা এবং মেটাডেটা পরিচালনার সাথে জড়িত থাকতে পারে।

  • বিল্ডিং ডেটা বংশ: এটি নির্ভরতা বোঝার জন্য ডেটা সম্পদ এবং ETL প্রক্রিয়াগুলির মধ্যে সম্পর্কের ভিজ্যুয়ালাইজেশনের অনুমতি দেয়। এটি সম্ভাব্য ডেটা মানের সমস্যাগুলি সনাক্ত করতে এবং সামগ্রিক ডেটা গুণমান উন্নত করতে সহায়তা করতে পারে।

  • ডেটা শ্রেণীবিভাগ: অতিরিক্ত প্রক্রিয়াকরণ ডেটা গোপনীয়তা এবং সমালোচনার স্তরকে সংজ্ঞায়িত করতে পারে, সঠিক ডেটা সুরক্ষা এবং সুরক্ষা নিশ্চিত করে।


উল্লিখিত প্রতিটি ধাপ বিভিন্ন প্রযুক্তি এবং সরঞ্জাম ব্যবহার করে বাস্তবায়িত করা যেতে পারে, ডেটা প্রক্রিয়াকরণের প্রতিটি নির্দিষ্ট পর্যায়ের জন্য সঠিক সমাধান বেছে নেওয়ার ক্ষেত্রে নমনীয়তার অনুমতি দেয়।


সমাধান

ডেটা পড়া

ETL প্রক্রিয়ার প্রথম ধাপে উৎস থেকে ডেটা পড়া জড়িত। ফাইল, ডাটাবেস, বার্তা ব্রোকার এবং আরও অনেক কিছুর মতো এই পর্যায়ে বিভিন্ন ধরণের উত্স ব্যবহার করা যেতে পারে। ডেটা বিভিন্ন ফর্ম্যাটেও আসতে পারে, যেমন CSV, JSON, Parquet, Delta, Hudi, ইত্যাদি


 from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()


ব্যবহারের উদাহরণ:

 spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()


এই পদ্ধতিটি বিভিন্ন সুবিধা প্রদান করে:

  • এটি পড়ার কৌশল (উৎস বিন্যাস) পরিবর্তন করতে সক্ষম করে।
  • এটি বিদ্যমান কোড পরিবর্তন না করেই নতুন পড়ার কৌশল যোগ করার সুবিধা দেয়।
  • প্রতিটি কৌশল এর অনন্য পরামিতি থাকতে পারে। উদাহরণস্বরূপ, CSV এর একটি বিভাজন থাকতে পারে, যখন ডেল্টার একটি সংস্করণ থাকতে পারে ইত্যাদি।
  • একটি পাথ প্রদানের পরিবর্তে, আমরা একটি ডেডিকেটেড ক্লাস তৈরি করতে পারি, যেমন ReadConfig , যা সমস্ত প্রয়োজনীয় ডেটা পড়ার পরামিতি ধরে রাখতে পারে।
  • এটি মূল কোডকে প্রভাবিত না করেই কৌশলগুলিতে অতিরিক্ত যুক্তি যুক্ত করার অনুমতি দেয়। এই ক্ষেত্রে:
    • আমরা একটি ফাইল উৎসের জন্য ডিফল্টরূপে input_file_name() কল করতে পারি।
    • যদি আমাদের কাছে একটি ডেটা ক্যাটালগ থাকে তবে আমরা টেবিলটি সেখানে বিদ্যমান কিনা তা পরীক্ষা করতে পারি এবং না থাকলে তা নিবন্ধন করতে পারি।
  • প্রতিটি কৌশল পৃথকভাবে পরীক্ষা করা যেতে পারে।


রূপান্তর

ডেটা পড়ার পরে, এটি অবশ্যই রূপান্তরিত হবে। এতে ফিল্টারিং, এগ্রিগেটিং এবং যোগদানের মতো বিভিন্ন ক্রিয়াকলাপ অন্তর্ভুক্ত থাকতে পারে। যেহেতু রূপান্তর প্রক্রিয়াটি জটিল এবং সময়সাপেক্ষ হতে পারে, তাই বোঝার সুবিধার জন্য এটিকে প্রমিত করা অত্যন্ত গুরুত্বপূর্ণ। আমার অভিজ্ঞতা থেকে, DataFrame API এর transform পদ্ধতি ব্যবহার করা এই প্রক্রিয়াটিকে মানক করার একটি কার্যকর উপায়। এই পদ্ধতিটি আমাদেরকে একাধিক রূপান্তর চেইন করতে দেয়, কোড পঠনযোগ্যতা বাড়ায়। প্রতিটি রূপান্তরকে একটি পৃথক বিশুদ্ধ ফাংশন হিসাবে ঘোষণা করা যেতে পারে, একটি নির্দিষ্ট রূপান্তর সম্পাদন করে এবং স্বাধীনভাবে পরীক্ষা করা যেতে পারে।

 import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )


লেখা

ETL প্রক্রিয়ার শেষ পর্যায় হল রূপান্তরিত ডেটা টার্গেট স্টোরেজে লেখা, যা হতে পারে একটি ফাইল সিস্টেম, একটি ডাটাবেস বা একটি মেসেজ ব্রোকার। লোডিং পর্বে অতিরিক্ত পদক্ষেপগুলি অন্তর্ভুক্ত করতে পারে যেমন ডেটা বৈধতা, ডেটা নিবন্ধকরণ এবং ডেটা বংশ। এই পর্যায়ে, আমরা ডেটা পড়ার মতো একই পদ্ধতি প্রয়োগ করতে পারি, কিন্তু লেখার কৌশলগুলির সাথে।


 from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)

ব্যবহারের উদাহরণ:

 path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)


উপসংহার

এই বাস্তবায়ন ETL প্রক্রিয়াটিকে মানসম্মত করে, এটিকে আরও বোধগম্য এবং বজায় রাখার যোগ্য করে তোলে। এটি বিদ্যমান কোড পরিবর্তন না করেই নতুন পঠন এবং লেখার কৌশল যোগ করার সুবিধা দেয়। প্রতিটি কৌশল অনন্য পরামিতি ধারণ করতে পারে, এবং অতিরিক্ত যুক্তি প্রধান কোড ব্যাহত না করে কৌশলগুলিতে অন্তর্ভুক্ত করা যেতে পারে। পদ্ধতিটি প্রতিটি কৌশলের পৃথক পরীক্ষাও সক্ষম করে। অধিকন্তু, আমরা ETL প্রক্রিয়ায় অতিরিক্ত পর্যায়গুলি প্রবর্তন করতে পারি, যেমন ডেটা গুণমান পরীক্ষা, ডেটা ক্যাটালগে ডেটা নিবন্ধন, ডেটা লাইনেজ তৈরি করা এবং ডেটা শ্রেণীবিভাগ।