Adi Polak

@adipolak

Apache Spark — Tips and Tricks for better performance

Apache Spark is quickly gaining steam both in the headlines and real-world adoption. Top use cases are Streaming Data, Machine Learning, Interactive Analysis and more. Many known companies uses it like Uber, Pinterest and more. So after working with Spark for more than 3 years in production, I’m happy to share my tips and tricks for better performance.

Lets start :)

1 - Avoid using your own custom UDFs:

UDF (user defined function) :

Column-based functions that extend the vocabulary of Spark SQL’s DSL.

Why we should avoid them?

From the Spark Apache docs:

“Use the higher-level standard Column-based functions with
Dataset operators whenever possible before reverting to
using your own custom UDF functions since UDFs are a
blackbox for Spark and so it does not even try to optimize them.

What actually happens behind the screens, is that the Catalyst can’t process and optimize UDFs at all, and it threats them as blackbox, which result in losing many optimisations like: Predicate pushdown , Constant folding and many others.

How to avoid it?

Try to avoid UDFs as much as possible and instead use Spark SQL function — make sure to find the ones that are relevant to your Spark version

Make sure your query are optimized using
dataframe.explain(true)

Noteworthy

Avoiding UDFs is not always possible , not all functionality exists in Apache Spark functions. But, try using built-in Spark SQL functions, as with it we cut down our testing effort as everything is performed on Spark’s side. These functions are designed by Databricks experts .

for example the following code can be replaced with notNull function

//udf example
def
notNull(s:String):Boolean = {
s != null
}
sparkSession.udf.register[Boolean,String]("notNull",notNull)
val newQuery = "select * from ${table} where notNull(some_column)"
val dataframe = sparkSession.sqlContext.sql(newQuery)

//builtin function
val dataframe = dataframe.filter(col("some_column").isNull)

When there is no built-in replacement, it is still possible to implement and extend Catalyst’s (Spark’s SQL optimizer) expression class. It will play well with code generation. For more details, Chris Fregly talked about it here (see slide 56). By doing this we directly access Tungsten format, it solves the serialization problem and bumps performance. Implementing expression are bounded to newer Spark versions and is still considered experimental.

Avoid UDFs or UDAFs that perform more than one thing

Split your function, if you have a function that does more than one thing? split it, clean code principles works here as well :). By splitting UDFs we are able to use built-in functions and chain them in the desired way. it makes testing easier and is a known best practice for software developers as a whole.

2 — Look under the hood —Or, what is Catalyst?

From Dataset object or Dataframe object you can call the explain method like this:

//always check yourself using 
dataframe.explain(true)

The output of this function is the Spark’s execution plan which is the output of Spark query engine — the catalyst. Here you can check yourself and see if there are ‘redundent’ calculation.

Make sure you are checking yourself using explain method since Map reduce actions includes shuffling (sending data over the network). This is expensive due to network traffic, data serialization and disk I/O. Even with in-memory database those are still expensive. Although Spark does in memory map-reduce, during shuffling Spark still uses the disk.

In order to reduce the number of stages and shuffling, best practice is first to understand the stages and then search for a way to reduce the complexity.

This is an example of calling explain method of a query with UDF :

Filter using UDF

From the filtering stage, you can see that casting takes place and it happens on each time an entry goes through the UDF . In our case it cast it to string.

In the physical plan we see what will actually happen in our executors, we see the partition filters, pushdown filters, the schema, the project method (here it is file scan because it’s a CSV file).

Without UDF — we might benefit from the pushdown filter which will happen at the storage level, that means that it won’t load all the data into Spark memory because the Spark process reads the data after the storage already filtered what’s needed to be filtered. Read here more about why pushdown is extremely important for performance.

This is an example with explain method over our second query where we used the Spark sql function — is null.

Filter using builtin functions

From the output, we can see that we are using the power of Pushdown filter (last line in the physical plan).

3- Do you really need distributed data?

Know your data. what is the size of your data? do you need it saved in a distributed manner? what is your storage? is it columnar based? also, look into the format file. For example, Avro file format is schema based and considered compact, but not readable when looking at the files themselves. Spark also support csv file and you can use Spark connector to other DB. Or write your own Apache Spark connector.

4- On-premise or on the cloud

It doesn’t matter if it’s in the cloud or on-premise, you should know your configurations .I’m working with Azure and there are many ways to run Spark on Azure with varies configurations. Apache Spark on HDInsight, Azure Databricks and more. The way to work with them is strictly depended on the end goal. Where you can combine it with specific storage that is built for searches and/or add pipelines and scheduler.

For now, read Apache Spark — Catalyst deep dive to understand how Spark query engine works.

Follow me on Medium for more posts about Scala, Kotlin, Big data, clean code and software engineers nonsense. Cheers !

Topics of interest

More Related Stories