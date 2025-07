При работе с большими наборами данных вApache SparkОбщая проблема результативности заключается вdata skewЭто происходит, когда несколько ключейdominateраспределение данных, что приводит кunevenразделов и медленных запросов. Это происходит в основном во время операций, требующихshufflingКакjoinsИли даже регулярныеaggregations.





Практическим способом уменьшения разрыва являетсяsalting, который включает в себя искусственное распространение тяжелых клавиш по нескольким разделам.В этой статье я буду направлять вас через это с практическим примером.





Как соль решает проблемы с отключением данных

Добавляя вrandomlyВ результате генерируемого числа к объединенному ключу, а затем к объединенному ключу, мы можем распределять большие ключи более равномерно.Это делает распределение данных более равномерным и распространяет нагрузку на большее количество работников, вместо того, чтобы отправлять большую часть данных одному работнику и оставлять остальные без работы.

Преимущества солярия

Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.

Improved Performance: Speeds up joins and aggregations by balancing the workload.

Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.



Когда употреблять соль

Во время слияний или агрегаций с искривленными клавишами используйте соляцию, когда вы замечаете длительные периоды отключения или сбои исполнителя из-за искажения данных.Это также полезно в приложениях потокового потока в режиме реального времени, где разделение влияет на эффективность обработки данных, или когда большинство работников остаются без работы, а некоторые застряли в рабочем состоянии.





Соль в примере скалы

Давайте создадим некоторые данные с помощьюunbalancedМы можем предположить, что есть два набора данных, которые мы должны объединить: один является большим набором данных, а другой - небольшим набором данных.

import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ // Simulated large dataset with skew val largeDF = Seq( (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ).toDF("customer_id", "transaction") // Small dataset val smallDF = Seq( (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ).toDF("customer_id", "name")





Давайте добавим соляную колонку к большим наборам данных, которые мы используемrandomizationраспределить значения большого ключа на более мелкие разделы

// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", (rand() * numBuckets).cast("int")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedLargeDF.show() +-----------+-----------+----+------------------+ |customer_id|transaction|salt|salted_customer_id| +-----------+-----------+----+------------------+ | 1| txn1| 1| 1_1| | 1| txn2| 1| 1_1| | 1| txn3| 2| 1_2| | 2| txn4| 2| 2_2| | 3| txn5| 0| 3_0| +-----------+-----------+----+------------------+





Чтобы убедиться, что мы охватываем все возможные случайные соленые ключи в больших наборах данных, нам нужноexplodeнебольшой набор данных со всеми возможными солеными значениями

// Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedSmallDF.show() +----+-----------+------+------------------+ |salt|customer_id| name|salted_customer_id| +----+-----------+------+------------------+ | 0| 1| Ahmed| 1_0| | 1| 1| Ahmed| 1_1| | 2| 1| Ahmed| 1_2| | 0| 2| Ali| 2_0| | 1| 2| Ali| 2_1| | 2| 2| Ali| 2_2| | 0| 3|Hassan| 3_0| | 1| 3|Hassan| 3_1| | 2| 3|Hassan| 3_2| +----+-----------+------+------------------+





Теперь мы можем легко соединить два набора данных

// Step 3: Perform salted join val joinedDF = saltedLargeDF. join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner"). select("customer_id", "transaction", "name") joinedDF.show() +-----------+-----------+------+ |customer_id|transaction| name| +-----------+-----------+------+ | 1| txn2| Ahmed| | 1| txn1| Ahmed| | 1| txn3| Ahmed| | 2| txn4| Ali| | 3| txn5|Hassan| +-----------+-----------+------+





Скачать пример в Python

from pyspark.sql import SparkSession from pyspark.sql.functions import col, rand, lit, concat from pyspark.sql.types import IntegerType # Simulated large dataset with skew largeDF = spark.createDataFrame([ (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ], ["customer_id", "transaction"]) # Small dataset smallDF = spark.createDataFrame([ (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ], ["customer_id", "name"]) # Step 1: create a salting key in the large dataset numBuckets = 3 saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 2: Explode rows in smallDF for possible salted keys salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt") saltedSmallDF = salt_range.crossJoin(smallDF) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 3: Perform salted join joinedDF = saltedLargeDF.join( saltedSmallDF, on=["salted_customer_id", "customer_id"], how="inner" ).select("customer_id", "transaction", "name")

Ноты

Этот код использует spark.range(...) для подражания Scala (0 до numBuckets).toDF («соль»).

Выражения колонн обрабатываются с использованием col (...), lit (...), и concat (...).

Для передачи в целое используется .cast (IntegerType()).





Тунинг Тип: Выбор Номерки

Номерки

Если вы настроите numBuckets = 100, каждый ключ может быть разделен на 100 подразделений. Однако будьте осторожны, потому что использование слишком многих букетов может снизить производительность, особенно для ключей с небольшим количеством данных.

If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal 0 , e.x.

// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))

Rule of Thumb:Начните с небольших размеров (например, 10-20) и постепенно увеличивайте их, исходя из наблюдаемых размеров шифров и времени выполнения задач.

Окончательные мысли

Сольтирование является эффективным и простым методом управления искажением в Apache Spark при традиционном разделении или подсказках ( SKEWED JOIN При правильном настройке и мониторинге эта техника может значительно сократить время выполнения задач на сильно искаженных наборах данных.





Первоначально опубликовано на https://practical-software.com 11 мая 2025 года.

https://practical-software.com