paint-brush
Стандарты разработки для Spark-заданийк@lexaneon
138 чтения

Стандарты разработки для Spark-заданий

к Alexey Artemov6m2024/02/12
Read on Terminal Reader

Слишком долго; Читать

Откройте для себя практические стратегии стандартизации процесса ETL в рамках Spark и Flink, охватывающие такие ключевые этапы, как чтение, преобразование и запись данных. Изучите проблемы и решения, включая проверки качества данных, регистрацию, построение родословной и классификацию, чтобы оптимизировать рабочие процессы обработки данных для повышения эффективности и масштабируемости.
featured image - Стандарты разработки для Spark-заданий
Alexey Artemov HackerNoon profile picture
0-item


В этой статье я хочу поделиться своим опытом стандартизации процесса ETL в таких средах, как Spark и Flink, особенно для тех, кто не использует такие платформы обработки данных, как dbt.


Контекст

Обработка данных — это комплексная область, включающая регулярную обработку значительных объемов информации как в пакетном, так и в потоковом режиме. Эти процедуры, часто называемые процессами ETL, разрабатываются с использованием различных языков программирования и платформ.


Обычно основные этапы обработки данных следующие:


  1. Чтение данных: Данные собираются из различных источников для последующей обработки.

  2. Преобразование: данные подвергаются различным преобразованиям и манипуляциям для достижения необходимой структуры и формата.

  3. Запись: преобразованные данные затем сохраняются в целевом хранилище.


Схематически это изображается так, как показано на схеме: Этапы обработки данных.


Проблемы

После реализации основных этапов можно добавить еще несколько этапов для улучшения и совершенствования процесса:


  • Проверка качества данных. Этот этап включает проверку соответствия данных требованиям и ожиданиям, включая очистку данных для обеспечения точности и полноты.

  • Регистрация данных в каталоге данных: процесс регистрации отслеживает и документирует доступные наборы данных. Это также может включать управление данными и метаданными для эффективной организации данных.

  • Создание происхождения данных. Это позволяет визуализировать связи между активами данных и процессами ETL, чтобы понять зависимости. Это может помочь выявить потенциальные проблемы с качеством данных и улучшить общее качество данных.

  • Классификация данных: дополнительная обработка может определить уровень конфиденциальности и критичности данных, обеспечивая надлежащую защиту и безопасность данных.


Каждый из упомянутых этапов может быть реализован с использованием различных технологий и инструментов, что позволяет гибко выбирать подходящее решение для каждого конкретного этапа обработки данных.


Решение

Чтение данных

Первый шаг в процессе ETL включает чтение данных из источника. На этом этапе могут использоваться различные типы источников, такие как файлы, базы данных, брокеры сообщений и т. д. Данные также могут поступать в разных форматах, таких как CSV, JSON, Parquet, Delta, Hudi и т. д. Вот некоторый шаблонный код для чтения данных из этих разных источников (реализованный как шаблон стратегии):


 from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()


Пример использования:

 spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()


Этот подход дает несколько преимуществ:

  • Это позволяет изменить стратегию чтения (формат источника).
  • Это облегчает добавление новых стратегий чтения без изменения существующего кода.
  • Каждая стратегия может иметь свои уникальные параметры. Например, CSV может иметь разделитель, а Delta — версию и т. д.
  • Вместо предоставления пути мы можем создать специальный класс, например ReadConfig , который может хранить все необходимые параметры чтения данных.
  • Это позволяет включать в стратегии дополнительную логику, не затрагивая основной код. Например:
    • Мы можем вызвать input_file_name() по умолчанию для источника файла.
    • Если у нас есть каталог данных, мы можем проверить, существует ли там таблица, и зарегистрировать ее, если нет.
  • Каждую стратегию можно протестировать индивидуально.


Трансформация

После прочтения данных их необходимо преобразовать. Это может включать в себя различные операции, такие как фильтрация, агрегирование и объединение. Поскольку процесс преобразования может быть сложным и трудоемким, крайне важно стандартизировать его для простоты понимания. По моему опыту, использование метода transform DataFrame API — эффективный способ стандартизировать этот процесс. Этот метод позволяет нам связывать несколько преобразований, улучшая читаемость кода. Каждое преобразование может быть объявлено как отдельная чистая функция, выполняющая определенное преобразование, и может быть протестировано независимо.

 import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )


Письмо

Последний этап процесса ETL — запись преобразованных данных в целевое хранилище, которым может быть файловая система, база данных или брокер сообщений. Фаза загрузки также может включать дополнительные этапы, такие как проверка данных, регистрация данных и происхождение данных. На этом этапе мы можем применить те же методы, что и для чтения данных, но со стратегиями записи.


 from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)

Пример использования:

 path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)


Заключение

Эта реализация стандартизирует процесс ETL, делая его более понятным и удобным в сопровождении. Это облегчает добавление новых стратегий чтения и записи без изменения существующего кода. Каждая стратегия может обладать уникальными параметрами, а в стратегии можно включать дополнительную логику, не нарушая основной код. Этот подход также позволяет проводить отдельное тестирование каждой стратегии. Более того, мы можем ввести дополнительные этапы в процесс ETL, такие как проверки качества данных, регистрация данных в каталоге данных, построение происхождения данных и классификация данных.