[hadoop] Apache Spark: The number of cores vs. the number of executors

Short answer: I think tgbaggio is right. You hit HDFS throughput limits on your executors.

I think the answer here may be a little simpler than some of the recommendations here.

The clue for me is in the cluster network graph. For run 1 the utilization is steady at ~50 M bytes/s. For run 3 the steady utilization is doubled, around 100 M bytes/s.

From the cloudera blog post shared by DzOrd, you can see this important quote:

I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number.

So, let's do a few calculations see what performance we expect if that is true.


Run 1: 19 GB, 7 cores, 3 executors

  • 3 executors x 7 threads = 21 threads
  • with 7 cores per executor, we expect limited IO to HDFS (maxes out at ~5 cores)
  • effective throughput ~= 3 executors x 5 threads = 15 threads

Run 3: 4 GB, 2 cores, 12 executors

  • 2 executors x 12 threads = 24 threads
  • 2 cores per executor, so hdfs throughput is ok
  • effective throughput ~= 12 executors x 2 threads = 24 threads

If the job is 100% limited by concurrency (the number of threads). We would expect runtime to be perfectly inversely correlated with the number of threads.

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62

So ratio_num_threads ~= inv_ratio_runtime, and it looks like we are network limited.

This same effect explains the difference between Run 1 and Run 2.


Run 2: 19 GB, 4 cores, 3 executors

  • 3 executors x 4 threads = 12 threads
  • with 4 cores per executor, ok IO to HDFS
  • effective throughput ~= 3 executors x 4 threads = 12 threads

Comparing the number of effective threads and the runtime:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91

It's not as perfect as the last comparison, but we still see a similar drop in performance when we lose threads.

Now for the last bit: why is it the case that we get better performance with more threads, esp. more threads than the number of CPUs?

A good explanation of the difference between parallelism (what we get by dividing up data onto multiple CPUs) and concurrency (what we get when we use multiple threads to do work on a single CPU) is provided in this great post by Rob Pike: Concurrency is not parallelism.

The short explanation is that if a Spark job is interacting with a file system or network the CPU spends a lot of time waiting on communication with those interfaces and not spending a lot of time actually "doing work". By giving those CPUs more than 1 task to work on at a time, they are spending less time waiting and more time working, and you see better performance.

Examples related to hadoop

Hadoop MapReduce: Strange Result when Storing Previous Value in Memory in a Reduce Class (Java) What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to check Spark Version What are the pros and cons of parquet format compared to other formats? java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient How to export data from Spark SQL to CSV How to copy data from one HDFS to another HDFS? How to calculate Date difference in Hive Select top 2 rows in Hive Spark - load CSV file as DataFrame?

Examples related to apache-spark

Select Specific Columns from Spark DataFrame Select columns in PySpark dataframe What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to find count of Null and Nan values for each column in a PySpark dataframe efficiently? Spark dataframe: collect () vs select () How does createOrReplaceTempView work in Spark? Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey Filter df when values matches part of a string in pyspark Filtering a pyspark dataframe using isin by exclusion Convert date from String to Date format in Dataframes

Examples related to yarn

Spark Kill Running Application Apache Spark: The number of cores vs. the number of executors Container is running beyond memory limits