diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 4f63adac6703..43887a7f0cd7 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -52,7 +52,6 @@ private[spark] class CoarseGrainedExecutorBackend( bindAddress: String, hostname: String, cores: Int, - userClassPath: Seq[URL], env: SparkEnv, resourcesFileOpt: Option[String], resourceProfile: ResourceProfile) @@ -124,7 +123,7 @@ private[spark] class CoarseGrainedExecutorBackend( */ private def createClassLoader(): MutableURLClassLoader = { val currentLoader = Utils.getContextOrSparkClassLoader - val urls = userClassPath.toArray + val urls = getUserClassPath.toArray if (env.conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)) { new ChildFirstURLClassLoader(urls, currentLoader) } else { @@ -149,6 +148,8 @@ private[spark] class CoarseGrainedExecutorBackend( } } + def getUserClassPath: Seq[URL] = Nil + def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) @@ -165,7 +166,7 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor => logInfo("Successfully registered with driver") try { - executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false, + executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false, resources = _resources) driver.get.send(LaunchedExecutor(executorId)) } catch { @@ -398,7 +399,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { cores: Int, appId: String, workerUrl: Option[String], - userClassPath: mutable.ListBuffer[URL], resourcesFileOpt: Option[String], resourceProfileId: Int) @@ -406,7 +406,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, + arguments.bindAddress, arguments.hostname, arguments.cores, env, arguments.resourcesFileOpt, resourceProfile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) @@ -494,7 +494,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var resourcesFileOpt: Option[String] = None var appId: String = null var workerUrl: Option[String] = None - val userClassPath = new mutable.ListBuffer[URL]() var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID var argv = args.toList @@ -525,9 +524,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail - case ("--user-class-path") :: value :: tail => - userClassPath += new URL(value) - argv = tail case ("--resourceProfileId") :: value :: tail => resourceProfileId = value.toInt argv = tail @@ -554,7 +550,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl, - userClassPath, resourcesFileOpt, resourceProfileId) + resourcesFileOpt, resourceProfileId) } private def printUsageAndExit(classNameForEntry: String): Unit = { @@ -572,7 +568,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --resourcesFile | --app-id | --worker-url - | --user-class-path | --resourceProfileId |""".stripMargin) // scalastyle:on println diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index ef02c93f0afa..a50516067628 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -886,6 +886,8 @@ private[spark] class Executor( val urls = userClassPath.toArray ++ currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL } + logInfo(s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " + + urls.mkString("'", ",", "'")) if (userClassPathFirst) { new ChildFirstURLClassLoader(urls, currentLoader) } else { diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 3aa181cd3ace..9bbfdc76e4f6 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.executor import java.io.File -import java.net.URL import java.nio.ByteBuffer import java.util.Properties @@ -56,7 +55,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) withTempDir { tmpDir => val testResourceArgs: JObject = ("" -> "") val ja = JArray(List(testResourceArgs)) @@ -77,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) withTempDir { tmpDir => val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val ja = Extraction.decompose(Seq(ra)) @@ -111,7 +110,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) withTempDir { tmpDir => val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) @@ -138,7 +137,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) // not enough gpu's on the executor withTempDir { tmpDir => @@ -191,7 +190,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) // executor resources < required withTempDir { tmpDir => @@ -222,7 +221,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) val parsedResources = backend.parseOrFindResources(None) @@ -269,7 +268,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) + 4, env, None, resourceProfile) val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val ja = Extraction.decompose(Seq(gpuArgs)) val f1 = createTempJsonFile(dir, "resources", ja) @@ -294,7 +293,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr) val env = createMockEnv(conf, serializer, Some(rpcEnv)) backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", - "host1", "host1", 4, Seq.empty[URL], env, None, + "host1", "host1", 4, env, None, resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)) assert(backend.taskResources.isEmpty) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala index 481561a51cc9..dd06688da349 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala @@ -50,7 +50,7 @@ private[spark] object KubernetesExecutorBackend extends Logging { val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile, execId) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId, - arguments.bindAddress, arguments.hostname, arguments.cores, Seq.empty, + arguments.bindAddress, arguments.hostname, arguments.cores, env, arguments.resourcesFileOpt, resourceProfile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a25edb0e5423..42e661cd47b1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -17,9 +17,9 @@ package org.apache.spark.deploy.yarn -import java.io.{File, IOException} +import java.io.IOException import java.lang.reflect.{InvocationTargetException, Modifier} -import java.net.{URI, URL, URLEncoder} +import java.net.{URI, URLEncoder} import java.security.PrivilegedExceptionAction import java.util.concurrent.{TimeoutException, TimeUnit} @@ -85,10 +85,7 @@ private[spark] class ApplicationMaster( private var metricsSystem: Option[MetricsSystem] = None private val userClassLoader = { - val classpath = Client.getUserClasspath(sparkConf) - val urls = classpath.map { entry => - new URL("file:" + new File(entry.getPath()).getAbsolutePath()) - } + val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode) if (isClusterMode) { if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ed54044a13ff..e85dcd284f7e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -18,10 +18,10 @@ package org.apache.spark.deploy.yarn import java.io.{FileSystem => _, _} -import java.net.{InetAddress, UnknownHostException, URI} +import java.net.{InetAddress, UnknownHostException, URI, URL} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets -import java.nio.file.Files +import java.nio.file.{Files, Paths} import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -1313,7 +1313,7 @@ private[spark] class Client( } -private object Client extends Logging { +private[spark] object Client extends Logging { // Alias for the user jar val APP_JAR_NAME: String = "__app__.jar" @@ -1475,6 +1475,34 @@ private object Client extends Logging { (mainUri ++ secondaryUris).toArray } + /** + * Returns a list of local, absolute file URLs representing the user classpath. Note that this + * must be executed on the same host which will access the URLs, as it will resolve relative + * paths based on the current working directory, as well as environment variables. + * See SPARK-35672 for discussion of why it is necessary to do environment variable substitution. + * + * @param conf Spark configuration. + * @param useClusterPath Whether to use the 'cluster' path when resolving paths with the + * `local` scheme. This should be used when running on the cluster, but + * not when running on the gateway (i.e. for the driver in `client` mode). + * @return Array of local URLs ready to be passed to a [[java.net.URLClassLoader]]. + */ + def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = { + Client.getUserClasspath(conf).map { uri => + val inputPath = uri.getPath + val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) { + Client.getClusterPath(conf, inputPath) + } else { + // Any other URI schemes should have been resolved by this point + assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString), + "getUserClasspath should only return 'file' or 'local' URIs but found: " + uri) + inputPath + } + val envVarResolvedFilePath = YarnSparkHadoopUtil.replaceEnvVars(replacedFilePath, sys.env) + Paths.get(envVarResolvedFilePath).toAbsolutePath.toUri.toURL + } + } + private def getMainJarUri(mainJar: Option[String]): Option[URI] = { mainJar.flatMap { path => val uri = Utils.resolveURI(path) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 717ce57b902c..dbf4a0a80525 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -17,7 +17,6 @@ package org.apache.spark.deploy.yarn -import java.io.File import java.nio.ByteBuffer import java.util.Collections @@ -190,16 +189,6 @@ private[yarn] class ExecutorRunnable( // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri => - val absPath = - if (new File(uri.getPath()).isAbsolute()) { - Client.getClusterPath(sparkConf, uri.getPath()) - } else { - Client.buildPath(Environment.PWD.$(), uri.getPath()) - } - Seq("--user-class-path", "file:" + absPath) - }.toSeq - YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ @@ -211,7 +200,6 @@ private[yarn] class ExecutorRunnable( "--cores", executorCores.toString, "--app-id", appId, "--resourceProfileId", resourceProfileId.toString) ++ - userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 09766bf97d8f..f347e37ba24a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -19,7 +19,9 @@ package org.apache.spark.deploy.yarn import java.util.regex.{Matcher, Pattern} +import scala.collection.immutable.{Map => IMap} import scala.collection.mutable.{HashMap, ListBuffer} +import scala.util.matching.Regex import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} @@ -93,14 +95,83 @@ object YarnSparkHadoopUtil { } } + /** + * Regex pattern to match the name of an environment variable. Note that Unix variable naming + * conventions (alphanumeric plus underscore, case-sensitive, can't start with a digit) + * are used for both Unix and Windows, following the convention of Hadoop's `Shell` class + * (see specifically [[org.apache.hadoop.util.Shell.getEnvironmentVariableRegex]]). + */ + private val envVarNameRegex: String = "[A-Za-z_][A-Za-z0-9_]*" + + /** + * Note that this regex only supports the `$VAR_NAME` and `%VAR_NAME%` syntax, for Unix and + * Windows respectively, and does not perform any handling of escapes. The Unix `${VAR_NAME}` + * syntax is not supported. + */ private val environmentVariableRegex: String = { if (Utils.isWindows) { - "%([A-Za-z_][A-Za-z0-9_]*?)%" + s"%($envVarNameRegex)%" } else { - "\\$([A-Za-z_][A-Za-z0-9_]*)" + s"\\$$($envVarNameRegex)" } } + // scalastyle:off line.size.limit + /** + * Replace environment variables in a string according to the same rules as + * [[org.apache.hadoop.yarn.api.ApplicationConstants.Environment]]: + * `$VAR_NAME` for Unix, `%VAR_NAME%` for Windows, and `{{VAR_NAME}}` for all OS. + * The `${VAR_NAME}` syntax is also supported for Unix. + * This support escapes for `$` and `\` (on Unix) and `%` and `^` characters (on Windows), e.g. + * `\$FOO`, `^%FOO^%`, and `%%FOO%%` will be resolved to `$FOO`, `%FOO%`, and `%FOO%`, + * respectively, instead of being treated as variable names. + * + * @param unresolvedString The unresolved string which may contain variable references. + * @param env The System environment + * @param isWindows True iff running in a Windows environment + * @return The input string with variables replaced with their values from `env` + * @see [[https://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html Environment Variables (IEEE Std 1003.1-2017)]] + * @see [[https://en.wikibooks.org/wiki/Windows_Batch_Scripting#Quoting_and_escaping Windows Batch Scripting | Quoting and Escaping]] + */ + // scalastyle:on line.size.limit + def replaceEnvVars( + unresolvedString: String, + env: IMap[String, String], + isWindows: Boolean = Utils.isWindows): String = { + val osResolvedString = if (isWindows) { + // ^% or %% can both be used as escapes for Windows + val windowsPattern = ("""(?i)(?:\^\^|\^%|%%|%(""" + envVarNameRegex + ")%)").r + windowsPattern.replaceAllIn(unresolvedString, m => + Regex.quoteReplacement(m.matched match { + case "^^" => "^" + case "^%" => "%" + case "%%" => "%" + case _ => env.getOrElse(m.group(1), "") + }) + ) + } else { + val unixPattern = + ("""(?i)(?:\\\\|\\\$|\$(""" + envVarNameRegex + """)|\$\{(""" + envVarNameRegex + ")})").r + unixPattern.replaceAllIn(unresolvedString, m => + Regex.quoteReplacement(m.matched match { + case """\\""" => """\""" + case """\$""" => """$""" + case str if str.startsWith("${") => env.getOrElse(m.group(2), "") + case _ => env.getOrElse(m.group(1), "") + }) + ) + } + + // YARN uses `{{...}}` to represent OS-agnostic variable expansion strings. Normally the + // NodeManager would replace this string with an OS-specific replacement before launching + // the container. Here, it gets directly treated as an additional expansion string, which + // has the same net result. + // Ref: Javadoc for org.apache.hadoop.yarn.api.ApplicationConstants.Environment.$$() + val yarnPattern = ("""(?i)\{\{(""" + envVarNameRegex + ")}}").r + yarnPattern.replaceAllIn(osResolvedString, + m => Regex.quoteReplacement(env.getOrElse(m.group(1), ""))) + } + /** * Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. * Not killing the task leaves various aspects of the executor and (to some extent) the jvm in diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index ce46ffa06f0f..3dd51f174b01 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -21,6 +21,7 @@ import java.net.URL import org.apache.spark.SparkEnv import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.yarn.Client import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEnv @@ -38,7 +39,6 @@ private[spark] class YarnCoarseGrainedExecutorBackend( bindAddress: String, hostname: String, cores: Int, - userClassPath: Seq[URL], env: SparkEnv, resourcesFile: Option[String], resourceProfile: ResourceProfile) @@ -49,13 +49,15 @@ private[spark] class YarnCoarseGrainedExecutorBackend( bindAddress, hostname, cores, - userClassPath, env, resourcesFile, resourceProfile) with Logging { private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf) + override def getUserClassPath: Seq[URL] = + Client.getUserClasspathUrls(env.conf, useClusterPath = true) + override def extractLogUrls: Map[String, String] = { YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None) .getOrElse(Map()) @@ -73,7 +75,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, + arguments.bindAddress, arguments.hostname, arguments.cores, env, arguments.resourcesFileOpt, resourceProfile) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 32dab6db0442..86956672205c 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream} import java.net.URI +import java.nio.file.Paths import java.util.Properties import java.util.concurrent.ConcurrentHashMap @@ -614,6 +615,40 @@ class ClientSuite extends SparkFunSuite with Matchers { } } + test("SPARK-35672: test Client.getUserClasspathUrls") { + val gatewayRootPath = "/local/matching/replace" + val replacementRootPath = "/replaced/path" + val conf = new SparkConf() + .set(SECONDARY_JARS, Seq( + s"local:$gatewayRootPath/foo.jar", + "local:/local/not/matching/replace/foo.jar", + "file:/absolute/file/path/foo.jar", + s"$gatewayRootPath/but-not-actually-local/foo.jar", + "/absolute/path/foo.jar", + "relative/path/foo.jar" + )) + .set(GATEWAY_ROOT_PATH, gatewayRootPath) + .set(REPLACEMENT_ROOT_PATH, replacementRootPath) + + def assertUserClasspathUrls(cluster: Boolean, expectedReplacementPath: String): Unit = { + val expectedUrls = Seq( + Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString, + s"file:$expectedReplacementPath/foo.jar", + "file:/local/not/matching/replace/foo.jar", + "file:/absolute/file/path/foo.jar", + // since this path wasn't a local URI, it should never be replaced + s"file:$gatewayRootPath/but-not-actually-local/foo.jar", + "file:/absolute/path/foo.jar", + Paths.get("relative/path/foo.jar").toAbsolutePath.toUri.toString + ).map(URI.create(_).toURL).toArray + assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls) + } + // assert that no replacement happens when cluster = false by expecting the replacement + // path to be the same as the original path + assertUserClasspathUrls(cluster = false, gatewayRootPath) + assertUserClasspathUrls(cluster = true, replacementRootPath) + } + private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"), diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 26ff3bf2971f..9fd3c70fa86f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.deploy.yarn import java.io.File +import java.net.URL import java.nio.charset.StandardCharsets +import java.nio.file.Paths import java.util.{HashMap => JHashMap} import scala.collection.mutable @@ -150,12 +152,70 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - test("run Spark in yarn-client mode with additional jar") { - testWithAddJar(true) + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local'") { + val jarPath = createJarWithOriginalResourceFile().getPath + testWithAddJar(clientMode = true, s"local:$jarPath") } - test("run Spark in yarn-cluster mode with additional jar") { - testWithAddJar(false) + test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local'") { + val jarPath = createJarWithOriginalResourceFile().getPath + testWithAddJar(clientMode = false, s"local:$jarPath") + } + + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' " + + "and gateway-replacement path") { + // Use the original jar URL, but set up the gateway/replacement configs such that if + // replacement occurs, things will break. This ensures the replacement doesn't apply to the + // driver in 'client' mode. Executors will fail in this case because they still apply the + // replacement in client mode. + val jarUrl = createJarWithOriginalResourceFile() + testWithAddJar(clientMode = true, s"local:${jarUrl.getPath}", Map( + GATEWAY_ROOT_PATH.key -> Paths.get(jarUrl.toURI).getParent.toString, + REPLACEMENT_ROOT_PATH.key -> "/nonexistent/path/" + ), expectExecutorFailure = true) + } + + test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local' " + + "and gateway-replacement path") { + // Put a prefix in front of the original jar URL which causes it to be an invalid path. + // Set up the gateway/replacement configs such that if replacement occurs, it is a valid + // path again (by removing the prefix). This ensures the replacement is applied. + val jarPath = createJarWithOriginalResourceFile().getPath + val gatewayPath = "/replaceme/nonexistent/" + testWithAddJar(clientMode = false, s"local:$gatewayPath$jarPath", Map( + GATEWAY_ROOT_PATH.key -> gatewayPath, + REPLACEMENT_ROOT_PATH.key -> "" + )) + } + + test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local' " + + "and gateway-replacement path containing an environment variable") { + // Treat the entire jar path as a string which needs to be replaced, and which will be replaced + // (using the gateway/replacement logic) by two environment variables, both of which have to be + // resolved properly for the resulting path to be correct. Two environment variables are + // used to test the two different styles of variable substitution (OS-style vs. YARN-style) + val jarPath = Paths.get(createJarWithOriginalResourceFile().toURI) + + val envVarConfigs = for ( + envVar <- Map("PARENT" -> jarPath.getParent, "FILENAME" -> jarPath.getFileName); + prefix <- Seq("spark.yarn.appMasterEnv.", "spark.executorEnv.") + ) yield s"$prefix${envVar._1}" -> envVar._2.toString + + val osSpecificEnvVar = if (Utils.isWindows) "%PARENT%" else "${PARENT}" + testWithAddJar(clientMode = false, s"local:/replaceme", Map( + GATEWAY_ROOT_PATH.key -> "/replaceme", + REPLACEMENT_ROOT_PATH.key -> s"$osSpecificEnvVar/{{FILENAME}}" + ) ++ envVarConfigs) + } + + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file'") { + val jarPath = createJarWithOriginalResourceFile().getPath + testWithAddJar(clientMode = true, s"file:$jarPath") + } + + test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'file'") { + val jarPath = createJarWithOriginalResourceFile().getPath + testWithAddJar(clientMode = false, s"file:$jarPath") } test("run Spark in yarn-cluster mode unsuccessfully") { @@ -286,16 +346,22 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - private def testWithAddJar(clientMode: Boolean): Unit = { - val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) + private def createJarWithOriginalResourceFile(): URL = + TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) + + private def testWithAddJar( + clientMode: Boolean, + jarPath: String, + extraConf: Map[String, String] = Map(), + expectExecutorFailure: Boolean = false): Unit = { val driverResult = File.createTempFile("driver", null, tempDir) val executorResult = File.createTempFile("executor", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), - appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), - extraClassPath = Seq(originalJar.getPath()), - extraJars = Seq("local:" + originalJar.getPath())) + appArgs = Seq(driverResult.getAbsolutePath, executorResult.getAbsolutePath), + extraJars = Seq(jarPath), + extraConf = extraConf) checkResult(finalState, driverResult, "ORIGINAL") - checkResult(finalState, executorResult, "ORIGINAL") + checkResult(finalState, executorResult, if (expectExecutorFailure) "failure" else "ORIGINAL") } private def testPySpark( diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 5b762f606112..a36d1fa14d88 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -141,4 +141,44 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } } + + test("SPARK-35672: test replaceEnvVars in Unix mode") { + Map( + "F_O_O$FOO$BAR" -> "F_O_OBAR", + "$FOO" -> "BAR", + "$F_O_O$FOO" -> "BarBAR", + "${FOO}" -> "BAR", + "$FOO.baz$BAR" -> "BAR.baz", + "{{FOO}}" -> "BAR", + "{{FOO}}$FOO" -> "BARBAR", + "%FOO%" -> "%FOO%", + """\$FOO\\\$FOO\${FOO}\\$FOO\\\\""" -> """$FOO\$FOO${FOO}\BAR\\""" + ).foreach { case (input, expected) => + withClue(s"input string `$input`: ") { + val replaced = YarnSparkHadoopUtil + .replaceEnvVars(input, Map("F_O_O" -> "Bar", "FOO" -> "BAR"), isWindows = false) + assert(replaced === expected) + } + } + } + + test("SPARK-35672: test replaceEnvVars in Windows mode") { + Map( + "Foo%FOO%%BAR%" -> "FooBAR", + "%FOO%" -> "BAR", + "%F_O_O%%FOO%" -> "BarBAR", + "{{FOO}}%FOO%" -> "BARBAR", + "$FOO" -> "$FOO", + "${FOO}" -> "${FOO}", + "%%FOO%%%FOO%%%%%%FOO%" -> "%FOO%BAR%%BAR", + "%FOO%^^^%FOO^%^FOO^^^^%FOO%" -> "BAR^%FOO%^FOO^^BAR" + ).foreach { case (input, expected) => + withClue(s"input string `$input`: ") { + val replaced = YarnSparkHadoopUtil + .replaceEnvVars(input, Map("F_O_O" -> "Bar", "FOO" -> "BAR"), isWindows = true) + assert(replaced === expected) + } + } + } + }