From 0956af95e24bc37303525fde6f85e0b3aeebd946 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Sun, 8 Jun 2014 16:16:53 -0700 Subject: [PATCH 1/5] Ported SPARK-1870 from 1.0 branch to 0.9 branch --- .../org/apache/spark/deploy/yarn/Client.scala | 23 +++++++++++++---- .../org/apache/spark/deploy/yarn/Client.scala | 25 +++++++++++++------ 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6956cd65c720..dcacf8779dba 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,8 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import scala.collection.JavaConversions._ -import scala.collection.mutable.HashMap -import scala.collection.mutable.Map +import scala.collection.mutable.{HashMap, ListBuffer, Map} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil} @@ -264,6 +263,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } // handle any add jars + var cachedSecondaryJarLinks = ListBuffer.empty[String] if ((args.addJars != null) && (!args.addJars.isEmpty())){ args.addJars.split(',').foreach { case file: String => val localURI = new URI(file.trim()) @@ -271,9 +271,11 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyRemoteFile(dst, localPath, replication) distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - linkname, statCache, true) + linkname, statCache) + cachedSecondaryJarLinks += linkname } } + sparkConf.set(Client.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) // handle any distributed cache files if ((args.files != null) && (!args.files.isEmpty())){ @@ -462,9 +464,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } object Client { - val SPARK_JAR: String = "spark.jar" - val APP_JAR: String = "app.jar" + val SPARK_JAR: String = "__spark__.jar" + val APP_JAR: String = "__app__.jar" val LOG4J_PROP: String = "log4j.properties" + val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" def main(argStrings: Array[String]) { // Set an env variable indicating we are running in YARN mode. @@ -491,11 +494,18 @@ object Client { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + LOG4J_PROP) } + + val cachedSecondaryJarLinks = + sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") + // Normally the users app.jar is last in case conflicts with spark jars val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) + cachedSecondaryJarLinks.foreach(jarLink => + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + jarLink)) } Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + SPARK_JAR) @@ -504,6 +514,9 @@ object Client { if (!userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) + cachedSecondaryJarLinks.foreach(jarLink => + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + jarLink)) } Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + "*") diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8ccdea663a11..c460ae78628b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,8 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import scala.collection.JavaConversions._ -import scala.collection.mutable.HashMap -import scala.collection.mutable.Map +import scala.collection.mutable.{ListBuffer, HashMap, Map} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil} @@ -281,18 +280,19 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } // Handle jars local to the ApplicationMaster. + var cachedSecondaryJarLinks = ListBuffer.empty[String] if ((args.addJars != null) && (!args.addJars.isEmpty())){ args.addJars.split(',').foreach { case file: String => val localURI = new URI(file.trim()) val localPath = new Path(localURI) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyRemoteFile(dst, localPath, replication) - // Only add the resource to the Spark ApplicationMaster. - val appMasterOnly = true distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - linkname, statCache, appMasterOnly) + linkname, statCache) + cachedSecondaryJarLinks += linkname } } + sparkConf.set(Client.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) // Handle any distributed cache files if ((args.files != null) && (!args.files.isEmpty())){ @@ -478,9 +478,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } object Client { - val SPARK_JAR: String = "spark.jar" - val APP_JAR: String = "app.jar" + val SPARK_JAR: String = "__spark__.jar" + val APP_JAR: String = "__app__.jar" val LOG4J_PROP: String = "log4j.properties" + val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" def main(argStrings: Array[String]) { // Set an env variable indicating we are running in YARN mode. @@ -507,12 +508,19 @@ object Client { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + LOG4J_PROP) } + + val cachedSecondaryJarLinks = + sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") + // Normally the users app.jar is last in case conflicts with spark jars val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) + cachedSecondaryJarLinks.foreach(jarLink => + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + jarLink)) } Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + SPARK_JAR) @@ -521,6 +529,9 @@ object Client { if (!userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) + cachedSecondaryJarLinks.foreach(jarLink => + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + jarLink)) } Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + "*") From ab94aa18173115b18e2458bbf9782daa618c9a74 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 9 Jun 2014 14:00:20 -0700 Subject: [PATCH 2/5] Code formatting. --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index dcacf8779dba..783cfedeaa60 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -505,7 +505,7 @@ object Client { Path.SEPARATOR + APP_JAR) cachedSecondaryJarLinks.foreach(jarLink => Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + jarLink)) + Path.SEPARATOR + jarLink)) } Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + SPARK_JAR) From 3cc1085fe4b1fb606a70a4c757cec6ce3685a36a Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 9 Jun 2014 18:02:11 -0700 Subject: [PATCH 3/5] changed from var to val --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c460ae78628b..a8ebe37cd477 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -280,7 +280,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } // Handle jars local to the ApplicationMaster. - var cachedSecondaryJarLinks = ListBuffer.empty[String] + val cachedSecondaryJarLinks = ListBuffer.empty[String] if ((args.addJars != null) && (!args.addJars.isEmpty())){ args.addJars.split(',').foreach { case file: String => val localURI = new URI(file.trim()) From b085f10d0c8903e62705fdd95ca2b4f9d20f068b Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 9 Jun 2014 18:21:02 -0700 Subject: [PATCH 4/5] Make sure that empty string is filtered out when we get secondary jars --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 783cfedeaa60..090b313e8cab 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -263,7 +263,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } // handle any add jars - var cachedSecondaryJarLinks = ListBuffer.empty[String] + val cachedSecondaryJarLinks = ListBuffer.empty[String] if ((args.addJars != null) && (!args.addJars.isEmpty())){ args.addJars.split(',').foreach { case file: String => val localURI = new URI(file.trim()) @@ -496,7 +496,7 @@ object Client { } val cachedSecondaryJarLinks = - sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") + sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",").filter(_.nonEmpty) // Normally the users app.jar is last in case conflicts with spark jars val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a8ebe37cd477..9013a1b304c3 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -510,11 +510,10 @@ object Client { } val cachedSecondaryJarLinks = - sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") + sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",").filter(_.nonEmpty) // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false") - .toBoolean + val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) From c5696f426e9bfc56f8cbaeef5ed115882426bb08 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 9 Jun 2014 21:45:24 -0700 Subject: [PATCH 5/5] fix line too long --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 3 ++- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 090b313e8cab..9e5e2d5ceaca 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -496,7 +496,8 @@ object Client { } val cachedSecondaryJarLinks = - sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",").filter(_.nonEmpty) + sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") + .filter(_.nonEmpty) // Normally the users app.jar is last in case conflicts with spark jars val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9013a1b304c3..6ff8c6c3b249 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -510,7 +510,8 @@ object Client { } val cachedSecondaryJarLinks = - sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",").filter(_.nonEmpty) + sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") + .filter(_.nonEmpty) // Normally the users app.jar is last in case conflicts with spark jars val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean