Skip to content

Commit

Permalink
Fix double shutdown of BuildEventArtifactUploader when BES+File outpu…
Browse files Browse the repository at this point in the history
…t enabled.

When both BES uploading and File BEP output are enabled, a single
BuildEventArtifactUploader object is shared by two different
BuildEventTransports. Both were calling #shutdown() which in turn called
ByteStreamUploader#shutdown(). If shutdown is called while one transport is
still uploading, the ByteStreamUploader will fail an assertion and crash.

This change adds reference counting to the BuildEventArtifactUploader interface
and ensures the reference counts are maintained correctly when sharing a
BuildEventArtifactUploader across multiple independent BuildEventTransport
threads.

Fixes bazelbuild#12575.

RELNOTES: None.

TESTED=Made repro modifications to GrpcCacheClient.java described in
bazelbuild#12575 and confirmed crash without this
change. Implemented this fix, observed crash was no longer reproducible. Added logs
to ByteStreamBuildEventArtifactUploader#deallocate() to verify deallocation happened
after both BuildEventTransports had completed.
PiperOrigin-RevId: 349589743
  • Loading branch information
michaeledgar authored and philwo committed Mar 15, 2021
1 parent 960822f commit 4edc0d5
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,8 @@ private static class ThrowingBuildEventArtifactUploaderSupplier {
}

