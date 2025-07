Cuando trabajamos con grandes conjuntos de datos enApache SparkUn problema de rendimiento común esdata skewEsto ocurre cuando algunas clavesdominateLa distribución de los datos, que conduce aunevenparticiones y consultas lentas. Esto ocurre principalmente durante las operaciones que requierenshufflingcomojoinsO incluso regularesaggregations.





Una forma práctica de reducir el desgaste essalting, que implica la difusión artificial de claves pesadas en múltiples particiones. En este post, te guiaré a través de esto con un ejemplo práctico.





Cómo Salting resuelve problemas de escape de datos

Al agregar arandomlygenerado número a la clave de unión y luego uniéndose sobre esta clave combinada, podemos distribuir las claves grandes de manera más uniforme. Esto hace que la distribución de los datos sea más uniforme y distribuye la carga a más trabajadores, en lugar de enviar la mayor parte de los datos a un trabajador y dejar a los demás inactivos.

Beneficios del salado

Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.

Improved Performance: Speeds up joins and aggregations by balancing the workload.

Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.



Cuándo usar salsa

Durante las juntas o agregaciones con teclas distorsionadas, utilice saltar cuando observe largos tiempos de ruido o fallas del ejecutor debido a la desviación de datos. También es útil en aplicaciones de streaming en tiempo real donde la partición afecta a la eficiencia del procesamiento de datos, o cuando la mayoría de los trabajadores están vacíos mientras algunos están atrapados en un estado de ejecución.





Un ejemplo en la escala

Vamos a generar algunos datos con ununbalancedPodemos suponer que hay dos conjuntos de datos que necesitamos unir: uno es un conjunto de datos grande, y el otro es un conjunto de datos pequeño.

import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ // Simulated large dataset with skew val largeDF = Seq( (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ).toDF("customer_id", "transaction") // Small dataset val smallDF = Seq( (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ).toDF("customer_id", "name")





Añadamos la columna salada a los grandes conjuntos de datos, que utilizamosrandomizationpara distribuir los valores de la clave grande en particiones más pequeñas

// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", (rand() * numBuckets).cast("int")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedLargeDF.show() +-----------+-----------+----+------------------+ |customer_id|transaction|salt|salted_customer_id| +-----------+-----------+----+------------------+ | 1| txn1| 1| 1_1| | 1| txn2| 1| 1_1| | 1| txn3| 2| 1_2| | 2| txn4| 2| 2_2| | 3| txn5| 0| 3_0| +-----------+-----------+----+------------------+





Para asegurarnos de que cubrimos todas las posibles claves saladas aleatorias en los grandes conjuntos de datos, necesitamosexplodeEl pequeño conjunto de datos con todos los posibles valores salados

// Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedSmallDF.show() +----+-----------+------+------------------+ |salt|customer_id| name|salted_customer_id| +----+-----------+------+------------------+ | 0| 1| Ahmed| 1_0| | 1| 1| Ahmed| 1_1| | 2| 1| Ahmed| 1_2| | 0| 2| Ali| 2_0| | 1| 2| Ali| 2_1| | 2| 2| Ali| 2_2| | 0| 3|Hassan| 3_0| | 1| 3|Hassan| 3_1| | 2| 3|Hassan| 3_2| +----+-----------+------+------------------+





Ahora podemos unir fácilmente los dos conjuntos de datos

// Step 3: Perform salted join val joinedDF = saltedLargeDF. join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner"). select("customer_id", "transaction", "name") joinedDF.show() +-----------+-----------+------+ |customer_id|transaction| name| +-----------+-----------+------+ | 1| txn2| Ahmed| | 1| txn1| Ahmed| | 1| txn3| Ahmed| | 2| txn4| Ali| | 3| txn5|Hassan| +-----------+-----------+------+





Un ejemplo en Python

from pyspark.sql import SparkSession from pyspark.sql.functions import col, rand, lit, concat from pyspark.sql.types import IntegerType # Simulated large dataset with skew largeDF = spark.createDataFrame([ (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ], ["customer_id", "transaction"]) # Small dataset smallDF = spark.createDataFrame([ (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ], ["customer_id", "name"]) # Step 1: create a salting key in the large dataset numBuckets = 3 saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 2: Explode rows in smallDF for possible salted keys salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt") saltedSmallDF = salt_range.crossJoin(smallDF) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 3: Perform salted join joinedDF = saltedLargeDF.join( saltedSmallDF, on=["salted_customer_id", "customer_id"], how="inner" ).select("customer_id", "transaction", "name")

Notas

Este código utiliza spark.range(...) para imitar Scala (0 hasta numBuckets).toDF("salt").

Las expresiones de columna se manejan utilizando col(...), lit(...), y concat(...).

El cast a integer utiliza .cast (IntegerType()).





Tipo de Tuning: Elegir Números

Números

Si establece numBuckets = 100, cada tecla se puede dividir en 100 sub-particiones. Sin embargo, tenga cuidado porque el uso de demasiados buckets puede disminuir el rendimiento, especialmente para las teclas con pocos datos.

If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal 0 , e.x.

// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))

, e.x.





Rule of Thumb:Comience pequeño (por ejemplo, 10-20) y aumente gradualmente en función de los tamaños observados y el tiempo de ejecución de tareas.

Pensamientos finales

El salto es un método eficaz y sencillo para gestionar el desvío en Apache Spark cuando la partición tradicional o las pistas ( SKEWED JOIN Con el ajuste y el seguimiento adecuados, esta técnica puede reducir significativamente los tiempos de ejecución de tareas en conjuntos de datos altamente distorsionados.





Publicado originalmente en https://practical-software.com el 11 de mayo de 2025.

https://practical-software.com