जब आप बड़े डेटा सेट के साथ काम करते हैं एक सामान्य प्रदर्शन समस्या है यह तब होता है जब कुछ कुंजी डेटा वितरण, जिससे यह मुख्य रूप से उन ऑपरेशनों के दौरान होता है जिनकी आवश्यकता होती है जैसे या नियमित रूप से . Apache Spark data skew dominate uneven shuffling joins aggregations स्केव को कम करने का एक व्यावहारिक तरीका है , जिसमें कई विभाजनों पर कृत्रिम रूप से भारी कुंजीों को फैलाना शामिल है. इस पोस्ट में, मैं आपको इस के माध्यम से एक व्यावहारिक उदाहरण के साथ मार्गदर्शन करूंगा। salting डेटा स्केव मुद्दों को कैसे हल करता है जोड़कर A जोड़ कुंजी पर संख्या उत्पन्न और फिर इस संयुक्त कुंजी पर जोड़कर, हम बड़े कुंजी को अधिक समान रूप से वितरित कर सकते हैं. यह डेटा वितरण को अधिक समान बनाता है और अधिक श्रमिकों पर भार फैलाता है, एक श्रमिकों को अधिकांश डेटा भेजने के बजाय और दूसरों को बेकार छोड़ देता है. randomly नमक के लाभ Spreads data evenly across partitions, preventing a few workers overload and improves utilization. Reduced Skew: Speeds up joins and aggregations by balancing the workload. Improved Performance: Reduces the risk of out-of-memory errors caused by large, uneven partitions. Avoids Resource Contention: जब सलाद का उपयोग करें विचलित कुंजी के साथ जोड़ों या जोड़ों के दौरान, जब आप डेटा विचलन के कारण लंबे shuffle समय या निष्पादक विफलताओं को नोटिस करते हैं तो नमन का उपयोग करें. यह वास्तविक समय स्ट्रीमिंग अनुप्रयोगों में भी उपयोगी है जहां विभाजन डेटा प्रसंस्करण दक्षता को प्रभावित करता है, या जब अधिकांश कर्मचारियों बेकार हैं जबकि कुछ चलने की स्थिति में फंस गए हैं. स्केल में उदाहरण आइए हम एक के साथ कुछ डेटा उत्पन्न करते हैं हम मान सकते हैं कि हमें जोड़ने की आवश्यकता है दो डेटासेट हैं: एक एक बड़ा डेटासेट है, और दूसरा एक छोटा डेटासेट है। unbalanced import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ // Simulated large dataset with skew val largeDF = Seq( (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ).toDF("customer_id", "transaction") // Small dataset val smallDF = Seq( (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ).toDF("customer_id", "name") चलो बड़े डेटा सेट में नमकीन कॉलम जोड़ते हैं, जिसे हम उपयोग करते हैं बड़े कुंजी के मूल्यों को छोटे विभाजनों में फैलाने के लिए randomization // Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", (rand() * numBuckets).cast("int")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedLargeDF.show() +-----------+-----------+----+------------------+ |customer_id|transaction|salt|salted_customer_id| +-----------+-----------+----+------------------+ | 1| txn1| 1| 1_1| | 1| txn2| 1| 1_1| | 1| txn3| 2| 1_2| | 2| txn4| 2| 2_2| | 3| txn5| 0| 3_0| +-----------+-----------+----+------------------+ यह सुनिश्चित करने के लिए कि हम बड़े डेटा सेट में सभी संभावित यादृच्छिक नमकीन कुंजी कवर करते हैं, हमें सभी संभावित नमकीन मूल्यों के साथ छोटे डेटासेट explode // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedSmallDF.show() +----+-----------+------+------------------+ |salt|customer_id| name|salted_customer_id| +----+-----------+------+------------------+ | 0| 1| Ahmed| 1_0| | 1| 1| Ahmed| 1_1| | 2| 1| Ahmed| 1_2| | 0| 2| Ali| 2_0| | 1| 2| Ali| 2_1| | 2| 2| Ali| 2_2| | 0| 3|Hassan| 3_0| | 1| 3|Hassan| 3_1| | 2| 3|Hassan| 3_2| +----+-----------+------+------------------+ अब हम दोनों डेटा सेट को आसानी से जोड़ सकते हैं // Step 3: Perform salted join val joinedDF = saltedLargeDF. join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner"). select("customer_id", "transaction", "name") joinedDF.show() +-----------+-----------+------+ |customer_id|transaction| name| +-----------+-----------+------+ | 1| txn2| Ahmed| | 1| txn1| Ahmed| | 1| txn3| Ahmed| | 2| txn4| Ali| | 3| txn5|Hassan| +-----------+-----------+------+ Python में उदाहरण from pyspark.sql import SparkSession from pyspark.sql.functions import col, rand, lit, concat from pyspark.sql.types import IntegerType # Simulated large dataset with skew largeDF = spark.createDataFrame([ (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ], ["customer_id", "transaction"]) # Small dataset smallDF = spark.createDataFrame([ (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ], ["customer_id", "name"]) # Step 1: create a salting key in the large dataset numBuckets = 3 saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 2: Explode rows in smallDF for possible salted keys salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt") saltedSmallDF = salt_range.crossJoin(smallDF) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 3: Perform salted join joinedDF = saltedLargeDF.join( saltedSmallDF, on=["salted_customer_id", "customer_id"], how="inner" ).select("customer_id", "transaction", "name") नोट्स इस कोड में spark.range(...) का उपयोग Scala का अनुकरण करने के लिए किया जाता है (0 से numBuckets).toDF ("साल") तक। कॉलम अभिव्यक्तियों को col(...), lit(...), और concat(...) का उपयोग करके संसाधित किया जाता है। अद्वितीय का उपयोग .cast (IntegerType()) में किया जाता है। ट्यूनिंग टिप: चुनें numBuckets संख्याएं यदि आप numBuckets = 100 सेट करते हैं, तो प्रत्येक कुंजी को 100 उप विभाजनों में विभाजित किया जा सकता है. हालांकि, सावधान रहें क्योंकि बहुत अधिक बक्के का उपयोग करने से प्रदर्शन कम हो सकता है, खासकर छोटे डेटा वाले कुंजी के लिए। If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal , e.x. 0 // Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) छोटे (उदाहरण के लिए, 10-20) से शुरू करें और ध्यान में रखे गए shuffle आकार और कार्य संचालन समय के आधार पर धीरे-धीरे बढ़ें। Rule of Thumb: अंतिम विचार सॉलिंग एक प्रभावी और सरल तरीका है जो पारंपरिक विभाजन या सुझावों पर Apache Spark में स्क्वाइव को प्रबंधित करने के लिए है ( सही ट्यूनिंग और निगरानी के साथ, यह तकनीक अत्यधिक विचलित डेटा सेट पर कार्य निष्पादन समय को काफी कम कर सकती है। SKEWED JOIN मूल रूप से 11 मई, 2025 को https://practical-software.com पर प्रकाशित किया गया था। मूल में प्रकाशित 11 मई 2025 को। https://practical-software.com https://practical-software.com https://practical-software.com