Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
import scala.collection.mutable.{HashMap, ListBuffer, Map}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
Expand Down Expand Up @@ -264,16 +263,19 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}

// handle any add jars
var cachedSecondaryJarLinks = ListBuffer.empty[String]
if ((args.addJars != null) && (!args.addJars.isEmpty())){
args.addJars.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache, true)
linkname, statCache)
cachedSecondaryJarLinks += linkname
}
}
sparkConf.set(Client.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))

// handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
Expand Down Expand Up @@ -462,9 +464,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}

object Client {
val SPARK_JAR: String = "spark.jar"
val APP_JAR: String = "app.jar"
val SPARK_JAR: String = "__spark__.jar"
val APP_JAR: String = "__app__.jar"
val LOG4J_PROP: String = "log4j.properties"
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"

def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
Expand All @@ -491,11 +494,18 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
}

val cachedSecondaryJarLinks =
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")

// Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
cachedSecondaryJarLinks.foreach(jarLink =>
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + jarLink))
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR)
Expand All @@ -504,6 +514,9 @@ object Client {
if (!userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
cachedSecondaryJarLinks.foreach(jarLink =>
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + jarLink))
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
import scala.collection.mutable.{ListBuffer, HashMap, Map}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
Expand Down Expand Up @@ -281,18 +280,19 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}

// Handle jars local to the ApplicationMaster.
var cachedSecondaryJarLinks = ListBuffer.empty[String]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be a val

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thks.

if ((args.addJars != null) && (!args.addJars.isEmpty())){
args.addJars.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
// Only add the resource to the Spark ApplicationMaster.
val appMasterOnly = true
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache, appMasterOnly)
linkname, statCache)
cachedSecondaryJarLinks += linkname
}
}
sparkConf.set(Client.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))

// Handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
Expand Down Expand Up @@ -478,9 +478,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}

object Client {
val SPARK_JAR: String = "spark.jar"
val APP_JAR: String = "app.jar"
val SPARK_JAR: String = "__spark__.jar"
val APP_JAR: String = "__app__.jar"
val LOG4J_PROP: String = "log4j.properties"
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"

def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
Expand All @@ -507,12 +508,19 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
}

val cachedSecondaryJarLinks =
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually returns a one element Array("") if there are no secondary jars. We should .filter(_.nonEmpty) after splitting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. You are right. It will add empty string to array, and then add the folder without file into classpath. Will fix in master as well.


// Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
.toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Not your code, but could you bump .toBoolean up one line? I'm pretty sure it's under 100 characters.

if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
cachedSecondaryJarLinks.foreach(jarLink =>
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + jarLink))
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR)
Expand All @@ -521,6 +529,9 @@ object Client {
if (!userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
cachedSecondaryJarLinks.foreach(jarLink =>
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + jarLink))
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")
Expand Down