Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,24 @@ private void inProcessLauncherTestImpl() throws Exception {

SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);
synchronized (InProcessTestApp.LOCK) {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

// SPARK-23020: see doc for InProcessTestApp.LOCK for a description of the race. Here
// we wait until we know that the connection between the app and the launcher has been
// established before allowing the app to finish.
final SparkAppHandle _handle = handle;
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertNotEquals(SparkAppHandle.State.UNKNOWN, _handle.getState());
});

InProcessTestApp.LOCK.wait(5000);
}

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
Expand Down Expand Up @@ -193,10 +205,26 @@ public static void main(String[] args) throws Exception {

public static class InProcessTestApp {

/**
* SPARK-23020: there's a race caused by a child app finishing too quickly. This would cause
* the InProcessAppHandle to dispose of itself even before the child connection was properly
* established, so no state changes would be detected for the application and its final
* state would be LOST.
*
* It's not really possible to fix that race safely in the handle code itself without changing
* the way in-process apps talk to the launcher library, so we work around that in the test by
* synchronizing on this object.
*/
public static final Object LOCK = new Object();

public static void main(String[] args) throws Exception {
assertNotEquals(0, args.length);
assertEquals(args[0], "hello");
new SparkContext().stop();

synchronized (LOCK) {
LOCK.notifyAll();
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@
package org.apache.spark.launcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

abstract class AbstractAppHandle implements SparkAppHandle {

private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
private static final Logger LOG = Logger.getLogger(AbstractAppHandle.class.getName());

private final LauncherServer server;

private LauncherServer.ServerConnection connection;
private List<Listener> listeners;
private AtomicReference<State> state;
private String appId;
private volatile String appId;
private volatile boolean disposed;

protected AbstractAppHandle(LauncherServer server) {
Expand All @@ -42,9 +42,9 @@ protected AbstractAppHandle(LauncherServer server) {
}

@Override
public synchronized void addListener(Listener l) {
public void addListener(Listener l) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove synchronized here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should add this one back.

if (listeners == null) {
listeners = new ArrayList<>();
listeners = new CopyOnWriteArrayList<>();
}
listeners.add(l);
}
Expand All @@ -71,16 +71,14 @@ public void stop() {

@Override
public synchronized void disconnect() {
if (!isDisposed()) {
if (connection != null) {
try {
connection.closeAndWait();
} catch (IOException ioe) {
// no-op.
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
dispose();
}
dispose();
}

void setConnection(LauncherServer.ServerConnection connection) {
Expand All @@ -100,7 +98,18 @@ boolean isDisposed() {
*/
synchronized void dispose() {
if (!isDisposed()) {
// First wait for all data from the connection to be read. Then unregister the handle.
// Otherwise, unregistering might cause the server to be stopped and all child connections
// to be closed.
if (connection != null) {
try {
connection.waitForClose();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);

// Set state to LOST if not yet final.
setState(State.LOST, false);
this.disposed = true;
Expand All @@ -127,11 +136,13 @@ void setState(State s, boolean force) {
current = state.get();
}

LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { current, s });
if (s != State.LOST) {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { current, s });
}
}

synchronized void setAppId(String appId) {
void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void monitorChild() {
}
}

disconnect();
dispose();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more document to disconnect and dispose? So that people can understand the difference between them clearly and have a better understanding of changes like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disconnect() is actually a public method and already documented in the SparkAppHandle interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm still not able to figure out what's the difference between them after reading the doc, do you mind leave a short description here? thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the documentation for dispose.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ synchronized void start(String appName, Method main, String[] args) {
setState(State.FAILED);
}

disconnect();
dispose();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we call disconnect here, we would close the connection, and then wait the close to finish in dispose. If we call dispose directly, we also close and wait the connection(in waitForClose). What the actual difference here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order in which the connection is closed. waitForClose will wait for the connection to be closed by the remote side (the finished app) before closing it itself, like disconnect does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see

});

app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void run() {
};
ServerConnection clientConnection = new ServerConnection(client, timeout);
Thread clientThread = factory.newThread(clientConnection);
clientConnection.setConnectionThread(clientThread);
synchronized (clients) {
clients.add(clientConnection);
}
Expand Down Expand Up @@ -290,17 +291,15 @@ class ServerConnection extends LauncherConnection {

private TimerTask timeout;
private volatile Thread connectionThread;
volatile AbstractAppHandle handle;
private volatile AbstractAppHandle handle;

ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket);
this.timeout = timeout;
}

@Override
public void run() {
this.connectionThread = Thread.currentThread();
super.run();
void setConnectionThread(Thread t) {
this.connectionThread = t;
}

@Override
Expand Down Expand Up @@ -363,17 +362,28 @@ public void close() throws IOException {
/**
* Close the connection and wait for any buffered data to be processed before returning.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the document, shall we still call it closeAndWait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That wouldn't be accurate anymore, because the wait happens first now. waitAndClose() is an option but also not totally accurate. Open to suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we update the document?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think I may have thought of a somewhat simple way to fix the race without needing the workaround in the test. Let me try that. If that doesn't work I'll update the javadoc.

Copy link
Contributor Author

@vanzin vanzin Feb 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was getting too complicated for a fix to a rare race, so I'll just update the doc here and leave that race for another time.

* This ensures any changes reported by the child application take effect.
*
* This method allows a short period for the connection thread to finish by itself (same amount
* of time as the connection timeout, which is configurable). This should be fine for
* well-behaved applications, where they close the connection when the app handle detects the
* app has finished.
*
* In case the connection is not closed within the grace period, this method forcefully closes
* it and any subsequent data that may arrive will be ignored.
*/
public void closeAndWait() throws IOException {
close();

public void waitForClose() throws IOException {
Thread connThread = this.connectionThread;
if (Thread.currentThread() != connThread) {
try {
connThread.join();
connThread.join(getConnectionTimeout());
} catch (InterruptedException ie) {
// Ignore.
}

if (connThread.isAlive()) {
LOG.log(Level.WARNING, "Timed out waiting for child connection to close.");
close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public void infoChanged(SparkAppHandle handle) {
Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS);
assertTrue(stopMsg instanceof Stop);
} finally {
handle.kill();
close(client);
handle.kill();
client.clientThread.join();
}
}
Expand Down