Automating Spark Jobs with Oozie Spark Action
You can use Apache Spark as part of a complex workflow with multiple processing steps, triggers, and interdependencies. You can automate Apache Spark jobs using Oozie Spark action.
About Oozie Spark Action
If you use Apache Spark as part of a complex workflow with multiple processing steps, triggers, and interdependencies, consider using Apache Oozie to automate jobs. Oozie is a workflow engine that executes sequences of actions structured as directed acyclic graphs (DAGs). Each action is an individual unit of work, such as a Spark job or Hive query.
The Oozie "Spark action" runs a Spark job as part of an Oozie workflow. The workflow waits until the Spark job completes before continuing to the next action.
For additional information about Spark action, see the Apache "Oozie Spark Action Extension" documentation. For general information about Oozie and Workflow Manager, see Workflow Management under Ambari documentation.
![]() | Note |
|---|---|
Support for yarn-client execution mode for Oozie Spark action will be removed in a future release. Oozie will continue to support yarn-cluster execution mode for Oozie Spark action. |
Configure Oozie Spark Action for Spark
- Set up .jar file exclusions.
Oozie distributes its own libraries on the ShareLib, which are included on the classpath. These .jar files may conflict with each other if some components require different versions of a library. You can use the
oozie.action.sharelib.for.<action_type>.exclude=<value>property to address these scenarios.In HDP-3.x, Spark2 uses older
jackson-*.jar versions than Oozie, which creates a runtime conflict in Oozie for Spark and generates aNoClassDefFoundErrorerror. This can be resolved by using theoozie.action.sharelib.for.<action_type>.exclude=<value>property to exclude theoozie/jackson.*.jar files from the classpath. Libraries matching the regex pattern provided as the property value will not be added to the distributed cache.
Notespark2 ShareLib directory will not be created. The named spark directory is used for spark2 libs.Examples
The following examples show how to use a ShareLib exclude on a Java action.
Actual ShareLib content:
* /user/oozie/share/lib/lib_20180701/oozie/lib-one-1.5.jar * /user/oozie/share/lib/lib_20180701/oozie/lib-two-1.5.jar * /user/oozie/share/lib/lib_20180701/java/lib-one-2.6.jar * /user/oozie/share/lib/lib_20180701/java/lib-two-2.6.jar * /user/oozie/share/lib/lib_20180701/java/component-connector.jarSetting the
oozie.action.sharelib.for.java.excludeproperty tooozie/lib-one.*=results in the following distributed cache content:* /user/oozie/share/lib/lib_20180701/oozie/lib-two-1.5.jar * /user/oozie/share/lib/lib_20180701/java/lib-one-2.6.jar * /user/oozie/share/lib/lib_20180701/java/lib-two-2.6.jar * /user/oozie/share/lib/lib_20180701/java/component-connector.jarSetting the
oozie.action.sharelib.for.java.excludeproperty tooozie/lib-one.*|component-connector.jar=results in the following distributed cache content:* /user/oozie/share/lib/lib_20180701/oozie/lib-two-1.5.jar * /user/oozie/share/lib/lib_20180701/java/lib-one-2.6.jar * /user/oozie/share/lib/lib_20180701/java/lib-two-2.6.jar
- Run the Oozie
shareliblistcommand to verify the configuration. You should seesparkin the results.oozie admin –shareliblist spark
The following examples show a workflow definition XML file, an Oozie job configuration file, and a Python script for running a Spark2-Pi job.
Sample Workflow.xml file for spark2-Pi:
<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkPythonPi'>
<start to='spark-node' />
<action name='spark-node'>
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>${master}</master>
<name>Python-Spark-Pi</name>
<jar>pi.py</jar>
</spark>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Workflow failed, error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>
Sample Job.properties file for spark2-Pi:
nameNode=hdfs://host:8020
jobTracker=host:8050
queueName=default
examplesRoot=examples
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/pyspark
master=yarn-cluster
oozie.action.sharelib.for.spark=spark2
Sample Python script, lib/pi.py:
import sys
from random import random
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="Python-Spark-Pi")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
sc.stop()
Troubleshooting .jar file conflicts with Oozie Spark action
When using Oozie Spark action, Oozie jobs may fail with the following error if there are .jar file conflicts between the "oozie" ShareLib and the "spark" ShareLib.
2018-06-04 13:27:32,652 WARN SparkActionExecutor:523 - SERVER[XXXX] USER[XXXX] GROUP[-] TOKEN[] APP[XXXX] JOB[0000000-<XXXXX>-oozie-oozi-W] ACTION[0000000-<XXXXXX>-oozie-oozi-W@spark2] Launcher exception: Attempt to add (hdfs://XXXX/user/oozie/share/lib/lib_XXXXX/oozie/aws-java-sdk-kms-1.10.6.jar) multiple times to the distributed cache.
java.lang.IllegalArgumentException: Attempt to add (hdfs://XXXXX/user/oozie/share/lib/lib_20170727191559/oozie/aws-java-sdk-kms-1.10.6.jar) multiple times to the distributed cache.
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13$anonfun$apply$8.apply(Client.scala:632)
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13$anonfun$apply$8.apply(Client.scala:623)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74)
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13.apply(Client.scala:623)
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13.apply(Client.scala:622)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:622)
at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:895)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:171)
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1231)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1290)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:750)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:311)
at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:232)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:58)
at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:237)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
Run the following commands to resolve this issue.
![]() | Note |
|---|---|
You may need to perform a backup before running the |
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark/aws*
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark/azure*
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark/hadoop-aws*
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark/hadoop-azure*
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark/ok*
hadoop fs -mv /user/oozie/share/lib/lib_<ts>/oozie/jackson* /user/oozie/share/lib/lib_<ts>/oozie.old
Next, run the following command to update the Oozie ShareLib:
oozie admin -oozie http://<oozie-server-hostname>:11000/oozie -sharelibupdate

