From a607d9dc70ac67f1aa2c32ca954177f9c77860be Mon Sep 17 00:00:00 2001 From: larsrc Date: Wed, 13 Jan 2021 09:50:22 -0800 Subject: [PATCH] Never create more than one process per WorkerMultiplexer. Turns out the ability to re-create a process makes everything complicated. Instead, just let the WorkerMultiplexer instance fall to the floor and create a new one as needed. Also restores interrupts in more places, handles some non-io-exceptions better in WorkerSpawnRunner, checks a few more edge cases around the multiplexer, makes the multiplexer try not to get interrupted during actual read, avoids creating unnecessary WorkerMultiplexer garbage, removes shutdownhooks on workerproxy destruction, sets the multiplexer reporter earlier, and improves some error messages. RELNOTES: n/a PiperOrigin-RevId: 351606949 --- .../build/lib/worker/SingleplexWorker.java | 5 + .../build/lib/worker/WorkerFactory.java | 19 ++- .../build/lib/worker/WorkerModule.java | 3 +- .../build/lib/worker/WorkerMultiplexer.java | 108 ++++++++---------- .../lib/worker/WorkerMultiplexerManager.java | 99 ++++++++-------- .../build/lib/worker/WorkerProxy.java | 17 ++- .../build/lib/worker/WorkerSpawnRunner.java | 18 ++- .../lib/worker/WorkerMultiplexerTest.java | 14 +-- 8 files changed, 142 insertions(+), 141 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java index bc325107d00b8c..a8684d1c229931 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java +++ b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java @@ -163,4 +163,9 @@ String getRecordingStreamMessage() { recordingInputStream.readRemaining(); return recordingInputStream.getRecordedDataAsString(); } + + @Override + public String toString() { + return workerKey.getMnemonic() + " worker #" + workerId; + } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java index 182c0c4b0263b8..337b18b84446ec 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java @@ -53,7 +53,7 @@ public void setOptions(WorkerOptions workerOptions) { } @Override - public Worker create(WorkerKey key) throws Exception { + public Worker create(WorkerKey key) { int workerId = pidCounter.getAndIncrement(); String workTypeName = WorkerKey.makeWorkerTypeName(key.getProxied()); Path logFile = @@ -66,9 +66,7 @@ public Worker create(WorkerKey key) throws Exception { worker = new SandboxedWorker(key, workerId, workDir, logFile); } else if (key.getProxied()) { WorkerMultiplexer workerMultiplexer = WorkerMultiplexerManager.getInstance(key, logFile); - worker = - new WorkerProxy( - key, workerId, key.getExecRoot(), workerMultiplexer.getLogFile(), workerMultiplexer); + worker = new WorkerProxy(key, workerId, workerMultiplexer.getLogFile(), workerMultiplexer); } else { worker = new SingleplexWorker(key, workerId, key.getExecRoot(), logFile); } @@ -112,13 +110,12 @@ public PooledObject wrap(Worker worker) { @Override public void destroyObject(WorkerKey key, PooledObject p) throws Exception { if (workerOptions.workerVerbose) { + int workerId = p.getObject().getWorkerId(); reporter.handle( Event.info( String.format( "Destroying %s %s (id %d)", - key.getMnemonic(), - WorkerKey.makeWorkerTypeName(key.getProxied()), - p.getObject().getWorkerId()))); + key.getMnemonic(), WorkerKey.makeWorkerTypeName(key.getProxied()), workerId))); } p.getObject().destroy(); } @@ -161,10 +158,10 @@ public boolean validateObject(WorkerKey key, PooledObject p) { } return false; } - boolean hashMatches = - key.getWorkerFilesCombinedHash().equals(worker.getWorkerFilesCombinedHash()); + boolean filesChanged = + !key.getWorkerFilesCombinedHash().equals(worker.getWorkerFilesCombinedHash()); - if (workerOptions.workerVerbose && reporter != null && !hashMatches) { + if (workerOptions.workerVerbose && reporter != null && filesChanged) { StringBuilder msg = new StringBuilder(); msg.append( String.format( @@ -191,6 +188,6 @@ public boolean validateObject(WorkerKey key, PooledObject p) { reporter.handle(Event.warn(msg.toString())); } - return hashMatches; + return !filesChanged; } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java index 6ef71aca2bc3b3..1867372ab8f972 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java @@ -67,6 +67,7 @@ public Iterable> getCommandOptions(Command command) public void beforeCommand(CommandEnvironment env) { this.env = env; env.getEventBus().register(this); + WorkerMultiplexerManager.beforeCommand(env); } @Subscribe @@ -236,6 +237,6 @@ public void afterCommand() { if (this.workerFactory != null) { this.workerFactory.setReporter(null); } - WorkerMultiplexerManager.afterCommandCleanup(); + WorkerMultiplexerManager.afterCommand(); } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java index 858dbbe0af189a..1a669477b56bc5 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; +import javax.annotation.Nullable; /** * An intermediate worker that sends requests and receives responses from the worker processes. @@ -48,29 +49,23 @@ public class WorkerMultiplexer extends Thread { * A map of {@code WorkResponse}s received from the worker process. They are stored in this map * keyed by the request id until the corresponding {@code WorkerProxy} picks them up. */ - private final ConcurrentMap workerProcessResponse; + private final ConcurrentMap workerProcessResponse = + new ConcurrentHashMap<>(); /** * A map of semaphores corresponding to {@code WorkRequest}s. After sending the {@code * WorkRequest}, {@code WorkerProxy} will wait on a semaphore to be released. {@code * WorkerMultiplexer} is responsible for releasing the corresponding semaphore in order to signal * {@code WorkerProxy} that the {@code WorkerResponse} has been received. */ - private final ConcurrentMap responseChecker; - /** The worker process that this WorkerMultiplexer should be talking to. */ - private Subprocess process; + private final ConcurrentMap responseChecker = new ConcurrentHashMap<>(); /** - * Set to true if one of the worker processes returns an unparseable response, or for other - * reasons we can't properly handle the remaining responses. We then discard all the responses - * from other work requests and abort. + * The worker process that this WorkerMultiplexer should be talking to. This should only be set + * once, when creating a new process. If the process dies or its stdio streams get corrupted, the + * {@code WorkerMultiplexer} gets discarded as well and a new one gets created as needed. */ - private boolean isWorkerStreamCorrupted; + private Subprocess process; /** InputStream from the worker process. */ private RecordingInputStream recordingStream; - /** - * True if we have received EOF on the stream from the worker process. We then stop processing, - * and all workers still waiting for responses will fail. - */ - private boolean isWorkerStreamClosed; /** True if this multiplexer was explicitly destroyed. */ private boolean wasDestroyed; /** @@ -89,25 +84,20 @@ public class WorkerMultiplexer extends Thread { * The active Reporter object, non-null if {@code --worker_verbose} is set. This must be cleared * at the end of a command execution. */ - public EventHandler reporter; + private EventHandler reporter; WorkerMultiplexer(Path logFile, WorkerKey workerKey) { this.logFile = logFile; this.workerKey = workerKey; - responseChecker = new ConcurrentHashMap<>(); - workerProcessResponse = new ConcurrentHashMap<>(); - isWorkerStreamCorrupted = false; - isWorkerStreamClosed = false; - wasDestroyed = false; } /** Sets or clears the reporter for outputting verbose info. */ - void setReporter(EventHandler reporter) { + synchronized void setReporter(@Nullable EventHandler reporter) { this.reporter = reporter; } /** Reports a string to the user if reporting is enabled. */ - private void report(String s) { + private synchronized void report(String s) { EventHandler r = this.reporter; // Protect against race condition with setReporter(). if (r != null && s != null) { r.handle(Event.info(s)); @@ -119,8 +109,10 @@ private void report(String s) { * exist. Also makes sure this {@code WorkerMultiplexer} runs as a separate thread. */ public synchronized void createProcess(Path workDir) throws IOException { - // The process may have died in the meanwhile (e.g. between builds). - if (this.process == null || !this.process.isAlive()) { + if (this.process == null) { + if (this.wasDestroyed) { + throw new IOException("Multiplexer destroyed before created process"); + } ImmutableList args = workerKey.getArgs(); File executable = new File(args.get(0)); if (!executable.isAbsolute() && executable.getParent() != null) { @@ -128,8 +120,6 @@ public synchronized void createProcess(Path workDir) throws IOException { newArgs.set(0, new File(workDir.getPathFile(), newArgs.get(0)).getAbsolutePath()); args = ImmutableList.copyOf(newArgs); } - isWorkerStreamCorrupted = false; - isWorkerStreamClosed = false; SubprocessBuilder processBuilder = subprocessFactory != null ? new SubprocessBuilder(subprocessFactory) @@ -139,6 +129,8 @@ public synchronized void createProcess(Path workDir) throws IOException { processBuilder.setStderr(logFile.getPathFile()); processBuilder.setEnv(workerKey.getEnv()); this.process = processBuilder.start(); + } else if (!this.process.isAlive()) { + throw new IOException("Process is dead"); } if (!this.isAlive()) { this.start(); @@ -155,24 +147,24 @@ public Path getLogFile() { /** * Signals this object to destroy itself, including the worker process. The object might not be - * fully destroyed at the end of this call, but will terminate soon. + * fully destroyed at the end of this call, but will terminate soon. This is considered a + * deliberate destruction. */ public synchronized void destroyMultiplexer() { if (this.process != null) { - destroyProcess(this.process); - this.process = null; + destroyProcess(); } wasDestroyed = true; } /** Destroys the worker subprocess. This might block forever if the subprocess refuses to die. */ - private void destroyProcess(Subprocess process) { + private synchronized void destroyProcess() { boolean wasInterrupted = false; try { - process.destroy(); + this.process.destroy(); while (true) { try { - process.waitFor(); + this.process.waitFor(); return; } catch (InterruptedException ie) { wasInterrupted = true; @@ -183,7 +175,6 @@ private void destroyProcess(Subprocess process) { if (wasInterrupted) { Thread.currentThread().interrupt(); // preserve interrupted status } - isWorkerStreamClosed = true; } } @@ -200,10 +191,6 @@ public synchronized void putRequest(WorkRequest request) throws IOException { // We can't know how much of the request was sent, so we have to assume the worker's input // now contains garbage. // TODO(b/151767359): Avoid causing garbage! Maybe by sending in a separate thread? - isWorkerStreamCorrupted = true; - if (e instanceof InterruptedIOException) { - Thread.currentThread().interrupt(); - } responseChecker.remove(request.getRequestId()); throw e; } @@ -228,10 +215,8 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException { // Wait for the multiplexer to get our response and release this semaphore. The semaphore will // throw {@code InterruptedException} when the multiplexer is terminated. waitForResponse.acquire(); - report("Acquired response semaphore for " + requestId); WorkResponse workResponse = workerProcessResponse.get(requestId); - report("Response for " + requestId + " is " + workResponse); return workResponse; } finally { responseChecker.remove(requestId); @@ -247,25 +232,25 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException { * execution cancellation. */ private void waitResponse() throws InterruptedException, IOException { - Subprocess p = this.process; - if (p == null || !p.isAlive()) { - // Avoid busy-wait for a new process. + recordingStream = new RecordingInputStream(this.process.getInputStream()); + recordingStream.startRecording(4096); + // TODO(larsrc): Turn this into a loop that also sends requests. + // Allow interrupts while waiting for responses, without conflating it with I/O errors. + while (recordingStream.available() == 0) { + if (!this.process.isAlive()) { + throw new IOException( + String.format("Multiplexer process for %s is dead", workerKey.getMnemonic())); + } Thread.sleep(1); - return; } - recordingStream = new RecordingInputStream(p.getInputStream()); - recordingStream.startRecording(4096); WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream); // A null parsedResponse can only happen if the input stream is closed, in which case we // drop everything. if (parsedResponse == null) { - isWorkerStreamClosed = true; - report( + throw new IOException( String.format( - "Multiplexer process for %s has closed its output, aborting multiplexer", - workerKey.getMnemonic())); - return; + "Multiplexer process for %s died while reading response", workerKey.getMnemonic())); } int requestId = parsedResponse.getRequestId(); @@ -287,13 +272,15 @@ private void waitResponse() throws InterruptedException, IOException { /** The multiplexer thread that listens to the WorkResponse from worker process. */ @Override public void run() { - while (!isWorkerStreamClosed && !isWorkerStreamCorrupted) { + while (this.process.isAlive()) { try { waitResponse(); } catch (IOException e) { // We got this exception while reading from the worker's stdout. We can't trust the // output any more at that point. - isWorkerStreamCorrupted = true; + if (this.process.isAlive()) { + destroyProcess(); + } if (e instanceof InterruptedIOException) { report( String.format( @@ -315,17 +302,12 @@ public void run() { // will let fall on the floor, but we still want to leave the process running for the next // build. // TODO(b/151767359): Cancel all outstanding requests when cancellation is implemented. - releaseAllSemaphores(); + for (Semaphore semaphore : responseChecker.values()) { + semaphore.release(); + } } } - // If we get here, the worker process is either dead or corrupted. We could attempt to restart - // it, but the outstanding requests will have failed already. Until we have a way to signal - // transient failures, we have to just reject all further requests and make sure the process - // is really dead synchronized (this) { - if (process != null && process.isAlive()) { - destroyMultiplexer(); - } releaseAllSemaphores(); } } @@ -350,14 +332,14 @@ String getRecordingStreamMessage() { /** Returns true if this process has died for other reasons than a call to {@code #destroy()}. */ boolean diedUnexpectedly() { - Subprocess p = this.process; // Protects against this.process getting null. - return p != null && !p.isAlive() && !wasDestroyed; + return this.process != null && !this.process.isAlive() && !wasDestroyed; } /** Returns the exit value of multiplexer's process, if it has exited. */ Optional getExitValue() { - Subprocess p = this.process; // Protects against this.process getting null. - return p != null && !p.isAlive() ? Optional.of(p.exitValue()) : Optional.empty(); + return this.process != null && !this.process.isAlive() + ? Optional.of(this.process.exitValue()) + : Optional.empty(); } /** For testing only, to verify that maps are cleared after responses are reaped. */ diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java index 35565f965ea190..aefabf0a4306ee 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java @@ -16,13 +16,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.devtools.build.lib.actions.UserExecException; +import com.google.devtools.build.lib.events.EventHandler; +import com.google.devtools.build.lib.runtime.CommandEnvironment; import com.google.devtools.build.lib.server.FailureDetails; import com.google.devtools.build.lib.server.FailureDetails.FailureDetail; import com.google.devtools.build.lib.server.FailureDetails.Worker.Code; import com.google.devtools.build.lib.vfs.Path; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Semaphore; +import javax.annotation.Nullable; /** * A manager to instantiate and destroy multiplexers. There should only be one {@code @@ -36,9 +38,6 @@ public class WorkerMultiplexerManager { */ private static final Map multiplexerInstance = new HashMap<>(); - /** A semaphore to protect {@code multiplexerInstance} and {@code multiplexerRefCount} objects. */ - private static final Semaphore semMultiplexer = new Semaphore(1); - private WorkerMultiplexerManager() {} /** @@ -46,68 +45,65 @@ private WorkerMultiplexerManager() {} * objects with the same {@code WorkerKey} talk to the same {@code WorkerMultiplexer}. Also, * record how many {@code WorkerProxy} objects are talking to this {@code WorkerMultiplexer}. */ - public static WorkerMultiplexer getInstance(WorkerKey key, Path logFile) - throws InterruptedException { - semMultiplexer.acquire(); - multiplexerInstance.putIfAbsent(key, new InstanceInfo(logFile, key)); - multiplexerInstance.get(key).increaseRefCount(); - WorkerMultiplexer workerMultiplexer = multiplexerInstance.get(key).getWorkerMultiplexer(); - semMultiplexer.release(); - return workerMultiplexer; + public static synchronized WorkerMultiplexer getInstance(WorkerKey key, Path logFile) { + InstanceInfo instanceInfo = + multiplexerInstance.computeIfAbsent(key, k -> new InstanceInfo(logFile, k)); + instanceInfo.increaseRefCount(); + return instanceInfo.getWorkerMultiplexer(); + } + + static void beforeCommand(CommandEnvironment env) { + setReporter(env.getReporter()); + } + + static void afterCommand() { + setReporter(null); } /** - * Removes the {@code WorkerMultiplexer} instance and reference count since it is no longer in - * use. + * Sets the reporter for all existing multiplexer instances. This allows reporting problems + * encountered while fetching an instance, e.g. during WorkerProxy validation. */ - public static void removeInstance(WorkerKey key) throws InterruptedException, UserExecException { - semMultiplexer.acquire(); - try { - multiplexerInstance.get(key).decreaseRefCount(); - if (multiplexerInstance.get(key).getRefCount() == 0) { - multiplexerInstance.get(key).getWorkerMultiplexer().interrupt(); - multiplexerInstance.get(key).getWorkerMultiplexer().destroyMultiplexer(); - multiplexerInstance.remove(key); - } - } catch (Exception e) { - String message = "NullPointerException while accessing non-existent multiplexer instance."; - throw createUserExecException(e, message, Code.MULTIPLEXER_INSTANCE_REMOVAL_FAILURE); - } finally { - semMultiplexer.release(); + private static synchronized void setReporter(@Nullable EventHandler reporter) { + for (InstanceInfo m : multiplexerInstance.values()) { + m.workerMultiplexer.setReporter(reporter); } } - /** Is called when a build is done, to do per-build cleanup. */ - static void afterCommandCleanup() { - try { - semMultiplexer.acquire(); - for (InstanceInfo i : multiplexerInstance.values()) { - i.getWorkerMultiplexer().setReporter(null); - } - semMultiplexer.release(); - } catch (InterruptedException e) { - // Interrupted during cleanup, not much we can do. + /** Removes a {@code WorkerProxy} instance and reference count since it is no longer in use. */ + public static synchronized void removeInstance(WorkerKey key) throws UserExecException { + InstanceInfo instanceInfo = multiplexerInstance.get(key); + if (instanceInfo == null) { + throw createUserExecException( + "Attempting to remove non-existent multiplexer instance.", + Code.MULTIPLEXER_INSTANCE_REMOVAL_FAILURE); + } + instanceInfo.decreaseRefCount(); + if (instanceInfo.getRefCount() == 0) { + instanceInfo.getWorkerMultiplexer().interrupt(); + instanceInfo.getWorkerMultiplexer().destroyMultiplexer(); + multiplexerInstance.remove(key); } } @VisibleForTesting static WorkerMultiplexer getMultiplexer(WorkerKey key) throws UserExecException { - try { - return multiplexerInstance.get(key).getWorkerMultiplexer(); - } catch (NullPointerException e) { - String message = "NullPointerException while accessing non-existent multiplexer instance."; - throw createUserExecException(e, message, Code.MULTIPLEXER_DOES_NOT_EXIST); + InstanceInfo instanceInfo = multiplexerInstance.get(key); + if (instanceInfo == null) { + throw createUserExecException( + "Accessing non-existent multiplexer instance.", Code.MULTIPLEXER_DOES_NOT_EXIST); } + return instanceInfo.getWorkerMultiplexer(); } @VisibleForTesting static Integer getRefCount(WorkerKey key) throws UserExecException { - try { - return multiplexerInstance.get(key).getRefCount(); - } catch (NullPointerException e) { - String message = "NullPointerException while accessing non-existent multiplexer instance."; - throw createUserExecException(e, message, Code.MULTIPLEXER_DOES_NOT_EXIST); + InstanceInfo instanceInfo = multiplexerInstance.get(key); + if (instanceInfo == null) { + throw createUserExecException( + "Accessing non-existent multiplexer instance.", Code.MULTIPLEXER_DOES_NOT_EXIST); } + return instanceInfo.getRefCount(); } @VisibleForTesting @@ -115,18 +111,17 @@ static Integer getInstanceCount() { return multiplexerInstance.keySet().size(); } - private static UserExecException createUserExecException( - Exception e, String message, Code detailedCode) { + private static UserExecException createUserExecException(String message, Code detailedCode) { return new UserExecException( FailureDetail.newBuilder() - .setMessage(ErrorMessage.builder().message(message).exception(e).build().toString()) + .setMessage(message) .setWorker(FailureDetails.Worker.newBuilder().setCode(detailedCode)) .build()); } /** Contains the WorkerMultiplexer instance and reference count. */ static class InstanceInfo { - private WorkerMultiplexer workerMultiplexer; + private final WorkerMultiplexer workerMultiplexer; private Integer refCount; public InstanceInfo(Path logFile, WorkerKey workerKey) { diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java index 6b3258c57147cf..f1940369647e89 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java @@ -39,7 +39,6 @@ final class WorkerProxy extends Worker { WorkerProxy( WorkerKey workerKey, int workerId, - Path workDir, Path logFile, WorkerMultiplexer workerMultiplexer) { super(workerKey, workerId, logFile); @@ -57,6 +56,7 @@ final class WorkerProxy extends Worker { @Override void setReporter(EventHandler reporter) { + // We might have created this multiplexer after setting the reporter for existing multiplexers workerMultiplexer.setReporter(reporter); } @@ -67,17 +67,17 @@ public void prepareExecution( workerMultiplexer.createProcess(workDir); } - /** Send the WorkRequest to multiplexer. */ @Override synchronized void destroy() { try { WorkerMultiplexerManager.removeInstance(workerKey); - } catch (InterruptedException e) { - logger.atWarning().withCause(e).log( - "InterruptedException was caught while destroying multiplexer. " - + "It could because the multiplexer was interrupted."); } catch (UserExecException e) { logger.atWarning().withCause(e).log("Exception"); + } finally { + if (this.shutdownHook != null) { + Runtime.getRuntime().removeShutdownHook(this.shutdownHook); + this.shutdownHook = null; + } } } @@ -110,4 +110,9 @@ public Optional getExitValue() { String getRecordingStreamMessage() { return workerMultiplexer.getRecordingStreamMessage(); } + + @Override + public String toString() { + return workerKey.getMnemonic() + " proxy worker #" + workerId; + } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index 7be36ea8a655d3..26f6019fa53136 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java @@ -57,6 +57,7 @@ import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; import com.google.protobuf.ByteString; import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; @@ -408,6 +409,7 @@ WorkResponse execInWorker( try { inputFiles.materializeVirtualInputs(execRoot); } catch (IOException e) { + restoreInterrupt(e); String message = "IOException while materializing virtual inputs:"; throw createUserExecException(e, message, Code.VIRTUAL_INPUT_MATERIALIZATION_FAILURE); } @@ -415,6 +417,7 @@ WorkResponse execInWorker( try { context.prefetchInputs(); } catch (IOException e) { + restoreInterrupt(e); String message = "IOException while prefetching for worker:"; throw createUserExecException(e, message, Code.PREFETCH_FAILURE); } @@ -426,6 +429,7 @@ WorkResponse execInWorker( worker.setReporter(workerOptions.workerVerbose ? reporter : null); request = createWorkRequest(spawn, context, flagFiles, inputFileCache, key); } catch (IOException e) { + restoreInterrupt(e); String message = "IOException while borrowing a worker from the pool:"; throw createUserExecException(e, message, Code.BORROW_FAILURE); } @@ -442,6 +446,7 @@ WorkResponse execInWorker( worker.prepareExecution(inputFiles, outputs, key.getWorkerFilesWithHashes().keySet()); spawnMetrics.setSetupTime(setupInputsTime.plus(prepareExecutionStopwatch.elapsed())); } catch (IOException e) { + restoreInterrupt(e); String message = ErrorMessage.builder() .message("IOException while preparing the execution environment of a worker:") @@ -456,6 +461,7 @@ WorkResponse execInWorker( try { worker.putRequest(request); } catch (IOException e) { + restoreInterrupt(e); String message = ErrorMessage.builder() .message( @@ -471,6 +477,7 @@ WorkResponse execInWorker( try { response = worker.getResponse(request.getRequestId()); } catch (IOException e) { + restoreInterrupt(e); // If protobuf or json reader couldn't parse the response, try to print whatever the // failing worker wrote to stdout - it's probably a stack trace or some kind of error // message that will help the user figure out why the compiler is failing. @@ -494,6 +501,7 @@ WorkResponse execInWorker( worker.finishExecution(execRoot); spawnMetrics.setProcessOutputsTime(processOutputsStopwatch.elapsed()); } catch (IOException e) { + restoreInterrupt(e); String message = ErrorMessage.builder() .message("IOException while finishing worker execution:") @@ -509,8 +517,10 @@ WorkResponse execInWorker( workers.invalidateObject(key, worker); } catch (IOException e1) { // The original exception is more important / helpful, so we'll just ignore this one. + restoreInterrupt(e1); + } finally { + worker = null; } - worker = null; } throw e; @@ -523,6 +533,12 @@ WorkResponse execInWorker( return response; } + private static void restoreInterrupt(IOException e) { + if (e instanceof InterruptedIOException) { + Thread.currentThread().interrupt(); + } + } + private static UserExecException createUserExecException( IOException e, String message, Code detailedCode) { return createUserExecException( diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java index 9075e2a458c106..45921c36133419 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java @@ -62,7 +62,7 @@ public void testGetResponse_noOutstandingRequests() throws IOException, Interrup multiplexer.setProcessFactory(params -> new FakeSubprocess(serverInputStream)); WorkRequest request1 = WorkRequest.newBuilder().setRequestId(1).build(); - WorkerProxy worker = new WorkerProxy(workerKey, 2, logPath, logPath, multiplexer); + WorkerProxy worker = new WorkerProxy(workerKey, 2, logPath, multiplexer); worker.prepareExecution(null, null, null); worker.putRequest(request1); WorkResponse response1 = WorkResponse.newBuilder().setRequestId(1).build(); @@ -85,12 +85,12 @@ public void testGetResponse_basicConcurrency() OutputStream workerOutputStream = new PipedOutputStream(serverInputStream); multiplexer.setProcessFactory(params -> new FakeSubprocess(serverInputStream)); - WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, logPath, multiplexer); + WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, multiplexer); worker1.prepareExecution(null, null, null); WorkRequest request1 = WorkRequest.newBuilder().setRequestId(3).build(); worker1.putRequest(request1); - WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, logPath, multiplexer); + WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, multiplexer); worker2.prepareExecution(null, null, null); WorkRequest request2 = WorkRequest.newBuilder().setRequestId(42).build(); worker2.putRequest(request2); @@ -121,12 +121,12 @@ public void testGetResponse_slowMultiplexer() OutputStream workerOutputStream = new PipedOutputStream(serverInputStrean); multiplexer.setProcessFactory(params -> new FakeSubprocess(serverInputStrean)); - WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, logPath, multiplexer); + WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, multiplexer); worker1.prepareExecution(null, null, null); WorkRequest request1 = WorkRequest.newBuilder().setRequestId(3).build(); worker1.putRequest(request1); - WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, logPath, multiplexer); + WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, multiplexer); worker2.prepareExecution(null, null, null); WorkRequest request2 = WorkRequest.newBuilder().setRequestId(42).build(); worker2.putRequest(request2); @@ -178,12 +178,12 @@ public void testGetResponse_slowProxy() OutputStream workerOutputStream = new PipedOutputStream(serverInputStream); multiplexer.setProcessFactory(params -> new FakeSubprocess(serverInputStream)); - WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, logPath, multiplexer); + WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, multiplexer); worker1.prepareExecution(null, null, null); WorkRequest request1 = WorkRequest.newBuilder().setRequestId(3).build(); worker1.putRequest(request1); - WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, logPath, multiplexer); + WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, multiplexer); worker2.prepareExecution(null, null, null); WorkRequest request2 = WorkRequest.newBuilder().setRequestId(42).build(); worker2.putRequest(request2);