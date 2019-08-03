Use Hacker Noon's RSS Feed
Visit Hacker Noon RSS Feed hackernoon.com/feedpromoted
Free & Open Source Advocate. Data Geek - Big or Small.
By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly.
Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources. This means that short jobs submitted while a long job is running can start receiving resources right away and still get good response times, without waiting for the long job to finish.
import threading
def set_spark_context(app_name=None):
conf = SparkConf().\
setAppName(app_name).\
set('spark.hadoop.mapreduce.output.fileoutputformat.compress', 'false').\
set('spark.sql.parquet.compression.codec','uncompressed').\
set('spark.scheduler.mode','FAIR') # need to set this on the sparkContext
sc = SparkContext(conf=conf)
try:
sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
sqlCtx = sqlContext = HiveContext(sc)
except py4j.protocol.Py4JError:
sqlCtx = sqlContext = SQLContext(sc)
return sc, sqlContext
#function to be called on spawned threads*
def saveParquet(df,path,pool_id,sc):
sc.setLocalProperty("spark.scheduler.pool", str(pool_id))
df.repartition(1).write.parquet(path)
sc.setLocalProperty("spark.scheduler.pool", None)
sc, sqlContext = set_spark_context("Simple TASK")
# Do processing here to produce three dataframes to be written as parquets.
# The tree results are df1, df2, df3
path1 = "/path/1"
path2 = "/path/2"
path3 = "/path/3"
df1_write = threading.Thread(target=saveParquet, args=(df1,path1,1,sc,))
df2_write = threading.Thread(target=saveParquet, args=(df2,path2,2,sc,))
df3_write = threading.Thread(target=saveParquet, args=(df3,path3,3,sc,))
df1.start()
df2.start()
df3.start()
df1.join()
df2.join()
df3.join()