This section contains information on how you can tune your Hadoop cluster and Spark specifically for optimal performance in job execution.
YARN Tuning Overview
This section provides an overview of configuration recommendations to be applied to the Hadoop cluster from the Trifacta platform.
NOTE: The recommendations in this section are optimized for use with the Trifacta platform. These may or may not conform to requirements for other applications using the Hadoop cluster. Trifacta assumes no responsibility for the configuration of the cluster.
YARN manages cluster resources (CPU and memory) by running all processes within allocated containers. Containers restrict the resources available to its process(es). Processes are monitored and killed if they overrun the container allocation.
- Multiple containers can run on a cluster node (if available resources permit).
- A job can request and use multiple containers across the cluster.
- Container requests specify virtual CPU (cores) and memory (in MB).
YARN configuration specifies:
- Per Cluster Node: Available virtual CPUs and memory per cluster node
- Per Container: virtual CPUs and memory for each container
The following parameters are available in
|Per Cluster Node||Amount of physical memory, in MB, that can be allocated for containers|
|Per Cluster Node||Number of CPU cores that can be allocated for containers|
|Per Container||Minimum container memory, in MBs; requests lower than this will be increased to this value|
|Per Container||Maximum container memory, in MBs; requests higher than this will be capped to this value|
|Per Container||Granularity of container memory requests|
|Per Container||Minimum allocation virtual CPU cores per container; requests lower than will increased to this value.|
|Per Container||Maximum allocation virtual CPU cores per container; requests higher than this will be capped to this value|
|Per Container||Granularity of container virtual CPU requests|
Spark Tuning Overview
Spark processes run multiple executors per job. Each executor must run within a YARN container. Therefore, resource requests must fit within YARN’s container limits.
Like YARN containers, multiple executors can run on a single node. More executors provide additional computational power and decreased runtime.
Spark’s dynamic allocation adjusts the number of executors to launch based on the following:
The per-executor resource request sizes can be specified by setting the following properties in the
spark.props section :
trifacta-conf.json, all values in the
spark.propssection must be quoted values.
|Amount of memory to use per executor process (in a specified unit)|
|Number of cores to use on each executor - limit to 5 cores per executor for best performance|
A single special process, the application driver, also runs in a container. Its resources are specified in the spark.props section:
|Amount of memory to use for the driver process (in a specified unit)|
|Number of cores to use for the driver process|
Spark Performance Considerations
Optimizing "Small" Joins
Broadcast, or map-side, joins materialize one side of the join and send it to all executors to be stored in memory. This technique can significantly accelerate joins by skipping the sort and shuffle phases during a "reduce" operation. However, there is also a cost in communicating the table to all executors. Therefore, only "small" tables should be considered for broadcast join. The definition of "small" is set by the
spark.sql.autoBroadcastJoinThreshold parameter which can be added to the
spark.props section of
trifacta-conf.json. By default, Spark sets this to
NOTE: We recommend setting this parameter between 20 and 100MB. It should not exceed 200MB.
In Spark's driver process, the transformation pipeline is compiled down to Spark code and optimized. This process can sometimes fail or take and an inordinately long time. By checkpointing the execution, Spark is forced to materialize the current table (in memory or on disk), thereby simplifying the segments that are optimized. While checkpointing can incur extra cost due to this materialization, it can also reduce end-to-end execution time by speeding up the compilation and optimization phases and by reusing materialized columns downstream.
NOTE: To increase the checkpointing frequency, set
transformer.dataframe.checkpoint.threshold in the
spark.props section of
Limiting Resource Utilization of Spark Jobs
With Spark's dynamic allocation, each job's resource utilization can be limited by setting the maximum number of executors per job. Set
spark.dynamicAllocation.maxExecutors in the
spark.props section of
trifacta-conf.json. When applied, the maximum job memory is then given (approximately due to small overhead added by YARN) by:
spark.dynamicAllocation.maxExecutors * (spark.driver.memory + spark.executor.memory)
The maximum number of cores used per job is given (exactly) by:
spark.dynamicAllocation.maxExecutors * (spark.driver.cores + spark.executor.cores)
To limit the overall cluster utilization of Trifacta jobs, YARN queues should be configured and used by the application.
The following configuration settings can be applied through Trifacta platform configuration based on the number of nodes in the Hadoop cluster.
NOTE: These recommendations should be modified based on the technical capabilities of your network, the nodes in the cluster, and other applications using the cluster.
|Available memory (GB)||16||32||64||160||256|
The specified configuration allows, maximally, the following Spark configuration per node:
|(1 driver + 1 executor) or 1 executor|
|2x1||(1 driver + 2 executor) or 3 executors|
|4x1||(1 driver + 3 executors) or 3 executors|
|10x1||(1 driver + 6 executors) or 6 executors|
|16x1||(1 driver + 10 executors) or 10 executors|
This page has no comments.