Skip to content

Commit ee6e797

Browse files
Marcelo Vanzincloud-fan
authored andcommitted
[SPARK-23020][CORE][BRANCH-2.3] Fix another race in the in-process launcher test.
First the bad news: there's an unfixable race in the launcher code. (By unfixable I mean it would take a lot more effort than this change to fix it.) The good news is that it should only affect super short lived applications, such as the one run by the flaky test, so it's possible to work around it in our test. The fix also uncovered an issue with the recently added "closeAndWait()" method; closing the connection would still possibly cause data loss, so this change waits a while for the connection to finish itself, and closes the socket if that times out. The existing connection timeout is reused so that if desired it's possible to control how long to wait. As part of that I also restored the old behavior that disconnect() would force a disconnection from the child app; the "wait for data to arrive" approach is only taken when disposing of the handle. I tested this by inserting a bunch of sleeps in the test and the socket handling code in the launcher library; with those I was able to reproduce the error from the jenkins jobs. With the changes, even with all the sleeps still in place, all tests pass. Author: Marcelo Vanzin <[email protected]> Closes #20743 from vanzin/SPARK-23020.
1 parent 8cd6a96 commit ee6e797

File tree

6 files changed

+88
-37
lines changed

6 files changed

+88
-37
lines changed

core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Map;
2626
import java.util.Properties;
2727

28-
import org.junit.Ignore;
2928
import org.junit.Test;
3029
import static org.junit.Assert.*;
3130
import static org.junit.Assume.*;
@@ -122,8 +121,7 @@ public void testChildProcLauncher() throws Exception {
122121
assertEquals(0, app.waitFor());
123122
}
124123

