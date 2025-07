Büyük verilerle çalıştığınızdaApache SparkOrtak bir performans sorunudata skewBu, birkaç anahtarındominateVerilerin dağıtımı,unevenpartisyonlar ve yavaş sorgular. esas olarak gerektiren işlemler sırasında meydana gelir.shufflinggibijoinsya da düzenliaggregations.





Döviz azaltmanın pratik bir yolusalting, bu, çoklu bölünmeler üzerinde ağır anahtarları yapay olarak yaymayı içerir. bu yazıda, bunu pratik bir örnekle size rehberlik edeceğim.





Salting Data Skew Sorunlarını Nasıl Çözer

eklemek için arandomlyBirleşik anahtarın sayısını oluşturduktan sonra bu kombine anahtarı birleştirerek, büyük anahtarları daha eşit şekilde dağıtabiliriz.Bu, veri dağılımını daha eşit hale getirir ve yükü daha fazla işçiye yayar, verilerin çoğunu bir işçiye göndermek yerine diğerlerini boş bırakır.

Salinanın Faydaları

Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.

Improved Performance: Speeds up joins and aggregations by balancing the workload.

Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.



Salatayı ne zaman kullanmalısınız

Kaydırılmış anahtarlarla birleştirme veya birleştirme sırasında, veri kaydırması nedeniyle uzun şeffaf zamanlar veya yürütücü hataları fark ettiğinizde tuzlama kullanın. Aynı zamanda, bölünmenin veri işleme verimliliğini etkilediği gerçek zamanlı akış uygulamalarında da yararlıdır, ya da çoğu işçi boşalırken birkaç kişi çalıştırma durumunda sıkışırsa.





Scala ile ilgili örnekler

Bazı verileri birunbalancedBirleştirmek istediğimiz iki veri kümesi var: biri büyük bir veri kümesi, diğeri ise küçük bir veri kümesi.

import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ // Simulated large dataset with skew val largeDF = Seq( (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ).toDF("customer_id", "transaction") // Small dataset val smallDF = Seq( (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ).toDF("customer_id", "name")





Kullandığımız büyük veri kümelerine tuzlu sütunu ekleyelim.randomizationBüyük anahtarın değerlerini daha küçük bölümlere dağıtmak için

// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", (rand() * numBuckets).cast("int")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedLargeDF.show() +-----------+-----------+----+------------------+ |customer_id|transaction|salt|salted_customer_id| +-----------+-----------+----+------------------+ | 1| txn1| 1| 1_1| | 1| txn2| 1| 1_1| | 1| txn3| 2| 1_2| | 2| txn4| 2| 2_2| | 3| txn5| 0| 3_0| +-----------+-----------+----+------------------+





Büyük veri kümelerindeki tüm olası rastgele tuzlu anahtarları kapsayacağımızı sağlamak için,explodeTüm olası tuzlu değerlerle küçük veri kümesi

// Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedSmallDF.show() +----+-----------+------+------------------+ |salt|customer_id| name|salted_customer_id| +----+-----------+------+------------------+ | 0| 1| Ahmed| 1_0| | 1| 1| Ahmed| 1_1| | 2| 1| Ahmed| 1_2| | 0| 2| Ali| 2_0| | 1| 2| Ali| 2_1| | 2| 2| Ali| 2_2| | 0| 3|Hassan| 3_0| | 1| 3|Hassan| 3_1| | 2| 3|Hassan| 3_2| +----+-----------+------+------------------+





Şimdi iki veritabanını kolayca birleştirebiliriz

// Step 3: Perform salted join val joinedDF = saltedLargeDF. join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner"). select("customer_id", "transaction", "name") joinedDF.show() +-----------+-----------+------+ |customer_id|transaction| name| +-----------+-----------+------+ | 1| txn2| Ahmed| | 1| txn1| Ahmed| | 1| txn3| Ahmed| | 2| txn4| Ali| | 3| txn5|Hassan| +-----------+-----------+------+





Python ile ilgili örnekler

from pyspark.sql import SparkSession from pyspark.sql.functions import col, rand, lit, concat from pyspark.sql.types import IntegerType # Simulated large dataset with skew largeDF = spark.createDataFrame([ (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ], ["customer_id", "transaction"]) # Small dataset smallDF = spark.createDataFrame([ (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ], ["customer_id", "name"]) # Step 1: create a salting key in the large dataset numBuckets = 3 saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 2: Explode rows in smallDF for possible salted keys salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt") saltedSmallDF = salt_range.crossJoin(smallDF) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 3: Perform salted join joinedDF = saltedLargeDF.join( saltedSmallDF, on=["salted_customer_id", "customer_id"], how="inner" ).select("customer_id", "transaction", "name")

Notlar

Bu kod, Scala’yı taklit etmek için spark.range(...) kullanır (0 numBuckets).toDF (“salt”).

Sütun ifadeleri col(...), lit(...), ve concat(...) kullanılarak işlenir.

Tam sayıyı oluşturmak için .cast (IntegerType()) kullanın.





Tuning Tipi: Seçme numBuckets

Numaralar

NumBuckets = 100'e ayarlanırsa, her anahtar 100 alt bölüme bölünebilir. Ancak, çok fazla kutu kullanarak performans düşebilir çünkü dikkatli olun, özellikle de az veri olan anahtarlar için. Her zaman veritabanının sapma profili temelinde farklı değerleri test edin.

If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal 0 , e.x.

// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))

, e.x.





Rule of Thumb:Küçük (örneğin, 10-20) başlatın ve gözlemlenen shuffle boyutlarına ve görev sürelerine göre yavaş yavaş artırın.

Son Düşünceler

Geleneksel bölünme veya ipuçları sırasında Apache Spark'ta kaydırma yönetimi için etkili ve basit bir yöntemdir ( SKEWED JOIN Doğru ayarlanma ve izleme ile, bu teknik, çok sapmış veri kümelerinde işin gerçekleştirme süresini önemli ölçüde azaltabilir.





Orijinal olarak 11 Mayıs 2025 tarihinde https://practical-software.com adresinde yayınlandı.

https://practical-software.com