この記事では、特に dbt のようなデータ処理フレームワークを使用していないフレームワークを対象に、Spark や Flink などのフレームワーク内で ETL プロセスを標準化した私の経験を共有することを目的としています。
データ処理は、バッチ モードとストリーム モードの両方で大量の情報を定期的に処理する包括的な分野です。これらの手順は、ETL プロセスと呼ばれることが多く、さまざまなプログラミング言語とフレームワークを使用して開発されます。
通常、データ処理の主な段階は次のとおりです。
データの読み取り:データは、後続の処理のためにさまざまなソースから収集されます。
変換:データは、必要な構造と形式を達成するために、さまざまな変換と操作を受けます。
書き込み:変換されたデータはターゲット ストレージに保存されます。
概略的には、次の図のように表されます。
主要なステージが実装された後、プロセスを強化および改良するために他のいくつかのステージを追加できます。
データ品質チェック:この段階では、正確性と完全性を確保するためのデータ クリーニングを含め、データが要件と期待に準拠していることを確認します。
データ カタログへのデータ登録:登録プロセスでは、利用可能なデータ セットを追跡し、文書化します。効果的なデータ編成のためのデータおよびメタデータの管理も含まれる場合があります。
データリネージの構築:これにより、データ資産と ETL プロセスの間の関係を視覚化し、依存関係を理解できるようになります。これは、潜在的なデータ品質の問題を特定し、全体的なデータ品質を向上させるのに役立ちます。
データの分類:追加の処理により、データの機密性と重要性のレベルが定義され、適切なデータ保護とセキュリティが確保される場合があります。
前述の各段階はさまざまなテクノロジーとツールを使用して実装できるため、データ処理の特定の段階ごとに適切なソリューションを柔軟に選択できます。
ETL プロセスの最初のステップには、ソースからのデータの読み取りが含まれます。この段階では、ファイル、データベース、メッセージ ブローカーなど、さまざまな種類のソースを使用できます。データは、CSV、JSON、Parquet、Delta、Hudi などのさまざまな形式で提供される場合もあります。これらのさまざまなソースからデータを読み取るための定型コードをいくつか示します (戦略パターンとして実装)。
from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()
使用例:
spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()
このアプローチにはいくつかの利点があります。
ReadConfig
などの専用クラスを作成できます。input_file_name()
を呼び出すことができます。
データを読み取った後は、データを変換する必要があります。これには、フィルタリング、集計、結合などのさまざまな操作が含まれる場合があります。変換プロセスは複雑で時間がかかる場合があるため、理解しやすくするために標準化することが重要です。私の経験から言えば、このプロセスを標準化するには、DataFrame API のtransform
メソッドを使用するのが効果的です。この方法により、複数の変換を連鎖させることができ、コードの可読性が向上します。各変換は、特定の変換を実行する個別の純粋関数として宣言でき、独立してテストできます。
import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )
ETL プロセスの最後の段階では、変換されたデータをターゲット ストレージ (ファイル システム、データベース、メッセージ ブローカーなど) に書き込みます。読み込みフェーズには、データ検証、データ登録、データリネージなどの追加の手順が組み込まれる場合もあります。この段階では、データの読み取りと同じ方法を、書き込み戦略に適用できます。
from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)
使用例:
path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)
この実装により ETL プロセスが標準化され、より理解しやすく、保守しやすくなります。既存のコードを変更することなく、新しい読み取りおよび書き込み戦略の追加が容易になります。各戦略は固有のパラメーターを持つことができ、メイン コードを中断することなく追加のロジックを戦略に組み込むことができます。このアプローチにより、各戦略を個別にテストすることも可能になります。さらに、データ品質チェック、データカタログへのデータ登録、データ系統の構築、データ分類などの追加の段階を ETL プロセスに導入できます。