Skip to content

Commit e380767

Browse files
Marcelo Vanzintgravescs
authored andcommitted
[SPARK-1395] Fix "local:" URI support in Yarn mode (again).
Recent changes ignored the fact that path may be defined with "local:" URIs, which means they need to be explicitly added to the classpath everywhere a remote process is started. This change fixes that by: - Using the correct methods to add paths to the classpath - Creating SparkConf settings for the Spark jar itself and for the user's jar - Propagating those two settings to the remote processes where needed This ensures that both in client and in cluster mode, the driver has the necessary info to build the executor's classpath and have things still work when they contain "local:" references. The change also fixes some confusion in ClientBase about whether to use SparkConf or system properties to propagate config options to the driver and executors, by standardizing on using data held by SparkConf. On the cleanup front, I removed the hacky way that log4j configuration was being propagated to handle the "local:" case. It's much more cleanly (and generically) handled by using spark-submit arguments (--files to upload a config file, or setting spark.executor.extraJavaOptions to pass JVM arguments and use a local file). Author: Marcelo Vanzin <[email protected]> Closes #560 from vanzin/yarn-local-2 and squashes the following commits: 4e7f066 [Marcelo Vanzin] Correctly propagate SPARK_JAVA_OPTS to driver/executor. 6a454ea [Marcelo Vanzin] Use constants for PWD in test. 6dd5943 [Marcelo Vanzin] Fix propagation of config options to driver / executor. b2e377f [Marcelo Vanzin] Review feedback. 93c3f85 [Marcelo Vanzin] Fix ClassCastException in test. e5c682d [Marcelo Vanzin] Fix cluster mode, restore SPARK_LOG4J_CONF. 1dfbb40 [Marcelo Vanzin] Add documentation for spark.yarn.jar. bbdce05 [Marcelo Vanzin] [SPARK-1395] Fix "local:" URI support in Yarn mode (again).
1 parent 9cb64b2 commit e380767

File tree

5 files changed

+274
-104
lines changed

5 files changed

+274
-104
lines changed

docs/running-on-yarn.md

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,19 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
9595
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
9696
</td>
9797
</tr>
98+
<tr>
99+
<td><code>spark.yarn.jar</code></td>
100+
<td>(none)</td>
101+
<td>
102+
The location of the Spark jar file, in case overriding the default location is desired.
103+
By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be
104+
in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't
105+
need to be distributed each time an application runs. To point to a jar on HDFS, for example,
106+
set this configuration to "hdfs:///some/path".
107+
</td>
108+
</tr>
98109
</table>
99110

100-
By default, Spark on YARN will use a Spark jar installed locally, but the Spark JAR can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a JAR on HDFS, `export SPARK_JAR=hdfs:///some/path`.
101-
102111
# Launching Spark on YARN
103112

104113
Ensure that `HADOOP_CONF_DIR` or `YARN_CONF_DIR` points to the directory which contains the (client side) configuration files for the Hadoop cluster.
@@ -156,7 +165,20 @@ all environment variables used for launching each container. This process is use
156165
classpath problems in particular. (Note that enabling this requires admin privileges on cluster
157166
settings and a restart of all node managers. Thus, this is not applicable to hosted clusters).
158167

159-
# Important Notes
168+
To use a custom log4j configuration for the application master or executors, there are two options:
169+
170+
- upload a custom log4j.properties using spark-submit, by adding it to the "--files" list of files
171+
to be uploaded with the application.
172+
- add "-Dlog4j.configuration=<location of configuration file>" to "spark.driver.extraJavaOptions"
173+
(for the driver) or "spark.executor.extraJavaOptions" (for executors). Note that if using a file,
174+
the "file:" protocol should be explicitly provided, and the file needs to exist locally on all
175+
the nodes.
176+
177+
Note that for the first option, both executors and the application master will share the same
178+
log4j configuration, which may cause issues when they run on the same node (e.g. trying to write
179+
to the same log file).
180+
181+
# Important notes
160182

161183
- Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be passed to YARN. Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
162184
- The local directories used by Spark executors will be the local directories configured for YARN (Hadoop YARN config `yarn.nodemanager.local-dirs`). If the user specifies `spark.local.dir`, it will be ignored.

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 157 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ trait ClientBase extends Logging {
154154
}
155155

