Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/spark-submit.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ rem
rem This is the entry point for running Spark submit. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

cmd /V /E /C "%~dp0spark-submit2.cmd" %*
cmd /V /E /C ""%~dp0spark-submit2.cmd" %*"
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ private[deploy] class DriverRunner(
Files.append(header, stderr, StandardCharsets.UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}

if (Utils.isWindows) {
Utils.shortenClasspath(builder)
}

runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ private[deploy] class ExecutorRunner(
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()

if (Utils.isWindows) {
Utils.shortenClasspath(builder)
}

val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")

Expand Down
50 changes: 50 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
Expand Down Expand Up @@ -1090,6 +1091,50 @@ private[spark] object Utils extends Logging {
bytesToString(megabytes * 1024L * 1024L)
}


/**
* Create a jar file at the given path, containing a manifest with a classpath
* that references all specified entries.
*/
def createShortClassPath(tempDir: File, classPath: String) : String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, def createShortClassPath(tempDir: File, classPath: String): String = {

if (isWindows) {
val env = new util.HashMap[String, String](System.getenv())
val javaCps = FileUtil
.createJarWithClassPath(classPath, new Path(tempDir.getAbsolutePath), env)
val javaCpStr = javaCps(0) + javaCps(1)
logInfo("Shorten the class path to: " + javaCpStr)
javaCpStr
} else {
classPath
}
}

def createShortClassPath(classPath: String) : String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

def createShortClassPath(classPath: String): String = {

val tempDir = createTempDir("classpaths")
createShortClassPath(tempDir, classPath)
}

/**
* Create a jar file at the given path, containing a manifest with a classpath
* that references all specified entries.
*/
def shortenClasspath(builder: ProcessBuilder): Unit = {
if (builder.command.asScala.mkString("\"", "\" \"", "\"").length > 8190) {
logWarning("Cmd too long, try to shorten the classpath")
// look for the class path
// note that environment set in teh ProcessBuilder is process-local. So it
// won't pollute the environment
val command = builder.command()
val idxCp = command.indexOf("-cp")
if (idxCp > 0 && idxCp + 1 < command.size()) {
val classPath = command.get(idxCp + 1)
val shortPath = createShortClassPath(classPath)
command.set(idxCp + 1, shortPath)
}
}
}


/**
* Execute a command and return the process running the command.
*/
Expand All @@ -1103,6 +1148,11 @@ private[spark] object Utils extends Logging {
for ((key, value) <- extraEnvironment) {
environment.put(key, value)
}

if (Utils.isWindows) {
Utils.shortenClasspath(builder)
}

val process = builder.start()
if (redirectStderr) {
val threadName = "redirect stderr for command " + command(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.launcher;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.SystemUtils;
import org.apache.spark.util.Utils;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should reorder imports. (See databricks/scala-style-guide#imports)

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -99,15 +102,28 @@ public void testChildProcLauncher() throws Exception {
String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
"-Dfoo=bar -Dtest.appender=childproc")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.addAppArgs("proc");
final Process app = launcher.launch();

new OutputRedirector(app.getInputStream(), TF);
new OutputRedirector(app.getErrorStream(), TF);
assertEquals(0, app.waitFor());
File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
try {
if (SystemUtils.IS_OS_WINDOWS) {
launcher.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH,
Utils.createShortClassPath(tempDir, System.getProperty("java.class.path")));
} else {
launcher.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"));
}
final Process app = launcher.launch();

new OutputRedirector(app.getInputStream(), TF);
new OutputRedirector(app.getErrorStream(), TF);
assertEquals(0, app.waitFor());
} finally {
if(tempDir.exists()) {
Utils.deleteRecursively(tempDir);
}
}
}

public static class SparkLauncherTestApp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._

import org.apache.spark._
import org.apache.spark.util.Utils

class LauncherBackendSuite extends SparkFunSuite with Matchers {

private val tests = Seq(
"local" -> "local",
"standalone/client" -> "local-cluster[1,1,1024]")

val tempDir = Utils.createTempDir()

tests.foreach { case (name, master) =>
test(s"$name: launcher handle") {
testWithMaster(master)
Expand All @@ -42,16 +45,22 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers {
private def testWithMaster(master: String): Unit = {
val env = new java.util.HashMap[String, String]()
env.put("SPARK_PRINT_LAUNCH_COMMAND", "1")
val handle = new SparkLauncher(env)

val launcher = new SparkLauncher(env)
.setSparkHome(sys.props("spark.test.home"))
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.setConf("spark.ui.enabled", "false")
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, s"-Dtest.appender=console")
.setMaster(master)
.setAppResource("spark-internal")
.setMainClass(TestApp.getClass.getName().stripSuffix("$"))
.startApplication()
if (Utils.isWindows) {
launcher.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH,
Utils.createShortClassPath(tempDir, System.getProperty("java.class.path")))
} else {
launcher.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
}

val handle = launcher.startApplication()
try {
eventually(timeout(30 seconds), interval(100 millis)) {
handle.getAppId() should not be (null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.spark.launcher;

import java.io.BufferedReader;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -28,8 +30,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.jar.Attributes;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.regex.Matcher;

import static org.apache.spark.launcher.CommandBuilderUtils.*;

Expand All @@ -53,6 +59,8 @@ abstract class AbstractCommandBuilder {
final Map<String, String> childEnv;
final Map<String, String> conf;

private static File classPathShortDir = new File(System.getProperty("java.io.tmpdir"), "spark-classpath");

// The merged configuration for the application. Cached to avoid having to read / parse
// properties files multiple times.
private Map<String, String> effectiveConfig;
Expand Down Expand Up @@ -115,10 +123,159 @@ List<String> buildJavaCommand(String extraClassPath) throws IOException {
}

cmd.add("-cp");
cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath)));
List<String> classPathEntries = buildClassPath(extraClassPath);
String classPath = null;
if(isWindows()) {
String[] jarCp = createJarWithClassPath(
classPathEntries.toArray(new String[classPathEntries.size()]),
classPathShortDir,
new HashMap<>(System.getenv()));
classPath = jarCp[0] + File.pathSeparator + jarCp[1];
} else {
classPath = join(File.pathSeparator, classPathEntries);
}
cmd.add(classPath);
return cmd;
}

/**
* Create a jar file at the given path, containing a manifest with a classpath
* that references all specified entries.
*
* Some platforms may have an upper limit on command line length. For example,
* the maximum command line length on Windows is 8191 characters, but the
* length of the classpath may exceed this. To work around this limitation,
* use this method to create a small intermediate jar with a manifest that
* contains the full classpath. It returns the absolute path to the new jar,
* which the caller may set as the classpath for a new process.
*
* Environment variable evaluation is not supported within a jar manifest, so
* this method expands environment variables before inserting classpath entries
* to the manifest. The method parses environment variables according to
* platform-specific syntax (%VAR% on Windows, or $VAR otherwise). On Windows,
* environment variables are case-insensitive. For example, %VAR% and %var%
* evaluate to the same value.
*
* Specifying the classpath in a jar manifest does not support wildcards, so
* this method expands wildcards internally. Any classpath entry that ends
* with * is translated to all files at that path with extension .jar or .JAR.
*
* This method is adapted from the Hadoop-common-project FSUtils#createJarWithClassPath
* Reimplement the mehtod here to avoid heavy dependencies
*
* @param classPathEntries String input classpath to bundle into the jar manifest
* @param workingDir Path to working directory to save jar
* @param callerEnv Map<String, String> caller's environment variables to use
* for expansion
* @return String[] with absolute path to new jar in position 0 and
* unexpanded wild card entry path in position 1
* @throws IOException if there is an I/O error while writing the jar file
*/
public static String[] createJarWithClassPath(String[] classPathEntries, File workingDir,
Map<String, String> callerEnv) throws IOException {
// Replace environment variables, case-insensitive on Windows
Map<String, String> env = new HashMap<>();
if (isWindows()) {
for(Map.Entry<String, String> entry : callerEnv.entrySet()) {
env.put(entry.getKey().toLowerCase(), entry.getValue());
}
} else {
env = callerEnv;
}

// expand the environment variables
Pattern envVarPattern = isWindows() ? Pattern.compile("%(.*?)%") :
Pattern.compile("\\$([A-Za-z_]{1}[A-Za-z0-9_]*)");
for (int i = 0; i < classPathEntries.length; ++i) {
String template = classPathEntries[i];
StringBuffer sb = new StringBuffer();
Matcher matcher = envVarPattern.matcher(template);
while (matcher.find()) {
String replacement = env.get(matcher.group(1));
if (replacement == null) {
replacement = "";
}
matcher.appendReplacement(sb, Matcher.quoteReplacement(replacement));
}
matcher.appendTail(sb);
classPathEntries[i] = sb.toString();
}

if (!workingDir.exists()) {
workingDir.mkdirs();
}

StringBuilder unexpandedWildcardClasspath = new StringBuilder();
// Append all entries
List<String> classPathEntryList = new ArrayList<String>(
classPathEntries.length);
for (String classPathEntry: classPathEntries) {
if (classPathEntry.length() == 0) {
continue;
}
if (classPathEntry.endsWith("*")) {
boolean foundWildCardJar = false;
// Append all jars that match the wildcard
File[] files = new File(classPathEntry.substring(0, classPathEntry.length() - 2)).listFiles();
if (files != null) {
for (File f : files) {
if (f.getName().toLowerCase().endsWith("jar")) {
foundWildCardJar = true;
classPathEntryList.add(f.getAbsoluteFile().toURI().toURL().toExternalForm());
}
}
}

if (!foundWildCardJar) {
unexpandedWildcardClasspath.append(File.pathSeparator);
unexpandedWildcardClasspath.append(classPathEntry);
}
} else {
// Append just this entry
File fileCpEntry = new File(classPathEntry);
String classPathEntryUrl = fileCpEntry.toURI().toURL()
.toExternalForm();

// File.toURI only appends trailing '/' if it can determine that it is a
// directory that already exists. (See JavaDocs.) If this entry had a
// trailing '/' specified by the caller, then guarantee that the
// classpath entry in the manifest has a trailing '/', and thus refers to
// a directory instead of a file. This can happen if the caller is
// creating a classpath jar referencing a directory that hasn't been
// created yet, but will definitely be created before running.
if (classPathEntry.endsWith("/") &&
!classPathEntryUrl.endsWith("/")) {
classPathEntryUrl = classPathEntryUrl + "/";
}
classPathEntryList.add(classPathEntryUrl);
}
}
String jarClassPath = join(" ", classPathEntryList);

// Create the manifest
Manifest jarManifest = new Manifest();
jarManifest.getMainAttributes().putValue(
Attributes.Name.MANIFEST_VERSION.toString(), "1.0");
jarManifest.getMainAttributes().putValue(
Attributes.Name.CLASS_PATH.toString(), jarClassPath);

// Write the manifest to output JAR file
File classPathJar = File.createTempFile("classpath-", ".jar", workingDir);
FileOutputStream fos = null;
BufferedOutputStream bos = null;
JarOutputStream jos = null;
try {
fos = new FileOutputStream(classPathJar);
bos = new BufferedOutputStream(fos);
jos = new JarOutputStream(bos, jarManifest);
} finally {
jos.close();
}
String[] jarCp = {classPathJar.getCanonicalPath(),
unexpandedWildcardClasspath.toString()};
return jarCp;
}

void addOptionString(List<String> cmd, String options) {
if (!isEmpty(options)) {
for (String opt : parseOptionString(options)) {
Expand Down
Loading