Khi làm việc với các tập dữ liệu lớn trong Một vấn đề về hiệu suất chung là Điều này xảy ra khi một vài phím phân phối dữ liệu, dẫn đến phân vùng và truy vấn chậm. Nó chủ yếu xảy ra trong các hoạt động đòi hỏi Như hoặc thường xuyên . Apache Spark data skew dominate uneven shuffling joins aggregations Một cách thực tế để giảm lệch là , liên quan đến việc nhân tạo phân tán các phím nặng trên nhiều phân vùng. Trong bài viết này, tôi sẽ hướng dẫn bạn thông qua điều này với một ví dụ thực tế. salting Làm thế nào Salting giải quyết các vấn đề trượt dữ liệu Bằng cách thêm a Điều này làm cho phân phối dữ liệu đồng đều hơn và phân phối tải trên nhiều công nhân hơn, thay vì gửi hầu hết dữ liệu cho một công nhân và để lại những người khác trống rỗng. randomly Lợi ích của Salting Spreads data evenly across partitions, preventing a few workers overload and improves utilization. Reduced Skew: Speeds up joins and aggregations by balancing the workload. Improved Performance: Reduces the risk of out-of-memory errors caused by large, uneven partitions. Avoids Resource Contention: Khi nào nên sử dụng Salting Trong quá trình sáp nhập hoặc tổng hợp với các phím bị lệch, sử dụng salting khi bạn nhận thấy thời gian shuffle dài hoặc lỗi người thực thi do sai lệch dữ liệu. Nó cũng hữu ích trong các ứng dụng phát trực tuyến thời gian thực nơi phân vùng ảnh hưởng đến hiệu quả xử lý dữ liệu, hoặc khi hầu hết người lao động trống rỗng trong khi một số bị mắc kẹt trong trạng thái chạy. Lời bài hát Salting Example in Scala Hãy tạo ra một số dữ liệu với một Chúng ta có thể giả định rằng có hai tập dữ liệu mà chúng ta cần tham gia: một là một tập dữ liệu lớn, và tập dữ liệu nhỏ khác. unbalanced import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ // Simulated large dataset with skew val largeDF = Seq( (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ).toDF("customer_id", "transaction") // Small dataset val smallDF = Seq( (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ).toDF("customer_id", "name") Chúng ta hãy thêm cột muối vào các tập dữ liệu lớn mà chúng ta sử dụng để phân tán các giá trị của khóa lớn thành các phân vùng nhỏ hơn randomization // Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", (rand() * numBuckets).cast("int")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedLargeDF.show() +-----------+-----------+----+------------------+ |customer_id|transaction|salt|salted_customer_id| +-----------+-----------+----+------------------+ | 1| txn1| 1| 1_1| | 1| txn2| 1| 1_1| | 1| txn3| 2| 1_2| | 2| txn4| 2| 2_2| | 3| txn5| 0| 3_0| +-----------+-----------+----+------------------+ Để đảm bảo chúng tôi bao gồm tất cả các phím muối ngẫu nhiên có thể trong các tập dữ liệu lớn, chúng tôi cần tập dữ liệu nhỏ với tất cả các giá trị muối có thể explode // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedSmallDF.show() +----+-----------+------+------------------+ |salt|customer_id| name|salted_customer_id| +----+-----------+------+------------------+ | 0| 1| Ahmed| 1_0| | 1| 1| Ahmed| 1_1| | 2| 1| Ahmed| 1_2| | 0| 2| Ali| 2_0| | 1| 2| Ali| 2_1| | 2| 2| Ali| 2_2| | 0| 3|Hassan| 3_0| | 1| 3|Hassan| 3_1| | 2| 3|Hassan| 3_2| +----+-----------+------+------------------+ Bây giờ chúng ta có thể dễ dàng kết nối hai tập dữ liệu // Step 3: Perform salted join val joinedDF = saltedLargeDF. join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner"). select("customer_id", "transaction", "name") joinedDF.show() +-----------+-----------+------+ |customer_id|transaction| name| +-----------+-----------+------+ | 1| txn2| Ahmed| | 1| txn1| Ahmed| | 1| txn3| Ahmed| | 2| txn4| Ali| | 3| txn5|Hassan| +-----------+-----------+------+ Lấy ví dụ trong Python from pyspark.sql import SparkSession from pyspark.sql.functions import col, rand, lit, concat from pyspark.sql.types import IntegerType # Simulated large dataset with skew largeDF = spark.createDataFrame([ (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ], ["customer_id", "transaction"]) # Small dataset smallDF = spark.createDataFrame([ (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ], ["customer_id", "name"]) # Step 1: create a salting key in the large dataset numBuckets = 3 saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 2: Explode rows in smallDF for possible salted keys salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt") saltedSmallDF = salt_range.crossJoin(smallDF) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 3: Perform salted join joinedDF = saltedLargeDF.join( saltedSmallDF, on=["salted_customer_id", "customer_id"], how="inner" ).select("customer_id", "transaction", "name") ghi chép Mã này sử dụng spark.range(...) để bắt chước Scala (0 cho đến khi numBuckets).toDF ("muối"). Các biểu thức cột được xử lý bằng cách sử dụng col (...), lit (...), và concat (...). Các cast to integer sử dụng .cast(IntegerType()). Lời bài hát: Choose numBuckets Số lượng Nếu bạn đặt numBuckets = 100, mỗi phím có thể được chia thành 100 phân vùng phụ. Tuy nhiên, hãy cẩn thận vì sử dụng quá nhiều buckets có thể làm giảm hiệu suất, đặc biệt là đối với các phím có ít dữ liệu. Luôn kiểm tra các giá trị khác nhau dựa trên hồ sơ lệch của tập dữ liệu của bạn. If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal , e.x. 0 // Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) Bắt đầu nhỏ (ví dụ, 10-20) và tăng dần dựa trên kích thước shuffle quan sát và thời gian chạy nhiệm vụ. Rule of Thumb: Suy nghĩ cuối cùng Tác dụng và cách sử dụng: Tác dụng và cách sử dụng: Tác dụng và cách sử dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Với điều chỉnh và giám sát chính xác, kỹ thuật này có thể làm giảm đáng kể thời gian thực hiện công việc trên các tập dữ liệu bị biến dạng cao. SKEWED JOIN Ban đầu được xuất bản tại https://practical-software.com vào ngày 11 tháng 5 năm 2025. Ban đầu được xuất bản tại Ngày 11 tháng 5 năm 2025. https://practical-software.com https://practical-software.com https://practical-software.com