paint-brush
Normes de développement pour les tâches Sparkpar@lexaneon
138 lectures

Normes de développement pour les tâches Spark

par Alexey Artemov6m2024/02/12
Read on Terminal Reader

Trop long; Pour lire

Découvrez des stratégies pratiques pour standardiser le processus ETL dans les frameworks Spark et Flink, couvrant des étapes clés telles que la lecture, la transformation et l'écriture de données. Explorez les défis et les solutions, notamment les contrôles de qualité des données, l'enregistrement, la création de lignées et la classification, pour optimiser vos flux de traitement de données afin d'améliorer l'efficacité et l'évolutivité.
featured image - Normes de développement pour les tâches Spark
Alexey Artemov HackerNoon profile picture
0-item


Dans cet article, mon objectif est de partager mon expérience de normalisation du processus ETL dans des frameworks comme Spark et Flink, en particulier pour ceux qui n'utilisent pas de frameworks de traitement de données comme dbt.


Contexte

Le traitement des données est un domaine complet qui englobe le traitement régulier de volumes d’informations substantiels en mode batch et stream. Ces procédures, souvent appelées processus ETL, sont développées à l'aide de divers langages et frameworks de programmation.


Généralement, les principales étapes du traitement des données sont les suivantes :


  1. Lecture des données : les données sont collectées à partir de diverses sources pour un traitement ultérieur.

  2. Transformation : Les données subissent diverses transformations et manipulations pour atteindre la structure et le format requis.

  3. Écriture : Les données transformées sont ensuite stockées dans le stockage cible.


Schématiquement, il est représenté comme le montre le diagramme : Étapes de traitement des données.


Défis

Une fois les principales étapes mises en œuvre, plusieurs autres étapes peuvent être ajoutées pour enrichir et affiner le processus :


  • Contrôle de la qualité des données : cette étape consiste à vérifier la conformité des données aux exigences et aux attentes, y compris le nettoyage des données pour garantir leur exactitude et leur exhaustivité.

  • Enregistrement des données dans le catalogue de données : le processus d'enregistrement suit et documente les ensembles de données disponibles. Cela peut également impliquer la gestion des données et des métadonnées pour une organisation efficace des données.

  • Création d'un lignage des données : cela permet de visualiser les relations entre les actifs de données et les processus ETL pour comprendre les dépendances. Cela peut aider à identifier les problèmes potentiels de qualité des données et à améliorer la qualité globale des données.

  • Classification des données : un traitement supplémentaire peut définir le niveau de confidentialité et de criticité des données, garantissant ainsi une protection et une sécurité appropriées des données.


Chacune des étapes mentionnées peut être mise en œuvre à l'aide de diverses technologies et outils, permettant une flexibilité dans le choix de la bonne solution pour chaque étape spécifique du traitement des données.


Solution

Lecture de données

La première étape du processus ETL consiste à lire les données de la source. Différents types de sources peuvent être utilisés à ce stade, tels que des fichiers, des bases de données, des courtiers de messages, etc. Les données peuvent également se présenter sous différents formats, tels que CSV, JSON, Parquet, Delta, Hudi, etc. Voici un code passe-partout pour lire les données de ces différentes sources (implémenté en tant que modèle de stratégie) :


 from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()


Exemple d'utilisation :

 spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()


Cette approche offre plusieurs avantages :

  • Il permet de modifier la stratégie de lecture (format source).
  • Il facilite l'ajout de nouvelles stratégies de lecture sans modifier le code existant.
  • Chaque stratégie peut avoir ses paramètres uniques. Par exemple, CSV peut avoir un délimiteur, tandis que Delta peut avoir une version, etc.
  • Au lieu de fournir un chemin, nous pouvons créer une classe dédiée, comme ReadConfig , qui peut contenir tous les paramètres de lecture de données nécessaires.
  • Il permet l'incorporation de logique supplémentaire dans les stratégies sans affecter le code principal. Par exemple:
    • Nous pouvons appeler input_file_name() par défaut pour un fichier source.
    • Si nous disposons d'un catalogue de données, nous pouvons vérifier si la table y existe et l'enregistrer sinon.
  • Chaque stratégie peut être testée individuellement.


Transformation

Après avoir lu les données, celles-ci doivent être transformées. Cela peut inclure diverses opérations telles que le filtrage, l'agrégation et la jointure. Le processus de transformation pouvant être complexe et long, il est crucial de le standardiser pour en faciliter la compréhension. D'après mon expérience, l'utilisation de la méthode transform de l'API DataFrame est un moyen efficace de standardiser ce processus. Cette méthode nous permet d'enchaîner plusieurs transformations, améliorant ainsi la lisibilité du code. Chaque transformation peut être déclarée comme une fonction pure distincte, effectuant une transformation spécifique, et peut être testée indépendamment.

 import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )


En écrivant

La dernière étape du processus ETL consiste à écrire les données transformées dans le stockage cible, qui peut être un système de fichiers, une base de données ou un courtier de messages. La phase de chargement peut également intégrer des étapes supplémentaires telles que la validation des données, l'enregistrement des données et le traçage des données. Pour cette étape, nous pouvons appliquer les mêmes méthodes que pour la lecture de données, mais avec des stratégies d'écriture.


 from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)

Exemple d'utilisation :

 path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)


Conclusion

Cette implémentation standardise le processus ETL, le rendant plus compréhensible et maintenable. Il facilite l'ajout de nouvelles stratégies de lecture et d'écriture sans modifier le code existant. Chaque stratégie peut posséder des paramètres uniques et une logique supplémentaire peut être incorporée aux stratégies sans perturber le code principal. L’approche permet également de tester séparément chaque stratégie. De plus, nous pouvons introduire des étapes supplémentaires dans le processus ETL, telles que les contrôles de qualité des données, l'enregistrement des données dans le catalogue de données, la création du lignage des données et la classification des données.