Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,7 @@ public class SparkLauncherSuite {
private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");

private SparkLauncher launcher;

@Before
public void configureLauncher() {
launcher = new SparkLauncher().setSparkHome(System.getProperty("spark.test.home"));
}
private final SparkLauncher launcher = new SparkLauncher();

@Test
public void testSparkArgumentHandling() throws Exception {
Expand Down Expand Up @@ -101,60 +96,6 @@ public void testSparkArgumentHandling() throws Exception {
assertEquals("python3.5", launcher.builder.conf.get(package$.MODULE$.PYSPARK_PYTHON().key()));
}

@Test(expected=IllegalStateException.class)
public void testRedirectTwiceFails() throws Exception {
launcher.setAppResource("fake-resource.jar")
.setMainClass("my.fake.class.Fake")
.redirectError()
.redirectError(ProcessBuilder.Redirect.PIPE)
.launch();
}

@Test(expected=IllegalStateException.class)
public void testRedirectToLogWithOthersFails() throws Exception {
launcher.setAppResource("fake-resource.jar")
.setMainClass("my.fake.class.Fake")
.redirectToLog("fakeLog")
.redirectError(ProcessBuilder.Redirect.PIPE)
.launch();
}

@Test
public void testRedirectErrorToOutput() throws Exception {
launcher.redirectError();
assertTrue(launcher.redirectErrorStream);
}

@Test
public void testRedirectsSimple() throws Exception {
launcher.redirectError(ProcessBuilder.Redirect.PIPE);
assertNotNull(launcher.errorStream);
assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.PIPE);

launcher.redirectOutput(ProcessBuilder.Redirect.PIPE);
assertNotNull(launcher.outputStream);
assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.PIPE);
}

@Test
public void testRedirectLastWins() throws Exception {
launcher.redirectError(ProcessBuilder.Redirect.PIPE)
.redirectError(ProcessBuilder.Redirect.INHERIT);
assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.INHERIT);

launcher.redirectOutput(ProcessBuilder.Redirect.PIPE)
.redirectOutput(ProcessBuilder.Redirect.INHERIT);
assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
}

@Test
public void testRedirectToLog() throws Exception {
launcher.redirectToLog("fakeLogger");
assertTrue(launcher.redirectToLog);
assertTrue(launcher.builder.getEffectiveConfig()
.containsKey(SparkLauncher.CHILD_PROCESS_LOGGER_NAME));
}

