Skip to content

Commit f4b5e02

Browse files
larsrc-googlecopybara-github
authored andcommitted
Let workers finish lost races without delaying dynamic execution.
If a worker is blocked on reading a response, it doesn't listen for interrupts. This changes blocking on reading to blocking on a sleep-loop, and if interrupted, the worker gets to finish in a separate thread before returning to the pool. This lets the action finish immediately. RELNOTES: None. PiperOrigin-RevId: 368207735
1 parent dfe2a31 commit f4b5e02

File tree

5 files changed

+73
-23
lines changed

5 files changed

+73
-23
lines changed

src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java

+6-18
Original file line numberDiff line numberDiff line change
@@ -120,24 +120,12 @@ void putRequest(WorkRequest request) throws IOException {
120120
@Override
121121
WorkResponse getResponse(int requestId) throws IOException, InterruptedException {
122122
recordingInputStream.startRecording(4096);
123-
// Ironically, we don't allow interrupts during dynamic execution, since we can't cancel
124-
// the worker short of destroying it.
125-
if (!workerKey.isSpeculative()) {
126-
while (recordingInputStream.available() == 0) {
127-
try {
128-
Thread.sleep(10);
129-
} catch (InterruptedException e) {
130-
// This should only happen when not in dynamic execution, so we can safely kill the
131-
// worker.
132-
destroy();
133-
throw e;
134-
}
135-
if (!process.isAlive()) {
136-
throw new IOException(
137-
String.format(
138-
"Worker process for %s died while waiting for response",
139-
workerKey.getMnemonic()));
140-
}
123+
while (recordingInputStream.available() == 0) {
124+
Thread.sleep(10);
125+
if (!process.isAlive()) {
126+
throw new IOException(
127+
String.format(
128+
"Worker process for %s died while waiting for response", workerKey.getMnemonic()));
141129
}
142130
}
143131
return workerProtocol.getResponse();

src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java

+39
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,10 @@ WorkResponse execInWorker(
457457

458458
try {
459459
response = worker.getResponse(request.getRequestId());
460+
} catch (InterruptedException e) {
461+
finishWorkAsync(key, worker, request);
462+
worker = null;
463+
throw e;
460464
} catch (IOException e) {
461465
restoreInterrupt(e);
462466
// If protobuf or json reader couldn't parse the response, try to print whatever the
@@ -514,6 +518,41 @@ WorkResponse execInWorker(
514518
return response;
515519
}
516520

521+
/**
522+
* Starts a thread to collect the response from a worker when it's no longer of interest.
523+
*
524+
* <p>This can happen either when we lost the race in dynamic execution or the build got
525+
* interrupted. This takes ownership of the worker for purposes of returning it to the worker
526+
* pool.
527+
*/
528+
private void finishWorkAsync(WorkerKey key, Worker worker, WorkRequest request) {
529+
Thread reaper =
530+
new Thread(
531+
() -> {
532+
Worker w = worker;
533+
try {
534+
w.getResponse(request.getRequestId());
535+
} catch (IOException | InterruptedException e1) {
536+
// If this happens, we either can't trust the output of the worker, or we got
537+
// interrupted while handling being interrupted. In the latter case, let's stop
538+
// trying and just destroy the worker. If it's a singleplex worker, there will
539+
// be a dangling response that we don't want to keep trying to read, so we destroy
540+
// the worker.
541+
try {
542+
workers.invalidateObject(key, w);
543+
w = null;
544+
} catch (IOException | InterruptedException e2) {
545+
// The reaper thread can't do anything useful about this.
546+
}
547+
} finally {
548+
if (w != null) {
549+
workers.returnObject(key, w);
550+
}
551+
}
552+
});
553+
reaper.start();
554+
}
555+
517556
private static void restoreInterrupt(IOException e) {
518557
if (e instanceof InterruptedIOException) {
519558
Thread.currentThread().interrupt();

src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java

+20
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@
4242
import java.util.Map;
4343
import java.util.Random;
4444
import java.util.UUID;
45+
import java.util.concurrent.Semaphore;
4546
import java.util.function.BiFunction;
4647
import java.util.regex.Matcher;
4748
import java.util.regex.Pattern;
49+
import sun.misc.Signal;
50+
import sun.misc.SignalHandler;
4851

4952
/** An example implementation of a worker process that is used for integration tests. */
5053
public final class ExampleWorker {
@@ -144,6 +147,23 @@ private static int doWork(List<String> args, PrintWriter err) {
144147
PrintStream originalStdOut = System.out;
145148
PrintStream originalStdErr = System.err;
146149

150+
if (workerOptions.waitForSignal) {
151+
Semaphore signalSem = new Semaphore(0);
152+
Signal.handle(
153+
new Signal("HUP"),
154+
new SignalHandler() {
155+
@Override
156+
public void handle(Signal sig) {
157+
signalSem.release();
158+
}
159+
});
160+
try {
161+
signalSem.acquire();
162+
} catch (InterruptedException e) {
163+
System.out.println("Interrupted while waiting for signal");
164+
e.printStackTrace();
165+
}
166+
}
147167
try (PrintStream ps = new PrintStream(baos)) {
148168
System.setOut(ps);
149169
System.setErr(ps);

src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java

+8
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ public static class ExampleWorkOptions extends OptionsBase {
135135
)
136136
public boolean hardPoison;
137137

138+
@Option(
139+
name = "wait_for_signal",
140+
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
141+
effectTags = {OptionEffectTag.NO_OP},
142+
defaultValue = "false",
143+
help = "Don't send a response until receiving a SIGXXXX.")
144+
public boolean waitForSignal;
145+
138146
/** Enum converter for --worker_protocol. */
139147
public static class WorkerProtocolEnumConverter
140148
extends EnumConverter<ExecutionRequirements.WorkerProtocolFormat> {

src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java

-5
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,6 @@ private void verifyGetResponseFailure(String responseString, String expectedErro
174174
assertThat(ex).hasMessageThat().contains(expectedError);
175175
}
176176

177-
@Test
178-
public void testGetResponse_json_emptyString_throws() throws IOException {
179-
verifyGetResponseFailure("", "Could not parse json work request correctly");
180-
}
181-
182177
@Test
183178
public void testGetResponse_badJson_throws() throws IOException {
184179
verifyGetResponseFailure(

0 commit comments

Comments
 (0)