Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This section provides an overview of configuration recommendations to be applied to the Hadoop cluster from the 

D s platform
.

Info

NOTE: The recommendations in this section are optimized for use with the

D s platform
. These may or may not conform to requirements for other applications using the Hadoop cluster.
D s company
assumes no responsibility for the configuration of the cluster.

YARN manages cluster resources (CPU and memory) by running all processes within allocated containers. Containers restrict the resources available to its process(es). Processes are monitored and killed if they overrun the container allocation.

  • Multiple containers can run on a cluster node (if available resources permit).
  • A job can request and use multiple containers across the cluster.
  • Container requests specify virtual CPU (cores) and memory (in MB).

YARN configuration specifies:

  • Per Cluster Node: Available virtual CPUs and memory per cluster node
  • Per Container: virtual CPUs and memory for each container

The following parameters are available in yarn-site.xml:

ParameterTypeDescription

yarn.nodemanager.resource.memory-mb

Per Cluster NodeAmount of physical memory, in MB, that can be allocated for containers
yarn.nodemanager.resource.cpu-vcoresPer Cluster NodeNumber of CPU cores that can be allocated for containers
yarn.scheduler.minimum-allocation-mbPer ContainerMinimum container memory, in MBs; requests lower than this will be increased to this value
yarn.scheduler.maximum-allocation-mbPer ContainerMaximum container memory, in MBs; requests higher than this will be capped to this value
yarn.scheduler.increment-allocation-mbPer ContainerGranularity of container memory requests
yarn.scheduler.minimum-allocation-vcoresPer ContainerMinimum allocation virtual CPU cores per container; requests lower than will increased to this value.
yarn.scheduler.maximum-allocation-vcoresPer ContainerMaximum allocation virtual CPU cores per container; requests higher than this will be capped to this value
yarn.scheduler.increment-allocation-vcoresPer ContainerGranularity of container virtual CPU requests

Spark Tuning Overview

Spark processes run multiple executors per job. Each executor must run within a YARN container. Therefore, resource requests must fit within YARN’s container limits.

Like YARN containers, multiple executors can run on a single node. More executors provide additional computational power and decreased runtime.

Dynamic allocation

Spark’s dynamic allocation adjusts the number of executors to launch based on the following:

  • job size

  • job complexity

  • available resources

D s config

ParameterDescription
spark.dynamicAllocation.enabled Set to true  to enable Spark's dynamic allocation
spark.dynamicAllocation.minExecutors Minimum number of executors
spark.dynamicAllocation.maxExecutors Maximum number of executors

For more information, see https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation.

Per-executor allocations

The per-executor resource request sizes can be specified by setting the following properties in the spark.props section :

 

Info

NOTE: In

D s triconf
, all values in the spark.props section must be quoted values.


ParameterDescription

spark.executor.memory

Amount of memory to use per executor process (in a specified unit)
spark.executor.coresNumber of cores to use on each executor - limit to 5 cores per executor for best performance

A single special process, the application driver, also runs in a container. Its resources are specified in the spark.props section:

ParameterDescription

spark.driver.memory

Amount of memory to use for the driver process (in a specified unit)
spark.driver.coresNumber of cores to use for the driver process

Spark Performance Considerations

Optimizing "Small" Joins

Broadcast, or map-side, joins materialize one side of the join and send it to all executors to be stored in memory. This technique can significantly accelerate joins by skipping the sort and shuffle phases during a "reduce" operation. However, there is also a cost in communicating the table to all executors. Therefore, only "small" tables should be considered for broadcast join. The definition of "small" is set by the spark.sql.autoBroadcastJoinThreshold parameter which can be added to the spark.props section of 

D s triconf
. By default, Spark sets this to 10485760 (10MB).

Info

NOTE:  We recommend setting this parameter between 20 and 100MB. It should not exceed 200MB.

Checkpointing

In Spark's driver process, the transformation pipeline is compiled down to Spark code and optimized. This process can sometimes fail or take and an inordinately long time. By checkpointing the execution, Spark is forced to materialize the current table (in memory or on disk), thereby simplifying the segments that are optimized. While checkpointing can incur extra cost due to this materialization, it can also reduce end-to-end execution time by speeding up the compilation and optimization phases and by reusing materialized columns downstream.

Info

NOTE: To increase the checkpointing frequency, set transformer.dataframe.checkpoint.threshold in the spark.props section of 

D s triconf
.

Limiting Resource Utilization of Spark Jobs

With Spark's dynamic allocation, each job's resource utilization can be limited by setting the maximum number of executors per job. Set spark.dynamicAllocation.maxExecutors in the spark.props section of 

D s triconf
. When applied, the maximum job memory is then given (approximately due to small overhead added by YARN) by:

spark.dynamicAllocation.maxExecutors * (spark.driver.memory + spark.executor.memory)

The maximum number of cores used per job is given (exactly) by:

spark.dynamicAllocation.maxExecutors * (spark.driver.cores + spark.executor.cores)

To limit the overall cluster utilization of 

D s item
itemjobs
, YARN queues should be configured and used by the application.

Tuning Recommendations

The following configuration settings can be applied through 

D s platform
 configuration based on the number of nodes in the Hadoop cluster.

Info

NOTE: These recommendations should be modified based on the technical capabilities of your network, the nodes in the cluster, and other applications using the cluster.

 



1241016
Available memory (GB)163264160256
Available vCPUs48164064
yarn.nodemanager.resource.memory-mb122882457657344147456245760
yarn.nodemanager.resource.cpu-vcores36133252
yarn.scheduler.minimum-allocation-mb10241024102410241024
yarn.scheduler.maximum-allocation-mb122882457657344147456245760
yarn.scheduler.increment-allocation-mb512512512512512
yarn.scheduler.minimum-allocation-vcores11111
yarn.scheduler.maximum-allocation-vcores36133252
yarn.scheduler.increment-allocation-vcores11111
spark.executor.memory6GB6GB16GB20GB20GB
spark.executor.cores22455
spark.driver.memory4GB4GB4GB4GB4GB
spark.driver.cores11111


The specified configuration allows, maximally, the following Spark configuration per node:

CoresxNodeConfiguration Options

1x1

(1 driver + 1 executor) or 1 executor
2x1(1 driver + 2 executor) or 3 executors
4x1(1 driver + 3 executors) or 3 executors
10x1(1 driver + 6 executors) or 6 executors
16x1(1 driver + 10 executors) or 10 executors

Spark Job Property Overrides

You can enable a set of Spark properties that users are permitted to override on individual jobs. For more information, see Enable Spark Job Overrides.