diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index caccfac5ef13..4ff636110bbd 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -29,6 +29,7 @@ import scala.collection.JavaConverters._ import scala.collection.immutable.{Map => IMap} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.util.control.NonFatal +import scala.util.matching.Regex import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration @@ -37,7 +38,7 @@ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.util.StringUtils +import org.apache.hadoop.util.{Shell, StringUtils} import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords._ @@ -1473,7 +1474,7 @@ private[spark] object Client extends Logging { /** * Returns a list of local, absolute file URLs representing the user classpath. Note that this * must be executed on the same host which will access the URLs, as it will resolve relative - * paths based on the current working directory. + * paths based on the current working directory, as well as environment variables. * * @param conf Spark configuration. * @param useClusterPath Whether to use the 'cluster' path when resolving paths with the @@ -1492,10 +1493,51 @@ private[spark] object Client extends Logging { "getUserClasspath should only return 'file' or 'local' URIs but found: " + uri) inputPath } - Paths.get(replacedFilePath).toAbsolutePath.toUri.toURL + val envVarResolvedFilePath = replaceEnvVars(replacedFilePath, sys.env) + Paths.get(envVarResolvedFilePath).toAbsolutePath.toUri.toURL } } + /** + * Replace environment variables in a string according to the same rules [[Environment]]: + * `$VAR_NAME` for Unix, `%VAR_NAME%` for Windows, and `{{VAR_NAME}}` for all OS. + * Note that this won't properly support escapes for `$` and `%` characters, e.g. + * `\$VAR_NAME` when `VAR_NAME=foo` will resolve to `\foo` (whereas in an actual shell it would + * be `$VAR_NAME` due to the backslash). + * + * @param unresolvedString The unresolved string which may contain variable references. + * @param env The System environment + * @param isWindows True iff running in a Windows environment + * @return The input string with variables replaced with their values from `env` + */ + def replaceEnvVars( + unresolvedString: String, + env: IMap[String, String], + isWindows: Boolean = Shell.WINDOWS): String = { + val osSpecificPatterns = if (isWindows) { + Seq( + // Environment variables can contain pretty much anything + // Ref: https://docs.microsoft.com/en-us/windows/win32/procthread/environment-variables + "%([^=%]+)%".r("varname") + ) + } else { + Seq( + // Environment variables are alphanumeric plus underscore, case-sensitive, can't start with + // a digit, based on Shell and Utilities volume of IEEE Std 1003.1-2001 + // Ref: https://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html + "\\$([A-z_][A-z_0-9]*)".r("varname"), + "\\$\\{([A-z_][A-z_0-9]*)}".r("varname"), + ) + } + // {{...}} is a YARN thing and not OS-specific. Follow Unix shell naming conventions + (osSpecificPatterns :+ "\\{\\{([A-z_][A-z_0-9]*)}}".r("varname")) + .foldLeft(unresolvedString) { (inputStr, pattern) => + pattern.replaceSomeIn(inputStr, { m => + Some(Regex.quoteReplacement(env.getOrElse(m.group("varname"), ""))) + }) + } + } + private def getMainJarUri(mainJar: Option[String]): Option[URI] = { mainJar.flatMap { path => val uri = Utils.resolveURI(path) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 1650ea2e4491..d374e18d2ca9 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -618,6 +618,44 @@ class ClientSuite extends SparkFunSuite with Matchers { assertUserClasspathUrls(cluster = true, replacementRootPath) } + test("SPARK-35672: test replaceEnvVars in Unix mode") { + Map( + "F_O_O$FOO$BAR" -> "F_O_OBAR", + "$FOO" -> "BAR", + "$F_O_O$FOO" -> "BarBAR", + "${FOO}" -> "BAR", + "$FOO.$BAR" -> "BAR.", + "{{FOO}}" -> "BAR", + "{{FOO}}$FOO" -> "BARBAR", + "%FOO%" -> "%FOO%", + ).foreach { case (input, expected) => + withClue(s"input string `$input`: ") { + assert(Client.replaceEnvVars(input, Map( + "F_O_O" -> "Bar", + "FOO" -> "BAR" + ), isWindows = false) === expected) + } + } + } + + test("SPARK-35672: test replaceEnvVars in Windows mode") { + Map( + "Foo%FOO%%BAR%" -> "FooBAR", + "%FOO%" -> "BAR", + "%Foo%%FOO%" -> "BarBAR", + "{{FOO}}%FOO%" -> "BARBAR", + "$FOO" -> "$FOO", + "${FOO}" -> "${FOO}", + ).foreach { case (input, expected) => + withClue(s"input string `$input`: ") { + assert(Client.replaceEnvVars(input, Map( + "Foo" -> "Bar", + "FOO" -> "BAR" + ), isWindows = true) === expected) + } + } + } + private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"), diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 6bd9c4d6deee..f959aba7739b 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -188,6 +188,20 @@ class YarnClusterSuite extends BaseYarnClusterSuite { })) } + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' " + + "and gateway-replacement path containing an environment variable") { + // Set up the replacement path to point to the correct path, but suffixed with some + // nonexistent environment variable. If environment variable replacement takes place, then + // the var will be replaced with an empty string, and the path will be correct again + def urlToParentPath(url: URL): String = Paths.get(url.toURI).getParent.toString + testWithAddJar(clientMode = true, "local", Some(jarUrl => { + (jarUrl.getPath, Map( + GATEWAY_ROOT_PATH.key -> urlToParentPath(jarUrl), + REPLACEMENT_ROOT_PATH.key -> s"${urlToParentPath(jarUrl)}{{NO_SUCH_ENV_VAR___}}" + )) + })) + } + test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file'") { testWithAddJar(clientMode = true, "file") }