Skip to content

Commit 003cfcd

Browse files
larsrc-googlecopybara-github
authored andcommitted
Allow use of JSON protocol in multiplex workers.
RELNOTES: Multiplex persistent workers can now use the JSON protocol. PiperOrigin-RevId: 352415016
1 parent 80c03ef commit 003cfcd

File tree

1 file changed

+18
-6
lines changed

1 file changed

+18
-6
lines changed

src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public class WorkerMultiplexer {
7272
* {@code WorkerMultiplexer} gets discarded as well and a new one gets created as needed.
7373
*/
7474
private Subprocess process;
75+
/** The implementation of the worker protocol (JSON or Proto). */
76+
private WorkerProtocolImpl workerProtocol;
7577
/** InputStream from the worker process. */
7678
private RecordingInputStream recordingStream;
7779
/** True if this multiplexer was explicitly destroyed. */
@@ -142,6 +144,18 @@ public synchronized void createProcess(Path workDir) throws IOException {
142144
processBuilder.setStderr(logFile.getPathFile());
143145
processBuilder.setEnv(workerKey.getEnv());
144146
this.process = processBuilder.start();
147+
recordingStream = new RecordingInputStream(process.getInputStream());
148+
recordingStream.startRecording(4096);
149+
if (workerProtocol == null) {
150+
switch (workerKey.getProtocolFormat()) {
151+
case JSON:
152+
workerProtocol = new JsonWorkerProtocol(process.getOutputStream(), recordingStream);
153+
break;
154+
case PROTO:
155+
workerProtocol = new ProtoWorkerProtocol(process.getOutputStream(), recordingStream);
156+
break;
157+
}
158+
}
145159
String id = workerKey.getMnemonic() + "-" + workerKey.hashCode();
146160
// TODO(larsrc): Consider moving sender/receiver threads into separate classes.
147161
this.requestSender =
@@ -277,8 +291,7 @@ private boolean sendRequest() {
277291
return false;
278292
}
279293
try {
280-
request.writeDelimitedTo(process.getOutputStream());
281-
process.getOutputStream().flush();
294+
workerProtocol.putRequest(request);
282295
} catch (IOException e) {
283296
// We can't know how much of the request was sent, so we have to assume the worker's input
284297
// now contains garbage, and this request is lost.
@@ -303,11 +316,9 @@ private boolean sendRequest() {
303316
* execution cancellation, but only by a call to {@link #destroyProcess()}.
304317
*/
305318
private boolean readResponse() {
306-
recordingStream = new RecordingInputStream(process.getInputStream());
307-
recordingStream.startRecording(4096);
308319
WorkResponse parsedResponse;
309320
try {
310-
parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream);
321+
parsedResponse = workerProtocol.getResponse();
311322
} catch (IOException e) {
312323
if (!(e instanceof InterruptedIOException)) {
313324
report(
@@ -320,7 +331,8 @@ private boolean readResponse() {
320331
destroyProcess();
321332
return false;
322333
}
323-
// A null parsedResponse can happen if the input stream is closed, in which case we
334+
335+
// A null parsedResponse can only happen if the input stream is closed, in which case we
324336
// drop everything.
325337
if (parsedResponse == null) {
326338
report(

0 commit comments

Comments
 (0)