125-
// TODO: [SPARK-23020] Re-enable this
126-
@Ignore
124+
@Test
127125
public void testInProcessLauncher() throws Exception {
128126
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
129127
// properties, and that can cause test failures down the test pipeline. So restore the original
@@ -159,12 +157,24 @@ private void inProcessLauncherTestImpl() throws Exception {
159157

160158
SparkAppHandle handle = null;
161159
try {
162-
handle = new InProcessLauncher()
163-
.setMaster("local")
164-
.setAppResource(SparkLauncher.NO_RESOURCE)
165-
.setMainClass(InProcessTestApp.class.getName())
166-
.addAppArgs("hello")
167-
.startApplication(listener);
160+
synchronized (InProcessTestApp.LOCK) {
161+
handle = new InProcessLauncher()
162+
.setMaster("local")
163+
.setAppResource(SparkLauncher.NO_RESOURCE)
164+
.setMainClass(InProcessTestApp.class.getName())
165+
.addAppArgs("hello")
166+
.startApplication(listener);
167+
168+
// SPARK-23020: see doc for InProcessTestApp.LOCK for a description of the race. Here
169+
// we wait until we know that the connection between the app and the launcher has been
170+
// established before allowing the app to finish.
171+
final SparkAppHandle _handle = handle;
172+
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
173+
assertNotEquals(SparkAppHandle.State.UNKNOWN, _handle.getState());
174+
});
175+
176+
InProcessTestApp.LOCK.wait(5000);
177+
}
168178

169179
waitFor(handle);
170180
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
@@ -195,10 +205,26 @@ public static void main(String[] args) throws Exception {
195205

196206
public static class InProcessTestApp {
197207

208+
/**
209+
* SPARK-23020: there's a race caused by a child app finishing too quickly. This would cause
210+
* the InProcessAppHandle to dispose of itself even before the child connection was properly
211+
* established, so no state changes would be detected for the application and its final
212+
* state would be LOST.
213+
*
214+
* It's not really possible to fix that race safely in the handle code itself without changing
215+
* the way in-process apps talk to the launcher library, so we work around that in the test by
216+
* synchronizing on this object.
217+
*/
218+
public static final Object LOCK = new Object();
219+
198220
public static void main(String[] args) throws Exception {
199221
assertNotEquals(0, args.length);
200222
assertEquals(args[0], "hello");
201223
new SparkContext().stop();
224+
225+
synchronized (LOCK) {
226+
LOCK.notifyAll();
227+
}
202228
}
203229

204230
}

launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@
1818
package org.apache.spark.launcher;
1919

2020
import java.io.IOException;
21-
import java.util.ArrayList;
2221
import java.util.List;
22+
import java.util.concurrent.CopyOnWriteArrayList;
2323
import java.util.concurrent.atomic.AtomicReference;
2424
import java.util.logging.Level;
2525
import java.util.logging.Logger;
2626

2727
abstract class AbstractAppHandle implements SparkAppHandle {
2828

29-
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
29+
private static final Logger LOG = Logger.getLogger(AbstractAppHandle.class.getName());
3030

3131
private final LauncherServer server;
3232

3333
private LauncherServer.ServerConnection connection;
3434
private List<Listener> listeners;
3535
private AtomicReference<State> state;
36-
private String appId;
36+
private volatile String appId;
3737
private volatile boolean disposed;
3838

3939
protected AbstractAppHandle(LauncherServer server) {
@@ -44,7 +44,7 @@ protected AbstractAppHandle(LauncherServer server) {
4444
@Override
4545
public synchronized void addListener(Listener l) {
4646
if (listeners == null) {
47-
listeners = new ArrayList<>();
47+
listeners = new CopyOnWriteArrayList<>();
4848
}
4949
listeners.add(l);
5050
}
@@ -71,16 +71,14 @@ public void stop() {
7171

7272
@Override
7373
public synchronized void disconnect() {
74-
if (!isDisposed()) {
75-
if (connection != null) {
76-
try {
77-
connection.closeAndWait();
78-
} catch (IOException ioe) {
79-
// no-op.
80-
}
74+
if (connection != null && connection.isOpen()) {
75+
try {
76+
connection.close();
77+
} catch (IOException ioe) {
78+
// no-op.
8179
}
82-
dispose();
8380
}
81+
dispose();
8482
}
8583

8684
void setConnection(LauncherServer.ServerConnection connection) {
@@ -97,10 +95,25 @@ boolean isDisposed() {
9795

9896
/**
9997
* Mark the handle as disposed, and set it as LOST in case the current state is not final.
98+
*
99+
* This method should be called only when there's a reasonable expectation that the communication
100+
* with the child application is not needed anymore, either because the code managing the handle
101+
* has said so, or because the child application is finished.
100102
*/
101103
synchronized void dispose() {
102104
if (!isDisposed()) {
105+
// First wait for all data from the connection to be read. Then unregister the handle.
106+
// Otherwise, unregistering might cause the server to be stopped and all child connections
107+
// to be closed.
108+
if (connection != null) {
109+
try {
110+
connection.waitForClose();
111+
} catch (IOException ioe) {
112+
// no-op.
113+
}
114+
}
103115
server.unregister(this);
116+
104117
// Set state to LOST if not yet final.
105118
setState(State.LOST, false);
106119
this.disposed = true;
@@ -127,11 +140,13 @@ void setState(State s, boolean force) {
127140
current = state.get();
128141
}
129142

130-
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
131-
new Object[] { current, s });
143+
if (s != State.LOST) {
144+
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
145+
new Object[] { current, s });
146+
}
132147
}
133148

134-
synchronized void setAppId(String appId) {
149+
void setAppId(String appId) {
135150
this.appId = appId;
136151
fireEvent(true);
137152
}

launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ void monitorChild() {
112112
}
113113
}
114114

115-
disconnect();
115+
dispose();
116116
}
117117
}
118118

launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ synchronized void start(String appName, Method main, String[] args) {
6666
setState(State.FAILED);
6767
}
6868

69-
disconnect();
69+
dispose();
7070
});
7171

7272
app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));

launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ public void run() {
238238
};
239239
ServerConnection clientConnection = new ServerConnection(client, timeout);
240240
Thread clientThread = factory.newThread(clientConnection);
241+
clientConnection.setConnectionThread(clientThread);
241242
synchronized (clients) {
242243
clients.add(clientConnection);
243244
}
@@ -290,17 +291,15 @@ class ServerConnection extends LauncherConnection {
290291

291292
private TimerTask timeout;
292293
private volatile Thread connectionThread;
293-
volatile AbstractAppHandle handle;
294+
private volatile AbstractAppHandle handle;
294295

295296
ServerConnection(Socket socket, TimerTask timeout) throws IOException {
296297
super(socket);
297298
this.timeout = timeout;
298299
}
299300

300-
@Override
301-
public void run() {
302-
this.connectionThread = Thread.currentThread();
303-
super.run();
301+
void setConnectionThread(Thread t) {
302+
this.connectionThread = t;
304303
}
305304

306305
@Override
@@ -361,19 +360,30 @@ public void close() throws IOException {
361360
}
362361

363362
/**
364-
* Close the connection and wait for any buffered data to be processed before returning.
363+
* Wait for the remote side to close the connection so that any pending data is processed.
365364
* This ensures any changes reported by the child application take effect.
365+
*
366+
* This method allows a short period for the above to happen (same amount of time as the
367+
* connection timeout, which is configurable). This should be fine for well-behaved
368+
* applications, where they close the connection arond the same time the app handle detects the
369+
* app has finished.
370+
*
371+
* In case the connection is not closed within the grace period, this method forcefully closes
372+
* it and any subsequent data that may arrive will be ignored.
366373
*/
367-
public void closeAndWait() throws IOException {
368-
close();
369-
374+
public void waitForClose() throws IOException {
370375
Thread connThread = this.connectionThread;
371376
if (Thread.currentThread() != connThread) {
372377
try {
373-
connThread.join();
378+
connThread.join(getConnectionTimeout());
374379
} catch (InterruptedException ie) {
375380
// Ignore.
376381
}
382+
383+
if (connThread.isAlive()) {
384+
LOG.log(Level.WARNING, "Timed out waiting for child connection to close.");
385+
close();
386+
}
377387
}
378388
}
379389

launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ public void infoChanged(SparkAppHandle handle) {
9494
Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS);
9595
assertTrue(stopMsg instanceof Stop);
9696
} finally {
97-
handle.kill();
9897
close(client);
98+
handle.kill();
9999
client.clientThread.join();
100100
}
101101
}

0 commit comments

Comments
 (0)