En este artículo, mi objetivo es compartir mi experiencia en la estandarización del proceso ETL dentro de marcos como Spark y Flink, particularmente para aquellos que no utilizan marcos de procesamiento de datos como dbt. Contexto El procesamiento de datos es un campo integral que abarca el procesamiento regular de volúmenes sustanciales de información tanto en modo por lotes como en modo continuo. Estos procedimientos, a menudo denominados procesos ETL, se desarrollan utilizando una variedad de lenguajes y marcos de programación. Normalmente, las principales etapas del procesamiento de datos son las siguientes: Los datos se recopilan de diversas fuentes para su posterior procesamiento. Lectura de datos: los datos sufren diversas transformaciones y manipulaciones para lograr la estructura y el formato requeridos. Transformación: los datos transformados se almacenan en el almacenamiento de destino. Escritura: Esquemáticamente se representa como se muestra en el diagrama: Desafíos Una vez implementadas las etapas principales, se pueden agregar varias otras etapas para mejorar y refinar el proceso: esta etapa implica verificar el cumplimiento de los datos con los requisitos y expectativas, incluida la limpieza de los datos para garantizar su precisión e integridad. Verificación de la calidad de los datos: El proceso de registro rastrea y documenta los conjuntos de datos disponibles. También puede implicar la gestión de datos y metadatos para una organización eficaz de los datos. Registro de datos en el catálogo de datos: esto permite la visualización de las relaciones entre los activos de datos y los procesos ETL para comprender las dependencias. Puede ayudar a identificar posibles problemas de calidad de los datos y mejorar la calidad general de los datos. Creación de linaje de datos: el procesamiento adicional puede definir el nivel de confidencialidad y criticidad de los datos, garantizando la protección y seguridad adecuadas de los datos. Clasificación de datos: Cada una de las etapas mencionadas se puede implementar utilizando diversas tecnologías y herramientas, lo que permite flexibilidad a la hora de elegir la solución adecuada para cada etapa específica del procesamiento de datos. Solución Leyendo datos El primer paso en el proceso ETL implica leer datos de la fuente. En esta etapa se pueden utilizar varios tipos de fuentes, como archivos, bases de datos, intermediarios de mensajes y más. Los datos también pueden venir en diferentes formatos, como CSV, JSON, Parquet, Delta, Hudi, etc. Aquí hay un código repetitivo para leer datos de estas diferentes fuentes (implementado como patrón de estrategia): from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data() Ejemplo de uso: spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read() Este enfoque ofrece varios beneficios: Permite la alteración de la estrategia de lectura (formato fuente). Facilita la adición de nuevas estrategias de lectura sin modificar el código existente. Cada estrategia puede tener sus parámetros únicos. Por ejemplo, CSV puede tener un delimitador, mientras que Delta puede tener una versión, etc. En lugar de proporcionar una ruta, podemos crear una clase dedicada, como , que puede contener todos los parámetros de lectura de datos necesarios. ReadConfig Permite la incorporación de lógica adicional a las estrategias sin afectar el código principal. Por ejemplo: Podemos llamar de forma predeterminada para una fuente de archivo. input_file_name() Si tenemos un catálogo de datos, podemos comprobar si la tabla existe allí y registrarla en caso contrario. Cada estrategia se puede probar individualmente. Transformación Después de leer los datos, deben transformarse. Esto puede incluir varias operaciones como filtrar, agregar y unirse. Dado que el proceso de transformación puede ser complejo y llevar mucho tiempo, es fundamental estandarizarlo para facilitar su comprensión. Según mi experiencia, utilizar el método de la API DataFrame es una forma eficaz de estandarizar este proceso. Este método nos permite encadenar múltiples transformaciones, mejorando la legibilidad del código. Cada transformación se puede declarar como una función pura independiente, que realiza una transformación específica y se puede probar de forma independiente. transform import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) ) Escribiendo La última etapa del proceso ETL es escribir los datos transformados en el almacenamiento de destino, que podría ser un sistema de archivos, una base de datos o un intermediario de mensajes. La fase de carga también puede incorporar pasos adicionales como validación de datos, registro de datos y linaje de datos. Para esta etapa podemos aplicar los mismos métodos que la lectura de datos, pero con estrategias de escritura. from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df) Ejemplo de uso: path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df) Conclusión Esta implementación estandariza el proceso ETL, haciéndolo más comprensible y mantenible. Facilita la adición de nuevas estrategias de lectura y escritura sin modificar el código existente. Cada estrategia puede poseer parámetros únicos y se puede incorporar lógica adicional a las estrategias sin alterar el código principal. El enfoque también permite realizar pruebas separadas de cada estrategia. Además, podemos introducir etapas adicionales al proceso ETL, como controles de calidad de datos, registro de datos en el catálogo de datos, creación de linaje de datos y clasificación de datos.