Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ public void testChildProcLauncher() throws Exception {
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.redirectError()
.addAppArgs("proc");
final Process app = launcher.launch();

new OutputRedirector(app.getInputStream(), TF);
new OutputRedirector(app.getErrorStream(), TF);
new OutputRedirector(app.getInputStream(), getClass().getName() + ".child", TF);
assertEquals(0, app.waitFor());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ChildProcAppHandle implements SparkAppHandle {
private final String secret;
private final LauncherServer server;

private Process childProc;
private volatile Process childProc;
private boolean disposed;
private LauncherConnection connection;
private List<Listener> listeners;
Expand Down Expand Up @@ -96,18 +96,14 @@ public synchronized void disconnect() {

@Override
public synchronized void kill() {
if (!disposed) {
disconnect();
}
disconnect();
if (childProc != null) {
try {
childProc.exitValue();
} catch (IllegalThreadStateException e) {
if (childProc.isAlive()) {
childProc.destroyForcibly();
} finally {
childProc = null;
}
childProc = null;
}
setState(State.KILLED);
}

String getSecret() {
Expand All @@ -118,7 +114,13 @@ void setChildProc(Process childProc, String loggerName, InputStream logStream) {
this.childProc = childProc;
if (logStream != null) {
this.redirector = new OutputRedirector(logStream, loggerName,
SparkLauncher.REDIRECTOR_FACTORY);
SparkLauncher.REDIRECTOR_FACTORY, this);
} else {
// If there is no log redirection, spawn a thread that will wait for the child process
// to finish.
Thread waiter = SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild);
waiter.setDaemon(true);
waiter.start();
}
}

Expand All @@ -134,7 +136,7 @@ LauncherConnection getConnection() {
return connection;
}

void setState(State s) {
synchronized void setState(State s) {
if (!state.isFinal()) {
state = s;
fireEvent(false);
Expand All @@ -144,17 +146,48 @@ void setState(State s) {
}
}

void setAppId(String appId) {
synchronized void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}

// Visible for testing.
boolean isRunning() {
return childProc == null || childProc.isAlive() || (redirector != null && redirector.isAlive());
/**
* Wait for the child process to exit and update the handle's state if necessary, accoding to
* the exit code.
*/
void monitorChild() {
while (childProc.isAlive()) {
try {
childProc.waitFor();
} catch (Exception e) {
LOG.log(Level.WARNING, "Exception waiting for child process to exit.", e);
}
}

synchronized (this) {
if (disposed) {
return;
}

disconnect();

int ec;
try {
ec = childProc.exitValue();
} catch (Exception e) {
LOG.log(Level.WARNING, "Exception getting child process exit code, assuming failure.", e);
ec = 1;
}

// Only override the success state; leave other fail states alone.
if (!state.isFinal() || (ec != 0 && state == State.FINISHED)) {
state = State.LOST;
fireEvent(false);
}
}
}

private synchronized void fireEvent(boolean isInfoChanged) {
private void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,24 @@ class OutputRedirector {
private final BufferedReader reader;
private final Logger sink;
private final Thread thread;
private final ChildProcAppHandle callback;

private volatile boolean active;

OutputRedirector(InputStream in, ThreadFactory tf) {
this(in, OutputRedirector.class.getName(), tf);
OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
this(in, loggerName, tf, null);
}

OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
OutputRedirector(
InputStream in,
String loggerName,
ThreadFactory tf,
ChildProcAppHandle callback) {
this.active = true;
this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
this.thread = tf.newThread(this::redirect);
this.sink = Logger.getLogger(loggerName);
this.callback = callback;
thread.start();
}

Expand All @@ -59,6 +65,10 @@ private void redirect() {
}
} catch (IOException e) {
sink.log(Level.FINE, "Error reading child process output.", e);
} finally {
if (callback != null) {
callback.monitorChild();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.nio.file.attribute.PosixFilePermission.*;

Expand All @@ -39,7 +40,7 @@

import static org.apache.spark.launcher.CommandBuilderUtils.*;

public class OutputRedirectionSuite extends BaseSuite {
public class ChildProcAppHandleSuite extends BaseSuite {

private static final List<String> MESSAGES = new ArrayList<>();

Expand Down Expand Up @@ -99,7 +100,8 @@ public void testRedirectLastWins() throws Exception {
public void testRedirectToLog() throws Exception {
assumeFalse(isWindows());

ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher().startApplication();
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.startApplication();
waitFor(handle);

assertTrue(MESSAGES.contains("output"));
Expand All @@ -112,7 +114,7 @@ public void testRedirectErrorToLog() throws Exception {

Path err = Files.createTempFile("stderr", "txt");

ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.redirectError(err.toFile())
.startApplication();
waitFor(handle);
Expand All @@ -127,7 +129,7 @@ public void testRedirectOutputToLog() throws Exception {

Path out = Files.createTempFile("stdout", "txt");

ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.redirectOutput(out.toFile())
.startApplication();
waitFor(handle);
Expand Down Expand Up @@ -173,17 +175,37 @@ public void testRedirectErrorTwiceFails() throws Exception {
.waitFor();
}

private void waitFor(ChildProcAppHandle handle) throws Exception {
@Test
public void testProcMonitorWithOutputRedirection() throws Exception {
File err = Files.createTempFile("out", "txt").toFile();
SparkAppHandle handle = new TestSparkLauncher()
.redirectError()
.redirectOutput(err)
.startApplication();
waitFor(handle);
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}

@Test
public void testProcMonitorWithLogRedirection() throws Exception {
SparkAppHandle handle = new TestSparkLauncher()
.redirectToLog(getClass().getName())
.startApplication();
waitFor(handle);
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}

private void waitFor(SparkAppHandle handle) throws Exception {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
try {
while (handle.isRunning()) {
Thread.sleep(10);
while (!handle.getState().isFinal()) {
assertTrue("Timed out waiting for handle to transition to final state.",
System.nanoTime() < deadline);
TimeUnit.MILLISECONDS.sleep(10);
}
} finally {
// Explicit unregister from server since the handle doesn't yet do that when the
// process finishes by itself.
LauncherServer server = LauncherServer.getServerInstance();
if (server != null) {
server.unregister(handle);
if (!handle.getState().isFinal()) {
handle.kill();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion launcher/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ log4j.appender.childproc.target=System.err
log4j.appender.childproc.layout=org.apache.log4j.PatternLayout
log4j.appender.childproc.layout.ConversionPattern=%t: %m%n

log4j.appender.outputredirtest=org.apache.spark.launcher.OutputRedirectionSuite$LogAppender
log4j.appender.outputredirtest=org.apache.spark.launcher.ChildProcAppHandleSuite$LogAppender
log4j.logger.org.apache.spark.launcher.app.outputredirtest=INFO, outputredirtest
log4j.logger.org.apache.spark.launcher.app.outputredirtest.additivity=false

Expand Down