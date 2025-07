Ao trabalhar com grandes conjuntos de dados emApache SparkUma questão de desempenho comum édata skewIsso acontece quando algumas chavesdominatedistribuição de dados, levando aunevenpartições e consultas lentas. ocorre principalmente durante operações que requeremshufflingAssim comojoinsOu mesmo regularaggregations.





Uma maneira prática de reduzir o desgaste ésalting, que envolve espalhar artificialmente chaves pesadas em várias partições. Neste post, eu vou guiá-lo por isso com um exemplo prático.





Como o Salting Resolve os Problemas de Skew de Dados

Ao acrescentar arandomlygerado número para a chave de junção e, em seguida, unindo-se sobre esta chave combinada, podemos distribuir grandes chaves mais uniformemente.Isso torna a distribuição de dados mais uniforme e espalha a carga sobre mais trabalhadores, em vez de enviar a maior parte dos dados para um trabalhador e deixar os outros vazios.

Benefícios do Salting

Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.

Improved Performance: Speeds up joins and aggregations by balancing the workload.

Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.



Quando usar salsicha

Durante juntas ou agregações com chaves distorcidas, use salting quando você notar longos tempos de ruído ou falhas do executor devido a distorção de dados.Ele também é útil em aplicativos de streaming em tempo real onde a partição afeta a eficiência do processamento de dados, ou quando a maioria dos trabalhadores estão vazios enquanto alguns estão presos em um estado de execução.





Exemplo de salto em escala

Vamos gerar alguns dados com umunbalancedPodemos assumir que existem dois conjuntos de dados que precisamos unir: um é um grande conjunto de dados e o outro é um pequeno conjunto de dados.

import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ // Simulated large dataset with skew val largeDF = Seq( (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ).toDF("customer_id", "transaction") // Small dataset val smallDF = Seq( (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ).toDF("customer_id", "name")





Vamos adicionar a coluna salgada aos grandes conjuntos de dados, que usamosrandomizationpara espalhar os valores da grande chave em partições menores

// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", (rand() * numBuckets).cast("int")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedLargeDF.show() +-----------+-----------+----+------------------+ |customer_id|transaction|salt|salted_customer_id| +-----------+-----------+----+------------------+ | 1| txn1| 1| 1_1| | 1| txn2| 1| 1_1| | 1| txn3| 2| 1_2| | 2| txn4| 2| 2_2| | 3| txn5| 0| 3_0| +-----------+-----------+----+------------------+





Para garantir que cobrimos todas as possíveis chaves salgadas aleatórias nos grandes conjuntos de dados, precisamosexplodeO pequeno conjunto de dados com todos os possíveis valores salgados

// Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) saltedSmallDF.show() +----+-----------+------+------------------+ |salt|customer_id| name|salted_customer_id| +----+-----------+------+------------------+ | 0| 1| Ahmed| 1_0| | 1| 1| Ahmed| 1_1| | 2| 1| Ahmed| 1_2| | 0| 2| Ali| 2_0| | 1| 2| Ali| 2_1| | 2| 2| Ali| 2_2| | 0| 3|Hassan| 3_0| | 1| 3|Hassan| 3_1| | 2| 3|Hassan| 3_2| +----+-----------+------+------------------+





Agora podemos juntar facilmente os dois conjuntos de dados

// Step 3: Perform salted join val joinedDF = saltedLargeDF. join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner"). select("customer_id", "transaction", "name") joinedDF.show() +-----------+-----------+------+ |customer_id|transaction| name| +-----------+-----------+------+ | 1| txn2| Ahmed| | 1| txn1| Ahmed| | 1| txn3| Ahmed| | 2| txn4| Ali| | 3| txn5|Hassan| +-----------+-----------+------+





Exemplo em Python

from pyspark.sql import SparkSession from pyspark.sql.functions import col, rand, lit, concat from pyspark.sql.types import IntegerType # Simulated large dataset with skew largeDF = spark.createDataFrame([ (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5") ], ["customer_id", "transaction"]) # Small dataset smallDF = spark.createDataFrame([ (1, "Ahmed"), (2, "Ali"), (3, "Hassan") ], ["customer_id", "name"]) # Step 1: create a salting key in the large dataset numBuckets = 3 saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 2: Explode rows in smallDF for possible salted keys salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt") saltedSmallDF = salt_range.crossJoin(smallDF) \ .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt"))) # Step 3: Perform salted join joinedDF = saltedLargeDF.join( saltedSmallDF, on=["salted_customer_id", "customer_id"], how="inner" ).select("customer_id", "transaction", "name")

Notas

Este código usa spark.range(...) para imitar Scala (0 até numBuckets).toDF (“sal”).

As expressões de coluna são tratadas usando col(...), lit(...), e concat(...).

O cast para inteiro usa .cast (IntegerType()).





Tipo de Túnel: Escolha numBuckets

Números

Se você definir numBuckets = 100, cada chave pode ser dividida em 100 sub-partículas. No entanto, tenha cuidado porque usar muitos buckets pode diminuir o desempenho, especialmente para chaves com poucos dados. Teste sempre valores diferentes com base no perfil de desvio do seu conjunto de dados.

If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal 0 , e.x.

// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))

, e.x.





Rule of Thumb:Comece pequeno (por exemplo, 10-20) e aumente gradualmente com base no tamanho do shuffle observado e no tempo de execução da tarefa.

Pensamentos finais

O salto é um método eficaz e simples para gerenciar o desvio no Apache Spark quando as partições tradicionais ou pistas ( SKEWED JOIN Com o ajuste e monitoramento corretos, esta técnica pode reduzir significativamente os tempos de execução de tarefas em conjuntos de dados altamente distorcidos.





Publicado originalmente em https://practical-software.com em 11 de maio de 2025.

