@@ -72,6 +72,8 @@ public class WorkerMultiplexer {
72
72
* {@code WorkerMultiplexer} gets discarded as well and a new one gets created as needed.
73
73
*/
74
74
private Subprocess process ;
75
+ /** The implementation of the worker protocol (JSON or Proto). */
76
+ private WorkerProtocolImpl workerProtocol ;
75
77
/** InputStream from the worker process. */
76
78
private RecordingInputStream recordingStream ;
77
79
/** True if this multiplexer was explicitly destroyed. */
@@ -142,6 +144,18 @@ public synchronized void createProcess(Path workDir) throws IOException {
142
144
processBuilder .setStderr (logFile .getPathFile ());
143
145
processBuilder .setEnv (workerKey .getEnv ());
144
146
this .process = processBuilder .start ();
147
+ recordingStream = new RecordingInputStream (process .getInputStream ());
148
+ recordingStream .startRecording (4096 );
149
+ if (workerProtocol == null ) {
150
+ switch (workerKey .getProtocolFormat ()) {
151
+ case JSON :
152
+ workerProtocol = new JsonWorkerProtocol (process .getOutputStream (), recordingStream );
153
+ break ;
154
+ case PROTO :
155
+ workerProtocol = new ProtoWorkerProtocol (process .getOutputStream (), recordingStream );
156
+ break ;
157
+ }
158
+ }
145
159
String id = workerKey .getMnemonic () + "-" + workerKey .hashCode ();
146
160
// TODO(larsrc): Consider moving sender/receiver threads into separate classes.
147
161
this .requestSender =
@@ -277,8 +291,7 @@ private boolean sendRequest() {
277
291
return false ;
278
292
}
279
293
try {
280
- request .writeDelimitedTo (process .getOutputStream ());
281
- process .getOutputStream ().flush ();
294
+ workerProtocol .putRequest (request );
282
295
} catch (IOException e ) {
283
296
// We can't know how much of the request was sent, so we have to assume the worker's input
284
297
// now contains garbage, and this request is lost.
@@ -303,11 +316,9 @@ private boolean sendRequest() {
303
316
* execution cancellation, but only by a call to {@link #destroyProcess()}.
304
317
*/
305
318
private boolean readResponse () {
306
- recordingStream = new RecordingInputStream (process .getInputStream ());
307
- recordingStream .startRecording (4096 );
308
319
WorkResponse parsedResponse ;
309
320
try {
310
- parsedResponse = WorkResponse . parseDelimitedFrom ( recordingStream );
321
+ parsedResponse = workerProtocol . getResponse ( );
311
322
} catch (IOException e ) {
312
323
if (!(e instanceof InterruptedIOException )) {
313
324
report (
@@ -320,7 +331,8 @@ private boolean readResponse() {
320
331
destroyProcess ();
321
332
return false ;
322
333
}
323
- // A null parsedResponse can happen if the input stream is closed, in which case we
334
+
335
+ // A null parsedResponse can only happen if the input stream is closed, in which case we
324
336
// drop everything.
325
337
if (parsedResponse == null ) {
326
338
report (
0 commit comments