diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java index a8f3c55dfcf9b8..a81718008a27ca 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java +++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java @@ -16,17 +16,20 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.Futures.immediateFailedFuture; +import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable; import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture; -import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer; import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; +import static com.google.devtools.build.lib.remote.util.Utils.mergeBulkTransfer; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.flogger.GoogleLogger; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.actions.Action; import com.google.devtools.build.lib.actions.ActionExecutionMetadata; @@ -44,8 +47,6 @@ import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.util.AsyncTaskCache; -import com.google.devtools.build.lib.remote.util.RxUtils; -import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult; import com.google.devtools.build.lib.remote.util.TempPathGenerator; import com.google.devtools.build.lib.vfs.FileSymlinkLoopException; import com.google.devtools.build.lib.vfs.FileSystemUtils; @@ -53,8 +54,6 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import io.reactivex.rxjava3.core.Completable; -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.core.Single; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -283,6 +282,10 @@ public ListenableFuture prefetchFiles( files.add(input); } + if (files.isEmpty()) { + return immediateVoidFuture(); + } + // Collect the set of directories whose output permissions must be set at the end of this call. // This responsibility cannot lie with the downloading of an individual file, because multiple // files may be concurrently downloaded into the same directory within a single call to @@ -291,30 +294,38 @@ public ListenableFuture prefetchFiles( // it must still synchronize on the output permissions having been set. Set dirsWithOutputPermissions = Sets.newConcurrentHashSet(); - Completable prefetch = - mergeBulkTransfer( - Flowable.fromIterable(files) - .flatMapSingle( - input -> - prefetchFile( - action, - dirsWithOutputPermissions, - metadataSupplier, - input, - priority))) - .doOnComplete( - // Set output permissions on tree artifact subdirectories, matching the behavior of - // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions. - () -> { - for (Path dir : dirsWithOutputPermissions) { - directoryTracker.setOutputPermissions(dir); - } - }); - - return toListenableFuture(prefetch); + // Using plain futures to avoid RxJava overheads. + List> transfers = new ArrayList<>(files.size()); + try (var s = Profiler.instance().profile("compose prefetches")) { + for (var file : files) { + transfers.add( + prefetchFile(action, dirsWithOutputPermissions, metadataSupplier, file, priority)); + } + } + + ListenableFuture mergedTransfer; + try (var s = Profiler.instance().profile("mergeBulkTransfer")) { + mergedTransfer = mergeBulkTransfer(transfers); + } + + return Futures.transformAsync( + mergedTransfer, + unused -> { + try { + // Set output permissions on tree artifact subdirectories, matching the behavior of + // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions. + for (Path dir : dirsWithOutputPermissions) { + directoryTracker.setOutputPermissions(dir); + } + } catch (IOException e) { + return immediateFailedFuture(e); + } + return immediateVoidFuture(); + }, + directExecutor()); } - private Single prefetchFile( + private ListenableFuture prefetchFile( ActionExecutionMetadata action, Set dirsWithOutputPermissions, MetadataSupplier metadataSupplier, @@ -323,14 +334,14 @@ private Single prefetchFile( try { if (input instanceof VirtualActionInput) { prefetchVirtualActionInput((VirtualActionInput) input); - return Single.just(TransferResult.ok()); + return immediateVoidFuture(); } PathFragment execPath = input.getExecPath(); FileArtifactValue metadata = metadataSupplier.getMetadata(input); if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) { - return Single.just(TransferResult.ok()); + return immediateVoidFuture(); } @Nullable Symlink symlink = maybeGetSymlink(action, input, metadata, metadataSupplier); @@ -357,11 +368,9 @@ private Single prefetchFile( result = result.andThen(plantSymlink(symlink)); } - return RxUtils.toTransferResult(result); - } catch (IOException e) { - return Single.just(TransferResult.error(e)); - } catch (InterruptedException e) { - return Single.just(TransferResult.interrupted()); + return toListenableFuture(result); + } catch (IOException | InterruptedException e) { + return immediateFailedFuture(e); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java index 3e8adc3a9de49c..61c4c4f4b9d9a4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java @@ -13,8 +13,12 @@ // limitations under the License. package com.google.devtools.build.lib.remote.util; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Throwables.getStackTraceAsString; +import static com.google.common.util.concurrent.Futures.immediateFailedFuture; +import static com.google.common.util.concurrent.Futures.immediateVoidFuture; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.util.stream.Collectors.joining; import build.bazel.remote.execution.v2.Action; @@ -29,7 +33,6 @@ import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ExecutionRequirements; import com.google.devtools.build.lib.actions.Spawn; @@ -417,11 +420,11 @@ public static ListenableFuture downloadAsActionResult( try { return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray())); } catch (InvalidProtocolBufferException e) { - return Futures.immediateFailedFuture(e); + return immediateFailedFuture(e); } }, - MoreExecutors.directExecutor()) - .catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor()); + directExecutor()) + .catching(CacheNotFoundException.class, (e) -> null, directExecutor()); } public static void verifyBlobContents(Digest expected, Digest actual) throws IOException { @@ -483,15 +486,15 @@ public ByteString getContents() { */ public static ListenableFuture refreshIfUnauthenticatedAsync( AsyncCallable call, CallCredentialsProvider callCredentialsProvider) { - Preconditions.checkNotNull(call); - Preconditions.checkNotNull(callCredentialsProvider); + checkNotNull(call); + checkNotNull(callCredentialsProvider); try { return Futures.catchingAsync( call.call(), Throwable.class, (e) -> refreshIfUnauthenticatedAsyncOnException(e, call, callCredentialsProvider), - MoreExecutors.directExecutor()); + directExecutor()); } catch (Throwable t) { return refreshIfUnauthenticatedAsyncOnException(t, call, callCredentialsProvider); } @@ -511,15 +514,15 @@ private static ListenableFuture refreshIfUnauthenticatedAsyncOnException( } } - return Futures.immediateFailedFuture(t); + return immediateFailedFuture(t); } /** Same as {@link #refreshIfUnauthenticatedAsync} but calling a synchronous code block. */ public static V refreshIfUnauthenticated( Callable call, CallCredentialsProvider callCredentialsProvider) throws IOException, InterruptedException { - Preconditions.checkNotNull(call); - Preconditions.checkNotNull(callCredentialsProvider); + checkNotNull(call); + checkNotNull(callCredentialsProvider); try { return call.call(); @@ -618,4 +621,49 @@ public static void waitForBulkTransfer( throw bulkTransferException; } } + + public static ListenableFuture mergeBulkTransfer( + Iterable> transfers) { + return Futures.whenAllComplete(transfers) + .callAsync( + () -> { + BulkTransferException bulkTransferException = null; + + for (var transfer : transfers) { + IOException error = null; + try { + transfer.get(); + } catch (CancellationException e) { + return immediateFailedFuture(new InterruptedException()); + } catch (InterruptedException e) { + return immediateFailedFuture(e); + } catch (ExecutionException e) { + var cause = e.getCause(); + if (cause instanceof InterruptedException) { + return immediateFailedFuture(cause); + } else if (cause instanceof IOException) { + error = (IOException) cause; + } else { + error = new IOException(cause); + } + } + + if (error == null) { + continue; + } + + if (bulkTransferException == null) { + bulkTransferException = new BulkTransferException(); + } + bulkTransferException.add(error); + } + + if (bulkTransferException != null) { + return immediateFailedFuture(bulkTransferException); + } + + return immediateVoidFuture(); + }, + directExecutor()); + } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java index 76b949eaf6d798..b39f2cf6836091 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java @@ -71,7 +71,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -760,7 +759,7 @@ public void prefetchFile_interruptingMetadataSupplier_interruptsDownload() throw prefetcher.prefetchFiles( action, ImmutableList.of(a1), interruptedMetadataSupplier, Priority.MEDIUM); - assertThrows(CancellationException.class, future::get); + assertThrows(InterruptedException.class, () -> getFromFuture(future)); } @Test