Neste artigo, pretendo compartilhar minha experiência na padronização do processo ETL em frameworks como Spark e Flink, principalmente para aqueles que não usam frameworks de processamento de dados como dbt.
O processamento de dados é um campo abrangente que abrange o processamento regular de volumes substanciais de informações nos modos lote e fluxo. Esses procedimentos, muitas vezes chamados de processos ETL, são desenvolvidos usando diversas linguagens de programação e estruturas.
Normalmente, as principais etapas do processamento de dados são as seguintes:
Leitura de dados: Os dados são coletados de várias fontes para processamento posterior.
Transformação: Os dados passam por diversas transformações e manipulações para atingir a estrutura e o formato necessários.
Escrita: Os dados transformados são então armazenados no armazenamento de destino.
Esquematicamente, é representado conforme mostrado no diagrama:
Após a implementação das etapas principais, diversas outras etapas podem ser adicionadas para aprimorar e refinar o processo:
Verificação da qualidade dos dados: esta etapa envolve a verificação da conformidade dos dados com os requisitos e expectativas, incluindo a limpeza dos dados para garantir a precisão e integridade.
Registro de dados no catálogo de dados: O processo de registro rastreia e documenta os conjuntos de dados disponíveis. Também pode envolver o gerenciamento de dados e metadados para uma organização eficaz dos dados.
Construindo linhagem de dados: permite a visualização de relacionamentos entre ativos de dados e processos ETL para entender as dependências. Pode ajudar a identificar possíveis problemas de qualidade dos dados e melhorar a qualidade geral dos dados.
Classificação dos dados: O processamento adicional pode definir o nível de confidencialidade e criticidade dos dados, garantindo a devida proteção e segurança dos dados.
Cada uma das etapas mencionadas pode ser implementada utilizando diversas tecnologias e ferramentas, permitindo flexibilidade na escolha da solução adequada para cada etapa específica do processamento de dados.
A primeira etapa do processo ETL envolve a leitura de dados da fonte. Vários tipos de fontes podem ser usados nesta fase, como arquivos, bancos de dados, corretores de mensagens e muito mais. Os dados também podem vir em diferentes formatos, como CSV, JSON, Parquet, Delta, Hudi, etc. Aqui está um código padrão para leitura de dados dessas diferentes fontes (implementado como padrão de estratégia):
from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()
Exemplo de uso:
spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()
Essa abordagem oferece vários benefícios:
ReadConfig
, que pode conter todos os parâmetros necessários de leitura de dados.input_file_name()
por padrão para uma fonte de arquivo.
Após a leitura dos dados, eles devem ser transformados. Isso pode incluir várias operações como filtragem, agregação e união. Como o processo de transformação pode ser complexo e demorado, é crucial padronizá-lo para facilitar o entendimento. Pela minha experiência, usar o método transform
da API DataFrame é uma forma eficaz de padronizar esse processo. Este método nos permite encadear múltiplas transformações, melhorando a legibilidade do código. Cada transformação pode ser declarada como uma função pura separada, realizando uma transformação específica, e pode ser testada de forma independente.
import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )
A última etapa do processo ETL é gravar os dados transformados no armazenamento de destino, que pode ser um sistema de arquivos, um banco de dados ou um intermediário de mensagens. A fase de carregamento também pode incorporar etapas extras, como validação de dados, registro de dados e linhagem de dados. Para esta etapa podemos aplicar os mesmos métodos de leitura de dados, mas com estratégias de escrita.
from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)
Exemplo de uso:
path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)
Esta implementação padroniza o processo ETL, tornando-o mais compreensível e fácil de manter. Facilita a adição de novas estratégias de leitura e escrita sem modificar o código existente. Cada estratégia pode possuir parâmetros exclusivos e lógica adicional pode ser incorporada às estratégias sem interromper o código principal. A abordagem também permite testes separados de cada estratégia. Além disso, podemos introduzir etapas adicionais ao processo ETL, como verificações de qualidade de dados, registro de dados no catálogo de dados, construção de linhagem de dados e classificação de dados.