Skip to content

Commit

Permalink
Only request grpc write when not complete
Browse files Browse the repository at this point in the history
If a queryWriteStatus yields a committedSize which leaves no content
remaining to be uploaded, immediately succeed a blob upload. This can
easily occur if a competing blob write completes asynchronously between
abnormal write termination and a query.

Closes bazelbuild#10284.

PiperOrigin-RevId: 281944912
  • Loading branch information
buchgr authored and copybara-github committed Nov 22, 2019
1 parent a6cd408 commit cc2b3ec
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,12 @@ ListenableFuture<Void> start() {
AtomicLong committedOffset = new AtomicLong(0);
return Futures.transformAsync(
retrier.executeAsync(
() -> ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff)),
() -> {
if (committedOffset.get() < chunker.getSize()) {
return ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff));
}
return Futures.immediateFuture(null);
},
progressiveBackoff),
(result) -> {
long committedSize = committedOffset.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public Chunk next() throws IOException {
return new Chunk(blob, offsetBefore);
}

private long bytesLeft() {
public long bytesLeft() {
return getSize() - getOffset();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,65 @@ public void queryWriteStatus(
withEmptyMetadata.detach(prevContext);
}

@Test
public void concurrentlyCompletedUploadIsNotRetried() throws Exception {
// Test that after an upload has failed and the QueryWriteStatus call returns
// that the upload has completed that we'll not retry the upload.
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME, new ReferenceCountedChannel(channel), null, 1, retrier);

byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);

Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());

AtomicInteger numWriteCalls = new AtomicInteger(0);

serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
numWriteCalls.getAndIncrement();
streamObserver.onError(Status.DEADLINE_EXCEEDED.asException());
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest writeRequest) {}

@Override
public void onError(Throwable throwable) {}

@Override
public void onCompleted() {}
};
}

@Override
public void queryWriteStatus(
QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
response.onNext(
QueryWriteStatusResponse.newBuilder()
.setCommittedSize(blob.length)
.setComplete(true)
.build());
response.onCompleted();
}
});

uploader.uploadBlob(hash, chunker, true);

// This test should not have triggered any retries.
assertThat(numWriteCalls.get()).isEqualTo(1);

blockUntilInternalStateConsistent(uploader);

withEmptyMetadata.detach(prevContext);
}

@Test
public void unimplementedQueryShouldRestartUpload() throws Exception {
Context prevContext = withEmptyMetadata.attach();
Expand Down

0 comments on commit cc2b3ec

Please sign in to comment.