大規模なデータセットで作業する場合 共通のパフォーマンス問題は これは、いくつかのキーが データの配布は、導く パーティションや遅いクエリで、主に必要な操作中に発生します。 まるで もしくは定期 . Apache Spark data skew dominate uneven shuffling joins aggregations スカウトを減らす実践的な方法は、 , これは、複数のパーティションに重いキーを人工的に広げることを意味します. この投稿では、私は実用的な例であなたを導きます。 salting How Salting Solves Data Skew 問題を解決する方法 を追加するA 結合キーに数を生成し、この結合キーを結合すると、大きなキーをより均等に配布できます。これはデータの配布をより均一にし、より多くの従業員に負荷を配布する代わりに、ほとんどのデータを一人の従業員に送信し、他の従業員を無職にします。 randomly 塩の利点 Spreads data evenly across partitions, preventing a few workers overload and improves utilization. Reduced Skew: Speeds up joins and aggregations by balancing the workload. Improved Performance: Reduces the risk of out-of-memory errors caused by large, uneven partitions. Avoids Resource Contention: いつソーセージを使うか 歪んだキーを使用する合併または合併の際には、長いシャフルタイムやデータの歪みによる実行器の失敗に気付いたときにソールを使用します. It is also useful in real-time streaming applications where partitioning affects data processing efficiency, or when most workers are idle while a few are stuck in a running state. Salting Example in Scala(スカラ) いくつかのデータを生成し、A 2 つのデータセットを組み合わせる必要があると仮定できます: 1 つは大きなデータセットで、もう一つは小さなデータセットです。 unbalanced import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ // Simulated large dataset with skew val largeDF = Seq( (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ).toDF("customer_id", "transaction") // Small dataset val smallDF = Seq( (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ).toDF("customer_id", "name") 私たちが使用する大きなデータセットに塩分列を追加します。 大きなキーの値を小さなパーティションに分散する randomization // Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", (rand() * numBuckets).cast("int")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedLargeDF.show() +-----------+-----------+----+------------------+ |customer_id|transaction|salt|salted_customer_id| +-----------+-----------+----+------------------+ | 1| txn1| 1| 1_1| | 1| txn2| 1| 1_1| | 1| txn3| 2| 1_2| | 2| txn4| 2| 2_2| | 3| txn5| 0| 3_0| +-----------+-----------+----+------------------+ 大規模なデータセットのすべての可能なランダム化ソースキーをカバーするために、私たちは すべての可能な塩値を持つ小さなデータセット explode // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedSmallDF.show() +----+-----------+------+------------------+ |salt|customer_id| name|salted_customer_id| +----+-----------+------+------------------+ | 0| 1| Ahmed| 1_0| | 1| 1| Ahmed| 1_1| | 2| 1| Ahmed| 1_2| | 0| 2| Ali| 2_0| | 1| 2| Ali| 2_1| | 2| 2| Ali| 2_2| | 0| 3|Hassan| 3_0| | 1| 3|Hassan| 3_1| | 2| 3|Hassan| 3_2| +----+-----------+------+------------------+ 2つのデータセットを簡単に組み合わせることができます。 // Step 3: Perform salted join val joinedDF = saltedLargeDF. join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner"). select("customer_id", "transaction", "name") joinedDF.show() +-----------+-----------+------+ |customer_id|transaction| name| +-----------+-----------+------+ | 1| txn2| Ahmed| | 1| txn1| Ahmed| | 1| txn3| Ahmed| | 2| txn4| Ali| | 3| txn5|Hassan| +-----------+-----------+------+ Python でのサンプルサンプル from pyspark.sql import SparkSession from pyspark.sql.functions import col, rand, lit, concat from pyspark.sql.types import IntegerType # Simulated large dataset with skew largeDF = spark.createDataFrame([ (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ], ["customer_id", "transaction"]) # Small dataset smallDF = spark.createDataFrame([ (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ], ["customer_id", "name"]) # Step 1: create a salting key in the large dataset numBuckets = 3 saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 2: Explode rows in smallDF for possible salted keys salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt") saltedSmallDF = salt_range.crossJoin(smallDF) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 3: Perform salted join joinedDF = saltedLargeDF.join( saltedSmallDF, on=["salted_customer_id", "customer_id"], how="inner" ).select("customer_id", "transaction", "name") ノート このコードでは、spark.range(...) を使用して Scala の (0 から numBuckets.toDF (「塩」) を模します。 コラム表現は col(...), lit(...), and concat(...) を使用して処理します。 Cast to integer は .cast(IntegerType()) を使用します。 タグ : 選択 ナンバー ナンバー numBuckets = 100 を設定すると、各キーは 100 個のサブパーティションに分割できますが、データセットのスクワウプロフィールに基づいて異なる値をテストしてください。 If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal , e.x. 0 // Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) 小さい(例えば、10-20)を開始し、観察されたシャフルサイズとタスクランタイムに基づいて徐々に増加します。 Rule of Thumb: 最終思考 シンプルでシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシンプルなシ 適切な調節とモニタリングにより、このテクニックは非常に歪んだデータセットでの作業実行時間を大幅に減らすことができます。 SKEWED JOIN 最初に掲載されました https://practical-software.com on May 11, 2025. オリジナル・Published at 2025年5月11日 https://practical-software.com https://practical-software.com https://practical-software.com