当您在大数据集中工作时 一个共同的性能问题是 这种情况发生时,一些钥匙 数据分布,导致 分区和缓慢查询. 主要发生在需要 如 甚至是定期 . Apache Spark data skew dominate uneven shuffling joins aggregations 减少偏差的实际方法是 ,这涉及在多个分区中人工散布重密钥. 在本文中,我将用一个实际的例子指导您。 salting Salting 如何解决数据转移问题 通过添加A 通过将生成的数字连接到连接密钥,然后连接到这个组合密钥,我们可以更均匀地分配大密钥,这使得数据的分布更均匀,并将负荷分布在更多的工人身上,而不是将大部分数据发送给一个工人,而让其他人空闲。 randomly 盐的益处 Spreads data evenly across partitions, preventing a few workers overload and improves utilization. Reduced Skew: Speeds up joins and aggregations by balancing the workload. Improved Performance: Reduces the risk of out-of-memory errors caused by large, uneven partitions. Avoids Resource Contention: 什么时候使用盐 在使用扭曲的键进行合并或聚合时,当您注意到由于数据扭曲而出现长时间的缝时间或执行器故障时,使用盐化,这也是实时流媒体应用程序中有用的,其中分区影响数据处理效率,或者当大多数工人处于空闲状态时,而少数人处于运行状态时。 在 Scala 中举例 让我们将一些数据与一个 我们可以假设我们需要连接两个数据集:一个是大数据集,另一个是小数据集。 unbalanced import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ // Simulated large dataset with skew val largeDF = Seq( (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ).toDF("customer_id", "transaction") // Small dataset val smallDF = Seq( (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ).toDF("customer_id", "name") 让我们将盐列添加到我们使用的大数据集中 将大密钥的值扩散到较小的分区 randomization // Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", (rand() * numBuckets).cast("int")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedLargeDF.show() +-----------+-----------+----+------------------+ |customer_id|transaction|salt|salted_customer_id| +-----------+-----------+----+------------------+ | 1| txn1| 1| 1_1| | 1| txn2| 1| 1_1| | 1| txn3| 2| 1_2| | 2| txn4| 2| 2_2| | 3| txn5| 0| 3_0| +-----------+-----------+----+------------------+ 为了确保我们涵盖大数据集中的所有可能的随机盐键,我们需要 包含所有可能的盐值的小数据集 explode // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedSmallDF.show() +----+-----------+------+------------------+ |salt|customer_id| name|salted_customer_id| +----+-----------+------+------------------+ | 0| 1| Ahmed| 1_0| | 1| 1| Ahmed| 1_1| | 2| 1| Ahmed| 1_2| | 0| 2| Ali| 2_0| | 1| 2| Ali| 2_1| | 2| 2| Ali| 2_2| | 0| 3|Hassan| 3_0| | 1| 3|Hassan| 3_1| | 2| 3|Hassan| 3_2| +----+-----------+------+------------------+ 现在我们可以轻松连接这两个数据集。 // Step 3: Perform salted join val joinedDF = saltedLargeDF. join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner"). select("customer_id", "transaction", "name") joinedDF.show() +-----------+-----------+------+ |customer_id|transaction| name| +-----------+-----------+------+ | 1| txn2| Ahmed| | 1| txn1| Ahmed| | 1| txn3| Ahmed| | 2| txn4| Ali| | 3| txn5|Hassan| +-----------+-----------+------+ 使用 Python 的例子 from pyspark.sql import SparkSession from pyspark.sql.functions import col, rand, lit, concat from pyspark.sql.types import IntegerType # Simulated large dataset with skew largeDF = spark.createDataFrame([ (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ], ["customer_id", "transaction"]) # Small dataset smallDF = spark.createDataFrame([ (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ], ["customer_id", "name"]) # Step 1: create a salting key in the large dataset numBuckets = 3 saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 2: Explode rows in smallDF for possible salted keys salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt") saltedSmallDF = salt_range.crossJoin(smallDF) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 3: Perform salted join joinedDF = saltedLargeDF.join( saltedSmallDF, on=["salted_customer_id", "customer_id"], how="inner" ).select("customer_id", "transaction", "name") 笔记 此代码使用 spark.range(...) 来模仿 Scala 的 (0 到 numBuckets).toDF(“盐”). 列表达式是使用col(...), lit(...),和 concat(...)来处理的。 演示到整数使用 .cast(IntegerType())。 调制方法:选择 numBuckets 号码 如果您将 numBuckets 设置为 100,每个密钥可以分为 100 个子分区,但是要小心,因为使用过多的桶可能会降低性能,特别是对于数据少的密钥。 If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal , e.x. 0 // Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) 开始小(例如,10-20),并根据观察到的尺寸和任务运行时间逐步增加。 Rule of Thumb: 最后的想法 有了正确的调节和监控,这种技术可以在高度扭曲的数据集上显著减少任务执行时间。 SKEWED JOIN 最初发表在 https://practical-software.com 于 2025 年 5 月 11 日。 最初发表在 2025年5月11日。 https://practical-software.com https://practical-software.com https://practical-software.com