Skip to content

Commit 1a519bb

Browse files
larsrc-googlecopybara-github
authored andcommitted
Makes singleplex requests be handled in separate threads in WorkRequestHandler.
Preparation for cancellation. RELNOTES: None. PiperOrigin-RevId: 372134373
1 parent ebe3a40 commit 1a519bb

File tree

3 files changed

+86
-26
lines changed

3 files changed

+86
-26
lines changed

src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java

+80-21
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424
import java.io.StringWriter;
2525
import java.lang.management.ManagementFactory;
2626
import java.time.Duration;
27+
import java.util.ArrayDeque;
2728
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Optional;
31+
import java.util.Queue;
32+
import java.util.concurrent.ConcurrentHashMap;
2833
import java.util.concurrent.atomic.AtomicReference;
2934
import java.util.function.BiFunction;
3035

@@ -34,22 +39,47 @@
3439
* (https://docs.bazel.build/versions/master/multiplex-worker.html).
3540
*/
3641
public class WorkRequestHandler implements AutoCloseable {
37-
3842
/** Contains the logic for reading {@link WorkRequest}s and writing {@link WorkResponse}s. */
3943
public interface WorkerMessageProcessor {
4044
/** Reads the next incoming request from this worker's stdin. */
41-
public WorkRequest readWorkRequest() throws IOException;
45+
WorkRequest readWorkRequest() throws IOException;
4246

4347
/**
4448
* Writes the provided {@link WorkResponse} to this worker's stdout. This function is also
4549
* responsible for flushing the stdout.
4650
*/
47-
public void writeWorkResponse(WorkResponse workResponse) throws IOException;
51+
void writeWorkResponse(WorkResponse workResponse) throws IOException;
4852

4953
/** Clean up. */
50-
public void close() throws IOException;
54+
void close() throws IOException;
55+
}
56+
57+
/** Holds information necessary to properly handle a request, especially for cancellation. */
58+
static class RequestInfo {
59+
/**
60+
* The builder for the response to this request. Since only one response must be sent per
61+
* request, this builder must be accessed through takeBuilder(), which zeroes this field and
62+
* returns the builder.
63+
*/
64+
private WorkResponse.Builder responseBuilder = WorkResponse.newBuilder();
65+
66+
/**
67+
* Returns the response builder. If called more than once on the same instance, subsequent calls
68+
* will return {@code null}.
69+
*/
70+
synchronized Optional<WorkResponse.Builder> takeBuilder() {
71+
WorkResponse.Builder b = responseBuilder;
72+
responseBuilder = null;
73+
return Optional.ofNullable(b);
74+
}
5175
}
5276

77+
/** Requests that are currently being processed. Visible for testing. */
78+
final Map<Integer, RequestInfo> activeRequests = new ConcurrentHashMap<>();
79+
80+
/** WorkRequests that have been received but could not be processed yet. */
81+
private final Queue<WorkRequest> availableRequests = new ArrayDeque<>();
82+
5383
/** The function to be called after each {@link WorkRequest} is read. */
5484
private final BiFunction<List<String>, PrintWriter, Integer> callback;
5585

@@ -58,6 +88,7 @@ public interface WorkerMessageProcessor {
5888

5989
final WorkerMessageProcessor messageProcessor;
6090

91+
6192
private final CpuTimeBasedGcScheduler gcScheduler;
6293

6394
/**
@@ -160,34 +191,61 @@ public void processRequests() throws IOException {
160191
if (request == null) {
161192
break;
162193
}
163-
if (request.getRequestId() != 0) {
164-
Thread t = createResponseThread(request);
165-
t.start();
166-
} else {
167-
respondToRequest(request);
194+
availableRequests.add(request);
195+
startRequestThreads();
196+
}
197+
}
198+
199+
/**
200+
* Starts threads for as many outstanding requests as possible. This is the only method that adds
201+
* to {@code activeRequests}.
202+
*/
203+
private synchronized void startRequestThreads() {
204+
while (!availableRequests.isEmpty()) {
205+
// If there's a singleplex request in process, don't start more processes.
206+
if (activeRequests.containsKey(0)) {
207+
return;
168208
}
209+
WorkRequest request = availableRequests.peek();
210+
// Don't start new singleplex requests if there are other requests running.
211+
if (request.getRequestId() == 0 && !activeRequests.isEmpty()) {
212+
return;
213+
}
214+
availableRequests.remove();
215+
Thread t = createResponseThread(request);
216+
activeRequests.put(request.getRequestId(), new RequestInfo());
217+
t.start();
169218
}
170219
}
171220

172221
/** Creates a new {@link Thread} to process a multiplex request. */
173-
public Thread createResponseThread(WorkRequest request) {
222+
Thread createResponseThread(WorkRequest request) {
174223
Thread currentThread = Thread.currentThread();
224+
String threadName =
225+
request.getRequestId() > 0
226+
? "multiplex-request-" + request.getRequestId()
227+
: "singleplex-request";
175228
return new Thread(
176229
() -> {
230+
RequestInfo requestInfo = activeRequests.get(request.getRequestId());
177231
try {
178-
respondToRequest(request);
232+
respondToRequest(request, requestInfo);
179233
} catch (IOException e) {
180234
e.printStackTrace(stderr);
181235
// In case of error, shut down the entire worker.
182236
currentThread.interrupt();
237+
} finally {
238+
activeRequests.remove(request.getRequestId());
239+
// A good time to start more requests, especially if we finished a singleplex request
240+
startRequestThreads();
183241
}
184242
},
185-
"multiplex-request-" + request.getRequestId());
243+
threadName);
186244
}
187245

188246
/** Handles and responds to the given {@link WorkRequest}. */
189247
@VisibleForTesting
190-
void respondToRequest(WorkRequest request) throws IOException {
248+
void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOException {
191249
try (StringWriter sw = new StringWriter();
192250
PrintWriter pw = new PrintWriter(sw)) {
193251
int exitCode;
@@ -198,14 +256,15 @@ void respondToRequest(WorkRequest request) throws IOException {
198256
exitCode = 1;
199257
}
200258
pw.flush();
201-
WorkResponse workResponse =
202-
WorkResponse.newBuilder()
203-
.setOutput(sw.toString())
204-
.setExitCode(exitCode)
205-
.setRequestId(request.getRequestId())
206-
.build();
207-
synchronized (this) {
208-
messageProcessor.writeWorkResponse(workResponse);
259+
Optional<WorkResponse.Builder> optBuilder = requestInfo.takeBuilder();
260+
if (optBuilder.isPresent()) {
261+
WorkResponse.Builder builder = optBuilder.get();
262+
builder.setRequestId(request.getRequestId());
263+
builder.setOutput(builder.getOutput() + sw.toString()).setExitCode(exitCode);
264+
WorkResponse response = builder.build();
265+
synchronized (this) {
266+
messageProcessor.writeWorkResponse(response);
267+
}
209268
}
210269
gcScheduler.maybePerformGc();
211270
}

src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void processRequests() throws IOException {
9898
Thread t = createResponseThread(request);
9999
t.start();
100100
} else {
101-
respondToRequest(request);
101+
respondToRequest(request, new RequestInfo());
102102
}
103103
if (workerOptions.exitAfter > 0 && workUnitCounter > workerOptions.exitAfter) {
104104
System.exit(0);

src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import static com.google.common.truth.Truth.assertThat;
1818

19+
import com.google.devtools.build.lib.worker.WorkRequestHandler.RequestInfo;
1920
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
2021
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
2122
import java.io.ByteArrayInputStream;
@@ -50,7 +51,7 @@ public void testNormalWorkRequest() throws IOException {
5051

5152
List<String> args = Arrays.asList("--sources", "A.java");
5253
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
53-
handler.respondToRequest(request);
54+
handler.respondToRequest(request, new RequestInfo());
5455

5556
WorkResponse response =
5657
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
@@ -70,7 +71,7 @@ public void testMultiplexWorkRequest() throws IOException {
7071

7172
List<String> args = Arrays.asList("--sources", "A.java");
7273
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).setRequestId(42).build();
73-
handler.respondToRequest(request);
74+
handler.respondToRequest(request, new RequestInfo());
7475

7576
WorkResponse response =
7677
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
@@ -93,7 +94,7 @@ public void testOutput() throws IOException {
9394

9495
List<String> args = Arrays.asList("--sources", "A.java");
9596
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
96-
handler.respondToRequest(request);
97+
handler.respondToRequest(request, new RequestInfo());
9798

9899
WorkResponse response =
99100
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
@@ -115,7 +116,7 @@ public void testException() throws IOException {
115116

116117
List<String> args = Arrays.asList("--sources", "A.java");
117118
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
118-
handler.respondToRequest(request);
119+
handler.respondToRequest(request, new RequestInfo());
119120

120121
WorkResponse response =
121122
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));

0 commit comments

Comments
 (0)