diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java index 7a1738f51d9..cad4236d241 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -30,8 +31,10 @@ import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody; import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody; +import software.amazon.awssdk.core.internal.async.SplittingPublisher; import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.utils.BinaryUtils; +import software.amazon.awssdk.utils.Validate; /** * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where @@ -246,4 +249,42 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content static AsyncRequestBody empty() { return fromBytes(new byte[0]); } + + + /** + * Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific + * portion of the original data, based on the configured {code chunkSizeInBytes}. + * + *

+ * If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the + * subscriber right after it's initialized. + *

+ * // TODO: API Surface Area review: should we make this behavior configurable? + * If content length is null, it is sent after the entire content for that chunk is buffered. + * In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}. + * + * @param chunkSizeInBytes the size for each divided chunk. The last chunk may be smaller than the configured size. + * @param maxMemoryUsageInBytes the max memory the SDK will use to buffer the content + * @return SplitAsyncRequestBodyResult + */ + default SplitAsyncRequestBodyResponse split(long chunkSizeInBytes, long maxMemoryUsageInBytes) { + Validate.isPositive(chunkSizeInBytes, "chunkSizeInBytes"); + Validate.isPositive(maxMemoryUsageInBytes, "maxMemoryUsageInBytes"); + + if (!this.contentLength().isPresent()) { + Validate.isTrue(maxMemoryUsageInBytes >= chunkSizeInBytes, + "maxMemoryUsageInBytes must be larger than or equal to " + + "chunkSizeInBytes if the content length is unknown"); + } + + CompletableFuture future = new CompletableFuture<>(); + SplittingPublisher splittingPublisher = SplittingPublisher.builder() + .asyncRequestBody(this) + .chunkSizeInBytes(chunkSizeInBytes) + .maxMemoryUsageInBytes(maxMemoryUsageInBytes) + .resultFuture(future) + .build(); + + return SplitAsyncRequestBodyResponse.create(splittingPublisher, future); + } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/SplitAsyncRequestBodyResponse.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/SplitAsyncRequestBodyResponse.java new file mode 100644 index 00000000000..0035c87520e --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/SplitAsyncRequestBodyResponse.java @@ -0,0 +1,80 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async; + + +import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.utils.Validate; + +/** + * Containing the result from {@link AsyncRequestBody#split(long, long)} + */ +@SdkPublicApi +public final class SplitAsyncRequestBodyResponse { + private final SdkPublisher asyncRequestBody; + private final CompletableFuture future; + + private SplitAsyncRequestBodyResponse(SdkPublisher asyncRequestBody, CompletableFuture future) { + this.asyncRequestBody = Validate.paramNotNull(asyncRequestBody, "asyncRequestBody"); + this.future = Validate.paramNotNull(future, "future"); + } + + public static SplitAsyncRequestBodyResponse create(SdkPublisher asyncRequestBody, + CompletableFuture future) { + return new SplitAsyncRequestBodyResponse(asyncRequestBody, future); + } + + /** + * Returns the converted {@link SdkPublisher} of {@link AsyncRequestBody}s. Each {@link AsyncRequestBody} publishes a specific + * portion of the original data. + */ + public SdkPublisher asyncRequestBodyPublisher() { + return asyncRequestBody; + } + + /** + * Returns {@link CompletableFuture} that will be notified when all data has been consumed or if an error occurs. + */ + public CompletableFuture future() { + return future; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SplitAsyncRequestBodyResponse that = (SplitAsyncRequestBodyResponse) o; + + if (!asyncRequestBody.equals(that.asyncRequestBody)) { + return false; + } + return future.equals(that.future); + } + + @Override + public int hashCode() { + int result = asyncRequestBody.hashCode(); + result = 31 * result + future.hashCode(); + return result; + } +} + diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index 8152e13980a..99cf1e7c338 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -31,13 +31,11 @@ import software.amazon.awssdk.utils.async.SimplePublisher; /** - * Splits an {@link SdkPublisher} to multiple smaller {@link AsyncRequestBody}s, each of which publishes a specific portion of the - * original data. + * Splits an {@link AsyncRequestBody} to multiple smaller {@link AsyncRequestBody}s, each of which publishes a specific portion of + * the original data. * *

If content length is known, each {@link AsyncRequestBody} is sent to the subscriber right after it's initialized. * Otherwise, it is sent after the entire content for that chunk is buffered. This is required to get content length. - * - * // TODO: create a default method in AsyncRequestBody for this */ @SdkInternalApi public class SplittingPublisher implements SdkPublisher { @@ -51,9 +49,9 @@ public class SplittingPublisher implements SdkPublisher { private SplittingPublisher(Builder builder) { this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody"); - this.chunkSizeInBytes = Validate.paramNotNull(builder.chunkSizeInBytes, "chunkSizeInBytes"); + this.chunkSizeInBytes = Validate.isPositive(builder.chunkSizeInBytes, "chunkSizeInBytes"); this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null)); - this.maxMemoryUsageInBytes = builder.maxMemoryUsageInBytes == null ? Long.MAX_VALUE : builder.maxMemoryUsageInBytes; + this.maxMemoryUsageInBytes = Validate.isPositive(builder.maxMemoryUsageInBytes, "maxMemoryUsageInBytes"); this.future = builder.future; // We need to cancel upstream subscription if the future gets cancelled. @@ -304,13 +302,13 @@ public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) { * @param chunkSizeInBytes The new chunkSizeInBytes value. * @return This object for method chaining. */ - public Builder chunkSizeInBytes(Long chunkSizeInBytes) { + public Builder chunkSizeInBytes(long chunkSizeInBytes) { this.chunkSizeInBytes = chunkSizeInBytes; return this; } /** - * Sets the maximum memory usage in bytes. By default, it uses unlimited memory. + * Sets the maximum memory usage in bytes. * * @param maxMemoryUsageInBytes The new maxMemoryUsageInBytes value. * @return This object for method chaining. @@ -319,7 +317,7 @@ public Builder chunkSizeInBytes(Long chunkSizeInBytes) { // on a new byte buffer. But we don't know for sure what the size of a buffer we request will be (we do use the size // for the last byte buffer as a hint), so I don't think we can have a truly accurate max. Maybe we call it minimum // buffer size instead? - public Builder maxMemoryUsageInBytes(Long maxMemoryUsageInBytes) { + public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) { this.maxMemoryUsageInBytes = maxMemoryUsageInBytes; return this; } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java index e0252c9ba6d..2dd4cb029ea 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java @@ -15,31 +15,23 @@ package software.amazon.awssdk.core.async; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.google.common.jimfs.Configuration; import com.google.common.jimfs.Jimfs; import io.reactivex.Flowable; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Instant; -import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import org.assertj.core.util.Lists; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.reactivestreams.Publisher; @@ -47,7 +39,6 @@ import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.http.async.SimpleSubscriber; import software.amazon.awssdk.utils.BinaryUtils; -import software.amazon.awssdk.utils.StringInputStream; @RunWith(Parameterized.class) public class AsyncRequestBodyTest { @@ -177,4 +168,25 @@ public void fromBytes_byteArrayNotNull_createsCopy() { ByteBuffer publishedBb = Flowable.fromPublisher(body).toList().blockingGet().get(0); assertThat(BinaryUtils.copyAllBytesFrom(publishedBb)).isEqualTo(original); } + + @Test + public void split_nonPositiveInput_shouldThrowException() { + AsyncRequestBody body = AsyncRequestBody.fromString("test"); + assertThatThrownBy(() -> body.split(0, 4)).hasMessageContaining("must be positive"); + assertThatThrownBy(() -> body.split(-1, 4)).hasMessageContaining("must be positive"); + assertThatThrownBy(() -> body.split(5, 0)).hasMessageContaining("must be positive"); + assertThatThrownBy(() -> body.split(5, -1)).hasMessageContaining("must be positive"); + } + + @Test + public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() { + AsyncRequestBody body = AsyncRequestBody.fromPublisher(new Publisher() { + @Override + public void subscribe(Subscriber s) { + + } + }); + assertThatThrownBy(() -> body.split(10, 4)) + .hasMessageContaining("must be larger than or equal"); + } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SplitAsyncRequestBodyResponseTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SplitAsyncRequestBodyResponseTest.java new file mode 100644 index 00000000000..2d1e50bcd59 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SplitAsyncRequestBodyResponseTest.java @@ -0,0 +1,29 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.jupiter.api.Test; + +public class SplitAsyncRequestBodyResponseTest { + + @Test + void equalsHashcode() { + EqualsVerifier.forClass(SplitAsyncRequestBodyResponse.class) + .withNonnullFields("asyncRequestBody", "future") + .verify(); + } +} diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java index 0dd017d0fd1..a3aea4a9bdf 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java @@ -25,7 +25,7 @@ import java.util.function.Function; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.internal.async.SplittingPublisher; +import software.amazon.awssdk.core.async.SplitAsyncRequestBodyResponse; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; @@ -169,26 +169,26 @@ private CompletableFuture sendUploadPartRequests(MpuRequestContext mpuRequ CompletableFuture returnFuture, Collection> futures) { - CompletableFuture splittingPublisherFuture = new CompletableFuture<>(); + AsyncRequestBody asyncRequestBody = mpuRequestContext.request.right(); - SplittingPublisher splittingPublisher = SplittingPublisher.builder() - .asyncRequestBody(asyncRequestBody) - .chunkSizeInBytes(mpuRequestContext.partSize) - .maxMemoryUsageInBytes(maxMemoryUsageInBytes) - .resultFuture(splittingPublisherFuture) - .build(); - - splittingPublisher.map(new BodyToRequestConverter(mpuRequestContext.request.left(), mpuRequestContext.uploadId)) - .subscribe(pair -> sendIndividualUploadPartRequest(mpuRequestContext.uploadId, - completedParts, - futures, - pair, - splittingPublisherFuture)) - .exceptionally(throwable -> { - returnFuture.completeExceptionally(throwable); - return null; - }); + + SplitAsyncRequestBodyResponse result = asyncRequestBody.split(mpuRequestContext.partSize, maxMemoryUsageInBytes); + + CompletableFuture splittingPublisherFuture = result.future(); + + result.asyncRequestBodyPublisher() + .map(new BodyToRequestConverter(mpuRequestContext.request.left(), + mpuRequestContext.uploadId)) + .subscribe(pair -> sendIndividualUploadPartRequest(mpuRequestContext.uploadId, + completedParts, + futures, + pair, + splittingPublisherFuture)) + .exceptionally(throwable -> { + returnFuture.completeExceptionally(throwable); + return null; + }); return splittingPublisherFuture; }