paint-brush
Tiêu chuẩn dành cho nhà phát triển cho công việc Sparktừ tác giả@lexaneon
138 lượt đọc

Tiêu chuẩn dành cho nhà phát triển cho công việc Spark

từ tác giả Alexey Artemov6m2024/02/12
Read on Terminal Reader

dài quá đọc không nổi

Khám phá các chiến lược thực tế để chuẩn hóa quy trình ETL trong khung Spark và Flink, bao gồm các giai đoạn chính như đọc, chuyển đổi và ghi dữ liệu. Khám phá những thách thức và giải pháp, bao gồm kiểm tra chất lượng dữ liệu, đăng ký, xây dựng dòng dõi và phân loại để tối ưu hóa quy trình xử lý dữ liệu của bạn nhằm cải thiện hiệu quả và khả năng mở rộng.
featured image - Tiêu chuẩn dành cho nhà phát triển cho công việc Spark
Alexey Artemov HackerNoon profile picture
0-item


Trong bài viết này, tôi muốn chia sẻ kinh nghiệm tiêu chuẩn hóa quy trình ETL trong các khung như Spark và Flink, đặc biệt đối với những người không sử dụng các khung xử lý dữ liệu như dbt.


Bối cảnh

Xử lý dữ liệu là một lĩnh vực toàn diện bao gồm việc xử lý thường xuyên khối lượng thông tin đáng kể ở cả chế độ hàng loạt và luồng. Các quy trình này, thường được gọi là quy trình ETL, được phát triển bằng nhiều ngôn ngữ và khung lập trình khác nhau.


Thông thường, các giai đoạn chính của xử lý dữ liệu như sau:


  1. Đọc dữ liệu: Dữ liệu được thu thập từ nhiều nguồn khác nhau để xử lý tiếp theo.

  2. Chuyển đổi: Dữ liệu trải qua nhiều biến đổi và thao tác khác nhau để đạt được cấu trúc và định dạng cần thiết.

  3. Viết: Dữ liệu được chuyển đổi sau đó được lưu trữ trong bộ lưu trữ đích.


Về mặt sơ đồ, nó được biểu diễn như trong sơ đồ: Các giai đoạn xử lý dữ liệu


Thử thách

Sau khi các giai đoạn chính được triển khai, một số giai đoạn khác có thể được thêm vào để nâng cao và hoàn thiện quy trình:


  • Kiểm tra chất lượng dữ liệu: Giai đoạn này bao gồm việc xác minh việc tuân thủ các yêu cầu và mong đợi của dữ liệu, bao gồm cả việc làm sạch dữ liệu để đảm bảo tính chính xác và đầy đủ.

  • Đăng ký dữ liệu trong danh mục dữ liệu: Quá trình đăng ký theo dõi và ghi lại các bộ dữ liệu có sẵn. Nó cũng có thể liên quan đến việc quản lý dữ liệu và siêu dữ liệu để tổ chức dữ liệu hiệu quả.

  • Xây dựng dòng dữ liệu: Điều này cho phép trực quan hóa mối quan hệ giữa nội dung dữ liệu và quy trình ETL để hiểu các mối quan hệ phụ thuộc. Nó có thể giúp xác định các vấn đề tiềm ẩn về chất lượng dữ liệu và cải thiện chất lượng dữ liệu tổng thể.

  • Phân loại dữ liệu: Quá trình xử lý bổ sung có thể xác định mức độ bảo mật và mức độ quan trọng của dữ liệu, đảm bảo bảo vệ và bảo mật dữ liệu thích hợp.


Mỗi giai đoạn được đề cập có thể được thực hiện bằng nhiều công nghệ và công cụ khác nhau, cho phép linh hoạt trong việc lựa chọn giải pháp phù hợp cho từng giai đoạn xử lý dữ liệu cụ thể.


Giải pháp

Đọc dữ liệu

Bước đầu tiên trong quy trình ETL liên quan đến việc đọc dữ liệu từ nguồn. Nhiều loại nguồn khác nhau có thể được sử dụng ở giai đoạn này, chẳng hạn như tệp, cơ sở dữ liệu, nhà môi giới tin nhắn, v.v. Dữ liệu cũng có thể có các định dạng khác nhau, chẳng hạn như CSV, JSON, Parquet, Delta, Hudi, v.v. Dưới đây là một số mã soạn sẵn để đọc dữ liệu từ các nguồn khác nhau này (được triển khai dưới dạng mẫu Chiến lược):


 from abc import ABC, abstractmethod from pyspark.sql import SparkSession, DataFrame # Define the strategy interface class ReadStrategy(ABC): @abstractmethod def read_data(self) -> DataFrame: pass # File System Read Strategy - represents a file system reading class FileSystemReadStrategy(ReadStrategy): def __init__(self, spark: SparkSession, path: str): self.spark = spark self.path = path @abstractmethod def read_data(self): pass # Implement the strategy for CSV class CSVReadStrategy(FileSystemReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("csv").option("header", "true").load(self.path) # Implement the strategy for Delta class DeltaReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("delta").load(self.path) # Implement the strategy for Parquet class ParquetReadStrategy(ReadStrategy): def read_data(self) -> DataFrame: return self.spark.read.format("parquet").load(self.path) # Context class class DataReader: def __init__(self, read_strategy: ReadStrategy): self.read_strategy = read_strategy def read(self) -> DataFrame: return self.read_strategy.read_data()


