Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.{Map => IMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
import scala.util.control.NonFatal
import scala.util.matching.Regex

import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
Expand All @@ -37,7 +38,7 @@ import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.util.{Shell, StringUtils}
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
Expand Down Expand Up @@ -1473,7 +1474,7 @@ private[spark] object Client extends Logging {
/**
* Returns a list of local, absolute file URLs representing the user classpath. Note that this
* must be executed on the same host which will access the URLs, as it will resolve relative
* paths based on the current working directory.
* paths based on the current working directory, as well as environment variables.
*
* @param conf Spark configuration.
* @param useClusterPath Whether to use the 'cluster' path when resolving paths with the
Expand All @@ -1492,10 +1493,51 @@ private[spark] object Client extends Logging {
"getUserClasspath should only return 'file' or 'local' URIs but found: " + uri)
inputPath
}
Paths.get(replacedFilePath).toAbsolutePath.toUri.toURL
val envVarResolvedFilePath = replaceEnvVars(replacedFilePath, sys.env)
Paths.get(envVarResolvedFilePath).toAbsolutePath.toUri.toURL
}
}

/**
* Replace environment variables in a string according to the same rules [[Environment]]:
* `$VAR_NAME` for Unix, `%VAR_NAME%` for Windows, and `{{VAR_NAME}}` for all OS.
* Note that this won't properly support escapes for `$` and `%` characters, e.g.
* `\$VAR_NAME` when `VAR_NAME=foo` will resolve to `\foo` (whereas in an actual shell it would
* be `$VAR_NAME` due to the backslash).
*
* @param unresolvedString The unresolved string which may contain variable references.
* @param env The System environment
* @param isWindows True iff running in a Windows environment
* @return The input string with variables replaced with their values from `env`
*/
def replaceEnvVars(
unresolvedString: String,
env: IMap[String, String],
isWindows: Boolean = Shell.WINDOWS): String = {
val osSpecificPatterns = if (isWindows) {
Seq(
// Environment variables can contain pretty much anything
// Ref: https://docs.microsoft.com/en-us/windows/win32/procthread/environment-variables
"%([^=%]+)%".r("varname")
)
} else {
Seq(
// Environment variables are alphanumeric plus underscore, case-sensitive, can't start with
// a digit, based on Shell and Utilities volume of IEEE Std 1003.1-2001
// Ref: https://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html
"\\$([A-z_][A-z_0-9]*)".r("varname"),
"\\$\\{([A-z_][A-z_0-9]*)}".r("varname"),
)
}
// {{...}} is a YARN thing and not OS-specific. Follow Unix shell naming conventions
(osSpecificPatterns :+ "\\{\\{([A-z_][A-z_0-9]*)}}".r("varname"))
.foldLeft(unresolvedString) { (inputStr, pattern) =>
pattern.replaceSomeIn(inputStr, { m =>
Some(Regex.quoteReplacement(env.getOrElse(m.group("varname"), "")))
})
}
}

private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
mainJar.flatMap { path =>
val uri = Utils.resolveURI(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,44 @@ class ClientSuite extends SparkFunSuite with Matchers {
assertUserClasspathUrls(cluster = true, replacementRootPath)
}

test("SPARK-35672: test replaceEnvVars in Unix mode") {
Map(
"F_O_O$FOO$BAR" -> "F_O_OBAR",
"$FOO" -> "BAR",
"$F_O_O$FOO" -> "BarBAR",
"${FOO}" -> "BAR",
"$FOO.$BAR" -> "BAR.",
"{{FOO}}" -> "BAR",
"{{FOO}}$FOO" -> "BARBAR",
"%FOO%" -> "%FOO%",
).foreach { case (input, expected) =>
withClue(s"input string `$input`: ") {
assert(Client.replaceEnvVars(input, Map(
"F_O_O" -> "Bar",
"FOO" -> "BAR"
), isWindows = false) === expected)
}
}
}

test("SPARK-35672: test replaceEnvVars in Windows mode") {
Map(
"Foo%FOO%%BAR%" -> "FooBAR",
"%FOO%" -> "BAR",
"%Foo%%FOO%" -> "BarBAR",
"{{FOO}}%FOO%" -> "BARBAR",
"$FOO" -> "$FOO",
"${FOO}" -> "${FOO}",
).foreach { case (input, expected) =>
withClue(s"input string `$input`: ") {
assert(Client.replaceEnvVars(input, Map(
"Foo" -> "Bar",
"FOO" -> "BAR"
), isWindows = true) === expected)
}
}
}

private val matching = Seq(
("files URI match test1", "file:///file1", "file:///file2"),
("files URI match test2", "file:///c:file1", "file://c:file2"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,20 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
}))
}

test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' " +
"and gateway-replacement path containing an environment variable") {
// Set up the replacement path to point to the correct path, but suffixed with some
// nonexistent environment variable. If environment variable replacement takes place, then
// the var will be replaced with an empty string, and the path will be correct again
def urlToParentPath(url: URL): String = Paths.get(url.toURI).getParent.toString
testWithAddJar(clientMode = true, "local", Some(jarUrl => {
(jarUrl.getPath, Map(
GATEWAY_ROOT_PATH.key -> urlToParentPath(jarUrl),
REPLACEMENT_ROOT_PATH.key -> s"${urlToParentPath(jarUrl)}{{NO_SUCH_ENV_VAR___}}"
))
}))
}

test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file'") {
testWithAddJar(clientMode = true, "file")
}
Expand Down