The can be configured to work with Spark to execute job results, a visual profile of transform job results, or both.
The Spark Job Service is a Scala-based capability for executing jobs and profiling your job results as an extension of job execution. This feature leverages the computing power of your existing Hadoop cluster to increase job execution and profiling performance. Features:
NOTE: Spark History Server is not supported. It should be used only for short-term debugging tasks, as it requires considerable resources. |
Before you begin, please verify the following:
For additional pre-requisites for a kerberized environment, see Configure for Kerberos Integration.
Configure Spark Job ServiceThe Spark Job Service must be enabled for both execution and profiling jobs to work in Spark. Below is a sample configuration and description of each property.
The following properties can be modified based on your needs:
After making any changes, save the file and restart the platform. See Start and Stop the Platform. Configure service for HiveDepending on the environment, please apply the following configuration changes to manage Spark interactions with Hive:
If Hive is present on the cluster and either enabled or disabled: the
At this point, the platform only expects that a Configure SparkAfter the Spark Job Service has been enabled, please complete the following sections to configure it for the Yarn cluster modeAll jobs submitted to the Spark Job Service are executed in YARN cluster mode. No other cluster mode is supported for the Spark Job Service. Configure access for secure impersonationThe Spark Job Service can run under secure impersonation. For more information, see Configure for Secure Impersonation. When running under secure impersonation, the Spark Job Service requires access to the following folders. Read, write, and execute access must be provided to the
Identify Hadoop libraries on the clusterThe Spark Job Service does not require additional installation on the This JAR file does not include the Hadoop client libraries. You must point the Steps:
Locate Hive dependencies locationIf the
|
Through the Admin Settings page, you can specify the YARN queue to which to submit your Spark jobs. All Spark jobs from the are submitted to this queue.
Steps:
In platform configuration, locate the following:
"spark.props.spark.yarn.queue": "default", |
default
with the name of the queue.Save your changes.
The following properties in platform configuration can be modified for Spark.
"spark": { ... "props": { "spark.driver.maxResultSize": "0", "spark.executor.memory": "6GB", "spark.executor.cores": "2", "spark.driver.memory": "2GB" ... } }, |
Setting Spark properties: You can pass additional properties to Spark such as number of cores, executors, and more.
NOTE: The above values are default values. If you are experiencing performance issues, you can modify the values. If you require further assistance, please contact |
If you have sufficient cluster resources, you should pass the following values:
"spark": { ... "props": { "spark.driver.maxResultSize": "0", "spark.executor.memory": "16GB", "spark.executor.cores": "5", "spark.driver.memory": "16GB" ... } }, |
Notes:
The above values must be below the per-container thresholds set by YARN. Please verify your settings against the following parameters in yarn-site.xml
:
yarn.scheduler.maximum-allocation-mb yarn.scheduler.maximum-allocation-vcores yarn.nodemanager.resource.memory-mb yarn.nodemanager.resource.cpu-vcores |
Save your changes.
You can modify the following Batch Job Runner configuration settings for the Spark service.
NOTE: Avoid modifying these settings unless you are experiencing issues with the user interface reporting jobs as having failed while the Spark job continues to execute on YARN. |
Setting | Description | Default |
---|---|---|
batchserver.spark.requestTimeoutMillis | Specifies the number of milliseconds that the Batch Job Runner service should wait for a response from Spark. If this timeout is exceeded, the UI changes the job status to failed. The YARN job may continue. | 20000 (20 seconds) |
By default, the application generates random samples from the first set of rows in the dataset, up to a limit. The volume of this sample set is determined by parameter. See Configure Application Limits.
For the Spark running environment, you can enable the generation of random samples across the entire dataset, which may increase the quality of your samples.
NOTE: This feature cannot be enabled if relational or JDBC sources are used in your deployment. |
Steps:
In platform configuration, locate the following property, and set its value to true
:
"webapp.enableSparkSampling": false, |
If you are using Hortonworks, additional configuration is required to enable integration with Spark.
NOTE: Before you begin, you must acquire the specific version number of your Hortonworks version. This version number should look similar to the following: |
Steps:
spark-job-service
configuration area.For the jvmOptions
properties, specify the following values. Do not remove any other values that may be specified:
"spark-job-service" : { ... "jvmOptions" : [ "-Xmx128m", "-Dhdp.version=<HDP_version_number>" ] ... } |
In the spark-job-service
area, locate the SPARK_DIST_CLASSPATH
configuration. Add a reference to your specific version of the local Hadoop client. The following applies to Hortonworks:
"spark-job-service.env.SPARK_DIST_CLASSPATH" : "/usr/hdp/<HDP_version_number>/hadoop/client/*", |
In the spark.props
configuration area, add the following values. Do not remove any other values that may be set for these options:
"spark": { ... "props": { ... "spark.yarn.am.extraJavaOptions": "-Dhdp.version=<HDP_version_number>", "spark.driver.extraJavaOptions": "-Dhdp.version=<HDP_version_number>" }, |
Review and set the following parameter.
Steps:
Review and set the following parameter based on your Hadoop distribution:
Hadoop Distribution | Parameter Value | Value is required? |
---|---|---|
CDH 6.x | "spark.useVendorSparkLibraries": true, | Yes. Additional configuration is in the next section. |
HDP 3.x | "spark.useVendorSparkLibraries": true, | Yes. Additional configuration is in the next section. |
CDH 5.x and earlier | "spark.useVendorSparkLibraries": false, | N |
HDP 2.x and earlier | "spark.useVendorSparkLibraries": false, | N |
Locate the following setting:
"spark.version" |
Set the above value based on your Hadoop distribution in use:
Hadoop Distribution | spark.version | Notes |
---|---|---|
CDH 6.1, CDH 6.2 | 2.4.0 | Platform must use native Hadoop libraries from the cluster. Additional configuration is required. See below. |
CDH 6.0 | 2.2.0 | Platform must use native Hadoop libraries from the cluster. Additional configuration is required. See below. |
CDH 5.15.x | See below. | |
CDH 5.14.x | See below. | |
HDP 3.1 | 2.3.0 | Platform must use native Hadoop libraries from the cluster. Additional configuration is required. See below. |
HDP 3.0 | 2.3.0 | Platform must use native Hadoop libraries from the cluster. Additional configuration is required. See below. |
HDP 2.6.x | See below. | |
HDP 2.5.x | See below. |
NOTE: If the |
You must acquire native Hadoop libraries from the cluster if you are using any of the following versions:
Hadoop version | Library location on cluster |
|
---|---|---|
Cloudera 6.0 or later | /opt/cloudera/parcels/CDH | See section below. |
Hortonworks 3.0 or later | /usr/hdp/current | See section below. |
NOTE: Whenever the Hadoop distribution is upgraded on the cluster, the new versions of these libraries must be recopied to the following locations on the |
For more information on acquiring these libraries, please see the documentation provided with your Hadoop distribution.
To integrate with CDH 6.x, the platform must use the native Spark libraries. Please add the following properties to your configuration.
Steps:
Set sparkBundleJar
to the following:
"sparkBundleJar":"/opt/cloudera/parcels/CDH/lib/spark/jars/*:/opt/cloudera/parcels/CDH/lib/spark/hive/*" |
For the Spark Job Service, the Spark bundle JAR must be added to the classpath:
NOTE: The key modification is to remove the |
"spark-job-service": { "classpath": "%(topOfTree)s/services/spark-job-server/server/build/libs/spark-job-server-bundle.jar:%(topOfTree)s/conf/hadoop-site/:/usr/lib/hdinsight-datalake/*:%(sparkBundleJar)s:%(topOfTree)s/%(hadoopBundleJar)s" }, |
In the spark.props
section, add the following property:
"spark.yarn.jars":"local:/opt/cloudera/parcels/CDH/lib/spark/jars/*,local:/opt/cloudera/parcels/CDH/lib/spark/hive/*", |
Save your changes.
To integrate with HDP 3.x, the platform must use the native Spark libraries. Please add the following properties to your configuration.
Steps:
Set the Hadoop bundle JAR to point to the one provided with your distribution. The example below points to HDP 3.0:
"hadoopBundleJar": "hadoop-deps/hdp-3.0/build/libs/hdp-3.0-bundle.jar" |
Enable use of native libraries:
"spark.useVendorSparkLibraries": true, |
Set the path to the Spark bundle JAR:
"sparkBundleJar": "/usr/hdp/current/spark2-client/jars/*" |
Add the Spark bundle JAR to the Spark Job Service classpath (spark-job-service.classpath). Example:
classpath: ”%(topOfTree)s/services/spark-job-server/server/build/libs/spark-job-server-bundle.jar:%(sparkBundleJar)s:/etc/hadoop/conf/" |
The following property needs to be added or updated in spark.props
. If there are other values in this property, the following value must appear first in the list:
"spark.executor.extraClassPath": "/usr/hdp/current/spark2-client/jars/guava-14.0.1.jar" |
The following property needs to be added or updated to spark.props
. These do not need to be listed in a specific order:
"spark.yarn.jars": "local:/usr/hdp/current/spark2-client/jars/*" |
The defaults to using Spark 2.3.0. Depending on the version of your Hadoop distribution, you may need to modify the version of Spark that is used by the platform.
In the following table, you can review the Spark/Java version requirements for the hosting
.
To change the version of Spark in use by the , you change the value of the
spark.version
property, as listed below. No additional installation is required.
Additional requirements:
spark.version
property) to match the version of Spark that is used on the EMR cluster. Required Java JDK Version | Java JDK 1.8 | |
---|---|---|
Spark for |
|
With EMR:
Supported version(s) of EMR | EMR 5.6 and EMR 5.7 | |
---|---|---|
Spark for |
|
Tip: This version of Spark is required for CDH 6.0. CDH 6.1 and later can use other versions. |
Required Java JDK Version | Java JDK 1.8 | |
---|---|---|
Spark for |
|
With EMR:
Supported version(s) of EMR | EMR 5.8, EMR 5.9, and EMR 5.10 | |
---|---|---|
Spark for | Spark version must match the Spark version on the EMR cluster:
|
Required Java JDK Version | Java JDK 1.8 |
---|---|
Spark for | Not supported. |
With EMR:
Supported version(s) of EMR | EMR 5.11 and EMR 5.12 | |
---|---|---|
Spark for | Spark version must match the Spark version on the EMR cluster:
|
NOTE: Spark-job-service fails to start when |
Tip: This version of Spark is required for HDP 3.x. |
Required Java JDK Version | Java JDK 1.8 | |
---|---|---|
Spark for |
|
With EMR:
Supported version(s) of EMR | EMR 5.13 - EMR 5.19 | |
---|---|---|
Spark for | Spark version must match the Spark version on the EMR cluster:
|
NOTE: Spark 2.4.0 does not work with HDP 3.0, HDP 3.1, and CDH 6.0. |
NOTE: For Azure Databricks, you must provide the specific version of Spark through a different configuration parameter. See Configure for Azure Databricks. |
Required Java JDK Version | Java JDK 1.8 | |||
---|---|---|---|---|
Spark for |
|
With EMR:
Supported version(s) of EMR | EMR 5.20 - EMR 5.21 | |
---|---|---|
Spark for | Spark version must match the Spark version on the EMR cluster:
|
You can restart the platform now. See Start and Stop the Platform.
At this point, you should be able to run a job in the platform, which launches a Spark execution job and a profiling. Results appear normally in the .
Steps:
To verify that the Spark running environment is working:
/opt/trifacta/logs/batch-job-runnner.log
SPARK JOB INFO
block with a timestamp corresponding to your job execution.For more information, see Verify Operations.
Service logs:
Logs for the Spark Job Service are located in the following location:
/opt/trifacta/logs/spark-job-service.log
Additional log information on the launching of profile jobs is located here:
/opt/trifacta/logs/batch-job-runner.log
Job logs:
When profiling jobs fail, additional log information is written to the following:
/opt/trifacta/logs/jobs/<job-id>/spark-job.log
Below is a list of common errors in the log files and their likely causes.
Whenever a Spark job is executed, it is reported back as having failed. On the cluster, the job appears to have succeeded. However, in the Spark Job Service logs, the Spark Job Service cannot find any of the applications that it has submitted to resource manager.
In this case, the root problem is that Spark is unable to delete temporary files after the job has completed execution. During job execution, a set of ephemeral files may be written to the designated temporary directory on the cluster, which is typically /trifacta/tempfiles
. In most cases, these files are removed transparent to the user.
In some cases, those files may be left behind. To account for this accumulation in the directory, the performs a periodic cleanup operation to remove temp files that are over a specified age.
This cleanup operation can fail if HDFS is configured to send Trash to an encrypted zone. The HDFS API does not support the skipTrash
option, which is available through the HDFS CLI. In this scenario, the temp files are not successfully removed, and the files continue to accumulate without limit in the temporary directory. Eventually, this accumulation of files can cause the Spark Job Service to crash with Out of Memory errors.
The following are possible solutions:
Disable temp file cleanup in :
"job.tempfiles.cleanup.age": 0, |
Clean up the tempfiles
directory using an external process.
Spark job service fails to start with an error similar to the following in the spark-job-service.log
file:
Exception in thread "main" com.fasterxml.jackson.databind.JsonMappingException: Jackson version is too old 2.5.3 |
Some versions of the hadoopBundleJar contain older versions of the Jackson dependencies, which break the spark-job-service.
To ensure that the spark-job-service is provided the correct Jackson dependency versions, the sparkBundleJar
must be listed before the hadoopBundleJar
in the spark-job-service.classpath
, which is inserted as a parameter in . Example:
"spark-job-service.classpath": "%(topOfTree)s/services/spark-job-server/server/build/libs/spark-job-server-bundle.jar:%(topOfTree)s/conf/hadoop-site/:%(topOfTree)s/%(sparkBundleJar)s:%(topOfTree)s/%(hadoopBundleJar)s" |
Spark jobs may fail with the following error in the YARN application logs:
ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.IllegalArgumentException: Unknown message type: -22 at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:67) |
This problem may occur if Spark authentication is disabled on the Hadoop cluster but enabled in the . Spark authentication must match on the cluster and the platform.
Steps:
spark.props
entry. Insert the following property and value:
"spark.authenticate": "false" |
When Spark authentication is enabled on the Hadoop cluster, Spark jobs can fail. The YARN log file message looks something like the following:
17/09/22 16:55:42 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, example.com, executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Unable to create executor due to Unable to register with external shuffle server due to : java.lang.IllegalStateException: Expected SaslMessage, received something else (maybe your client does not have SASL enabled?) at org.apache.spark.network.sasl.SaslMessage.decode(SaslMessage.java:69) |
When Spark authentication is enabled on the Hadoop cluster, the must also be configured with Spark authentication enabled.
Inside the spark.props
entry, insert the following property value:
"spark.authenticate": "true" |
When executing a job through Spark, the job may fail with the following error in the spark-job-service.log
:
Required executor memory (6144+614 MB) is above the max threshold (1615 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'. |
The per-container memory allocation in Spark (spark.executor.memory
and spark.driver.memory
) must not exceed the YARN thresholds. See Spark tuning properties above.
When executing a job through Spark, the job may fail with the following error in the spark-job-service.log
file:
Job submission failed akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://SparkJobServer/user/ProfileLauncher#1213485950]] after [20000 ms] |
There is a 20-second timeout on the attempt to submit a Profiling job to Yarn. If the initial upload of the spark libraries to the cluster takes longer than 20 seconds, the spark-job-service times out and returns an error to the UI. However, the libraries do finish uploading successfully to the cluster.
The library upload is a one-time operation for each install/upgrade. Despite the error, the libraries are uploaded successfully the first time. This error does not affect subsequent profiler job runs.
Solution:
Try running the job again.
When executing a job through Spark, the job may fail with the following error in the spark-job-service.log
file:
java.lang.ClassNotFoundException: com.trifacta.jobserver.profiler.Profiler |
By default, the Spark job service attempts to optimize the distribution of the Spark JAR files across the cluster. This optimization involves a one-time upload of the spark-assembly and profiler-bundle JAR files to HDFS. Then, YARN distributes these JARs to the worker nodes of the cluster, where they are cached for future use.
In some cases, the localized JAR files can get corrupted on the worker nodes, causing this ClassNotFound error to occur.
Solution:
The solution is to disable this optimization through platform configuration.
Steps:
spark-job-service
configuration node.Set the following property to false
:
"spark-job-service.optimizeLocalization" : true |
spark-job-service.log
file:
Exception in thread "LeaseRenewer:trifacta@nameservice1" java.lang.OutOfMemoryError: PermGen space |
Solution:
The solution is to configure the PermGen space for the Spark driver:
spark
configuration node.Set the following property to the given value:
"spark.props.spark.driver.extraJavaOptions" : "-XX:MaxPermSize=1024m -XX:PermSize=256m", |
spark-job-service.log
file:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token x for trifacta) can't be found in cache |
Solution:
The solution is to set Spark impersonation to true
:
spark-job-service
configuration node.Set the following property to the given value:
"spark-job-service.sparkImpersonationOn" : true, |
Issue:
Spark fails with an error similar to the following in the spark-job-service.log:
"Job aborted due to stage failure: Total size of serialized results of 208 tasks (1025.4 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)" |
Explanation:
The spark.driver.maxResultSize
parameter determines the limit of the total size of serialized results of all partitions for each Spark action (e.g. collect). If the total size of the serialized results exceeds this limit, the job is aborted.
To enable serialized results of unlimited size, set this parameter to zero (0
).
Solution:
To the spark.props
section of the file, remove the size limit by setting this value to zero:
"spark.driver.maxResultSize": "0" |
Issue:
Spark job fails with an error similar to the following in either the spark-job.log
or the yarn-app.log
file:
"java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState'" |
Explanation:
By default, the Spark running environment attempts to connect to Hive when it creates the Spark Context. This connection attempt may fail if Hive connectivity (in conf/hadoop-site/hive-site.xml
) is not configured correctly on the .
Solution:
This issue can be fixed by configuring Hive connectivity on the edge node.
If Hive connectivity is not required, the Spark running environment's default behavior can be changed as follows:
In the spark-job-service
section of the file, disable Hive connectivity by setting this value to false
:
"spark-job-service.enableHiveSupport": false |
Issue:
Spark job fails with an error similar to the following in either the spark-job.log
or the yarn-app.log
file:
java.io.FileNotFoundException: No Avro files found. Hadoop option "avro.mapred.ignore.inputs.without.extension" is set to true. Do all input files have ".avro" extension? |
Explanation:
By default, Spark-Avro requires all Avro files to have the .avro extension, which includes all part files in a source directory. Spark-Avro ignores any files that do not have the .avro extension.
If a directory contains part files without an extension (e.g. part-00001
, part-00002
), Spark-Avro ignores these files and throws the "No Avro files found" error.
Solution:
This issue can be fixed by setting the spark.hadoop.avro.mapred.ignore.inputs.without.extension
property to false
:
To the spark.props
section of the file, add the following setting if it does not already exist. Set its value to false
:
"spark.hadoop.avro.mapred.ignore.inputs.without.extension": "false" |
Issue:
After you have submitted a job to be executed on the Spark cluster, the job may fail in the after 30 minutes. However, on the busy cluster, the job remains enqueued and is eventually collected and executed. Since the job was canceled in the platform, results are not returned.
Explanation:
This issue is caused by a timeout setting for Batch Job Runner, which cancels management of jobs after a predefined number of seconds. Since these jobs are already queued on the cluster, they may be executed independent of the platform.
Solution:
This issue can be fixed by increasing the Batch Job Runner Spark timeout setting:
Locate the following property. By default, it is set to 172800
, which is 48 hours:
"batchserver.spark.progressTimeoutSeconds": 172800, |
If your value is lower than the default, you can increase this value high enough for your job to succeed.