From 67a5ccf7a9d02a8b930ab97e10c0858b4d046496 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 28 Oct 2016 15:50:59 +0800 Subject: [PATCH 1/4] [SPARK-18160][CORE][YARN] SparkContext.addFile doesn't work in yarn-cluster mode --- .../scala/org/apache/spark/SparkContext.scala | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4694790c72cd..5fa504823be8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1440,14 +1440,16 @@ class SparkContext(config: SparkConf) extends Logging { val scheme = new URI(schemeCorrectedPath).getScheme if (!Array("http", "https", "ftp").contains(scheme)) { val fs = hadoopPath.getFileSystem(hadoopConfiguration) - val isDir = fs.getFileStatus(hadoopPath).isDirectory - if (!isLocal && scheme == "file" && isDir) { - throw new SparkException(s"addFile does not support local directories when not running " + - "local mode.") - } - if (!recursive && isDir) { - throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " + - "turned on.") + if (fs.exists(hadoopPath)) { + val isDir = fs.getFileStatus(hadoopPath).isDirectory + if (!isLocal && scheme == "file" && isDir) { + throw new SparkException(s"addFile does not support local directories when not running " + + "local mode.") + } + if (!recursive && isDir) { + throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " + + "turned on.") + } } } else { // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies @@ -1455,7 +1457,18 @@ class SparkContext(config: SparkConf) extends Logging { } val key = if (!isLocal && scheme == "file") { - env.rpcEnv.fileServer.addFile(new File(uri.getPath)) + if (master == "yarn" && deployMode == "cluster") { + // fallback to container working directory if the absolute path file is not + // found in yarn-cluster mode. Because --files would be copied to working directory of + // container through yarn dist cache. + if (new File(uri.getPath).exists()) { + env.rpcEnv.fileServer.addFile(new File(uri.getPath)) + } else { + env.rpcEnv.fileServer.addFile(new File(new Path(uri.getPath).getName)) + } + } else { + env.rpcEnv.fileServer.addFile(new File(uri.getPath)) + } } else { schemeCorrectedPath } From 8033bd1bce9aa2fa0b05fe53c66ea072656dbd23 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 1 Nov 2016 07:23:53 +0800 Subject: [PATCH 2/4] Revert "[SPARK-18160][CORE][YARN] SparkContext.addFile doesn't work in yarn-cluster mode" This reverts commit 67a5ccf7a9d02a8b930ab97e10c0858b4d046496. --- .../scala/org/apache/spark/SparkContext.scala | 31 ++++++------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5fa504823be8..4694790c72cd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1440,16 +1440,14 @@ class SparkContext(config: SparkConf) extends Logging { val scheme = new URI(schemeCorrectedPath).getScheme if (!Array("http", "https", "ftp").contains(scheme)) { val fs = hadoopPath.getFileSystem(hadoopConfiguration) - if (fs.exists(hadoopPath)) { - val isDir = fs.getFileStatus(hadoopPath).isDirectory - if (!isLocal && scheme == "file" && isDir) { - throw new SparkException(s"addFile does not support local directories when not running " + - "local mode.") - } - if (!recursive && isDir) { - throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " + - "turned on.") - } + val isDir = fs.getFileStatus(hadoopPath).isDirectory + if (!isLocal && scheme == "file" && isDir) { + throw new SparkException(s"addFile does not support local directories when not running " + + "local mode.") + } + if (!recursive && isDir) { + throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " + + "turned on.") } } else { // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies @@ -1457,18 +1455,7 @@ class SparkContext(config: SparkConf) extends Logging { } val key = if (!isLocal && scheme == "file") { - if (master == "yarn" && deployMode == "cluster") { - // fallback to container working directory if the absolute path file is not - // found in yarn-cluster mode. Because --files would be copied to working directory of - // container through yarn dist cache. - if (new File(uri.getPath).exists()) { - env.rpcEnv.fileServer.addFile(new File(uri.getPath)) - } else { - env.rpcEnv.fileServer.addFile(new File(new Path(uri.getPath).getName)) - } - } else { - env.rpcEnv.fileServer.addFile(new File(uri.getPath)) - } + env.rpcEnv.fileServer.addFile(new File(uri.getPath)) } else { schemeCorrectedPath } From b8aa1fb8904f983f564f05e11a5074a5f43a2f57 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 1 Nov 2016 11:58:08 +0800 Subject: [PATCH 3/4] remove spark.files & spark.jars from SparkConf in yarn mode --- .../scala/org/apache/spark/SparkContext.scala | 29 ++++--------------- .../org/apache/spark/deploy/yarn/Client.scala | 4 ++- 2 files changed, 9 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4694790c72cd..63478c88b057 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1716,29 +1716,12 @@ class SparkContext(config: SparkConf) extends Logging { key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => - if (master == "yarn" && deployMode == "cluster") { - // In order for this to work in yarn cluster mode the user must specify the - // --addJars option to the client to upload the file into the distributed cache - // of the AM to make it show up in the current working directory. - val fileName = new Path(uri.getPath).getName() - try { - env.rpcEnv.fileServer.addJar(new File(fileName)) - } catch { - case e: Exception => - // For now just log an error but allow to go through so spark examples work. - // The spark examples don't really need the jar distributed since its also - // the app jar. - logError("Error adding jar (" + e + "), was the --addJars option used?") - null - } - } else { - try { - env.rpcEnv.fileServer.addJar(new File(uri.getPath)) - } catch { - case exc: FileNotFoundException => - logError(s"Jar not found at $path") - null - } + try { + env.rpcEnv.fileServer.addJar(new File(uri.getPath)) + } catch { + case exc: FileNotFoundException => + logError(s"Jar not found at $path") + null } // A JAR file which exists locally on every worker node case "local" => diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 55e4a833b670..878f2bef54d7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1202,7 +1202,9 @@ private object Client extends Logging { // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - + // yarn would use dist cache to distribute files & jars, so remove them from sparkConf + sparkConf.remove("spark.jars") + sparkConf.remove("spark.files") val args = new ClientArguments(argStrings) new Client(args, sparkConf).run() } From 0dd5486f266eff8a76d23236916b0ea458e75de1 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 2 Nov 2016 08:57:04 +0800 Subject: [PATCH 4/4] update comment --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 878f2bef54d7..053a78617d4e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1202,7 +1202,8 @@ private object Client extends Logging { // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - // yarn would use dist cache to distribute files & jars, so remove them from sparkConf + // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, + // so remove them from sparkConf here for yarn mode. sparkConf.remove("spark.jars") sparkConf.remove("spark.files") val args = new ClientArguments(argStrings)