BuildEventArtifactUploader get() throws IOException {
if (memoizedValue == null && exception == null) {
boolean needsInitialization = memoizedValue == null;
if (needsInitialization && exception == null) {
try {
memoizedValue = callable.call();
} catch (IOException e) {
Expand All @@ -864,6 +865,9 @@ BuildEventArtifactUploader get() throws IOException {
}
}
if (memoizedValue != null) {
if (!needsInitialization) {
memoizedValue.retain();
}
return memoizedValue;
}
throw exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public void run() {
logger.atSevere().log("BES upload failed due to a RuntimeException / Error. This is a bug.");
throw e;
} finally {
buildEventUploader.shutdown();
buildEventUploader.release();
MoreExecutors.shutdownAndAwaitTermination(timeoutExecutor, 0, TimeUnit.MILLISECONDS);
closeFuture.set(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ java_library(
"//third_party:flogger",
"//third_party:guava",
"//third_party:jsr305",
"//third_party:netty",
"//third_party/protobuf:protobuf_java",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType;
import com.google.devtools.build.lib.vfs.Path;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -30,7 +31,7 @@
import javax.annotation.Nullable;

/** Uploads artifacts referenced by the Build Event Protocol (BEP). */
public interface BuildEventArtifactUploader {
public interface BuildEventArtifactUploader extends ReferenceCounted {
/**
* Asynchronously uploads a set of files referenced by the protobuf representation of a {@link
* BuildEvent}. This method is expected to return quickly.
Expand Down Expand Up @@ -78,11 +79,6 @@ public ListenableFuture<String> uriFuture() {
}
};

/**
* Shutdown any resources associated with the uploader.
*/
void shutdown();

/**
* Return true if the upload may be "slow". Examples of slowness include writes to remote storage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
import com.google.devtools.build.lib.buildeventstream.PathConverter.FileUriPathConverter;
import com.google.devtools.build.lib.vfs.Path;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;

/** An uploader that simply turns paths into local file URIs. */
public class LocalFilesArtifactUploader implements BuildEventArtifactUploader {
public class LocalFilesArtifactUploader extends AbstractReferenceCounted
implements BuildEventArtifactUploader {
private static final FileUriPathConverter FILE_URI_PATH_CONVERTER = new FileUriPathConverter();
private final ConcurrentHashMap<Path, Boolean> fileIsDirectory = new ConcurrentHashMap<>();

Expand All @@ -34,10 +37,15 @@ public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
}

@Override
public void shutdown() {
protected void deallocate() {
// Intentionally left empty
}

@Override
public ReferenceCounted touch(Object o) {
return this;
}

@Override
public boolean mayBeSlow() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void run() {
} catch (IOException e) {
logger.atSevere().withCause(e).log("Failed to close BEP file output stream.");
} finally {
uploader.shutdown();
uploader.release();
timeoutExecutor.shutdown();
}
closeFuture.set(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.Path;
import io.grpc.Context;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -43,10 +45,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

/**
* A {@link BuildEventArtifactUploader} backed by {@link ByteStreamUploader}.
*/
class ByteStreamBuildEventArtifactUploader implements BuildEventArtifactUploader {
/** A {@link BuildEventArtifactUploader} backed by {@link ByteStreamUploader}. */
class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
implements BuildEventArtifactUploader {

private final ListeningExecutorService uploadExecutor;
private final Context ctx;
Expand Down Expand Up @@ -243,14 +244,19 @@ public boolean mayBeSlow() {
}

@Override
public void shutdown() {
protected void deallocate() {
if (shutdown.getAndSet(true)) {
return;
}
uploader.release();
uploadExecutor.shutdown();
}

@Override
public ReferenceCounted touch(Object o) {
return this;
}

private static class PathConverterImpl implements PathConverter {

private final String remoteServerInstanceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface BuildEventArtifactUploaderFactory {

/**
* Returns a new instance of a {@link BuildEventArtifactUploader}. The call is responsible for
* calling {@link BuildEventArtifactUploader#shutdown()} on the returned instance.
* calling {@link BuildEventArtifactUploader#release()} on the returned instance.
*/
BuildEventArtifactUploader create(CommandEnvironment env)
throws InvalidPackagePathSymlinkException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs",
"//third_party:guava",
"//third_party:junit4",
"//third_party:netty",
"//third_party:truth",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactoryMap;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
import com.google.devtools.build.lib.vfs.Path;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.util.Map;
import org.junit.Before;
Expand Down Expand Up @@ -51,8 +52,38 @@ public boolean mayBeSlow() {
}

@Override
public void shutdown() {
// Intentionally left empty.
public int refCnt() {
return 0;
}

@Override
public ReferenceCounted retain() {
return this;
}

@Override
public ReferenceCounted retain(int i) {
return this;
}

@Override
public ReferenceCounted touch() {
return this;
}

@Override
public ReferenceCounted touch(Object o) {
return this;
}

@Override
public boolean release() {
return false;
}

@Override
public boolean release(int i) {
return false;
}
};
uploaderFactories =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ java_test(
"//third_party:guava",
"//third_party:junit4",
"//third_party:mockito",
"//third_party:netty",
"//third_party:truth",
"//third_party/protobuf:protobuf_java",
"//third_party/protobuf:protobuf_java_util",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import com.google.devtools.build.lib.buildeventstream.PathConverter.FileUriPathConverter;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.common.options.Options;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -137,7 +139,7 @@ public void testCancelledUpload() throws Exception {

BuildEventArtifactUploader uploader =
Mockito.spy(
new BuildEventArtifactUploader() {
new BuildEventArtifactUploaderWithRefCounting() {
@Override
public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
return Futures.immediateCancelledFuture();
Expand All @@ -147,9 +149,6 @@ public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
public boolean mayBeSlow() {
return false;
}

@Override
public void shutdown() {}
});

File output = tmp.newFile();
Expand Down Expand Up @@ -248,7 +247,7 @@ public void testWritesWithUploadDelays() throws Exception {

BuildEventArtifactUploader uploader =
Mockito.spy(
new BuildEventArtifactUploader() {
new BuildEventArtifactUploaderWithRefCounting() {
@Override
public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
if (files.containsKey(file1)) {
Expand All @@ -261,11 +260,6 @@ public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
public boolean mayBeSlow() {
return true;
}

@Override
public void shutdown() {
// Intentionally left empty.
}
});
File output = tmp.newFile();
BufferedOutputStream outputStream =
Expand All @@ -284,7 +278,7 @@ public void shutdown() {
assertThat(in.available()).isEqualTo(0);
}

verify(uploader).shutdown();
verify(uploader).release();
}

/** Regression test for b/207287675 */
Expand All @@ -296,7 +290,7 @@ public void testHandlesDuplicateFiles() throws Exception {

BuildEventArtifactUploader uploader =
Mockito.spy(
new BuildEventArtifactUploader() {
new BuildEventArtifactUploaderWithRefCounting() {
@Override
public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
return Futures.immediateFuture(new FileUriPathConverter());
Expand All @@ -306,11 +300,6 @@ public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
public boolean mayBeSlow() {
return false;
}

@Override
public void shutdown() {
// Intentionally left empty.
}
});
File output = tmp.newFile();
BufferedOutputStream outputStream =
Expand Down Expand Up @@ -338,7 +327,7 @@ public void testCloseWaitsForWritesToFinish() throws Exception {
SettableFuture<PathConverter> upload = SettableFuture.create();
BuildEventArtifactUploader uploader =
Mockito.spy(
new BuildEventArtifactUploader() {
new BuildEventArtifactUploaderWithRefCounting() {
@Override
public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
return upload;
Expand All @@ -348,11 +337,6 @@ public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
public boolean mayBeSlow() {
return false;
}

@Override
public void shutdown() {
// Intentionally left empty.
}
});

File output = tmp.newFile();
Expand All @@ -373,7 +357,7 @@ public void shutdown() {
assertThat(in.available()).isEqualTo(0);
}

verify(uploader).shutdown();
verify(uploader).release();
}

private static class WithLocalFilesEvent implements BuildEvent {
Expand Down Expand Up @@ -418,4 +402,16 @@ public Collection<BuildEventId> getChildrenEvents() {
return ImmutableList.of(BuildEventIdUtil.progressId(id + 1));
}
}

private abstract static class BuildEventArtifactUploaderWithRefCounting
extends AbstractReferenceCounted implements BuildEventArtifactUploader {

@Override
protected void deallocate() {}

@Override
public ReferenceCounted touch(Object o) {
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void uploadsShouldWork() throws Exception {
.isEqualTo("bytestream://localhost/instance/blobs/" + hash + "/" + size);
}

artifactUploader.shutdown();
artifactUploader.release();

assertThat(uploader.refCnt()).isEqualTo(0);
assertThat(refCntChannel.isShutdown()).isTrue();
Expand All @@ -198,7 +198,7 @@ public void testUploadDirectoryDoesNotCrash() throws Exception {

PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
assertThat(pathConverter.apply(dir)).isNull();
artifactUploader.shutdown();
artifactUploader.release();
}

@Test
Expand Down Expand Up @@ -267,7 +267,7 @@ public void onCompleted() {
assertThat(e.getCause().getCause()).isInstanceOf(StatusRuntimeException.class);
assertThat(Status.fromThrowable(e).getCode()).isEqualTo(Status.CANCELLED.getCode());

artifactUploader.shutdown();
artifactUploader.release();

assertThat(uploader.refCnt()).isEqualTo(0);
assertThat(refCntChannel.isShutdown()).isTrue();
Expand Down

0 comments on commit 4edc0d5

Please sign in to comment.