Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,8 @@ private[spark] class Client(
val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
sys.props.get("spark.driver.libraryPath")).flatten
if (libraryPaths.nonEmpty) {
prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(libraryPaths)))
prefixEnv = Some(createLibraryPathPrefix(libraryPaths.mkString(File.pathSeparator),
sparkConf))
}
if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) {
logWarning(s"${AM_JAVA_OPTIONS.key} will not take effect in cluster mode")
Expand All @@ -921,7 +922,7 @@ private[spark] class Client(
.map(YarnSparkHadoopUtil.escapeForShell)
}
sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(paths))))
prefixEnv = Some(createLibraryPathPrefix(paths, sparkConf))
}
}

Expand Down Expand Up @@ -1485,6 +1486,22 @@ private object Client extends Logging {
YarnAppReport(report.getYarnApplicationState(), report.getFinalApplicationStatus(), diagsOpt)
}

/**
* Create a properly quoted library path string to be added as a prefix to the command executed by
* YARN. This is different from plain quoting due to YARN executing the command through "bash -c".
*/
def createLibraryPathPrefix(libpath: String, conf: SparkConf): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe pull this into a util class and have unit tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so specific to the way YARN runs things that I don't think it would be useful anywhere else. If at some point it becomes useful, the code can be moved.

I think the tests I added are better than just unit testing this function, since that way the code is actually being run through YARN and bash.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k

val cmdPrefix = if (Utils.isWindows) {
Utils.libraryPathEnvPrefix(Seq(libpath))
} else {
val envName = Utils.libraryPathEnvName
// For quotes, escape both the quote and the escape character when encoding in the command
// string.
val quoted = libpath.replace("\"", "\\\\\\\"")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumb question i think escaping """ => "\"". Not sure why we have so many escapes otherwise. Trying to understand, else PR looks good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be double escaped because it goes through two rounds of bash interpreting the value.

envName + "=\\\"" + quoted + File.pathSeparator + "$" + envName + "\\\""
}
getClusterPath(conf, cmdPrefix)
}
}

private[spark] class YarnClusterApplication extends SparkApplication {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,6 @@ private[yarn] class ExecutorRunnable(
// Extra options for the JVM
val javaOpts = ListBuffer[String]()

// Set the environment variable through a command prefix
// to append to the existing value of the variable
var prefixEnv: Option[String] = None

// Set the JVM memory
val executorMemoryString = executorMemory + "m"
javaOpts += "-Xmx" + executorMemoryString
Expand All @@ -144,8 +140,11 @@ private[yarn] class ExecutorRunnable(
val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId)
javaOpts ++= Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell)
}
sparkConf.get(EXECUTOR_LIBRARY_PATH).foreach { p =>
prefixEnv = Some(Client.getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(p))))

// Set the library path through a command prefix to append to the existing value of the
// env variable.
val prefixEnv = sparkConf.get(EXECUTOR_LIBRARY_PATH).map { libPath =>
Client.createLibraryPathPrefix(libPath, sparkConf)
}

javaOpts += "-Djava.io.tmpdir=" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -216,6 +217,14 @@ abstract class BaseYarnClusterSuite
props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")

// SPARK-24446: make sure special characters in the library path do not break containers.
if (!Utils.isWindows) {
val libPath = """/tmp/does not exist:$PWD/tmp:/tmp/quote":/tmp/ampersand&"""
props.setProperty(AM_LIBRARY_PATH.key, libPath)
props.setProperty(DRIVER_LIBRARY_PATH.key, libPath)
props.setProperty(EXECUTOR_LIBRARY_PATH.key, libPath)
}

yarnCluster.getConfig().asScala.foreach { e =>
props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
}
Expand Down