В этой статье я хочу поделиться своим опытом стандартизации процесса ETL в таких средах, как Spark и Flink, особенно для тех, кто не использует такие платформы обработки данных, как dbt. Контекст Обработка данных — это комплексная область, включающая регулярную обработку значительных объемов информации как в пакетном, так и в потоковом режиме. Эти процедуры, часто называемые процессами ETL, разрабатываются с использованием различных языков программирования и платформ. Обычно основные этапы обработки данных следующие: Данные собираются из различных источников для последующей обработки. Чтение данных: данные подвергаются различным преобразованиям и манипуляциям для достижения необходимой структуры и формата. Преобразование: преобразованные данные затем сохраняются в целевом хранилище. Запись: Схематически это изображается так, как показано на схеме: Проблемы После реализации основных этапов можно добавить еще несколько этапов для улучшения и совершенствования процесса: Этот этап включает проверку соответствия данных требованиям и ожиданиям, включая очистку данных для обеспечения точности и полноты. Проверка качества данных. процесс регистрации отслеживает и документирует доступные наборы данных. Это также может включать управление данными и метаданными для эффективной организации данных. Регистрация данных в каталоге данных: Это позволяет визуализировать связи между активами данных и процессами ETL, чтобы понять зависимости. Это может помочь выявить потенциальные проблемы с качеством данных и улучшить общее качество данных. Создание происхождения данных. дополнительная обработка может определить уровень конфиденциальности и критичности данных, обеспечивая надлежащую защиту и безопасность данных. Классификация данных: Каждый из упомянутых этапов может быть реализован с использованием различных технологий и инструментов, что позволяет гибко выбирать подходящее решение для каждого конкретного этапа обработки данных. Решение Чтение данных Первый шаг в процессе ETL включает чтение данных из источника. На этом этапе могут использоваться различные типы источников, такие как файлы, базы данных, брокеры сообщений и т. д. Данные также могут поступать в разных форматах, таких как CSV, JSON, Parquet, Delta, Hudi и т. д. Вот некоторый шаблонный код для чтения данных из этих разных источников (реализованный как шаблон стратегии): from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data() Пример использования: spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read() Этот подход дает несколько преимуществ: Это позволяет изменить стратегию чтения (формат источника). Это облегчает добавление новых стратегий чтения без изменения существующего кода. Каждая стратегия может иметь свои уникальные параметры. Например, CSV может иметь разделитель, а Delta — версию и т. д. Вместо предоставления пути мы можем создать специальный класс, например , который может хранить все необходимые параметры чтения данных. ReadConfig Это позволяет включать в стратегии дополнительную логику, не затрагивая основной код. Например: Мы можем вызвать по умолчанию для источника файла. input_file_name() Если у нас есть каталог данных, мы можем проверить, существует ли там таблица, и зарегистрировать ее, если нет. Каждую стратегию можно протестировать индивидуально. Трансформация После прочтения данных их необходимо преобразовать. Это может включать в себя различные операции, такие как фильтрация, агрегирование и объединение. Поскольку процесс преобразования может быть сложным и трудоемким, крайне важно стандартизировать его для простоты понимания. По моему опыту, использование метода DataFrame API — эффективный способ стандартизировать этот процесс. Этот метод позволяет нам связывать несколько преобразований, улучшая читаемость кода. Каждое преобразование может быть объявлено как отдельная чистая функция, выполняющая определенное преобразование, и может быть протестировано независимо. transform import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) ) Письмо Последний этап процесса ETL — запись преобразованных данных в целевое хранилище, которым может быть файловая система, база данных или брокер сообщений. Фаза загрузки также может включать дополнительные этапы, такие как проверка данных, регистрация данных и происхождение данных. На этом этапе мы можем применить те же методы, что и для чтения данных, но со стратегиями записи. from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df) Пример использования: path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df) Заключение Эта реализация стандартизирует процесс ETL, делая его более понятным и удобным в сопровождении. Это облегчает добавление новых стратегий чтения и записи без изменения существующего кода. Каждая стратегия может обладать уникальными параметрами, а в стратегии можно включать дополнительную логику, не нарушая основной код. Этот подход также позволяет проводить отдельное тестирование каждой стратегии. Более того, мы можем ввести дополнительные этапы в процесс ETL, такие как проверки качества данных, регистрация данных в каталоге данных, построение происхождения данных и классификация данных.