在本文中,我的目标是分享我在 Spark 和 Flink 等框架内标准化 ETL 流程的经验,特别是对于那些不使用 dbt 等数据处理框架的人。
数据处理是一个综合领域,包括以批处理和流模式对大量信息进行常规处理。这些过程通常称为 ETL 过程,是使用各种编程语言和框架开发的。
通常,数据处理的主要阶段如下:
读取数据:从各种来源收集数据以进行后续处理。
转换:数据经过各种转换和操作以获得所需的结构和格式。
写入:将转换后的数据存储到目标存储中。
示意性地表示如下图所示:
实施主要阶段后,可以添加其他几个阶段来增强和细化流程:
数据质量检查:此阶段涉及验证数据是否符合要求和期望,包括数据清理以确保准确性和完整性。
数据目录中的数据注册:注册过程跟踪并记录可用的数据集。它还可能涉及有效数据组织的数据和元数据管理。
构建数据沿袭:这使得数据资产和 ETL 流程之间的关系可视化,以了解依赖关系。它可以帮助识别潜在的数据质量问题并提高整体数据质量。
数据分类:附加处理可以定义数据机密性和关键性的级别,确保适当的数据保护和安全。
上述每个阶段都可以使用各种技术和工具来实现,从而可以灵活地为数据处理的每个特定阶段选择正确的解决方案。
ETL 过程的第一步涉及从源读取数据。在此阶段可以使用各种类型的源,例如文件、数据库、消息代理等等。数据也可能采用不同的格式,例如 CSV、JSON、Parquet、Delta、Hudi 等。以下是一些用于从这些不同来源读取数据的样板代码(作为策略模式实现):
from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()
使用示例:
spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()
这种方法有几个好处:
ReadConfig
,而不是提供路径,它可以保存所有必要的数据读取参数。input_file_name()
来获取文件源。
读取数据后,必须对其进行转换。这可以包括各种操作,例如过滤、聚合和连接。由于转换过程可能复杂且耗时,因此对其进行标准化以便于理解至关重要。根据我的经验,使用 DataFrame API 的transform
方法是标准化此过程的有效方法。这种方法让我们能够链接多个转换,从而增强代码的可读性。每个转换都可以声明为单独的纯函数,执行特定的转换,并且可以独立测试。
import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )
ETL 过程的最后一个阶段是将转换后的数据写入目标存储,目标存储可以是文件系统、数据库或消息代理。加载阶段还可能包含额外的步骤,例如数据验证、数据注册和数据沿袭。对于这个阶段,我们可以应用与读取数据相同的方法,但采用写入策略。
from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)
使用示例:
path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)
此实现标准化了 ETL 流程,使其更易于理解和维护。它有助于添加新的读写策略,而无需修改现有代码。每个策略都可以拥有独特的参数,并且可以在不破坏主代码的情况下将附加逻辑合并到策略中。该方法还可以对每个策略进行单独测试。此外,我们可以在 ETL 过程中引入其他阶段,例如数据质量检查、数据目录中的数据注册、构建数据沿袭和数据分类。