156156
/** Copy the file into HDFS if needed. */
157-
private def copyRemoteFile(
157+
private[yarn] def copyRemoteFile(
158158
dstDir: Path,
159159
originalPath: Path,
160160
replication: Short,
@@ -213,10 +213,19 @@ trait ClientBase extends Logging {
213213

214214
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
215215

216-
Map(
217-
ClientBase.SPARK_JAR -> ClientBase.getSparkJar, ClientBase.APP_JAR -> args.userJar,
218-
ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
219-
).foreach { case(destName, _localPath) =>
216+
val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF"))
217+
if (oldLog4jConf.isDefined) {
218+
logWarning(
219+
"SPARK_LOG4J_CONF detected in the system environment. This variable has been " +
220+
"deprecated. Please refer to the \"Launching Spark on YARN\" documentation " +
221+
"for alternatives.")
222+
}
223+
224+
List(
225+
(ClientBase.SPARK_JAR, ClientBase.sparkJar(sparkConf), ClientBase.CONF_SPARK_JAR),
226+
(ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR),
227+
("log4j.properties", oldLog4jConf.getOrElse(null), null)
228+
).foreach { case(destName, _localPath, confKey) =>
220229
val localPath: String = if (_localPath != null) _localPath.trim() else ""
221230
if (! localPath.isEmpty()) {
222231
val localURI = new URI(localPath)
@@ -225,6 +234,8 @@ trait ClientBase extends Logging {
225234
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
226235
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
227236
destName, statCache)
237+
} else if (confKey != null) {
238+
sparkConf.set(confKey, localPath)
228239
}
229240
}
230241
}
@@ -246,6 +257,8 @@ trait ClientBase extends Logging {
246257
if (addToClasspath) {
247258
cachedSecondaryJarLinks += linkname
248259
}
260+
} else if (addToClasspath) {
261+
cachedSecondaryJarLinks += file.trim()
249262
}
250263
}
251264
}
@@ -265,14 +278,10 @@ trait ClientBase extends Logging {
265278
val env = new HashMap[String, String]()
266279

267280
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
268-
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
269-
ClientBase.populateClasspath(yarnConf, sparkConf, log4jConf, env, extraCp)
281+
ClientBase.populateClasspath(args, yarnConf, sparkConf, env, extraCp)
270282
env("SPARK_YARN_MODE") = "true"
271283
env("SPARK_YARN_STAGING_DIR") = stagingDir
272284
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
273-
if (log4jConf != null) {
274-
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
275-
}
276285

277286
// Set the environment variables to be passed on to the executors.
278287
distCacheMgr.setDistFilesEnv(env)
@@ -285,7 +294,6 @@ trait ClientBase extends Logging {
285294
// Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments.
286295
env("SPARK_YARN_USER_ENV") = userEnvs
287296
}
288-
289297
env
290298
}
291299

@@ -310,6 +318,37 @@ trait ClientBase extends Logging {
310318
logInfo("Setting up container launch context")
311319
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
312320
amContainer.setLocalResources(localResources)
321+
322+
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
323+
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
324+
// SparkContext will not let that set spark* system properties, which is expected behavior for
325+
// Yarn clients. So propagate it through the environment.
326+
//
327+
// Note that to warn the user about the deprecation in cluster mode, some code from
328+
// SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
329+
// described above).
330+
if (args.amClass == classOf[ApplicationMaster].getName) {
331+
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
332+
val warning =
333+
s"""
334+
|SPARK_JAVA_OPTS was detected (set to '$value').
335+
|This is deprecated in Spark 1.0+.
336+
|
337+
|Please instead use:
338+
| - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
339+
| - ./spark-submit with --driver-java-options to set -X options for a driver
340+
| - spark.executor.extraJavaOptions to set -X options for executors
341+
""".stripMargin
342+
logWarning(warning)
343+
for (proc <- Seq("driver", "executor")) {
344+
val key = s"spark.$proc.extraJavaOptions"
345+
if (sparkConf.contains(key)) {
346+
throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
347+
}
348+
}
349+
env("SPARK_JAVA_OPTS") = value
350+
}
351+
}
313352
amContainer.setEnvironment(env)
314353

315354
val amMemory = calculateAMMemory(newApp)
@@ -341,30 +380,20 @@ trait ClientBase extends Logging {
341380
javaOpts += "-XX:CMSIncrementalDutyCycle=10"
342381
}
343382

344-
// SPARK_JAVA_OPTS is deprecated, but for backwards compatibility:
345-
sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
346-
sparkConf.set("spark.executor.extraJavaOptions", opts)
347-
sparkConf.set("spark.driver.extraJavaOptions", opts)
348-
}
349-
383+
// Forward the Spark configuration to the application master / executors.
350384
// TODO: it might be nicer to pass these as an internal environment variable rather than
351385
// as Java options, due to complications with string parsing of nested quotes.
352-
if (args.amClass == classOf[ExecutorLauncher].getName) {
353-
// If we are being launched in client mode, forward the spark-conf options
354-
// onto the executor launcher
355-
for ((k, v) <- sparkConf.getAll) {
356-
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
357-
}
358-
} else {
359-
// If we are being launched in standalone mode, capture and forward any spark
360-
// system properties (e.g. set by spark-class).
361-
for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
362-
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
363-
}
364-
sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts)
365-
sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p")
386+
for ((k, v) <- sparkConf.getAll) {
387+
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
388+
}
389+
390+
if (args.amClass == classOf[ApplicationMaster].getName) {
391+
sparkConf.getOption("spark.driver.extraJavaOptions")
392+
.orElse(sys.env.get("SPARK_JAVA_OPTS"))
393+
.foreach(opts => javaOpts += opts)
394+
sparkConf.getOption("spark.driver.libraryPath")
395+
.foreach(p => javaOpts += s"-Djava.library.path=$p")
366396
}
367-
javaOpts += ClientBase.getLog4jConfiguration(localResources)
368397