Ví dụ sử dụng:

 spark = SparkSession.builder.appName("our-ETL-job").getOrCreate() path = "path-to-our-data" src_df = DataReader(DeltaReadStrategy(spark, path)).read()


Cách tiếp cận này mang lại một số lợi ích:

  • Nó cho phép thay đổi chiến lược đọc (định dạng nguồn).
  • Nó tạo điều kiện cho việc bổ sung các chiến lược đọc mới mà không cần sửa đổi mã hiện có.
  • Mỗi chiến lược có thể có các thông số riêng. Ví dụ: CSV có thể có dấu phân cách, trong khi Delta có thể có phiên bản, v.v.
  • Thay vì cung cấp đường dẫn, chúng ta có thể tạo một lớp chuyên dụng, như ReadConfig , có thể chứa tất cả các tham số đọc dữ liệu cần thiết.
  • Nó cho phép kết hợp logic bổ sung vào các chiến lược mà không ảnh hưởng đến mã chính. Ví dụ:
    • Theo mặc định, chúng ta có thể gọi input_file_name() cho nguồn tệp.
    • Nếu chúng ta có một danh mục dữ liệu, chúng ta có thể kiểm tra xem bảng có tồn tại ở đó hay không và đăng ký nó nếu không.
  • Mỗi chiến lược có thể được thử nghiệm riêng lẻ.


Chuyển đổi

Sau khi đọc dữ liệu, nó phải được chuyển đổi. Điều này có thể bao gồm các hoạt động khác nhau như lọc, tổng hợp và nối. Vì quá trình chuyển đổi có thể phức tạp và tốn thời gian nên điều quan trọng là phải chuẩn hóa nó để dễ hiểu. Theo kinh nghiệm của tôi, sử dụng phương thức transform của API DataFrame là một cách hiệu quả để chuẩn hóa quy trình này. Phương pháp này cho phép chúng ta xâu chuỗi nhiều phép biến đổi, nâng cao khả năng đọc mã. Mỗi phép biến đổi có thể được khai báo dưới dạng một hàm thuần túy riêng biệt, thực hiện một phép biến đổi cụ thể và có thể được kiểm tra độc lập.

 import pyspark.sql.functions as F from pyspark.sql import DataFrame desired_columns = ["name", "age", "country"] def with_country_code(df: DataFrame) -> DataFrame: return df.withColumn("country_code", F.when(F.col("country") == "USA", "US").otherwise("Other")) def with_age_category(df: DataFrame) -> DataFrame: return df.withColumn("age_category", F.when(F.col("age") < 18, "child").when((F.col("age") >= 18) & (F.col("age") < 65), "adult").otherwise("senior")) def filter_adult(df: DataFrame) -> DataFrame: return df.filter(F.col("age_category") == "adult") def select_columns(columns: list) -> callable[[DataFrame], DataFrame]: def inner(df: DataFrame) -> DataFrame: return df.select(columns) return inner transformed_df = (src_df .transform(with_country_code) .transform(with_age_category) .transform(filter_adult) .transform(select_columns(desired_columns)) )


Viết

Giai đoạn cuối cùng trong quy trình ETL là ghi dữ liệu đã chuyển đổi vào bộ lưu trữ đích, có thể là hệ thống tệp, cơ sở dữ liệu hoặc trình trung chuyển tin nhắn. Giai đoạn tải cũng có thể kết hợp các bước bổ sung như xác thực dữ liệu, đăng ký dữ liệu và truyền dữ liệu. Đối với giai đoạn này, chúng ta có thể áp dụng các phương pháp tương tự như đọc dữ liệu, nhưng với chiến lược viết.


 from abc import ABC, abstractmethod from pyspark.sql import DataFrame, SparkSession class WriteStrategy(ABC): @abstractmethod def write_data(self, df: DataFrame): pass class FileSystemWriteStrategy(WriteStrategy): def __init__(self, spark: SparkSession, path: str, format: str, mode: str = "overwrite"): self.spark = spark self.path = path self.format = format self.mode = mode def write_data(self, df: DataFrame): df.write.format(self.format).mode(self.mode).save(self.path) class DataWriter: def __init__(self, write_strategy: WriteStrategy): self.write_strategy = write_strategy def write(self, df: DataFrame): self.write_strategy.write_data(df)

Ví dụ sử dụng:

 path = "path-to-our-data" writer = DataWriter(FileSystemWriteStrategy(spark, path, "delta")) writer.write(transformed_df)


Phần kết luận

Việc triển khai này tiêu chuẩn hóa quy trình ETL, làm cho quy trình này dễ hiểu và dễ bảo trì hơn. Nó tạo điều kiện bổ sung các chiến lược đọc và viết mới mà không cần sửa đổi mã hiện có. Mỗi chiến lược có thể sở hữu các tham số duy nhất và logic bổ sung có thể được tích hợp vào các chiến lược mà không làm gián đoạn mã chính. Cách tiếp cận này cũng cho phép thử nghiệm riêng biệt từng chiến lược. Hơn nữa, chúng tôi có thể giới thiệu các giai đoạn bổ sung cho quy trình ETL, chẳng hạn như kiểm tra chất lượng dữ liệu, đăng ký dữ liệu trong danh mục dữ liệu, xây dựng dòng dữ liệu và phân loại dữ liệu.