This section contains information on how you can tune your Hadoop cluster and Spark specifically for optimal performance in job execution.
This section provides an overview of configuration recommendations to be applied to the Hadoop cluster from the .
NOTE: The recommendations in this section are optimized for use with the . These may or may not conform to requirements for other applications using the Hadoop cluster. assumes no responsibility for the configuration of the cluster.
YARN manages cluster resources (CPU and memory) by running all processes within allocated containers. Containers restrict the resources available to its process(es). Processes are monitored and killed if they overrun the container allocation.
YARN configuration specifies:
The following parameters are available in
|Per Cluster Node||Amount of physical memory, in MB, that can be allocated for containers|
|Per Cluster Node||Number of CPU cores that can be allocated for containers|
|Per Container||Minimum container memory, in MBs; requests lower than this will be increased to this value|
|Per Container||Maximum container memory, in MBs; requests higher than this will be capped to this value|
|Per Container||Granularity of container memory requests|
|Per Container||Minimum allocation virtual CPU cores per container; requests lower than will increased to this value.|
|Per Container||Maximum allocation virtual CPU cores per container; requests higher than this will be capped to this value|
|Per Container||Granularity of container virtual CPU requests|
Spark processes run multiple executors per job. Each executor must run within a YARN container. Therefore, resource requests must fit within YARN’s container limits.
Like YARN containers, multiple executors can run on a single node. More executors provide additional computational power and decreased runtime.
Spark’s dynamic allocation adjusts the number of executors to launch based on the following:
The per-executor resource request sizes can be specified by setting the following properties in the
spark.props section :
NOTE: In , all values in the
|Amount of memory to use per executor process (in a specified unit)|
|Number of cores to use on each executor - limit to 5 cores per executor for best performance|
A single special process, the application driver, also runs in a container. Its resources are specified in the spark.props section:
|Amount of memory to use for the driver process (in a specified unit)|
|Number of cores to use for the driver process|
Broadcast, or map-side, joins materialize one side of the join and send it to all executors to be stored in memory. This technique can significantly accelerate joins by skipping the sort and shuffle phases during a "reduce" operation. However, there is also a cost in communicating the table to all executors. Therefore, only "small" tables should be considered for broadcast join. The definition of "small" is set by the
spark.sql.autoBroadcastJoinThreshold parameter which can be added to the
spark.props section of . By default, Spark sets this to
NOTE: We recommend setting this parameter between 20 and 100MB. It should not exceed 200MB.
In Spark's driver process, the transformation pipeline is compiled down to Spark code and optimized. This process can sometimes fail or take and an inordinately long time. By checkpointing the execution, Spark is forced to materialize the current table (in memory or on disk), thereby simplifying the segments that are optimized. While checkpointing can incur extra cost due to this materialization, it can also reduce end-to-end execution time by speeding up the compilation and optimization phases and by reusing materialized columns downstream.
NOTE: To increase the checkpointing frequency, set
With Spark's dynamic allocation, each job's resource utilization can be limited by setting the maximum number of executors per job. Set
spark.dynamicAllocation.maxExecutors in the
spark.props section of . When applied, the maximum job memory is then given (approximately due to small overhead added by YARN) by:
spark.dynamicAllocation.maxExecutors * (spark.driver.memory + spark.executor.memory)
The maximum number of cores used per job is given (exactly) by:
spark.dynamicAllocation.maxExecutors * (spark.driver.cores + spark.executor.cores)
To limit the overall cluster utilization of , YARN queues should be configured and used by the application.
The following configuration settings can be applied through configuration based on the number of nodes in the Hadoop cluster.
NOTE: These recommendations should be modified based on the technical capabilities of your network, the nodes in the cluster, and other applications using the cluster.
|Available memory (GB)||16||32||64||160||256|
The specified configuration allows, maximally, the following Spark configuration per node:
|(1 driver + 1 executor) or 1 executor|
|2x1||(1 driver + 2 executor) or 3 executors|
|4x1||(1 driver + 3 executors) or 3 executors|
|10x1||(1 driver + 6 executors) or 6 executors|
|16x1||(1 driver + 10 executors) or 10 executors|