В этой статье я хочу поделиться своим опытом стандартизации процесса ETL в таких средах, как Spark и Flink, особенно для тех, кто не использует такие платформы обработки данных, как dbt.
Обработка данных — это комплексная область, включающая регулярную обработку значительных объемов информации как в пакетном, так и в потоковом режиме. Эти процедуры, часто называемые процессами ETL, разрабатываются с использованием различных языков программирования и платформ.
Обычно основные этапы обработки данных следующие:
Чтение данных: Данные собираются из различных источников для последующей обработки.
Преобразование: данные подвергаются различным преобразованиям и манипуляциям для достижения необходимой структуры и формата.
Запись: преобразованные данные затем сохраняются в целевом хранилище.
Схематически это изображается так, как показано на схеме:
После реализации основных этапов можно добавить еще несколько этапов для улучшения и совершенствования процесса:
Проверка качества данных. Этот этап включает проверку соответствия данных требованиям и ожиданиям, включая очистку данных для обеспечения точности и полноты.
Регистрация данных в каталоге данных: процесс регистрации отслеживает и документирует доступные наборы данных. Это также может включать управление данными и метаданными для эффективной организации данных.
Создание происхождения данных. Это позволяет визуализировать связи между активами данных и процессами ETL, чтобы понять зависимости. Это может помочь выявить потенциальные проблемы с качеством данных и улучшить общее качество данных.
Классификация данных: дополнительная обработка может определить уровень конфиденциальности и критичности данных, обеспечивая надлежащую защиту и безопасность данных.
Каждый из упомянутых этапов может быть реализован с использованием различных технологий и инструментов, что позволяет гибко выбирать подходящее решение для каждого конкретного этапа обработки данных.
Первый шаг в процессе ETL включает чтение данных из источника. На этом этапе могут использоваться различные типы источников, такие как файлы, базы данных, брокеры сообщений и т. д. Данные также могут поступать в разных форматах, таких как CSV, JSON, Parquet, Delta, Hudi и т. д. Вот некоторый шаблонный код для чтения данных из этих разных источников (реализованный как шаблон стратегии):
from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()
Пример использования:
spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()
Этот подход дает несколько преимуществ:
ReadConfig
, который может хранить все необходимые параметры чтения данных.input_file_name()
по умолчанию для источника файла.
После прочтения данных их необходимо преобразовать. Это может включать в себя различные операции, такие как фильтрация, агрегирование и объединение. Поскольку процесс преобразования может быть сложным и трудоемким, крайне важно стандартизировать его для простоты понимания. По моему опыту, использование метода transform
DataFrame API — эффективный способ стандартизировать этот процесс. Этот метод позволяет нам связывать несколько преобразований, улучшая читаемость кода. Каждое преобразование может быть объявлено как отдельная чистая функция, выполняющая определенное преобразование, и может быть протестировано независимо.
import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )
Последний этап процесса ETL — запись преобразованных данных в целевое хранилище, которым может быть файловая система, база данных или брокер сообщений. Фаза загрузки также может включать дополнительные этапы, такие как проверка данных, регистрация данных и происхождение данных. На этом этапе мы можем применить те же методы, что и для чтения данных, но со стратегиями записи.
from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)
Пример использования:
path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)
Эта реализация стандартизирует процесс ETL, делая его более понятным и удобным в сопровождении. Это облегчает добавление новых стратегий чтения и записи без изменения существующего кода. Каждая стратегия может обладать уникальными параметрами, а в стратегии можно включать дополнительную логику, не нарушая основной код. Этот подход также позволяет проводить отдельное тестирование каждой стратегии. Более того, мы можем ввести дополнительные этапы в процесс ETL, такие как проверки качества данных, регистрация данных в каталоге данных, построение происхождения данных и классификация данных.