paint-brush
Spark işleri için Geliştirme Standartlarıile@lexaneon
138 okumalar

Spark işleri için Geliştirme Standartları

ile Alexey Artemov6m2024/02/12
Read on Terminal Reader

Çok uzun; Okumak

Verileri okuma, dönüştürme ve yazma gibi önemli aşamaları kapsayan, Spark ve Flink çerçevelerinde ETL sürecini standartlaştırmaya yönelik pratik stratejileri keşfedin. Veri işleme iş akışlarınızı gelişmiş verimlilik ve ölçeklenebilirlik için optimize etmek amacıyla veri kalitesi kontrolleri, kayıt, köken oluşturma ve sınıflandırma dahil zorlukları ve çözümleri keşfedin.
featured image - Spark işleri için Geliştirme Standartları
Alexey Artemov HackerNoon profile picture
0-item


Bu yazımda özellikle dbt gibi veri işleme çerçevelerini kullanmayanlar için Spark ve Flink gibi çerçeveler içerisinde ETL sürecini standartlaştırma deneyimimi paylaşmayı amaçlıyorum.


Bağlam

Veri işleme, önemli bilgi hacimlerinin hem toplu hem de akış modlarında düzenli olarak işlenmesini kapsayan kapsamlı bir alandır. Genellikle ETL süreçleri olarak adlandırılan bu prosedürler, çeşitli programlama dilleri ve çerçeveleri kullanılarak geliştirilir.


Tipik olarak veri işlemenin ana aşamaları aşağıdaki gibidir:


  1. Verilerin okunması: Veriler daha sonraki işlemler için çeşitli kaynaklardan toplanır.

  2. Dönüşüm: Veriler gerekli yapı ve formata ulaşmak için çeşitli dönüşümlere ve manipülasyonlara tabi tutulur.

  3. Yazma: Dönüştürülen veriler daha sonra hedef depolamada saklanır.


Şematik olarak şemada gösterildiği gibi temsil edilir: Veri işleme aşamaları.


Zorluklar

Ana aşamalar uygulandıktan sonra süreci geliştirmek ve iyileştirmek için birkaç aşama daha eklenebilir:


  • Veri kalitesi kontrolü: Bu aşama, doğruluk ve eksiksizliği sağlamak için veri temizliği de dahil olmak üzere, verilerin gereksinimlere ve beklentilere uygunluğunu doğrulamayı içerir.

  • Veri kataloğuna veri kaydı: Kayıt işlemi mevcut veri kümelerini izler ve belgelendirir. Etkili veri organizasyonu için veri ve meta veri yönetimini de içerebilir.

  • Veri kökeni oluşturma: Bu, bağımlılıkları anlamak için veri varlıkları ile ETL süreçleri arasındaki ilişkilerin görselleştirilmesine olanak tanır. Potansiyel veri kalitesi sorunlarının belirlenmesine ve genel veri kalitesinin iyileştirilmesine yardımcı olabilir.

  • Veri sınıflandırması: Ek işlemler, uygun veri koruma ve güvenliğini sağlayarak, veri gizliliği ve kritiklik düzeyini tanımlayabilir.


Bahsedilen aşamaların her biri, çeşitli teknolojiler ve araçlar kullanılarak uygulanabilir ve bu, veri işlemenin her bir spesifik aşaması için doğru çözümün seçilmesinde esneklik sağlar.


Çözüm

Veri okuma

ETL sürecindeki ilk adım, kaynaktan veri okumayı içerir. Bu aşamada dosyalar, veritabanları, mesaj aracıları ve daha fazlası gibi çeşitli kaynak türleri kullanılabilir. Veriler ayrıca CSV, JSON, Parquet, Delta, Hudi vb. gibi farklı formatlarda da gelebilir. Bu farklı kaynaklardan veri okumak için bazı standart kodlar aşağıda verilmiştir (Strateji modeli olarak uygulanmıştır):


 from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()


Kullanım örneği:

 spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()


Bu yaklaşım çeşitli avantajlar sunar:

  • Okuma stratejisinin (kaynak formatı) değiştirilmesine olanak sağlar.
  • Mevcut kodu değiştirmeden yeni okuma stratejilerinin eklenmesini kolaylaştırır.
  • Her stratejinin kendine özgü parametreleri olabilir. Örneğin, CSV'nin bir sınırlayıcısı olabilirken Delta'nın bir sürümü vb. olabilir.
  • Bir yol sağlamak yerine, tüm gerekli veri okuma parametrelerini tutabilen ReadConfig gibi özel bir sınıf oluşturabiliriz.
  • Ana kodu etkilemeden stratejilere ek mantığın dahil edilmesine olanak tanır. Örneğin:
    • Bir dosya kaynağı için varsayılan olarak input_file_name() işlevini çağırabiliriz.
    • Veri kataloğumuz varsa orada tablonun var olup olmadığını kontrol edebilir, yoksa kaydedebiliriz.
  • Her strateji ayrı ayrı test edilebilir.


dönüşüm

Verileri okuduktan sonra dönüştürülmesi gerekir. Bu, filtreleme, toplama ve birleştirme gibi çeşitli işlemleri içerebilir. Dönüşüm süreci karmaşık ve zaman alıcı olabileceğinden, anlaşılmasını kolaylaştırmak için onu standartlaştırmak çok önemlidir. Deneyimlerime göre DataFrame API'nin transform yöntemini kullanmak bu süreci standartlaştırmanın etkili bir yoludur. Bu yöntem birden fazla dönüşümü zincirlememize olanak tanıyarak kodun okunabilirliğini artırır. Her dönüşüm, belirli bir dönüşümü gerçekleştiren ayrı bir saf fonksiyon olarak bildirilebilir ve bağımsız olarak test edilebilir.

 import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )


yazı

ETL sürecinin son aşaması, dönüştürülen verileri bir dosya sistemi, veritabanı veya mesaj aracısı olabilecek hedef depolamaya yazmaktır. Yükleme aşaması ayrıca veri doğrulama, veri kaydı ve veri kökeni gibi ekstra adımları da içerebilir. Bu aşama için veri okuma yöntemlerinin aynısını ancak yazma stratejileriyle uygulayabiliriz.


 from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)

Kullanım örneği:

 path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)


Çözüm

Bu uygulama ETL sürecini standartlaştırarak daha anlaşılır ve sürdürülebilir hale getiriyor. Mevcut kodu değiştirmeden yeni okuma ve yazma stratejilerinin eklenmesini kolaylaştırır. Her strateji benzersiz parametrelere sahip olabilir ve ana kodu bozmadan stratejilere ek mantık dahil edilebilir. Bu yaklaşım aynı zamanda her stratejinin ayrı ayrı test edilmesine olanak sağlar. Ayrıca, ETL sürecine veri kalitesi kontrolleri, veri kataloğuna veri kaydı, veri kökeni oluşturma ve veri sınıflandırma gibi ek aşamalar da ekleyebiliriz.