paint-brush
Spark 작업을 위한 개발 표준~에 의해@lexaneon
138 판독값

Spark 작업을 위한 개발 표준

~에 의해 Alexey Artemov6m2024/02/12
Read on Terminal Reader

너무 오래; 읽다

데이터 읽기, 변환, 쓰기와 같은 주요 단계를 다루는 Spark 및 Flink 프레임워크 내에서 ETL 프로세스를 표준화하기 위한 실용적인 전략을 알아보세요. 데이터 품질 검사, 등록, 계보 구축, 분류 등의 과제와 솔루션을 탐색하여 효율성과 확장성을 향상하기 위해 데이터 처리 워크플로를 최적화하세요.
featured image - Spark 작업을 위한 개발 표준
Alexey Artemov HackerNoon profile picture
0-item


이 기사에서는 특히 dbt와 같은 데이터 처리 프레임워크를 사용하지 않는 사람들을 위해 Spark 및 Flink와 같은 프레임워크 내에서 ETL 프로세스를 표준화한 경험을 공유하는 것을 목표로 합니다.


문맥

데이터 처리는 배치 모드와 스트림 모드 모두에서 상당한 양의 정보를 정기적으로 처리하는 포괄적인 분야입니다. ETL 프로세스라고도 하는 이러한 절차는 다양한 프로그래밍 언어와 프레임워크를 사용하여 개발됩니다.


일반적으로 데이터 처리의 주요 단계는 다음과 같습니다.


  1. 데이터 읽기: 데이터는 후속 처리를 위해 다양한 소스에서 수집됩니다.

  2. 변환: 데이터는 필요한 구조와 형식을 얻기 위해 다양한 변환과 조작을 거칩니다.

  3. 쓰기: 변환된 데이터는 대상 스토리지에 저장됩니다.


개략적으로 그림과 같이 표현됩니다. 데이터 처리 단계.


도전과제

기본 단계가 구현된 후 프로세스를 강화하고 개선하기 위해 여러 다른 단계를 추가할 수 있습니다.


  • 데이터 품질 확인: 이 단계에는 정확성과 완전성을 보장하기 위한 데이터 정리를 포함하여 데이터가 요구 사항 및 기대 사항에 부합하는지 확인하는 작업이 포함됩니다.

  • 데이터 카탈로그에 데이터 등록: 등록 프로세스는 사용 가능한 데이터 세트를 추적하고 문서화합니다. 또한 효과적인 데이터 구성을 위한 데이터 및 메타데이터 관리도 포함될 수 있습니다.

  • 데이터 계보 구축: 이를 통해 데이터 자산과 ETL 프로세스 간의 관계를 시각화하여 종속성을 이해할 수 있습니다. 이는 잠재적인 데이터 품질 문제를 식별하고 전반적인 데이터 품질을 개선하는 데 도움이 될 수 있습니다.

  • 데이터 분류: 추가 처리를 통해 데이터 기밀성 및 중요도 수준을 정의하여 적절한 데이터 보호 및 보안을 보장할 수 있습니다.


언급된 각 단계는 다양한 기술과 도구를 사용하여 구현될 수 있으므로 데이터 처리의 각 특정 단계에 적합한 솔루션을 유연하게 선택할 수 있습니다.


해결책

데이터 읽기

ETL 프로세스의 첫 번째 단계는 소스에서 데이터를 읽는 것입니다. 이 단계에서는 파일, 데이터베이스, 메시지 브로커 등과 같은 다양한 유형의 소스를 사용할 수 있습니다. 데이터는 CSV, JSON, Parquet, Delta, Hudi 등과 같은 다양한 형식으로 제공될 수도 있습니다. 다음은 이러한 다양한 소스에서 데이터를 읽기 위한 일부 상용구 코드입니다(전략 패턴으로 구현됨).


 from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()


사용 예:

 spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()


이 접근 방식은 다음과 같은 몇 가지 이점을 제공합니다.

  • 읽기 전략(소스 형식)을 변경할 수 있습니다.
  • 기존 코드를 수정하지 않고도 새로운 읽기 전략을 쉽게 추가할 수 있습니다.
  • 각 전략에는 고유한 매개변수가 있을 수 있습니다. 예를 들어 CSV에는 구분 기호가 있고 Delta에는 버전 등이 있을 수 있습니다.
  • 경로를 제공하는 대신 필요한 모든 데이터 읽기 매개변수를 보유할 수 있는 ReadConfig 와 같은 전용 클래스를 만들 수 있습니다.
  • 이를 통해 기본 코드에 영향을 주지 않고 추가 논리를 전략에 통합할 수 있습니다. 예를 들어:
    • 파일 소스에 대해 기본적으로 input_file_name() 호출할 수 있습니다.
    • 데이터 카탈로그가 있으면 거기에 테이블이 있는지 확인하고 없으면 등록할 수 있습니다.
  • 각 전략은 개별적으로 테스트할 수 있습니다.


변환

데이터를 읽은 후에는 변환해야 합니다. 여기에는 필터링, 집계, 조인과 같은 다양한 작업이 포함될 수 있습니다. 변환 프로세스는 복잡하고 시간이 많이 소요될 수 있으므로 이해하기 쉽도록 표준화하는 것이 중요합니다. 내 경험에 따르면 DataFrame API의 transform 메서드를 사용하는 것은 이 프로세스를 표준화하는 효과적인 방법입니다. 이 방법을 사용하면 여러 변환을 연결하여 코드 가독성을 높일 수 있습니다. 각 변환은 별도의 순수 함수로 선언되어 특정 변환을 수행할 수 있으며 독립적으로 테스트할 수 있습니다.

 import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )


글쓰기

ETL 프로세스의 마지막 단계는 변환된 데이터를 파일 시스템, 데이터베이스 또는 메시지 브로커일 수 있는 대상 스토리지에 쓰는 것입니다. 로딩 단계에는 데이터 검증, 데이터 등록, 데이터 계보와 같은 추가 단계가 포함될 수도 있습니다. 이 단계에서는 데이터 읽기와 동일한 방법을 적용할 수 있지만 쓰기 전략을 사용할 수 있습니다.


 from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)

사용 예:

 path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)


결론

이 구현은 ETL 프로세스를 표준화하여 더 이해하기 쉽고 유지 관리하기 쉽게 만듭니다. 기존 코드를 수정하지 않고도 새로운 읽기 및 쓰기 전략을 쉽게 추가할 수 있습니다. 각 전략은 고유한 매개변수를 가질 수 있으며, 기본 코드를 방해하지 않고 추가 논리를 전략에 통합할 수 있습니다. 이 접근 방식을 사용하면 각 전략을 별도로 테스트할 수도 있습니다. 또한 데이터 품질 검사, 데이터 카탈로그에 데이터 등록, 데이터 계보 구축, 데이터 분류 등 ETL 프로세스에 추가 단계를 도입할 수 있습니다.