En este artículo, mi objetivo es compartir mi experiencia en la estandarización del proceso ETL dentro de marcos como Spark y Flink, particularmente para aquellos que no utilizan marcos de procesamiento de datos como dbt.
El procesamiento de datos es un campo integral que abarca el procesamiento regular de volúmenes sustanciales de información tanto en modo por lotes como en modo continuo. Estos procedimientos, a menudo denominados procesos ETL, se desarrollan utilizando una variedad de lenguajes y marcos de programación.
Normalmente, las principales etapas del procesamiento de datos son las siguientes:
Lectura de datos: Los datos se recopilan de diversas fuentes para su posterior procesamiento.
Transformación: los datos sufren diversas transformaciones y manipulaciones para lograr la estructura y el formato requeridos.
Escritura: los datos transformados se almacenan en el almacenamiento de destino.
Esquemáticamente se representa como se muestra en el diagrama:
Una vez implementadas las etapas principales, se pueden agregar varias otras etapas para mejorar y refinar el proceso:
Verificación de la calidad de los datos: esta etapa implica verificar el cumplimiento de los datos con los requisitos y expectativas, incluida la limpieza de los datos para garantizar su precisión e integridad.
Registro de datos en el catálogo de datos: El proceso de registro rastrea y documenta los conjuntos de datos disponibles. También puede implicar la gestión de datos y metadatos para una organización eficaz de los datos.
Creación de linaje de datos: esto permite la visualización de las relaciones entre los activos de datos y los procesos ETL para comprender las dependencias. Puede ayudar a identificar posibles problemas de calidad de los datos y mejorar la calidad general de los datos.
Clasificación de datos: el procesamiento adicional puede definir el nivel de confidencialidad y criticidad de los datos, garantizando la protección y seguridad adecuadas de los datos.
Cada una de las etapas mencionadas se puede implementar utilizando diversas tecnologías y herramientas, lo que permite flexibilidad a la hora de elegir la solución adecuada para cada etapa específica del procesamiento de datos.
El primer paso en el proceso ETL implica leer datos de la fuente. En esta etapa se pueden utilizar varios tipos de fuentes, como archivos, bases de datos, intermediarios de mensajes y más. Los datos también pueden venir en diferentes formatos, como CSV, JSON, Parquet, Delta, Hudi, etc. Aquí hay un código repetitivo para leer datos de estas diferentes fuentes (implementado como patrón de estrategia):
from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()
Ejemplo de uso:
spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()
Este enfoque ofrece varios beneficios:
ReadConfig
, que puede contener todos los parámetros de lectura de datos necesarios.input_file_name()
de forma predeterminada para una fuente de archivo.
Después de leer los datos, deben transformarse. Esto puede incluir varias operaciones como filtrar, agregar y unirse. Dado que el proceso de transformación puede ser complejo y llevar mucho tiempo, es fundamental estandarizarlo para facilitar su comprensión. Según mi experiencia, utilizar el método transform
de la API DataFrame es una forma eficaz de estandarizar este proceso. Este método nos permite encadenar múltiples transformaciones, mejorando la legibilidad del código. Cada transformación se puede declarar como una función pura independiente, que realiza una transformación específica y se puede probar de forma independiente.
import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )
La última etapa del proceso ETL es escribir los datos transformados en el almacenamiento de destino, que podría ser un sistema de archivos, una base de datos o un intermediario de mensajes. La fase de carga también puede incorporar pasos adicionales como validación de datos, registro de datos y linaje de datos. Para esta etapa podemos aplicar los mismos métodos que la lectura de datos, pero con estrategias de escritura.
from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)
Ejemplo de uso:
path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)
Esta implementación estandariza el proceso ETL, haciéndolo más comprensible y mantenible. Facilita la adición de nuevas estrategias de lectura y escritura sin modificar el código existente. Cada estrategia puede poseer parámetros únicos y se puede incorporar lógica adicional a las estrategias sin alterar el código principal. El enfoque también permite realizar pruebas separadas de cada estrategia. Además, podemos introducir etapas adicionales al proceso ETL, como controles de calidad de datos, registro de datos en el catálogo de datos, creación de linaje de datos y clasificación de datos.