Dans cet article, mon objectif est de partager mon expérience de normalisation du processus ETL dans des frameworks comme Spark et Flink, en particulier pour ceux qui n'utilisent pas de frameworks de traitement de données comme dbt.
Le traitement des données est un domaine complet qui englobe le traitement régulier de volumes d’informations substantiels en mode batch et stream. Ces procédures, souvent appelées processus ETL, sont développées à l'aide de divers langages et frameworks de programmation.
Généralement, les principales étapes du traitement des données sont les suivantes :
Lecture des données : les données sont collectées à partir de diverses sources pour un traitement ultérieur.
Transformation : Les données subissent diverses transformations et manipulations pour atteindre la structure et le format requis.
Écriture : Les données transformées sont ensuite stockées dans le stockage cible.
Schématiquement, il est représenté comme le montre le diagramme :
Une fois les principales étapes mises en œuvre, plusieurs autres étapes peuvent être ajoutées pour enrichir et affiner le processus :
Contrôle de la qualité des données : cette étape consiste à vérifier la conformité des données aux exigences et aux attentes, y compris le nettoyage des données pour garantir leur exactitude et leur exhaustivité.
Enregistrement des données dans le catalogue de données : le processus d'enregistrement suit et documente les ensembles de données disponibles. Cela peut également impliquer la gestion des données et des métadonnées pour une organisation efficace des données.
Création d'un lignage des données : cela permet de visualiser les relations entre les actifs de données et les processus ETL pour comprendre les dépendances. Cela peut aider à identifier les problèmes potentiels de qualité des données et à améliorer la qualité globale des données.
Classification des données : un traitement supplémentaire peut définir le niveau de confidentialité et de criticité des données, garantissant ainsi une protection et une sécurité appropriées des données.
Chacune des étapes mentionnées peut être mise en œuvre à l'aide de diverses technologies et outils, permettant une flexibilité dans le choix de la bonne solution pour chaque étape spécifique du traitement des données.
La première étape du processus ETL consiste à lire les données de la source. Différents types de sources peuvent être utilisés à ce stade, tels que des fichiers, des bases de données, des courtiers de messages, etc. Les données peuvent également se présenter sous différents formats, tels que CSV, JSON, Parquet, Delta, Hudi, etc. Voici un code passe-partout pour lire les données de ces différentes sources (implémenté en tant que modèle de stratégie) :
from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()
Exemple d'utilisation :
spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()
Cette approche offre plusieurs avantages :
ReadConfig
, qui peut contenir tous les paramètres de lecture de données nécessaires.input_file_name()
par défaut pour un fichier source.
Après avoir lu les données, celles-ci doivent être transformées. Cela peut inclure diverses opérations telles que le filtrage, l'agrégation et la jointure. Le processus de transformation pouvant être complexe et long, il est crucial de le standardiser pour en faciliter la compréhension. D'après mon expérience, l'utilisation de la méthode transform
de l'API DataFrame est un moyen efficace de standardiser ce processus. Cette méthode nous permet d'enchaîner plusieurs transformations, améliorant ainsi la lisibilité du code. Chaque transformation peut être déclarée comme une fonction pure distincte, effectuant une transformation spécifique, et peut être testée indépendamment.
import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )
La dernière étape du processus ETL consiste à écrire les données transformées dans le stockage cible, qui peut être un système de fichiers, une base de données ou un courtier de messages. La phase de chargement peut également intégrer des étapes supplémentaires telles que la validation des données, l'enregistrement des données et le traçage des données. Pour cette étape, nous pouvons appliquer les mêmes méthodes que pour la lecture de données, mais avec des stratégies d'écriture.
from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)
Exemple d'utilisation :
path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)
Cette implémentation standardise le processus ETL, le rendant plus compréhensible et maintenable. Il facilite l'ajout de nouvelles stratégies de lecture et d'écriture sans modifier le code existant. Chaque stratégie peut posséder des paramètres uniques et une logique supplémentaire peut être incorporée aux stratégies sans perturber le code principal. L’approche permet également de tester séparément chaque stratégie. De plus, nous pouvons introduire des étapes supplémentaires dans le processus ETL, telles que les contrôles de qualité des données, l'enregistrement des données dans le catalogue de données, la création du lignage des données et la classification des données.