Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.1.0] Optimize prefetchInputs. #20719

Merged
merged 4 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.devtools.build.lib.remote.util.Utils.mergeBulkTransfer;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.Action;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
Expand All @@ -44,17 +47,13 @@
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.RxUtils;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.vfs.FileSymlinkLoopException;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.OutputPermissions;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -283,6 +282,10 @@ public ListenableFuture<Void> prefetchFiles(
files.add(input);
}

if (files.isEmpty()) {
return immediateVoidFuture();
}

// Collect the set of directories whose output permissions must be set at the end of this call.
// This responsibility cannot lie with the downloading of an individual file, because multiple
// files may be concurrently downloaded into the same directory within a single call to
Expand All @@ -291,30 +294,38 @@ public ListenableFuture<Void> prefetchFiles(
// it must still synchronize on the output permissions having been set.
Set<Path> dirsWithOutputPermissions = Sets.newConcurrentHashSet();

Completable prefetch =
mergeBulkTransfer(
Flowable.fromIterable(files)
.flatMapSingle(
input ->
prefetchFile(
action,
dirsWithOutputPermissions,
metadataSupplier,
input,
priority)))
.doOnComplete(
// Set output permissions on tree artifact subdirectories, matching the behavior of
// SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
() -> {
for (Path dir : dirsWithOutputPermissions) {
directoryTracker.setOutputPermissions(dir);
}
});

return toListenableFuture(prefetch);
// Using plain futures to avoid RxJava overheads.
List<ListenableFuture<Void>> transfers = new ArrayList<>(files.size());
try (var s = Profiler.instance().profile("compose prefetches")) {
for (var file : files) {
transfers.add(
prefetchFile(action, dirsWithOutputPermissions, metadataSupplier, file, priority));
}
}

ListenableFuture<Void> mergedTransfer;
try (var s = Profiler.instance().profile("mergeBulkTransfer")) {
mergedTransfer = mergeBulkTransfer(transfers);
}

return Futures.transformAsync(
mergedTransfer,
unused -> {
try {
// Set output permissions on tree artifact subdirectories, matching the behavior of
// SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
for (Path dir : dirsWithOutputPermissions) {
directoryTracker.setOutputPermissions(dir);
}
} catch (IOException e) {
return immediateFailedFuture(e);
}
return immediateVoidFuture();
},
directExecutor());
}

private Single<TransferResult> prefetchFile(
private ListenableFuture<Void> prefetchFile(
ActionExecutionMetadata action,
Set<Path> dirsWithOutputPermissions,
MetadataSupplier metadataSupplier,
Expand All @@ -323,14 +334,14 @@ private Single<TransferResult> prefetchFile(
try {
if (input instanceof VirtualActionInput) {
prefetchVirtualActionInput((VirtualActionInput) input);
return Single.just(TransferResult.ok());
return immediateVoidFuture();
}

PathFragment execPath = input.getExecPath();

FileArtifactValue metadata = metadataSupplier.getMetadata(input);
if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) {
return Single.just(TransferResult.ok());
return immediateVoidFuture();
}

@Nullable Symlink symlink = maybeGetSymlink(action, input, metadata, metadataSupplier);
Expand All @@ -357,11 +368,9 @@ private Single<TransferResult> prefetchFile(
result = result.andThen(plantSymlink(symlink));
}

return RxUtils.toTransferResult(result);
} catch (IOException e) {
return Single.just(TransferResult.error(e));
} catch (InterruptedException e) {
return Single.just(TransferResult.interrupted());
return toListenableFuture(result);
} catch (IOException | InterruptedException e) {
return immediateFailedFuture(e);
}
}

Expand Down
68 changes: 58 additions & 10 deletions src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.util;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.stream.Collectors.joining;

import build.bazel.remote.execution.v2.Action;
Expand All @@ -29,7 +33,6 @@
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecutionRequirements;
import com.google.devtools.build.lib.actions.Spawn;
Expand Down Expand Up @@ -417,11 +420,11 @@ public static ListenableFuture<ActionResult> downloadAsActionResult(
try {
return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray()));
} catch (InvalidProtocolBufferException e) {
return Futures.immediateFailedFuture(e);
return immediateFailedFuture(e);
}
},
MoreExecutors.directExecutor())
.catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
directExecutor())
.catching(CacheNotFoundException.class, (e) -> null, directExecutor());
}

public static void verifyBlobContents(Digest expected, Digest actual) throws IOException {
Expand Down Expand Up @@ -483,15 +486,15 @@ public ByteString getContents() {
*/
public static <V> ListenableFuture<V> refreshIfUnauthenticatedAsync(
AsyncCallable<V> call, CallCredentialsProvider callCredentialsProvider) {
Preconditions.checkNotNull(call);
Preconditions.checkNotNull(callCredentialsProvider);
checkNotNull(call);
checkNotNull(callCredentialsProvider);

try {
return Futures.catchingAsync(
call.call(),
Throwable.class,
(e) -> refreshIfUnauthenticatedAsyncOnException(e, call, callCredentialsProvider),
MoreExecutors.directExecutor());
directExecutor());
} catch (Throwable t) {
return refreshIfUnauthenticatedAsyncOnException(t, call, callCredentialsProvider);
}
Expand All @@ -511,15 +514,15 @@ private static <V> ListenableFuture<V> refreshIfUnauthenticatedAsyncOnException(
}
}

return Futures.immediateFailedFuture(t);
return immediateFailedFuture(t);
}

/** Same as {@link #refreshIfUnauthenticatedAsync} but calling a synchronous code block. */
public static <V> V refreshIfUnauthenticated(
Callable<V> call, CallCredentialsProvider callCredentialsProvider)
throws IOException, InterruptedException {
Preconditions.checkNotNull(call);
Preconditions.checkNotNull(callCredentialsProvider);
checkNotNull(call);
checkNotNull(callCredentialsProvider);

try {
return call.call();
Expand Down Expand Up @@ -618,4 +621,49 @@ public static void waitForBulkTransfer(
throw bulkTransferException;
}
}

public static ListenableFuture<Void> mergeBulkTransfer(
Iterable<ListenableFuture<Void>> transfers) {
return Futures.whenAllComplete(transfers)
.callAsync(
() -> {
BulkTransferException bulkTransferException = null;

for (var transfer : transfers) {
IOException error = null;
try {
transfer.get();
} catch (CancellationException e) {
return immediateFailedFuture(new InterruptedException());
} catch (InterruptedException e) {
return immediateFailedFuture(e);
} catch (ExecutionException e) {
var cause = e.getCause();
if (cause instanceof InterruptedException) {
return immediateFailedFuture(cause);
} else if (cause instanceof IOException) {
error = (IOException) cause;
} else {
error = new IOException(cause);
}
}

if (error == null) {
continue;
}

if (bulkTransferException == null) {
bulkTransferException = new BulkTransferException();
}
bulkTransferException.add(error);
}

if (bulkTransferException != null) {
return immediateFailedFuture(bulkTransferException);
}

return immediateVoidFuture();
},
directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -760,7 +759,7 @@ public void prefetchFile_interruptingMetadataSupplier_interruptsDownload() throw
prefetcher.prefetchFiles(
action, ImmutableList.of(a1), interruptedMetadataSupplier, Priority.MEDIUM);

assertThrows(CancellationException.class, future::get);
assertThrows(InterruptedException.class, () -> getFromFuture(future));
}

@Test
Expand Down
Loading