This section contains information on how you can tune your Hadoop cluster and Spark specifically for optimal performance in job execution.

YARN Tuning Overview

This section provides an overview of configuration recommendations to be applied to the Hadoop cluster from the .

NOTE: The recommendations in this section are optimized for use with the . These may or may not conform to requirements for other applications using the Hadoop cluster. assumes no responsibility for the configuration of the cluster.

YARN manages cluster resources (CPU and memory) by running all processes within allocated containers. Containers restrict the resources available to its process(es). Processes are monitored and killed if they overrun the container allocation.

YARN configuration specifies:

The following parameters are available in yarn-site.xml:



Per Cluster NodeAmount of physical memory, in MB, that can be allocated for containers
yarn.nodemanager.resource.cpu-vcoresPer Cluster NodeNumber of CPU cores that can be allocated for containers
yarn.scheduler.minimum-allocation-mbPer ContainerMinimum container memory, in MBs; requests lower than this will be increased to this value
yarn.scheduler.maximum-allocation-mbPer ContainerMaximum container memory, in MBs; requests higher than this will be capped to this value
yarn.scheduler.increment-allocation-mbPer ContainerGranularity of container memory requests
yarn.scheduler.minimum-allocation-vcoresPer ContainerMinimum allocation virtual CPU cores per container; requests lower than will increased to this value.
yarn.scheduler.maximum-allocation-vcoresPer ContainerMaximum allocation virtual CPU cores per container; requests higher than this will be capped to this value
yarn.scheduler.increment-allocation-vcoresPer ContainerGranularity of container virtual CPU requests

Spark Tuning Overview

Spark processes run multiple executors per job. Each executor must run within a YARN container. Therefore, resource requests must fit within YARN’s container limits.

Like YARN containers, multiple executors can run on a single node. More executors provide additional computational power and decreased runtime.

Spark’s dynamic allocation adjusts the number of executors to launch based on the following:

The per-executor resource request sizes can be specified by setting the following properties in the spark.props section :


NOTE: In , all values in the spark.props section must be quoted values.



Amount of memory to use per executor process (in a specified unit)
spark.executor.coresNumber of cores to use on each executor - limit to 5 cores per executor for best performance

A single special process, the application driver, also runs in a container. Its resources are specified in the spark.props section:



Amount of memory to use for the driver process (in a specified unit)
spark.driver.coresNumber of cores to use for the driver process

Spark Performance Considerations

Optimizing "Small" Joins

Broadcast, or map-side, joins materialize one side of the join and send it to all executors to be stored in memory. This technique can significantly accelerate joins by skipping the sort and shuffle phases during a "reduce" operation. However, there is also a cost in communicating the table to all executors. Therefore, only "small" tables should be considered for broadcast join. The definition of "small" is set by the spark.sql.autoBroadcastJoinThreshold parameter which can be added to the spark.props section of . By default, Spark sets this to 10485760 (10MB).

NOTE:  We recommend setting this parameter between 20 and 100MB. It should not exceed 200MB.


In Spark's driver process, the transformation pipeline is compiled down to Spark code and optimized. This process can sometimes fail or take and an inordinately long time. By checkpointing the execution, Spark is forced to materialize the current table (in memory or on disk), thereby simplifying the segments that are optimized. While checkpointing can incur extra cost due to this materialization, it can also reduce end-to-end execution time by speeding up the compilation and optimization phases and by reusing materialized columns downstream.

NOTE: To increase the checkpointing frequency, set transformer.dataframe.checkpoint.threshold in the spark.props section of .

Limiting Resource Utilization of Spark Jobs

With Spark's dynamic allocation, each job's resource utilization can be limited by setting the maximum number of executors per job. Set spark.dynamicAllocation.maxExecutors in the spark.props section of . When applied, the maximum job memory is then given (approximately due to small overhead added by YARN) by:

spark.dynamicAllocation.maxExecutors * (spark.driver.memory + spark.executor.memory)

The maximum number of cores used per job is given (exactly) by:

spark.dynamicAllocation.maxExecutors * (spark.driver.cores + spark.executor.cores)

To limit the overall cluster utilization of , YARN queues should be configured and used by the application.

Tuning Recommendations

The following configuration settings can be applied through  configuration based on the number of nodes in the Hadoop cluster.

NOTE: These recommendations should be modified based on the technical capabilities of your network, the nodes in the cluster, and other applications using the cluster.

Available memory (GB)163264160256
Available vCPUs48164064

The specified configuration allows, maximally, the following Spark configuration per node:

CoresxNodeConfiguration Options


(1 driver + 1 executor) or 1 executor
2x1(1 driver + 2 executor) or 3 executors
4x1(1 driver + 3 executors) or 3 executors
10x1(1 driver + 6 executors) or 6 executors
16x1(1 driver + 10 executors) or 10 executors