Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.1.2] Don't upload remote input to remote cache #21941

Merged
merged 1 commit into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.vfs.FileSymlinkLoopException;
Expand Down Expand Up @@ -79,8 +80,6 @@ public abstract class AbstractActionInputPrefetcher implements ActionInputPrefet
protected final Path execRoot;
protected final RemoteOutputChecker remoteOutputChecker;

private final Set<ActionInput> missingActionInputs = Sets.newConcurrentHashSet();

private final ActionOutputDirectoryHelper outputDirectoryHelper;

/** The state of a directory tracked by {@link DirectoryTracker}, as explained below. */
Expand Down Expand Up @@ -538,7 +537,7 @@ private Completable downloadFileNoCheckRx(
.doOnError(
error -> {
if (error instanceof CacheNotFoundException) {
missingActionInputs.add(actionInput);
reporter.post(new LostInputsEvent());
}
}));

Expand Down Expand Up @@ -700,10 +699,6 @@ public void flushOutputTree() throws InterruptedException {
downloadCache.awaitInProgressTasks();
}

public ImmutableSet<ActionInput> getMissingActionInputs() {
return ImmutableSet.copyOf(missingActionInputs);
}

public RemoteOutputChecker getRemoteOutputChecker() {
return remoteOutputChecker;
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event",
"//src/main/java/com/google/devtools/build/lib/remote/disk",
"//src/main/java/com/google/devtools/build/lib/remote/downloader",
"//src/main/java/com/google/devtools/build/lib/remote/grpc",
Expand Down Expand Up @@ -229,6 +230,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/events",
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
Expand All @@ -245,12 +247,13 @@ java_library(
srcs = ["LeaseService.java"],
deps = [
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/actions:artifacts",
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
"//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event",
"//src/main/java/com/google/devtools/build/lib/skyframe:action_execution_value",
"//src/main/java/com/google/devtools/build/lib/skyframe:sky_functions",
"//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
"//src/main/java/com/google/devtools/build/skyframe",
"//third_party:guava",
"//third_party:jsr305",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import com.google.devtools.build.lib.actions.ActionInput;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.cache.ActionCache;
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.skyframe.ActionExecutionValue;
import com.google.devtools.build.lib.skyframe.SkyFunctions;
import com.google.devtools.build.lib.skyframe.TreeArtifactValue;
import com.google.devtools.build.skyframe.MemoizingEvaluator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

Expand All @@ -30,6 +31,7 @@ public class LeaseService {
@Nullable private final ActionCache actionCache;
private final AtomicBoolean leaseExtensionStarted = new AtomicBoolean(false);
@Nullable LeaseExtension leaseExtension;
private final AtomicBoolean hasMissingActionInputs = new AtomicBoolean(false);

public LeaseService(
MemoizingEvaluator memoizingEvaluator,
Expand All @@ -48,12 +50,18 @@ public void finalizeAction() {
}
}

public void finalizeExecution(Set<ActionInput> missingActionInputs) {
@AllowConcurrentEvents
@Subscribe
public void onLostInputs(LostInputsEvent event) {
hasMissingActionInputs.set(true);
}

public void finalizeExecution() {
if (leaseExtension != null) {
leaseExtension.stop();
}

if (!missingActionInputs.isEmpty()) {
if (hasMissingActionInputs.getAndSet(false)) {
handleMissingInputs();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
Expand All @@ -22,24 +23,29 @@
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.Message;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
Expand All @@ -59,13 +65,50 @@
/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

/**
* An interface used to check whether a given {@link Path} is stored in a remote or a disk cache.
*/
public interface RemotePathChecker {
boolean isRemote(RemoteActionExecutionContext context, Path path) throws IOException;
}

private RemotePathChecker remotePathChecker =
new RemotePathChecker() {
@Override
public boolean isRemote(RemoteActionExecutionContext context, Path path)
throws IOException {
var fs = path.getFileSystem();
if (fs instanceof RemoteActionFileSystem) {
var remoteActionFileSystem = (RemoteActionFileSystem) fs;
if (remoteActionFileSystem.isRemote(path)) {
if (context.getReadCachePolicy().allowDiskCache()) {
try (var inputStream = path.getInputStream()) {
// If the file exists in the disk cache, download it and continue the upload.
return false;
} catch (IOException e) {
logger.atWarning().withCause(e).log(
"Failed to get input stream for %s", path.getPathString());
}
}
return true;
}
}
return false;
}
};

public RemoteExecutionCache(
RemoteCacheClient protocolImpl,
RemoteOptions options,
DigestUtil digestUtil) {
RemoteCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) {
super(protocolImpl, options, digestUtil);
}

@VisibleForTesting
void setRemotePathChecker(RemotePathChecker remotePathChecker) {
this.remotePathChecker = remotePathChecker;
}

/**
* Ensures that the tree structure of the inputs, the input files themselves, and the command are
* available in the remote cache, such that the tree can be reassembled and executed on another
Expand All @@ -82,7 +125,8 @@ public void ensureInputsPresent(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
boolean force)
boolean force,
Reporter reporter)
throws IOException, InterruptedException {
Iterable<Digest> merkleTreeAllDigests;
try (SilentCloseable s = Profiler.instance().profile("merkleTree.getAllDigests()")) {
Expand All @@ -95,7 +139,7 @@ public void ensureInputsPresent(
}

Flowable<TransferResult> uploads =
createUploadTasks(context, merkleTree, additionalInputs, allDigests, force)
createUploadTasks(context, merkleTree, additionalInputs, allDigests, force, reporter)
.flatMapPublisher(
result ->
Flowable.using(
Expand All @@ -113,10 +157,7 @@ public void ensureInputsPresent(
}));

try {
// Workaround for https://github.com/bazelbuild/bazel/issues/19513.
if (!mergeBulkTransfer(uploads).blockingAwait(options.remoteTimeout.getSeconds(), SECONDS)) {
throw new IOException("Timed out when waiting for uploads");
}
mergeBulkTransfer(uploads).blockingAwait();
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause != null) {
Expand All @@ -131,7 +172,8 @@ private ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context,
Digest digest,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs) {
Map<Digest, Message> additionalInputs,
Reporter reporter) {
Directory node = merkleTree.getDirectoryByDigest(digest);
if (node != null) {
return cacheProtocol.uploadBlob(context, digest, node.toByteString());
Expand All @@ -142,7 +184,20 @@ private ListenableFuture<Void> uploadBlob(
if (file.getBytes() != null) {
return cacheProtocol.uploadBlob(context, digest, file.getBytes());
}
return cacheProtocol.uploadFile(context, digest, file.getPath());

var path = checkNotNull(file.getPath());
try {
if (remotePathChecker.isRemote(context, path)) {
// If we get here, the remote input was determined to exist in the remote or disk cache at
// some point before action execution, but reported to be missing when querying the remote
// for missing action inputs; possibly because it was evicted in the interim.
reporter.post(new LostInputsEvent());
throw new CacheNotFoundException(digest, path.getPathString());
}
} catch (IOException e) {
return immediateFailedFuture(e);
}
return cacheProtocol.uploadFile(context, digest, path);
}

Message message = additionalInputs.get(digest);
Expand All @@ -169,14 +224,16 @@ private Single<List<UploadTask>> createUploadTasks(
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Iterable<Digest> allDigests,
boolean force) {
boolean force,
Reporter reporter) {
return Single.using(
() -> Profiler.instance().profile("collect digests"),
ignored ->
Flowable.fromIterable(allDigests)
.flatMapMaybe(
digest ->
maybeCreateUploadTask(context, merkleTree, additionalInputs, digest, force))
maybeCreateUploadTask(
context, merkleTree, additionalInputs, digest, force, reporter))
.collect(toImmutableList()),
SilentCloseable::close);
}
Expand All @@ -186,7 +243,8 @@ private Maybe<UploadTask> maybeCreateUploadTask(
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Digest digest,
boolean force) {
boolean force,
Reporter reporter) {
return Maybe.create(
emitter -> {
AsyncSubject<Void> completion = AsyncSubject.create();
Expand All @@ -211,7 +269,11 @@ private Maybe<UploadTask> maybeCreateUploadTask(
return toCompletable(
() ->
uploadBlob(
context, uploadTask.digest, merkleTree, additionalInputs),
context,
uploadTask.digest,
merkleTree,
additionalInputs,
reporter),
directExecutor());
}),
/* onAlreadyRunning= */ () -> emitter.onSuccess(uploadTask),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,8 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
.withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache
merkleTree,
additionalInputs,
force);
force,
reporter);
} finally {
maybeReleaseRemoteActionBuildingSemaphore();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
buildRequestId,
invocationId,
remoteOptions.remoteInstanceName,
remoteOptions.remoteAcceptCached));
remoteOptions.remoteAcceptCached,
env.getReporter()));
} else {
if (enableDiskCache) {
try {
Expand Down Expand Up @@ -990,6 +991,7 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
env.getSkyframeExecutor().getEvaluator(),
env.getBlazeWorkspace().getPersistentActionCache(),
leaseExtension);
env.getEventBus().register(leaseService);

remoteOutputService.setRemoteOutputChecker(remoteOutputChecker);
remoteOutputService.setActionInputFetcher(actionInputFetcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.Subscribe;
import com.google.devtools.build.lib.actions.Action;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputMap;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.ArtifactPathResolver;
Expand Down Expand Up @@ -168,11 +166,7 @@ public void finalizeBuild(boolean buildSuccessful) {
@Subscribe
public void onExecutionPhaseCompleteEvent(ExecutionPhaseCompleteEvent event) {
if (leaseService != null) {
var missingActionInputs = ImmutableSet.<ActionInput>of();
if (actionInputFetcher != null) {
missingActionInputs = actionInputFetcher.getMissingActionInputs();
}
leaseService.finalizeExecution(missingActionInputs);
leaseService.finalizeExecution();
}
}

Expand Down
Loading
Loading