@Test
public void testChildProcLauncher() throws Exception {
// This test is failed on Windows due to the failure of initiating executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ String getScalaVersion() {

String getSparkHome() {
String path = getenv(ENV_SPARK_HOME);
if (path == null && "1".equals(getenv("SPARK_TESTING"))) {
path = System.getProperty("spark.test.home");
}
checkState(path != null,
"Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
return path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.launcher;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
Expand Down Expand Up @@ -113,10 +114,12 @@ String getSecret() {
return secret;
}

void setChildProc(Process childProc, String loggerName) {
void setChildProc(Process childProc, String loggerName, InputStream logStream) {
this.childProc = childProc;
this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName,
SparkLauncher.REDIRECTOR_FACTORY);
if (logStream != null) {
this.redirector = new OutputRedirector(logStream, loggerName,
SparkLauncher.REDIRECTOR_FACTORY);
}
}

void setConnection(LauncherConnection connection) {
Expand Down Expand Up @@ -146,6 +149,11 @@ void setAppId(String appId) {
fireEvent(true);
}

// Visible for testing.
boolean isRunning() {
return childProc == null || childProc.isAlive() || (redirector != null && redirector.isAlive());
}

private synchronized void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ void stop() {
active = false;
}

boolean isAlive() {
return thread.isAlive();
}

}
87 changes: 58 additions & 29 deletions launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -110,7 +111,6 @@ public static void setConfig(String name, String value) {
// Visible for testing.
final SparkSubmitCommandBuilder builder;
File workingDir;
boolean redirectToLog;
boolean redirectErrorStream;
ProcessBuilder.Redirect errorStream;
ProcessBuilder.Redirect outputStream;
Expand Down Expand Up @@ -446,7 +446,6 @@ public SparkLauncher redirectOutput(File outFile) {
*/
public SparkLauncher redirectToLog(String loggerName) {
setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
redirectToLog = true;
return this;
}

Expand All @@ -459,11 +458,22 @@ public SparkLauncher redirectToLog(String loggerName) {
* @return A process handle for the Spark app.
*/
public Process launch() throws IOException {
Process childProc = createBuilder().start();
if (redirectToLog) {
String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
new OutputRedirector(childProc.getInputStream(), loggerName, REDIRECTOR_FACTORY);
ProcessBuilder pb = createBuilder();

boolean outputToLog = outputStream == null;
boolean errorToLog = !redirectErrorStream && errorStream == null;

String loggerName = getLoggerName();
if (loggerName != null && outputToLog && errorToLog) {
pb.redirectErrorStream(true);
}

Process childProc = pb.start();
if (loggerName != null) {
InputStream logStream = outputToLog ? childProc.getInputStream() : childProc.getErrorStream();
new OutputRedirector(logStream, loggerName, REDIRECTOR_FACTORY);
}

return childProc;
}

Expand Down Expand Up @@ -498,38 +508,48 @@ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) thr
handle.addListener(l);
}

String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
String loggerName = getLoggerName();
ProcessBuilder pb = createBuilder();

boolean outputToLog = outputStream == null;
boolean errorToLog = !redirectErrorStream && errorStream == null;

// Only setup stderr + stdout to logger redirection if user has not otherwise configured output
// redirection.
if (loggerName == null) {
String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
if (appName == null) {
if (builder.appName != null) {
appName = builder.appName;
} else if (builder.mainClass != null) {
int dot = builder.mainClass.lastIndexOf(".");
if (dot >= 0 && dot < builder.mainClass.length() - 1) {
appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
} else {
appName = builder.mainClass;
}
} else if (builder.appResource != null) {
appName = new File(builder.appResource).getName();
if (loggerName == null && (outputToLog || errorToLog)) {
String appName;
if (builder.appName != null) {
appName = builder.appName;
} else if (builder.mainClass != null) {
int dot = builder.mainClass.lastIndexOf(".");
if (dot >= 0 && dot < builder.mainClass.length() - 1) {
appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
} else {
appName = String.valueOf(COUNTER.incrementAndGet());
appName = builder.mainClass;
}
} else if (builder.appResource != null) {
appName = new File(builder.appResource).getName();
} else {
appName = String.valueOf(COUNTER.incrementAndGet());
}
String loggerPrefix = getClass().getPackage().getName();
loggerName = String.format("%s.app.%s", loggerPrefix, appName);
}

if (outputToLog && errorToLog) {
pb.redirectErrorStream(true);
}

pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
String.valueOf(LauncherServer.getServerInstance().getPort()));
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
try {
handle.setChildProc(pb.start(), loggerName);
Process child = pb.start();
InputStream logStream = null;
if (loggerName != null) {
logStream = outputToLog ? child.getInputStream() : child.getErrorStream();
}
handle.setChildProc(child, loggerName, logStream);
} catch (IOException ioe) {
handle.kill();
throw ioe;
Expand All @@ -538,10 +558,9 @@ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) thr
return handle;
}

private ProcessBuilder createBuilder() {
private ProcessBuilder createBuilder() throws IOException {
List<String> cmd = new ArrayList<>();
String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
cmd.add(join(File.separator, builder.getSparkHome(), "bin", script));
cmd.add(findSparkSubmit());
cmd.addAll(builder.buildSparkSubmitArgs());

// Since the child process is a batch script, let's quote things so that special characters are
Expand All @@ -568,11 +587,11 @@ private ProcessBuilder createBuilder() {
// Similarly, if redirectToLog is specified, no other redirections should be specified.
checkState(!redirectErrorStream || errorStream == null,
"Cannot specify both redirectError() and redirectError(...) ");
checkState(!redirectToLog ||
(!redirectErrorStream && errorStream == null && outputStream == null),
checkState(getLoggerName() == null ||
((!redirectErrorStream && errorStream == null) || outputStream == null),
"Cannot used redirectToLog() in conjunction with other redirection methods.");

if (redirectErrorStream || redirectToLog) {
if (redirectErrorStream) {
pb.redirectErrorStream(true);
}
if (errorStream != null) {
Expand All @@ -585,6 +604,16 @@ private ProcessBuilder createBuilder() {
return pb;
}

// Visible for testing.
String findSparkSubmit() throws IOException {
String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
return join(File.separator, builder.getSparkHome(), "bin", script);
}

private String getLoggerName() throws IOException {
return builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
}

private static class ArgumentValidator extends SparkSubmitOptionParser {

private final boolean hasValue;
Expand Down
Loading