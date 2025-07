जब आप बड़े डेटा सेट के साथ काम करते हैंApache Sparkएक सामान्य प्रदर्शन समस्या हैdata skewयह तब होता है जब कुछ कुंजीdominateडेटा वितरण, जिससेunevenयह मुख्य रूप से उन ऑपरेशनों के दौरान होता है जिनकी आवश्यकता होती हैshufflingजैसेjoinsया नियमित रूप सेaggregations.





स्केव को कम करने का एक व्यावहारिक तरीका हैsalting, जिसमें कई विभाजनों पर कृत्रिम रूप से भारी कुंजीों को फैलाना शामिल है. इस पोस्ट में, मैं आपको इस के माध्यम से एक व्यावहारिक उदाहरण के साथ मार्गदर्शन करूंगा।





डेटा स्केव मुद्दों को कैसे हल करता है

जोड़कर Arandomlyजोड़ कुंजी पर संख्या उत्पन्न और फिर इस संयुक्त कुंजी पर जोड़कर, हम बड़े कुंजी को अधिक समान रूप से वितरित कर सकते हैं. यह डेटा वितरण को अधिक समान बनाता है और अधिक श्रमिकों पर भार फैलाता है, एक श्रमिकों को अधिकांश डेटा भेजने के बजाय और दूसरों को बेकार छोड़ देता है.

नमक के लाभ

Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.

Improved Performance: Speeds up joins and aggregations by balancing the workload.

Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.



जब सलाद का उपयोग करें

विचलित कुंजी के साथ जोड़ों या जोड़ों के दौरान, जब आप डेटा विचलन के कारण लंबे shuffle समय या निष्पादक विफलताओं को नोटिस करते हैं तो नमन का उपयोग करें. यह वास्तविक समय स्ट्रीमिंग अनुप्रयोगों में भी उपयोगी है जहां विभाजन डेटा प्रसंस्करण दक्षता को प्रभावित करता है, या जब अधिकांश कर्मचारियों बेकार हैं जबकि कुछ चलने की स्थिति में फंस गए हैं.





स्केल में उदाहरण

आइए हम एक के साथ कुछ डेटा उत्पन्न करते हैंunbalancedहम मान सकते हैं कि हमें जोड़ने की आवश्यकता है दो डेटासेट हैं: एक एक बड़ा डेटासेट है, और दूसरा एक छोटा डेटासेट है।

import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ // Simulated large dataset with skew val largeDF = Seq( (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ).toDF("customer_id", "transaction") // Small dataset val smallDF = Seq( (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ).toDF("customer_id", "name")





चलो बड़े डेटा सेट में नमकीन कॉलम जोड़ते हैं, जिसे हम उपयोग करते हैंrandomizationबड़े कुंजी के मूल्यों को छोटे विभाजनों में फैलाने के लिए

// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", (rand() * numBuckets).cast("int")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedLargeDF.show() +-----------+-----------+----+------------------+ |customer_id|transaction|salt|salted_customer_id| +-----------+-----------+----+------------------+ | 1| txn1| 1| 1_1| | 1| txn2| 1| 1_1| | 1| txn3| 2| 1_2| | 2| txn4| 2| 2_2| | 3| txn5| 0| 3_0| +-----------+-----------+----+------------------+





यह सुनिश्चित करने के लिए कि हम बड़े डेटा सेट में सभी संभावित यादृच्छिक नमकीन कुंजी कवर करते हैं, हमेंexplodeसभी संभावित नमकीन मूल्यों के साथ छोटे डेटासेट

// Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedSmallDF.show() +----+-----------+------+------------------+ |salt|customer_id| name|salted_customer_id| +----+-----------+------+------------------+ | 0| 1| Ahmed| 1_0| | 1| 1| Ahmed| 1_1| | 2| 1| Ahmed| 1_2| | 0| 2| Ali| 2_0| | 1| 2| Ali| 2_1| | 2| 2| Ali| 2_2| | 0| 3|Hassan| 3_0| | 1| 3|Hassan| 3_1| | 2| 3|Hassan| 3_2| +----+-----------+------+------------------+





अब हम दोनों डेटा सेट को आसानी से जोड़ सकते हैं

// Step 3: Perform salted join val joinedDF = saltedLargeDF. join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner"). select("customer_id", "transaction", "name") joinedDF.show() +-----------+-----------+------+ |customer_id|transaction| name| +-----------+-----------+------+ | 1| txn2| Ahmed| | 1| txn1| Ahmed| | 1| txn3| Ahmed| | 2| txn4| Ali| | 3| txn5|Hassan| +-----------+-----------+------+





Python में उदाहरण

from pyspark.sql import SparkSession from pyspark.sql.functions import col, rand, lit, concat from pyspark.sql.types import IntegerType # Simulated large dataset with skew largeDF = spark.createDataFrame([ (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ], ["customer_id", "transaction"]) # Small dataset smallDF = spark.createDataFrame([ (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ], ["customer_id", "name"]) # Step 1: create a salting key in the large dataset numBuckets = 3 saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 2: Explode rows in smallDF for possible salted keys salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt") saltedSmallDF = salt_range.crossJoin(smallDF) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 3: Perform salted join joinedDF = saltedLargeDF.join( saltedSmallDF, on=["salted_customer_id", "customer_id"], how="inner" ).select("customer_id", "transaction", "name")

नोट्स

इस कोड में spark.range(...) का उपयोग Scala का अनुकरण करने के लिए किया जाता है (0 से numBuckets).toDF ("साल") तक।

कॉलम अभिव्यक्तियों को col(...), lit(...), और concat(...) का उपयोग करके संसाधित किया जाता है।

अद्वितीय का उपयोग .cast (IntegerType()) में किया जाता है।





ट्यूनिंग टिप: चुनें numBuckets

संख्याएं

यदि आप numBuckets = 100 सेट करते हैं, तो प्रत्येक कुंजी को 100 उप विभाजनों में विभाजित किया जा सकता है. हालांकि, सावधान रहें क्योंकि बहुत अधिक बक्के का उपयोग करने से प्रदर्शन कम हो सकता है, खासकर छोटे डेटा वाले कुंजी के लिए।

If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal 0 , e.x.

// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))

, e.x.





Rule of Thumb:छोटे (उदाहरण के लिए, 10-20) से शुरू करें और ध्यान में रखे गए shuffle आकार और कार्य संचालन समय के आधार पर धीरे-धीरे बढ़ें।

अंतिम विचार

सॉलिंग एक प्रभावी और सरल तरीका है जो पारंपरिक विभाजन या सुझावों पर Apache Spark में स्क्वाइव को प्रबंधित करने के लिए है ( SKEWED JOIN सही ट्यूनिंग और निगरानी के साथ, यह तकनीक अत्यधिक विचलित डेटा सेट पर कार्य निष्पादन समय को काफी कम कर सकती है।





मूल रूप से 11 मई, 2025 को https://practical-software.com पर प्रकाशित किया गया था।

https://practical-software.com