Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -810,12 +810,12 @@ private class MultipartUploadCompareAndExchangeOperation {
this.threadPool = threadPool;
}

void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
innerRun(expected, updated, listener.delegateResponse((delegate, e) -> {
void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) {
ActionListener.run(listener.delegateResponse((delegate, e) -> {
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e);
if (e instanceof AwsServiceException awsServiceException
&& (awsServiceException.statusCode() == 404
|| awsServiceException.statusCode() == 200
if ((e instanceof AwsServiceException awsServiceException)
&& (awsServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()
|| awsServiceException.statusCode() == RestStatus.OK.getStatus()
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
Expand All @@ -824,7 +824,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
} else {
delegate.onFailure(e);
}
}));
}), l -> innerRun(expected, updated, l));
}

void innerRun(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
Expand Down Expand Up @@ -1120,16 +1120,13 @@ public void compareAndExchangeRegister(
ActionListener<OptionalBytesReference> listener
) {
final var clientReference = blobStore.clientReference();
ActionListener.run(
ActionListener.releaseBefore(clientReference, listener),
l -> new MultipartUploadCompareAndExchangeOperation(
purpose,
clientReference.client(),
blobStore.bucket(),
key,
blobStore.getThreadPool()
).run(expected, updated, l)
);
new MultipartUploadCompareAndExchangeOperation(
purpose,
clientReference.client(),
blobStore.bucket(),
key,
blobStore.getThreadPool()
).run(expected, updated, ActionListener.releaseBefore(clientReference, listener));
}

@Override
Expand Down