From 6d5c0b8f291e96fcf85ea29a54eb35845b1ab30a Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Mon, 15 Mar 2021 16:29:00 -0700 Subject: [PATCH 1/6] SPARK-35672 [YARN] Pass user classpath entries to executors using config instead of command line. User-provided JARs are make available to executors using a custom classloader, so they do not appear on the standard Java classpath. Instead, they are passed as a list to the executor which then creates a classloader out of the URLs. Currently, this list of JARs is crafted by the Driver, which then passes the information to the executors by specifying each JAR on the executor command line as `--user-class-path /path/to/myjar.jar`. This can cause extremely long argument lists when there are many JARs, which can cause the OS argument length to be exceeded (see the JIRA for more details/examples). Instead, we can have the YARN `Client` create the list and put it into the configs, which get written to a file and distributed via the YARN distributed cache. The executor can load this from its configs. This bypasses the command line and uses a more scalable approach for passing the list of JARs. --- .../CoarseGrainedExecutorBackend.scala | 30 ++++++++++--------- .../spark/internal/config/package.scala | 10 +++++++ .../yarn/ClientDistributedCacheManager.scala | 11 +++++++ .../spark/deploy/yarn/ExecutorRunnable.scala | 12 -------- .../YarnCoarseGrainedExecutorBackend.scala | 12 ++++---- .../spark/deploy/yarn/YarnClusterSuite.scala | 23 +++++++++----- 6 files changed, 59 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 8568fdde4da4..845e15639bda 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -19,6 +19,7 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer +import java.nio.file.Paths import java.util.Locale import java.util.concurrent.atomic.AtomicBoolean @@ -385,16 +386,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { cores: Int, appId: String, workerUrl: Option[String], - userClassPath: mutable.ListBuffer[URL], resourcesFileOpt: Option[String], resourceProfileId: Int) def main(args: Array[String]): Unit = { - val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => - CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => - new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, - env, arguments.resourcesFileOpt, resourceProfile) + val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, Seq[URL]) => + CoarseGrainedExecutorBackend = { + case (rpcEnv, arguments, env, resourceProfile, userClassPath) => + new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, + arguments.bindAddress, arguments.hostname, arguments.cores, userClassPath, + env, arguments.resourcesFileOpt, resourceProfile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) System.exit(0) @@ -402,7 +403,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { def run( arguments: Arguments, - backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => + backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, Seq[URL]) => CoarseGrainedExecutorBackend): Unit = { Utils.initDaemon(log) @@ -454,12 +455,18 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf) } + // Fetch user classpath entries from conf and make absolute if necessary + val userClassPath = driverConf + .get(EXECUTOR_USER_CLASS_PATH_ENTRIES) + .map(Paths.get(_).toAbsolutePath.toUri.toURL) + logInfo(s"Starting executor with user classpath: ${userClassPath.mkString(":")}") + driverConf.set(EXECUTOR_ID, arguments.executorId) val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) env.rpcEnv.setupEndpoint("Executor", - backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)) + backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile, userClassPath)) arguments.workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } @@ -476,7 +483,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var resourcesFileOpt: Option[String] = None var appId: String = null var workerUrl: Option[String] = None - val userClassPath = new mutable.ListBuffer[URL]() var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID var argv = args.toList @@ -507,9 +513,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail - case ("--user-class-path") :: value :: tail => - userClassPath += new URL(value) - argv = tail case ("--resourceProfileId") :: value :: tail => resourceProfileId = value.toInt argv = tail @@ -536,7 +539,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl, - userClassPath, resourcesFileOpt, resourceProfileId) + resourcesFileOpt, resourceProfileId) } private def printUsageAndExit(classNameForEntry: String): Unit = { @@ -554,7 +557,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --resourcesFile | --app-id | --worker-url - | --user-class-path | --resourceProfileId |""".stripMargin) // scalastyle:on println diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 613a66d2d5ac..bfa415012520 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -750,6 +750,16 @@ package object config { .toSequence .createWithDefault(Nil) + private[spark] val EXECUTOR_USER_CLASS_PATH_ENTRIES = + ConfigBuilder("spark.executor.userClassPath.entries") + .internal() + .doc("Internal conf used to pass the user classpath entries to executors. This is a " + + "comma-separated list of paths which may be absolute or relative and are expected " + + "to exist on the host filesystem where the executor is running.") + .stringConf + .toSequence + .createWithDefault(Nil) + private[spark] val TASK_MAX_DIRECT_RESULT_SIZE = ConfigBuilder("spark.task.maxDirectResultSize") .version("2.0.0") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index e02fbd0c9149..eb0bdfa8d115 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.SparkConf import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.EXECUTOR_USER_CLASS_PATH_ENTRIES +import org.apache.spark.util.Utils private case class CacheEntry( uri: URI, @@ -89,6 +91,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging { /** * Writes down information about cached files needed in executors to the given configuration. + * This includes the user classpath which will be leveraged by executors for loading user classes. */ def updateConfiguration(conf: SparkConf): Unit = { conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString).toSeq) @@ -96,6 +99,14 @@ private[spark] class ClientDistributedCacheManager() extends Logging { conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime).toSeq) conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()).toSeq) conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()).toSeq) + val userClassPath = Client.getUserClasspath(conf).map { uri => + if (Utils.isLocalUri(uri.toString)) { + Client.getClusterPath(conf, uri.getPath) + } else { + uri.getPath + } + }.toSeq + conf.set(EXECUTOR_USER_CLASS_PATH_ENTRIES, userClassPath) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 717ce57b902c..dbf4a0a80525 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -17,7 +17,6 @@ package org.apache.spark.deploy.yarn -import java.io.File import java.nio.ByteBuffer import java.util.Collections @@ -190,16 +189,6 @@ private[yarn] class ExecutorRunnable( // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri => - val absPath = - if (new File(uri.getPath()).isAbsolute()) { - Client.getClusterPath(sparkConf, uri.getPath()) - } else { - Client.buildPath(Environment.PWD.$(), uri.getPath()) - } - Seq("--user-class-path", "file:" + absPath) - }.toSeq - YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ @@ -211,7 +200,6 @@ private[yarn] class ExecutorRunnable( "--cores", executorCores.toString, "--app-id", appId, "--resourceProfileId", resourceProfileId.toString) ++ - userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index ce46ffa06f0f..ab48070fc87a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -21,6 +21,7 @@ import java.net.URL import org.apache.spark.SparkEnv import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.CoarseGrainedExecutorBackend.Arguments import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEnv @@ -70,11 +71,12 @@ private[spark] class YarnCoarseGrainedExecutorBackend( private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { def main(args: Array[String]): Unit = { - val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) => - CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => - new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, - env, arguments.resourcesFileOpt, resourceProfile) + val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, Seq[URL]) => + CoarseGrainedExecutorBackend = { + case (rpcEnv, arguments, env, resourceProfile, userClassPath) => + new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, + arguments.bindAddress, arguments.hostname, arguments.cores, userClassPath, + env, arguments.resourcesFileOpt, resourceProfile) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 26ff3bf2971f..b368e8dc7851 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -150,12 +150,20 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - test("run Spark in yarn-client mode with additional jar") { - testWithAddJar(true) + test("run Spark in yarn-client mode with additional jar using URI scheme 'local'") { + testWithAddJar(clientMode = true, "local") } - test("run Spark in yarn-cluster mode with additional jar") { - testWithAddJar(false) + test("run Spark in yarn-cluster mode with additional jar using URI scheme 'local'") { + testWithAddJar(clientMode = false, "local") + } + + test("run Spark in yarn-client mode with additional jar using URI scheme 'file'") { + testWithAddJar(clientMode = true, "file") + } + + test("run Spark in yarn-cluster mode with additional jar using URI scheme 'file'") { + testWithAddJar(clientMode = false, "file") } test("run Spark in yarn-cluster mode unsuccessfully") { @@ -286,14 +294,13 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - private def testWithAddJar(clientMode: Boolean): Unit = { + private def testWithAddJar(clientMode: Boolean, jarUriScheme: String): Unit = { val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) val driverResult = File.createTempFile("driver", null, tempDir) val executorResult = File.createTempFile("executor", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), - appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), - extraClassPath = Seq(originalJar.getPath()), - extraJars = Seq("local:" + originalJar.getPath())) + appArgs = Seq(driverResult.getAbsolutePath, executorResult.getAbsolutePath), + extraJars = Seq(s"$jarUriScheme:${originalJar.getPath}")) checkResult(finalState, driverResult, "ORIGINAL") checkResult(finalState, executorResult, "ORIGINAL") } From 6b7e7b5d91e96a6cb193a0773e723bab1253e15e Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Mon, 7 Jun 2021 15:05:18 -0700 Subject: [PATCH 2/6] Use the correct configuration to address test failures. --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 8 ++++++++ .../deploy/yarn/ClientDistributedCacheManager.scala | 11 ----------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 364bc3b4b0d2..a87a20543061 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -668,6 +668,14 @@ private[spark] class Client( if (cachedSecondaryJarLinks.nonEmpty) { sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks.toSeq) } + val userClassPath = Client.getUserClasspath(sparkConf).map { uri => + if (Utils.isLocalUri(uri.toString)) { + Client.getClusterPath(sparkConf, uri.getPath) + } else { + uri.getPath + } + }.toSeq + sparkConf.set(EXECUTOR_USER_CLASS_PATH_ENTRIES, userClassPath) if (isClusterMode && args.primaryPyFile != null) { distribute(args.primaryPyFile, appMasterOnly = true) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index eb0bdfa8d115..e02fbd0c9149 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -30,8 +30,6 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.SparkConf import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.EXECUTOR_USER_CLASS_PATH_ENTRIES -import org.apache.spark.util.Utils private case class CacheEntry( uri: URI, @@ -91,7 +89,6 @@ private[spark] class ClientDistributedCacheManager() extends Logging { /** * Writes down information about cached files needed in executors to the given configuration. - * This includes the user classpath which will be leveraged by executors for loading user classes. */ def updateConfiguration(conf: SparkConf): Unit = { conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString).toSeq) @@ -99,14 +96,6 @@ private[spark] class ClientDistributedCacheManager() extends Logging { conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime).toSeq) conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()).toSeq) conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()).toSeq) - val userClassPath = Client.getUserClasspath(conf).map { uri => - if (Utils.isLocalUri(uri.toString)) { - Client.getClusterPath(conf, uri.getPath) - } else { - uri.getPath - } - }.toSeq - conf.set(EXECUTOR_USER_CLASS_PATH_ENTRIES, userClassPath) } /** From 479ded6ba468e2cdaba98c849362081208e3a8b2 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Mon, 7 Jun 2021 15:05:18 -0700 Subject: [PATCH 3/6] Simplify passing the classpath: Allow the executor to leverage the existing SECONDARY_JARS and APP_JAR configs to construct the classpath using the same (now shared) logic as the ApplicationMaster uses, instead of adding an additional config to pass the info. --- .../CoarseGrainedExecutorBackend.scala | 29 +++++++------------ .../org/apache/spark/executor/Executor.scala | 2 ++ .../spark/internal/config/package.scala | 10 ------- .../CoarseGrainedExecutorBackendSuite.scala | 17 +++++------ .../spark/deploy/yarn/ApplicationMaster.scala | 9 ++---- .../org/apache/spark/deploy/yarn/Client.scala | 20 ++++++------- .../YarnCoarseGrainedExecutorBackend.scala | 17 +++++------ 7 files changed, 42 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 845e15639bda..f8ff27de7f70 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -19,7 +19,6 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer -import java.nio.file.Paths import java.util.Locale import java.util.concurrent.atomic.AtomicBoolean @@ -53,7 +52,6 @@ private[spark] class CoarseGrainedExecutorBackend( bindAddress: String, hostname: String, cores: Int, - userClassPath: Seq[URL], env: SparkEnv, resourcesFileOpt: Option[String], resourceProfile: ResourceProfile) @@ -125,7 +123,7 @@ private[spark] class CoarseGrainedExecutorBackend( */ private def createClassLoader(): MutableURLClassLoader = { val currentLoader = Utils.getContextOrSparkClassLoader - val urls = userClassPath.toArray + val urls = getUserClassPath.toArray if (env.conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)) { new ChildFirstURLClassLoader(urls, currentLoader) } else { @@ -150,6 +148,8 @@ private[spark] class CoarseGrainedExecutorBackend( } } + def getUserClassPath: Seq[URL] = Nil + def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) @@ -166,7 +166,7 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor => logInfo("Successfully registered with driver") try { - executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false, + executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false, resources = _resources) driver.get.send(LaunchedExecutor(executorId)) } catch { @@ -390,12 +390,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { resourceProfileId: Int) def main(args: Array[String]): Unit = { - val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, Seq[URL]) => - CoarseGrainedExecutorBackend = { - case (rpcEnv, arguments, env, resourceProfile, userClassPath) => - new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, userClassPath, - env, arguments.resourcesFileOpt, resourceProfile) + val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => + CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => + new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, + arguments.bindAddress, arguments.hostname, arguments.cores, + env, arguments.resourcesFileOpt, resourceProfile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) System.exit(0) @@ -403,7 +402,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { def run( arguments: Arguments, - backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, Seq[URL]) => + backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend): Unit = { Utils.initDaemon(log) @@ -455,18 +454,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf) } - // Fetch user classpath entries from conf and make absolute if necessary - val userClassPath = driverConf - .get(EXECUTOR_USER_CLASS_PATH_ENTRIES) - .map(Paths.get(_).toAbsolutePath.toUri.toURL) - logInfo(s"Starting executor with user classpath: ${userClassPath.mkString(":")}") - driverConf.set(EXECUTOR_ID, arguments.executorId) val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) env.rpcEnv.setupEndpoint("Executor", - backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile, userClassPath)) + backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)) arguments.workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 289c24ca3632..ff09a2433de1 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -878,6 +878,8 @@ private[spark] class Executor( val urls = userClassPath.toArray ++ currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL } + logInfo(s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " + + urls.mkString("'", ":", "'")) if (userClassPathFirst) { new ChildFirstURLClassLoader(urls, currentLoader) } else { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index bfa415012520..613a66d2d5ac 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -750,16 +750,6 @@ package object config { .toSequence .createWithDefault(Nil) - private[spark] val EXECUTOR_USER_CLASS_PATH_ENTRIES = - ConfigBuilder("spark.executor.userClassPath.entries") - .internal() - .doc("Internal conf used to pass the user classpath entries to executors. This is a " + - "comma-separated list of paths which may be absolute or relative and are expected " + - "to exist on the host filesystem where the executor is running.") - .stringConf - .toSequence - .createWithDefault(Nil) - private[spark] val TASK_MAX_DIRECT_RESULT_SIZE = ConfigBuilder("spark.task.maxDirectResultSize") .version("2.0.0") diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 4909a586d31a..24182e4471a4 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.executor import java.io.File -import java.net.URL import java.nio.ByteBuffer import java.util.Properties @@ -56,7 +55,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) withTempDir { tmpDir => val testResourceArgs: JObject = ("" -> "") val ja = JArray(List(testResourceArgs)) @@ -77,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) withTempDir { tmpDir => val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val ja = Extraction.decompose(Seq(ra)) @@ -111,7 +110,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) withTempDir { tmpDir => val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) @@ -138,7 +137,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) // not enough gpu's on the executor withTempDir { tmpDir => @@ -191,7 +190,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) // executor resources < required withTempDir { tmpDir => @@ -222,7 +221,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) val parsedResources = backend.parseOrFindResources(None) @@ -269,7 +268,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val ja = Extraction.decompose(Seq(gpuArgs)) val f1 = createTempJsonFile(dir, "resources", ja) @@ -294,7 +293,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr) val env = createMockEnv(conf, serializer, Some(rpcEnv)) backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", - "host1", "host1", 4, Seq.empty[URL], env, None, + "host1", "host1", 4, env, None, resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)) assert(backend.taskResources.isEmpty) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ebe128623554..b5cd837f8b69 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -17,9 +17,9 @@ package org.apache.spark.deploy.yarn -import java.io.{File, IOException} +import java.io.IOException import java.lang.reflect.{InvocationTargetException, Modifier} -import java.net.{URI, URL, URLEncoder} +import java.net.{URI, URLEncoder} import java.security.PrivilegedExceptionAction import java.util.concurrent.{TimeoutException, TimeUnit} @@ -85,10 +85,7 @@ private[spark] class ApplicationMaster( private var metricsSystem: Option[MetricsSystem] = None private val userClassLoader = { - val classpath = Client.getUserClasspath(sparkConf) - val urls = classpath.map { entry => - new URL("file:" + new File(entry.getPath()).getAbsolutePath()) - } + val urls = Client.getUserClasspathUrls(sparkConf) if (isClusterMode) { if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a87a20543061..dc3a8263e41a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.yarn import java.io.{FileSystem => _, _} -import java.net.{InetAddress, UnknownHostException, URI} +import java.net.{InetAddress, UnknownHostException, URI, URL} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.nio.file.Files @@ -668,14 +668,6 @@ private[spark] class Client( if (cachedSecondaryJarLinks.nonEmpty) { sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks.toSeq) } - val userClassPath = Client.getUserClasspath(sparkConf).map { uri => - if (Utils.isLocalUri(uri.toString)) { - Client.getClusterPath(sparkConf, uri.getPath) - } else { - uri.getPath - } - }.toSeq - sparkConf.set(EXECUTOR_USER_CLASS_PATH_ENTRIES, userClassPath) if (isClusterMode && args.primaryPyFile != null) { distribute(args.primaryPyFile, appMasterOnly = true) @@ -1316,7 +1308,7 @@ private[spark] class Client( } -private object Client extends Logging { +private[spark] object Client extends Logging { // Alias for the user jar val APP_JAR_NAME: String = "__app__.jar" @@ -1478,6 +1470,14 @@ private object Client extends Logging { (mainUri ++ secondaryUris).toArray } + /** + * Returns a list of local, absolute URLs representing the user classpath. + * + * @param conf Spark configuration. + */ + def getUserClasspathUrls(conf: SparkConf): Array[URL] = + getUserClasspath(conf).map(entry => new URL("file:" + new File(entry.getPath).getAbsolutePath)) + private def getMainJarUri(mainJar: Option[String]): Option[URI] = { mainJar.flatMap { path => val uri = Utils.resolveURI(path) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index ab48070fc87a..6a2b251af645 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -21,7 +21,7 @@ import java.net.URL import org.apache.spark.SparkEnv import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.executor.CoarseGrainedExecutorBackend.Arguments +import org.apache.spark.deploy.yarn.Client import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEnv @@ -39,7 +39,6 @@ private[spark] class YarnCoarseGrainedExecutorBackend( bindAddress: String, hostname: String, cores: Int, - userClassPath: Seq[URL], env: SparkEnv, resourcesFile: Option[String], resourceProfile: ResourceProfile) @@ -50,13 +49,14 @@ private[spark] class YarnCoarseGrainedExecutorBackend( bindAddress, hostname, cores, - userClassPath, env, resourcesFile, resourceProfile) with Logging { private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf) + override def getUserClassPath: Seq[URL] = Client.getUserClasspathUrls(env.conf) + override def extractLogUrls: Map[String, String] = { YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None) .getOrElse(Map()) @@ -71,12 +71,11 @@ private[spark] class YarnCoarseGrainedExecutorBackend( private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { def main(args: Array[String]): Unit = { - val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, Seq[URL]) => - CoarseGrainedExecutorBackend = { - case (rpcEnv, arguments, env, resourceProfile, userClassPath) => - new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, userClassPath, - env, arguments.resourcesFileOpt, resourceProfile) + val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) => + CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => + new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, + arguments.bindAddress, arguments.hostname, arguments.cores, + env, arguments.resourcesFileOpt, resourceProfile) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")) From cc90f5814f165e4f66e91745e719403bc1921144 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Fri, 11 Jun 2021 13:22:19 -0700 Subject: [PATCH 4/6] Change `getUserClasspathUrls` to properly resolve `local:`-style paths which leverage the gateway/replacment path functionality. Enhance test cases in `YarnClusterSuite` for this case, and add tests in `ClientSuite` for the logic. --- .../org/apache/spark/executor/Executor.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 28 ++++++++-- .../YarnCoarseGrainedExecutorBackend.scala | 3 +- .../spark/deploy/yarn/ClientSuite.scala | 26 ++++++++++ .../spark/deploy/yarn/YarnClusterSuite.scala | 52 ++++++++++++++++--- 6 files changed, 99 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index ff09a2433de1..9c257d8b892b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -879,7 +879,7 @@ private[spark] class Executor( new File(uri.split("/").last).toURI.toURL } logInfo(s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " + - urls.mkString("'", ":", "'")) + urls.mkString("'", ",", "'")) if (userClassPathFirst) { new ChildFirstURLClassLoader(urls, currentLoader) } else { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index b5cd837f8b69..9973db8e8306 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -85,7 +85,7 @@ private[spark] class ApplicationMaster( private var metricsSystem: Option[MetricsSystem] = None private val userClassLoader = { - val urls = Client.getUserClasspathUrls(sparkConf) + val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode) if (isClusterMode) { if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index dc3a8263e41a..0393a81e8e60 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,7 +21,7 @@ import java.io.{FileSystem => _, _} import java.net.{InetAddress, UnknownHostException, URI, URL} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets -import java.nio.file.Files +import java.nio.file.{Files, Paths} import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -1471,12 +1471,32 @@ private[spark] object Client extends Logging { } /** - * Returns a list of local, absolute URLs representing the user classpath. + * Returns a list of local, absolute file URLs representing the user classpath. Note that this + * must be executed on the same host which will access the URLs, as it will resolve relative + * paths based on the current working directory. * * @param conf Spark configuration. + * @param useClusterPath Whether to use the 'cluster' path when resolving paths with the + * `local` scheme. This should be used when running on the cluster, but + * not when running on the gateway (i.e. for the driver in `client` mode). + * @return Array of local URLs ready to be passed to a [[java.net.URLClassLoader]]. */ - def getUserClasspathUrls(conf: SparkConf): Array[URL] = - getUserClasspath(conf).map(entry => new URL("file:" + new File(entry.getPath).getAbsolutePath)) + def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = { + Client.getUserClasspath(conf).map { uri => + val inputPath = uri.getPath + val localPath = if (Utils.isLocalUri(uri.toString)) { + val localPath = if (useClusterPath) { + Client.getClusterPath(conf, inputPath) + } else { + inputPath + } + Paths.get(localPath) + } else { + new File(inputPath).toPath.toAbsolutePath + } + localPath.toUri.toURL + } + } private def getMainJarUri(mainJar: Option[String]): Option[URI] = { mainJar.flatMap { path => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index 6a2b251af645..3dd51f174b01 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -55,7 +55,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend( private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf) - override def getUserClassPath: Seq[URL] = Client.getUserClasspathUrls(env.conf) + override def getUserClassPath: Seq[URL] = + Client.getUserClasspathUrls(env.conf, useClusterPath = true) override def extractLogUrls: Map[String, String] = { YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index ea3acec3bb78..faad4f20ffc1 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream} import java.net.URI +import java.nio.file.Paths import java.util.Properties import java.util.concurrent.ConcurrentHashMap @@ -583,6 +584,31 @@ class ClientSuite extends SparkFunSuite with Matchers { } } + test("SPARK-35672: test Client.getUserClasspathUrls") { + val conf = new SparkConf() + .set(SECONDARY_JARS, Seq( + "local:/local/matching/replace/foo.jar", + "local:/local/not/matching/replace/foo.jar", + "file:/absolute/file/path/foo.jar", + "relative/file/path/foo.jar" + )) + .set(GATEWAY_ROOT_PATH, "/local/matching/replace") + .set(REPLACEMENT_ROOT_PATH, "/replaced/path") + + def assertUserClasspathUrls(cluster: Boolean, gatewayReplacementPath: String): Unit = { + val expectedUrls = Seq( + Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString, + s"file:$gatewayReplacementPath/foo.jar", + "file:/local/not/matching/replace/foo.jar", + "file:/absolute/file/path/foo.jar", + Paths.get("relative/file/path/foo.jar").toAbsolutePath.toUri.toString + ).map(URI.create(_).toURL).toArray + assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls) + } + assertUserClasspathUrls(cluster = false, "/local/matching/replace") + assertUserClasspathUrls(cluster = true, "/replaced/path") + } + private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"), diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index b368e8dc7851..6bd9c4d6deee 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.deploy.yarn import java.io.File +import java.net.URL import java.nio.charset.StandardCharsets +import java.nio.file.Paths import java.util.{HashMap => JHashMap} import scala.collection.mutable @@ -150,19 +152,47 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - test("run Spark in yarn-client mode with additional jar using URI scheme 'local'") { + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local'") { testWithAddJar(clientMode = true, "local") } - test("run Spark in yarn-cluster mode with additional jar using URI scheme 'local'") { + test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local'") { testWithAddJar(clientMode = false, "local") } - test("run Spark in yarn-client mode with additional jar using URI scheme 'file'") { + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' " + + "and gateway-replacement path") { + // Use the original jar URL, but set up the gateway/replacement configs such that if + // replacement occurs, things will break. This ensures the replacement doesn't apply to the + // driver in 'client' mode. Executors will fail in this case because they still apply the + // replacement in client mode. + testWithAddJar(clientMode = true, "local", Some(jarUrl => { + (jarUrl.getPath, Map( + GATEWAY_ROOT_PATH.key -> Paths.get(jarUrl.toURI).getParent.toString, + REPLACEMENT_ROOT_PATH.key -> "/nonexistent/path/" + )) + }), expectExecutorFailure = true) + } + + test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local' " + + "and gateway-replacement path") { + // Put a prefix in front of the original jar URL which causes it to be an invalid path. + // Set up the gateway/replacement configs such that if replacement occurs, it is a valid + // path again (by removing the prefix). This ensures the replacement is applied. + val gatewayPath = "/replaceme/nonexistent/" + testWithAddJar(clientMode = false, "local", Some(jarUrl => { + (gatewayPath + jarUrl.getPath, Map( + GATEWAY_ROOT_PATH.key -> gatewayPath, + REPLACEMENT_ROOT_PATH.key -> "" + )) + })) + } + + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file'") { testWithAddJar(clientMode = true, "file") } - test("run Spark in yarn-cluster mode with additional jar using URI scheme 'file'") { + test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'file'") { testWithAddJar(clientMode = false, "file") } @@ -294,15 +324,23 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - private def testWithAddJar(clientMode: Boolean, jarUriScheme: String): Unit = { + private def testWithAddJar( + clientMode: Boolean, + jarUriScheme: String, + jarUrlToPathAndConfs: Option[URL => (String, Map[String, String])] = None, + expectExecutorFailure: Boolean = false): Unit = { val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) + val (jarPath, extraConf) = jarUrlToPathAndConfs + .map(_.apply(originalJar)) + .getOrElse((originalJar.getPath, Map[String, String]())) val driverResult = File.createTempFile("driver", null, tempDir) val executorResult = File.createTempFile("executor", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), appArgs = Seq(driverResult.getAbsolutePath, executorResult.getAbsolutePath), - extraJars = Seq(s"$jarUriScheme:${originalJar.getPath}")) + extraJars = Seq(s"$jarUriScheme:$jarPath"), + extraConf = extraConf) checkResult(finalState, driverResult, "ORIGINAL") - checkResult(finalState, executorResult, "ORIGINAL") + checkResult(finalState, executorResult, if (expectExecutorFailure) "failure" else "ORIGINAL") } private def testPySpark( From 5f55b9a466c947e283055bd8ae3fc7ee56d9f28e Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Fri, 18 Jun 2021 09:35:32 -0700 Subject: [PATCH 5/6] Expand test cases and extract some shared constants. Simplify logic in getUserClasspathUrls and make the assumptions more clear via an assert --- .../org/apache/spark/deploy/yarn/Client.scala | 16 +++++------ .../spark/deploy/yarn/ClientSuite.scala | 27 ++++++++++++------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0393a81e8e60..caccfac5ef13 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1484,17 +1484,15 @@ private[spark] object Client extends Logging { def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = { Client.getUserClasspath(conf).map { uri => val inputPath = uri.getPath - val localPath = if (Utils.isLocalUri(uri.toString)) { - val localPath = if (useClusterPath) { - Client.getClusterPath(conf, inputPath) - } else { - inputPath - } - Paths.get(localPath) + val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) { + Client.getClusterPath(conf, inputPath) } else { - new File(inputPath).toPath.toAbsolutePath + // Any other URI schemes should have been resolved by this point + assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString), + "getUserClasspath should only return 'file' or 'local' URIs but found: " + uri) + inputPath } - localPath.toUri.toURL + Paths.get(replacedFilePath).toAbsolutePath.toUri.toURL } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index faad4f20ffc1..1650ea2e4491 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -585,28 +585,37 @@ class ClientSuite extends SparkFunSuite with Matchers { } test("SPARK-35672: test Client.getUserClasspathUrls") { + val gatewayRootPath = "/local/matching/replace" + val replacementRootPath = "/replaced/path" val conf = new SparkConf() .set(SECONDARY_JARS, Seq( - "local:/local/matching/replace/foo.jar", + s"local:$gatewayRootPath/foo.jar", "local:/local/not/matching/replace/foo.jar", "file:/absolute/file/path/foo.jar", - "relative/file/path/foo.jar" + s"$gatewayRootPath/but-not-actually-local/foo.jar", + "/absolute/path/foo.jar", + "relative/path/foo.jar" )) - .set(GATEWAY_ROOT_PATH, "/local/matching/replace") - .set(REPLACEMENT_ROOT_PATH, "/replaced/path") + .set(GATEWAY_ROOT_PATH, gatewayRootPath) + .set(REPLACEMENT_ROOT_PATH, replacementRootPath) - def assertUserClasspathUrls(cluster: Boolean, gatewayReplacementPath: String): Unit = { + def assertUserClasspathUrls(cluster: Boolean, expectedReplacementPath: String): Unit = { val expectedUrls = Seq( Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString, - s"file:$gatewayReplacementPath/foo.jar", + s"file:$expectedReplacementPath/foo.jar", "file:/local/not/matching/replace/foo.jar", "file:/absolute/file/path/foo.jar", - Paths.get("relative/file/path/foo.jar").toAbsolutePath.toUri.toString + // since this path wasn't a local URI, it should never be replaced + s"file:$gatewayRootPath/but-not-actually-local/foo.jar", + "file:/absolute/path/foo.jar", + Paths.get("relative/path/foo.jar").toAbsolutePath.toUri.toString ).map(URI.create(_).toURL).toArray assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls) } - assertUserClasspathUrls(cluster = false, "/local/matching/replace") - assertUserClasspathUrls(cluster = true, "/replaced/path") + // assert that no replacement happens when cluster = false by expecting the replacement + // path to be the same as the original path + assertUserClasspathUrls(cluster = false, gatewayRootPath) + assertUserClasspathUrls(cluster = true, replacementRootPath) } private val matching = Seq( From 7eac14a1f12cc5d18b73bf2e5a4beaf5a16acde0 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Tue, 22 Jun 2021 08:21:01 -0700 Subject: [PATCH 6/6] empty commit to trigger build