Skip to content

Commit 6904750

Browse files
Marcelo Vanzintgravescs
authored andcommitted
[SPARK-1395] Allow "local:" URIs to work on Yarn.
This only works for the three paths defined in the environment (SPARK_JAR, SPARK_YARN_APP_JAR and SPARK_LOG4J_CONF). Tested by running SparkPi with local: and file: URIs against Yarn cluster (no "upload" shows up in logs in the local case). Author: Marcelo Vanzin <[email protected]> Closes #303 from vanzin/yarn-local and squashes the following commits: 82219c1 [Marcelo Vanzin] [SPARK-1395] Allow "local:" URIs to work on Yarn.
1 parent bb76eae commit 6904750

File tree

6 files changed

+142
-79
lines changed

6 files changed

+142
-79
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.deploy
1919

2020
import java.io.{PrintStream, File}
21-
import java.net.URL
21+
import java.net.{URI, URL}
2222

2323
import org.apache.spark.executor.ExecutorURLClassLoader
2424

@@ -216,7 +216,7 @@ object SparkSubmit {
216216
}
217217

218218
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
219-
val localJarFile = new File(localJar)
219+
val localJarFile = new File(new URI(localJar).getPath())
220220
if (!localJarFile.exists()) {
221221
printWarning(s"Jar $localJar does not exist, skipping.")
222222
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class ExecutorRunnable(
8282
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
8383

8484
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
85-
localResources.contains(ClientBase.LOG4J_PROP))
85+
localResources)
8686
logInfo("Setting up executor with commands: " + commands)
8787
ctx.setCommands(commands)
8888

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 128 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.deploy.yarn
1919

2020
import java.io.File
21-
import java.net.{InetAddress, UnknownHostException, URI}
21+
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
2222
import java.nio.ByteBuffer
2323

2424
import scala.collection.JavaConversions._
@@ -209,53 +209,35 @@ trait ClientBase extends Logging {
209209

210210
Map(
211211
ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar,
212-
ClientBase.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")
212+
ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
213213
).foreach { case(destName, _localPath) =>
214214
val localPath: String = if (_localPath != null) _localPath.trim() else ""
215215
if (! localPath.isEmpty()) {
216216
val localURI = new URI(localPath)
217-
val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
218-
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
219-
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
220-
destName, statCache)
217+
if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
218+
val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
219+
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
220+
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
221+
destName, statCache)
222+
}
221223
}
222224
}
223225

224-
// Handle jars local to the ApplicationMaster.
225-
if ((args.addJars != null) && (!args.addJars.isEmpty())){
226-
args.addJars.split(',').foreach { case file: String =>
227-
val localURI = new URI(file.trim())
228-
val localPath = new Path(localURI)
229-
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
230-
val destPath = copyRemoteFile(dst, localPath, replication)
231-
// Only add the resource to the Spark ApplicationMaster.
232-
val appMasterOnly = true
233-
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
234-
linkname, statCache, appMasterOnly)
235-
}
236-
}
237-
238-
// Handle any distributed cache files
239-
if ((args.files != null) && (!args.files.isEmpty())){
240-
args.files.split(',').foreach { case file: String =>
241-
val localURI = new URI(file.trim())
242-
val localPath = new Path(localURI)
243-
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
244-
val destPath = copyRemoteFile(dst, localPath, replication)
245-
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
246-
linkname, statCache)
247-
}
248-
}
249-
250-
// Handle any distributed cache archives
251-
if ((args.archives != null) && (!args.archives.isEmpty())) {
252-
args.archives.split(',').foreach { case file:String =>
253-
val localURI = new URI(file.trim())
254-
val localPath = new Path(localURI)
255-
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
256-
val destPath = copyRemoteFile(dst, localPath, replication)
257-
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
258-
linkname, statCache)
226+
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
227+
(args.files, LocalResourceType.FILE, false),
228+
(args.archives, LocalResourceType.ARCHIVE, false) )
229+
fileLists.foreach { case (flist, resType, appMasterOnly) =>
230+
if (flist != null && !flist.isEmpty()) {
231+
flist.split(',').foreach { case file: String =>
232+
val localURI = new URI(file.trim())
233+
if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
234+
val localPath = new Path(localURI)
235+
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
236+
val destPath = copyRemoteFile(dst, localPath, replication)
237+
distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
238+
linkname, statCache, appMasterOnly)
239+
}
240+
}
259241
}
260242
}
261243

@@ -269,12 +251,14 @@ trait ClientBase extends Logging {
269251
logInfo("Setting up the launch environment")
270252

271253
val env = new HashMap[String, String]()
272-
273-
ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP),
274-
env)
254+
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
255+
ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env)
275256
env("SPARK_YARN_MODE") = "true"
276257
env("SPARK_YARN_STAGING_DIR") = stagingDir
277258
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
259+
if (log4jConf != null) {
260+
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
261+
}
278262

279263
// Set the environment variables to be passed on to the executors.
280264
distCacheMgr.setDistFilesEnv(env)
@@ -345,10 +329,7 @@ trait ClientBase extends Logging {
345329
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
346330
JAVA_OPTS += " " + env("SPARK_JAVA_OPTS")
347331
}
348-
349-
if (!localResources.contains(ClientBase.LOG4J_PROP)) {
350-
JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
351-
}
332+
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
352333

353334
// Command for the ApplicationMaster
354335
val commands = List[String](
@@ -377,6 +358,8 @@ object ClientBase {
377358
val SPARK_JAR: String = "spark.jar"
378359
val APP_JAR: String = "app.jar"
379360
val LOG4J_PROP: String = "log4j.properties"
361+
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
362+
val LOCAL_SCHEME = "local"
380363

381364
// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
382365
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
@@ -428,30 +411,113 @@ object ClientBase {
428411
}
429412
}
430413

431-
def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
414+
/**
415+
* Returns the java command line argument for setting up log4j. If there is a log4j.properties
416+
* in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable
417+
* is checked.
418+
*/
419+
def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = {
420+
var log4jConf = LOG4J_PROP
421+
if (!localResources.contains(log4jConf)) {
422+
log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match {
423+
case conf: String =>
424+
val confUri = new URI(conf)
425+
if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) {
426+
"file://" + confUri.getPath()
427+
} else {
428+
ClientBase.LOG4J_PROP
429+
}
430+
case null => "log4j-spark-container.properties"
431+
}
432+
}
433+
" -Dlog4j.configuration=" + log4jConf
434+
}
435+
436+
def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
437+
log4jConf: String, env: HashMap[String, String]) {
432438
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(),
433439
File.pathSeparator)
434-
// If log4j present, ensure ours overrides all others
435-
if (addLog4j) {
436-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
437-
Path.SEPARATOR + LOG4J_PROP, File.pathSeparator)
440+
if (log4jConf != null) {
441+
// If a custom log4j config file is provided as a local: URI, add its parent directory to the
442+
// classpath. Note that this only works if the custom config's file name is
443+
// "log4j.properties".
444+
val localPath = getLocalPath(log4jConf)
445+
if (localPath != null) {
446+
val parentPath = new File(localPath).getParent()
447+
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath,
448+
File.pathSeparator)
449+
}
438450
}
439451
// Normally the users app.jar is last in case conflicts with spark jars
440452
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
441453
.toBoolean
442454
if (userClasspathFirst) {
443-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
444-
Path.SEPARATOR + APP_JAR, File.pathSeparator)
455+
addUserClasspath(args, env)
445456
}
446-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
447-
Path.SEPARATOR + SPARK_JAR, File.pathSeparator)
457+
addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env);
448458
ClientBase.populateHadoopClasspath(conf, env)
449-
450459
if (!userClasspathFirst) {
451-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
452-
Path.SEPARATOR + APP_JAR, File.pathSeparator)
460+
addUserClasspath(args, env)
461+
}
462+
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
463+
Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator)
464+
}
465+
466+
/**
467+
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
468+
* to the classpath.
469+
*/
470+
private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
471+
if (args != null) {
472+
addClasspathEntry(args.userJar, APP_JAR, env)
473+
}
474+
475+
if (args != null && args.addJars != null) {
476+
args.addJars.split(",").foreach { case file: String =>
477+
addClasspathEntry(file, null, env)
478+
}
479+
}
480+
}
481+
482+
/**
483+
* Adds the given path to the classpath, handling "local:" URIs correctly.
484+
*
485+
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
486+
* name will be added to the classpath (relative to the job's work directory).
487+
*
488+
* If not a "local:" file and no alternate name, the environment is not modified.
489+
*
490+
* @param path Path to add to classpath (optional).
491+
* @param fileName Alternate name for the file (optional).
492+
* @param env Map holding the environment variables.
493+
*/
494+
private def addClasspathEntry(path: String, fileName: String,
495+
env: HashMap[String, String]) : Unit = {
496+
if (path != null) {
497+
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
498+
val localPath = getLocalPath(path)
499+
if (localPath != null) {
500+
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
501+
File.pathSeparator)
502+
return
503+
}
504+
}
505+
}
506+
if (fileName != null) {
507+
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
508+
Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
509+
}
510+
}
511+
512+
/**
513+
* Returns the local path if the URI is a "local:" URI, or null otherwise.
514+
*/
515+
private def getLocalPath(resource: String): String = {
516+
val uri = new URI(resource)
517+
if (LOCAL_SCHEME.equals(uri.getScheme())) {
518+
return uri.getPath()
453519
}
454-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
455-
Path.SEPARATOR + "*", File.pathSeparator)
520+
null
456521
}
522+
457523
}

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ trait ExecutorRunnableUtil extends Logging {
5252
hostname: String,
5353
executorMemory: Int,
5454
executorCores: Int,
55-
userSpecifiedLogFile: Boolean) = {
55+
localResources: HashMap[String, LocalResource]) = {
5656
// Extra options for the JVM
5757
var JAVA_OPTS = ""
5858
// Set the JVM memory
@@ -64,10 +64,7 @@ trait ExecutorRunnableUtil extends Logging {
6464

6565
JAVA_OPTS += " -Djava.io.tmpdir=" +
6666
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
67-
68-
if (!userSpecifiedLogFile) {
69-
JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
70-
}
67+
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
7168

7269
// Commenting it out for now - so that people can refer to the properties if required. Remove
7370
// it once cpuset version is pushed out.
@@ -120,7 +117,7 @@ trait ExecutorRunnableUtil extends Logging {
120117
rtype: LocalResourceType,
121118
localResources: HashMap[String, LocalResource],
122119
timestamp: String,
123-
size: String,
120+
size: String,
124121
vis: String) = {
125122
val uri = new URI(file)
126123
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
@@ -153,7 +150,7 @@ trait ExecutorRunnableUtil extends Logging {
153150
val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
154151
val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
155152
for( i <- 0 to distArchives.length - 1) {
156-
setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
153+
setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
157154
timeStamps(i), fileSizes(i), visibilities(i))
158155
}
159156
}
@@ -165,7 +162,11 @@ trait ExecutorRunnableUtil extends Logging {
165162
def prepareEnvironment: HashMap[String, String] = {
166163
val env = new HashMap[String, String]()
167164

168-
ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
165+
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
166+
ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env)
167+
if (log4jConf != null) {
168+
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
169+
}
169170

170171
// Allow users to specify some environment variables
171172
YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
5454
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
5555
}
5656

57-
override def getCurrentUserCredentials(): Credentials = {
57+
override def getCurrentUserCredentials(): Credentials = {
5858
UserGroupInformation.getCurrentUser().getCredentials()
5959
}
6060

@@ -76,10 +76,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
7676
}
7777

7878
object YarnSparkHadoopUtil {
79-
def getLoggingArgsForContainerCommandLine(): String = {
80-
"-Dlog4j.configuration=log4j-spark-container.properties"
81-
}
82-
8379
def addToEnvironment(
8480
env: HashMap[String, String],
8581
variable: String,

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class ExecutorRunnable(
7979
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
8080

8181
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
82-
localResources.contains(ClientBase.LOG4J_PROP))
82+
localResources)
8383

8484
logInfo("Setting up executor with commands: " + commands)
8585
ctx.setCommands(commands)

0 commit comments

Comments
 (0)