Skip to content

Commit 0956af9

Browse files
committed
Ported SPARK-1870 from 1.0 branch to 0.9 branch
1 parent 51f677e commit 0956af9

File tree

2 files changed

+36
-12
lines changed
  • yarn
    • alpha/src/main/scala/org/apache/spark/deploy/yarn
    • stable/src/main/scala/org/apache/spark/deploy/yarn

2 files changed

+36
-12
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI}
2121
import java.nio.ByteBuffer
2222

2323
import scala.collection.JavaConversions._
24-
import scala.collection.mutable.HashMap
25-
import scala.collection.mutable.Map
24+
import scala.collection.mutable.{HashMap, ListBuffer, Map}
2625

2726
import org.apache.hadoop.conf.Configuration
2827
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
@@ -264,16 +263,19 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
264263
}
265264

266265
// handle any add jars
266+
var cachedSecondaryJarLinks = ListBuffer.empty[String]
267267
if ((args.addJars != null) && (!args.addJars.isEmpty())){
268268
args.addJars.split(',').foreach { case file: String =>
269269
val localURI = new URI(file.trim())
270270
val localPath = new Path(localURI)
271271
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
272272
val destPath = copyRemoteFile(dst, localPath, replication)
273273
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
274-
linkname, statCache, true)
274+
linkname, statCache)
275+
cachedSecondaryJarLinks += linkname
275276
}
276277
}
278+
sparkConf.set(Client.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
277279

278280
// handle any distributed cache files
279281
if ((args.files != null) && (!args.files.isEmpty())){
@@ -462,9 +464,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
462464
}
463465

464466
object Client {
465-
val SPARK_JAR: String = "spark.jar"
466-
val APP_JAR: String = "app.jar"
467+
val SPARK_JAR: String = "__spark__.jar"
468+
val APP_JAR: String = "__app__.jar"
467469
val LOG4J_PROP: String = "log4j.properties"
470+
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
468471

469472
def main(argStrings: Array[String]) {
470473
// Set an env variable indicating we are running in YARN mode.
@@ -491,11 +494,18 @@ object Client {
491494
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
492495
Path.SEPARATOR + LOG4J_PROP)
493496
}
497+
498+
val cachedSecondaryJarLinks =
499+
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
500+
494501
// Normally the users app.jar is last in case conflicts with spark jars
495502
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean
496503
if (userClasspathFirst) {
497504
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
498505
Path.SEPARATOR + APP_JAR)
506+
cachedSecondaryJarLinks.foreach(jarLink =>
507+
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
508+
Path.SEPARATOR + jarLink))
499509
}
500510
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
501511
Path.SEPARATOR + SPARK_JAR)
@@ -504,6 +514,9 @@ object Client {
504514
if (!userClasspathFirst) {
505515
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
506516
Path.SEPARATOR + APP_JAR)
517+
cachedSecondaryJarLinks.foreach(jarLink =>
518+
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
519+
Path.SEPARATOR + jarLink))
507520
}
508521
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
509522
Path.SEPARATOR + "*")

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI}
2121
import java.nio.ByteBuffer
2222

2323
import scala.collection.JavaConversions._
24-
import scala.collection.mutable.HashMap
25-
import scala.collection.mutable.Map
24+
import scala.collection.mutable.{ListBuffer, HashMap, Map}
2625

2726
import org.apache.hadoop.conf.Configuration
2827
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
@@ -281,18 +280,19 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
281280
}
282281

283282
// Handle jars local to the ApplicationMaster.
283+
var cachedSecondaryJarLinks = ListBuffer.empty[String]
284284
if ((args.addJars != null) && (!args.addJars.isEmpty())){
285285
args.addJars.split(',').foreach { case file: String =>
286286
val localURI = new URI(file.trim())
287287
val localPath = new Path(localURI)
288288
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
289289
val destPath = copyRemoteFile(dst, localPath, replication)
290-
// Only add the resource to the Spark ApplicationMaster.
291-
val appMasterOnly = true
292290
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
293-
linkname, statCache, appMasterOnly)
291+
linkname, statCache)
292+
cachedSecondaryJarLinks += linkname
294293
}
295294
}
295+
sparkConf.set(Client.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
296296

297297
// Handle any distributed cache files
298298
if ((args.files != null) && (!args.files.isEmpty())){
@@ -478,9 +478,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
478478
}
479479

480480
object Client {
481-
val SPARK_JAR: String = "spark.jar"
482-
val APP_JAR: String = "app.jar"
481+
val SPARK_JAR: String = "__spark__.jar"
482+
val APP_JAR: String = "__app__.jar"
483483
val LOG4J_PROP: String = "log4j.properties"
484+
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
484485

485486
def main(argStrings: Array[String]) {
486487
// Set an env variable indicating we are running in YARN mode.
@@ -507,12 +508,19 @@ object Client {
507508
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
508509
Path.SEPARATOR + LOG4J_PROP)
509510
}
511+
512+
val cachedSecondaryJarLinks =
513+
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
514+
510515
// Normally the users app.jar is last in case conflicts with spark jars
511516
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
512517
.toBoolean
513518
if (userClasspathFirst) {
514519
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
515520
Path.SEPARATOR + APP_JAR)
521+
cachedSecondaryJarLinks.foreach(jarLink =>
522+
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
523+
Path.SEPARATOR + jarLink))
516524
}
517525
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
518526
Path.SEPARATOR + SPARK_JAR)
@@ -521,6 +529,9 @@ object Client {
521529
if (!userClasspathFirst) {
522530
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
523531
Path.SEPARATOR + APP_JAR)
532+
cachedSecondaryJarLinks.foreach(jarLink =>
533+
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
534+
Path.SEPARATOR + jarLink))
524535
}
525536
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
526537
Path.SEPARATOR + "*")

0 commit comments

Comments
 (0)