369398
// Command for the ApplicationMaster
370399
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
@@ -377,7 +406,10 @@ trait ClientBase extends Logging {
377406
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
378407
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
379408

380-
logInfo("Command for starting the Spark ApplicationMaster: " + commands)
409+
logInfo("Yarn AM launch context:")
410+
logInfo(s" class: ${args.amClass}")
411+
logInfo(s" env: $env")
412+
logInfo(s" command: ${commands.mkString(" ")}")
381413

382414
// TODO: it would be nicer to just make sure there are no null commands here
383415
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
@@ -391,12 +423,39 @@ trait ClientBase extends Logging {
391423
object ClientBase extends Logging {
392424
val SPARK_JAR: String = "__spark__.jar"
393425
val APP_JAR: String = "__app__.jar"
394-
val LOG4J_PROP: String = "log4j.properties"
395-
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
396426
val LOCAL_SCHEME = "local"
427+
val CONF_SPARK_JAR = "spark.yarn.jar"
428+
/**
429+
* This is an internal config used to propagate the location of the user's jar file to the
430+
* driver/executors.
431+
*/
432+
val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
433+
/**
434+
* This is an internal config used to propagate the list of extra jars to add to the classpath
435+
* of executors.
436+
*/
397437
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
438+
val ENV_SPARK_JAR = "SPARK_JAR"
398439

399-
def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
440+
/**
441+
* Find the user-defined Spark jar if configured, or return the jar containing this
442+
* class if not.
443+
*
444+
* This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the
445+
* user environment if that is not found (for backwards compatibility).
446+
*/
447+
def sparkJar(conf: SparkConf) = {
448+
if (conf.contains(CONF_SPARK_JAR)) {
449+
conf.get(CONF_SPARK_JAR)
450+
} else if (System.getenv(ENV_SPARK_JAR) != null) {
451+
logWarning(
452+
s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " +
453+
s"in favor of the $CONF_SPARK_JAR configuration variable.")
454+
System.getenv(ENV_SPARK_JAR)
455+
} else {
456+
SparkContext.jarOfClass(this.getClass).head
457+
}
458+
}
400459

401460
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = {
402461
val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
@@ -469,71 +528,74 @@ object ClientBase extends Logging {
469528
triedDefault.toOption
470529
}
471530

531+
def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
532+
env: HashMap[String, String], extraClassPath: Option[String] = None) {
533+
extraClassPath.foreach(addClasspathEntry(_, env))
534+
addClasspathEntry(Environment.PWD.$(), env)
535+
536+
// Normally the users app.jar is last in case conflicts with spark jars
537+
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
538+
addUserClasspath(args, sparkConf, env)
539+
addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
540+
ClientBase.populateHadoopClasspath(conf, env)
541+
} else {
542+
addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
543+
ClientBase.populateHadoopClasspath(conf, env)
544+
addUserClasspath(args, sparkConf, env)
545+
}
546+
547+
// Append all jar files under the working directory to the classpath.
548+
addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env);
549+
}
472550

473551
/**
474-
* Returns the java command line argument for setting up log4j. If there is a log4j.properties
475-
* in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable
476-
* is checked.
552+
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
553+
* to the classpath.
477554
*/
478-
def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = {
479-
var log4jConf = LOG4J_PROP
480-
if (!localResources.contains(log4jConf)) {
481-
log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match {
482-
case conf: String =>
483-
val confUri = new URI(conf)
484-
if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) {
485-
"file://" + confUri.getPath()
486-
} else {
487-
ClientBase.LOG4J_PROP
488-
}
489-
case null => "log4j-spark-container.properties"
555+
private def addUserClasspath(args: ClientArguments, conf: SparkConf,
556+
env: HashMap[String, String]) = {
557+
if (args != null) {
558+
addFileToClasspath(args.userJar, APP_JAR, env)
559+
if (args.addJars != null) {
560+
args.addJars.split(",").foreach { case file: String =>
561+
addFileToClasspath(file, null, env)
562+
}
490563
}
564+
} else {
565+
val userJar = conf.get(CONF_SPARK_USER_JAR, null)
566+
addFileToClasspath(userJar, APP_JAR, env)
567+
568+
val cachedSecondaryJarLinks = conf.get(CONF_SPARK_YARN_SECONDARY_JARS, "").split(",")
569+
cachedSecondaryJarLinks.foreach(jar => addFileToClasspath(jar, null, env))
491570
}
492-
" -Dlog4j.configuration=" + log4jConf
493571
}
494572

495-
def populateClasspath(conf: Configuration, sparkConf: SparkConf, log4jConf: String,
496-
env: HashMap[String, String], extraClassPath: Option[String] = None) {
497-
498-
if (log4jConf != null) {
499-
// If a custom log4j config file is provided as a local: URI, add its parent directory to the
500-
// classpath. Note that this only works if the custom config's file name is
501-
// "log4j.properties".
502-
val localPath = getLocalPath(log4jConf)
503-
if (localPath != null) {
504-
val parentPath = new File(localPath).getParent()
505-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath,
506-
File.pathSeparator)
573+
/**
574+
* Adds the given path to the classpath, handling "local:" URIs correctly.
575+
*
576+
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
577+
* name will be added to the classpath (relative to the job's work directory).
578+
*
579+
* If not a "local:" file and no alternate name, the environment is not modified.
580+
*
581+
* @param path Path to add to classpath (optional).
582+
* @param fileName Alternate name for the file (optional).
583+
* @param env Map holding the environment variables.
584+
*/
585+
private def addFileToClasspath(path: String, fileName: String,
586+
env: HashMap[String, String]) : Unit = {
587+
if (path != null) {
588+
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
589+
val localPath = getLocalPath(path)
590+
if (localPath != null) {
591+
addClasspathEntry(localPath, env)
592+
return
593+
}
507594
}
508595
}
509-
510-
/** Add entry to the classpath. */
511-
def addClasspathEntry(path: String) = YarnSparkHadoopUtil.addToEnvironment(env,
512-
Environment.CLASSPATH.name, path, File.pathSeparator)
513-
/** Add entry to the classpath. Interpreted as a path relative to the working directory. */
514-
def addPwdClasspathEntry(entry: String) =
515-
addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + entry)
516-
517-
extraClassPath.foreach(addClasspathEntry)
518-
519-
val cachedSecondaryJarLinks =
520-
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
521-
.filter(_.nonEmpty)
522-
// Normally the users app.jar is last in case conflicts with spark jars
523-
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
524-
addPwdClasspathEntry(APP_JAR)
525-
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
526-
addPwdClasspathEntry(SPARK_JAR)
527-
ClientBase.populateHadoopClasspath(conf, env)
528-
} else {
529-
addPwdClasspathEntry(SPARK_JAR)
530-
ClientBase.populateHadoopClasspath(conf, env)
531-
addPwdClasspathEntry(APP_JAR)
532-
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
596+
if (fileName != null) {
597+
addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env);
533598
}
534-
// Append all class files and jar files under the working directory to the classpath.
535-
addClasspathEntry(Environment.PWD.$())
536-
addPwdClasspathEntry("*")
537599
}
538600

539601
/**
@@ -547,4 +609,8 @@ object ClientBase extends Logging {
547609
null
548610
}
549611

612+
private def addClasspathEntry(path: String, env: HashMap[String, String]) =
613+
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
614+
File.pathSeparator)
615+
550616
}

0 commit comments

Comments
 (0)