diff --git a/.changes/next-release/bugfix-AWSS3-b8d17fa.json b/.changes/next-release/bugfix-AWSS3-b8d17fa.json new file mode 100644 index 000000000000..277fa7ae9c56 --- /dev/null +++ b/.changes/next-release/bugfix-AWSS3-b8d17fa.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS S3", + "contributor": "", + "description": "Fixed the issue in S3 multipart client where the list of parts could be out of order in CompleteMultipartRequest, causing `The list of parts was not in ascending order` error to be thrown." +} diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java index 4fe64a7bca05..b41fbf836dcb 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java @@ -18,13 +18,15 @@ import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Consumer; +import java.util.stream.IntStream; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -59,12 +61,16 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber< private final Collection> futures = new ConcurrentLinkedQueue<>(); private final PutObjectRequest putObjectRequest; private final CompletableFuture returnFuture; - private final Map completedParts; + private final AtomicReferenceArray completedParts; private final Map existingParts; private final PublisherListener progressListener; private Subscription subscription; private volatile boolean isDone; private volatile boolean isPaused; + /** + * Indicates whether CompleteMultipart has been initiated or not. + */ + private final AtomicBoolean completedMultipartInitiated = new AtomicBoolean(false); private volatile CompletableFuture completeMpuFuture; KnownContentLengthAsyncRequestBodySubscriber(MpuRequestContext mpuRequestContext, @@ -75,9 +81,9 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber< this.putObjectRequest = mpuRequestContext.request().left(); this.returnFuture = returnFuture; this.uploadId = mpuRequestContext.uploadId(); - this.existingParts = mpuRequestContext.existingParts(); + this.existingParts = mpuRequestContext.existingParts() == null ? new HashMap<>() : mpuRequestContext.existingParts(); this.numExistingParts = NumericUtils.saturatedCast(mpuRequestContext.numPartsCompleted()); - this.completedParts = new ConcurrentHashMap<>(); + this.completedParts = new AtomicReferenceArray<>(partCount); this.multipartUploadHelper = multipartUploadHelper; this.progressListener = putObjectRequest.overrideConfiguration().map(c -> c.executionAttributes() .getAttribute(JAVA_PROGRESS_LISTENER)) @@ -154,8 +160,8 @@ public void onNext(AsyncRequestBody asyncRequestBody) { partNumber.getAndIncrement(), uploadId); - Consumer completedPartConsumer = - completedPart -> completedParts.put(completedPart.partNumber(), completedPart); + Consumer completedPartConsumer = completedPart -> completedParts.set(completedPart.partNumber() - 1, + completedPart); multipartUploadHelper.sendIndividualUploadPartRequest(uploadId, completedPartConsumer, futures, Pair.of(uploadRequest, asyncRequestBody), progressListener) .whenComplete((r, t) -> { @@ -193,15 +199,16 @@ public void onComplete() { } private void completeMultipartUploadIfFinished(int requestsInFlight) { - if (isDone && requestsInFlight == 0) { + if (isDone && requestsInFlight == 0 && completedMultipartInitiated.compareAndSet(false, true)) { CompletedPart[] parts; if (existingParts.isEmpty()) { - parts = completedParts.values().toArray(new CompletedPart[0]); - } else if (!completedParts.isEmpty()) { + parts = + IntStream.range(0, completedParts.length()) + .mapToObj(completedParts::get) + .toArray(CompletedPart[]::new); + } else { // List of CompletedParts needs to be in ascending order parts = mergeCompletedParts(); - } else { - parts = existingParts.values().toArray(new CompletedPart[0]); } completeMpuFuture = multipartUploadHelper.completeMultipartUpload(returnFuture, uploadId, parts, putObjectRequest); } @@ -212,7 +219,7 @@ private CompletedPart[] mergeCompletedParts() { int currPart = 1; while (currPart < partCount + 1) { CompletedPart completedPart = existingParts.containsKey(currPart) ? existingParts.get(currPart) : - completedParts.get(currPart); + completedParts.get(currPart - 1); merged[currPart - 1] = completedPart; currPart++; } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MpuRequestContext.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MpuRequestContext.java index 6c4b978e4183..b9d47f6b6c71 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MpuRequestContext.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MpuRequestContext.java @@ -15,7 +15,6 @@ package software.amazon.awssdk.services.s3.internal.multipart; -import java.util.Collections; import java.util.Map; import java.util.Objects; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -94,7 +93,7 @@ public String uploadId() { } public Map existingParts() { - return existingParts != null ? Collections.unmodifiableMap(existingParts) : null; + return existingParts; } public static final class Builder { diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java index 05051ab25cac..cd4ebc4a88b9 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java @@ -136,7 +136,6 @@ private void uploadFromBeginning(Pair reques .contentLength(contentLength) .partSize(partSize) .uploadId(uploadId) - .existingParts(new ConcurrentHashMap<>()) .numPartsCompleted(numPartsCompleted) .build(); diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java index 180754b66388..a4816c1b568b 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java @@ -33,6 +33,7 @@ import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.multipart.S3ResumeToken; import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.utils.Pair; @@ -100,7 +101,8 @@ void pause_withUninitiatedCompleteMpuFuture_shouldReturnToken() { private S3ResumeToken configureSubscriberAndPause(int numExistingParts, CompletableFuture completeMpuFuture) { Map existingParts = existingParts(numExistingParts); - KnownContentLengthAsyncRequestBodySubscriber subscriber = subscriber(putObjectRequest, asyncRequestBody, existingParts); + KnownContentLengthAsyncRequestBodySubscriber subscriber = subscriber(putObjectRequest, asyncRequestBody, existingParts, + new CompletableFuture<>()); when(multipartUploadHelper.completeMultipartUpload(any(CompletableFuture.class), any(String.class), any(CompletedPart[].class), any(PutObjectRequest.class))) @@ -111,7 +113,8 @@ private S3ResumeToken configureSubscriberAndPause(int numExistingParts, private KnownContentLengthAsyncRequestBodySubscriber subscriber(PutObjectRequest putObjectRequest, AsyncRequestBody asyncRequestBody, - Map existingParts) { + Map existingParts, + CompletableFuture returnFuture) { MpuRequestContext mpuRequestContext = MpuRequestContext.builder() .request(Pair.of(putObjectRequest, asyncRequestBody)) @@ -122,7 +125,7 @@ private KnownContentLengthAsyncRequestBodySubscriber subscriber(PutObjectRequest .numPartsCompleted((long) existingParts.size()) .build(); - return new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, new CompletableFuture<>(), multipartUploadHelper); + return new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, returnFuture, multipartUploadHelper); } private Map existingParts(int numExistingParts) { diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java index f96ddc4ddc48..284a392086df 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java @@ -166,8 +166,9 @@ void uploadObject_contentLengthExceedThresholdAndPartSize_shouldUseMPU(AsyncRequ List actualRequests = requestArgumentCaptor.getAllValues(); List actualRequestBodies = requestBodyArgumentCaptor.getAllValues(); - assertThat(actualRequestBodies).hasSize(4); - assertThat(actualRequests).hasSize(4); + int numTotalParts = 4; + assertThat(actualRequestBodies).hasSize(numTotalParts); + assertThat(actualRequests).hasSize(numTotalParts); for (int i = 0; i < actualRequests.size(); i++) { UploadPartRequest request = actualRequests.get(i); @@ -182,6 +183,12 @@ void uploadObject_contentLengthExceedThresholdAndPartSize_shouldUseMPU(AsyncRequ assertThat(requestBody.contentLength()).hasValue(PART_SIZE); } } + + ArgumentCaptor completeMpuArgumentCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class); + verify(s3AsyncClient).completeMultipartUpload(completeMpuArgumentCaptor.capture()); + + CompleteMultipartUploadRequest actualRequest = completeMpuArgumentCaptor.getValue(); + assertThat(actualRequest.multipartUpload().parts()).isEqualTo(completedParts(numTotalParts)); } /** @@ -373,6 +380,40 @@ void uploadObject_withResumeToken_shouldInvokeListPartsAndSkipExistingParts(int assertThat(actualRequest.multipartUpload().parts()).isEqualTo(completedParts(numTotalParts)); } + @Test + void uploadObject_partsFinishedOutOfOrder_shouldSortThemInCompleteMultipart() { + int numTotalParts = 4; + PutObjectRequest putObjectRequest = putObjectRequest(MPU_CONTENT_SIZE); + + stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient); + stubSuccessfulCompleteMultipartCall(BUCKET, KEY, s3AsyncClient); + + CompletableFuture part1Future = new CompletableFuture<>(); + CompletableFuture part2Future = new CompletableFuture<>(); + CompletableFuture part3Future = new CompletableFuture<>(); + CompletableFuture part4Future = new CompletableFuture<>(); + + when(s3AsyncClient.uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class))).thenReturn(part1Future) + .thenReturn(part2Future) + .thenReturn(part3Future) + .thenReturn(part4Future); + CompletableFuture returnFuture = uploadHelper.uploadObject(putObjectRequest, + AsyncRequestBody.fromBytes(RandomStringUtils.randomAscii((int) MPU_CONTENT_SIZE).getBytes(StandardCharsets.UTF_8))); + + part4Future.complete(UploadPartResponse.builder().build()); + part2Future.complete(UploadPartResponse.builder().build()); + part3Future.complete(UploadPartResponse.builder().build()); + part1Future.complete(UploadPartResponse.builder().build()); + + returnFuture.join(); + + ArgumentCaptor completeMpuArgumentCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class); + verify(s3AsyncClient).completeMultipartUpload(completeMpuArgumentCaptor.capture()); + + CompleteMultipartUploadRequest actualRequest = completeMpuArgumentCaptor.getValue(); + assertThat(actualRequest.multipartUpload().parts()).isEqualTo(completedParts(numTotalParts)); + } + private List completedParts(int totalNumParts) { return IntStream.range(1, totalNumParts + 1).mapToObj(i -> CompletedPart.builder().partNumber(i).build()).collect(Collectors.toList()); }