In diesem Artikel möchte ich meine Erfahrungen mit der Standardisierung des ETL-Prozesses innerhalb von Frameworks wie Spark und Flink teilen, insbesondere für diejenigen, die keine Datenverarbeitungs-Frameworks wie dbt verwenden.
Die Datenverarbeitung ist ein umfassender Bereich, der die regelmäßige Verarbeitung erheblicher Informationsmengen sowohl im Batch- als auch im Stream-Modus umfasst. Diese Verfahren, oft auch als ETL-Prozesse bezeichnet, werden mithilfe verschiedener Programmiersprachen und Frameworks entwickelt.
Typischerweise sind die Hauptphasen der Datenverarbeitung wie folgt:
Auslesen von Daten: Es werden Daten aus verschiedenen Quellen zur späteren Verarbeitung gesammelt.
Transformation: Die Daten werden verschiedenen Transformationen und Manipulationen unterzogen, um die erforderliche Struktur und das erforderliche Format zu erhalten.
Schreiben: Die transformierten Daten werden dann im Zielspeicher gespeichert.
Schematisch ist es wie im Diagramm dargestellt dargestellt:
Nachdem die Hauptphasen implementiert wurden, können mehrere weitere Phasen hinzugefügt werden, um den Prozess zu verbessern und zu verfeinern:
Datenqualitätsprüfung: In dieser Phase wird die Übereinstimmung der Daten mit den Anforderungen und Erwartungen überprüft, einschließlich der Datenbereinigung, um Genauigkeit und Vollständigkeit sicherzustellen.
Datenregistrierung im Datenkatalog: Der Registrierungsprozess verfolgt und dokumentiert verfügbare Datensätze. Es kann auch die Verwaltung von Daten und Metadaten für eine effektive Datenorganisation umfassen.
Aufbau einer Datenherkunft: Dies ermöglicht die Visualisierung von Beziehungen zwischen Datenbeständen und ETL-Prozessen, um Abhängigkeiten zu verstehen. Es kann dabei helfen, potenzielle Datenqualitätsprobleme zu erkennen und die Datenqualität insgesamt zu verbessern.
Datenklassifizierung: Durch zusätzliche Verarbeitung kann der Grad der Datenvertraulichkeit und -kritikalität festgelegt werden, wodurch ein angemessener Datenschutz und eine angemessene Datensicherheit gewährleistet werden.
Jede der genannten Phasen kann mit verschiedenen Technologien und Tools umgesetzt werden, was eine flexible Auswahl der richtigen Lösung für jede spezifische Phase der Datenverarbeitung ermöglicht.
Der erste Schritt im ETL-Prozess besteht darin, Daten aus der Quelle zu lesen. In dieser Phase können verschiedene Arten von Quellen verwendet werden, z. B. Dateien, Datenbanken, Nachrichtenbroker und mehr. Die Daten können auch in verschiedenen Formaten vorliegen, z. B. CSV, JSON, Parquet, Delta, Hudi usw. Hier ist ein Mustercode zum Lesen von Daten aus diesen verschiedenen Quellen (implementiert als Strategiemuster):
from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()
Anwendungsbeispiel:
spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()
Dieser Ansatz bietet mehrere Vorteile:
ReadConfig
erstellen, die alle erforderlichen Datenleseparameter enthalten kann.input_file_name()
standardmäßig für eine Dateiquelle aufrufen.
Nach dem Lesen der Daten müssen diese transformiert werden. Dies kann verschiedene Vorgänge wie Filtern, Aggregieren und Zusammenführen umfassen. Da der Transformationsprozess komplex und zeitaufwändig sein kann, ist es wichtig, ihn zu standardisieren, um das Verständnis zu erleichtern. Meiner Erfahrung nach ist die Verwendung der transform
der DataFrame-API eine effektive Möglichkeit, diesen Prozess zu standardisieren. Mit dieser Methode können wir mehrere Transformationen verketten und so die Lesbarkeit des Codes verbessern. Jede Transformation kann als separate reine Funktion deklariert werden, die eine bestimmte Transformation durchführt, und kann unabhängig getestet werden.
import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )
Der letzte Schritt im ETL-Prozess ist das Schreiben der transformierten Daten in den Zielspeicher, bei dem es sich um ein Dateisystem, eine Datenbank oder einen Nachrichtenbroker handeln kann. Die Ladephase kann auch zusätzliche Schritte wie Datenvalidierung, Datenregistrierung und Datenherkunft umfassen. In dieser Phase können wir dieselben Methoden wie beim Lesen von Daten anwenden, jedoch mit Schreibstrategien.
from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)
Anwendungsbeispiel:
path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)
Diese Implementierung standardisiert den ETL-Prozess und macht ihn verständlicher und wartbarer. Es erleichtert das Hinzufügen neuer Lese- und Schreibstrategien, ohne den vorhandenen Code zu ändern. Jede Strategie kann einzigartige Parameter besitzen und zusätzliche Logik kann in Strategien integriert werden, ohne den Hauptcode zu stören. Der Ansatz ermöglicht auch das separate Testen jeder Strategie. Darüber hinaus können wir zusätzliche Phasen in den ETL-Prozess einführen, wie z. B. Datenqualitätsprüfungen, Datenregistrierung im Datenkatalog, Aufbau der Datenherkunft und Datenklassifizierung.