diff --git a/.changes/next-release/bugfix-AWSCRTbasedS3client-87a642c.json b/.changes/next-release/bugfix-AWSCRTbasedS3client-87a642c.json
new file mode 100644
index 000000000000..9a8add588223
--- /dev/null
+++ b/.changes/next-release/bugfix-AWSCRTbasedS3client-87a642c.json
@@ -0,0 +1,6 @@
+{
+ "type": "bugfix",
+ "category": "AWS CRT-based S3 client",
+ "contributor": "",
+ "description": "Fixed memory leak issue when a request was cancelled in the AWS CRT-based S3 client."
+}
diff --git a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml
index 7d0333b86eac..17c617c0d12b 100644
--- a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml
+++ b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml
@@ -53,4 +53,15 @@
+
+
+
+
+
+
+
+
diff --git a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle.xml b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle.xml
index e06660009921..532429b1cfd2 100644
--- a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle.xml
+++ b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle.xml
@@ -359,6 +359,14 @@
+
+
+
+
+
+
+
+
diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtFileUploadTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtFileUploadTest.java
index 9aecc1c05f29..2235da4b5660 100644
--- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtFileUploadTest.java
+++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtFileUploadTest.java
@@ -40,6 +40,7 @@
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.crt.s3.S3MetaRequest;
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
+import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestWrapper;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.transfer.s3.internal.model.CrtFileUpload;
import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot;
@@ -53,7 +54,7 @@ class CrtFileUploadTest {
private static final int NUM_OF_PARTS_COMPLETED = 5;
private static final long PART_SIZE_IN_BYTES = 8 * MB;
private static final String MULTIPART_UPLOAD_ID = "someId";
- private S3MetaRequest metaRequest;
+ private S3MetaRequestPauseObservable observable;
private static FileSystem fileSystem;
private static File file;
private static ResumeToken token;
@@ -77,7 +78,7 @@ public static void tearDown() throws IOException {
@BeforeEach
void setUpBeforeEachTest() {
- metaRequest = Mockito.mock(S3MetaRequest.class);
+ observable = Mockito.mock(S3MetaRequestPauseObservable.class);
}
@Test
@@ -102,17 +103,13 @@ void pause_futureCompleted_shouldReturnNormally() {
.sdkResponse(putObjectResponse)
.transferredBytes(0L)
.build());
- S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
-
UploadFileRequest request = uploadFileRequest();
CrtFileUpload fileUpload =
new CrtFileUpload(future, transferProgress, observable, request);
- observable.subscribe(metaRequest);
-
ResumableFileUpload resumableFileUpload = fileUpload.pause();
- Mockito.verify(metaRequest, Mockito.never()).pause();
+ Mockito.verify(observable, Mockito.never()).pause();
assertThat(resumableFileUpload.totalParts()).isEmpty();
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
@@ -130,10 +127,7 @@ void pauseTwice_shouldReturnTheSame() {
.transferredBytes(1000L)
.build());
UploadFileRequest request = uploadFileRequest();
-
- S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
- when(metaRequest.pause()).thenReturn(token);
- observable.subscribe(metaRequest);
+ when(observable.pause()).thenReturn(token);
CrtFileUpload fileUpload =
new CrtFileUpload(future, transferProgress, observable, request);
@@ -154,10 +148,8 @@ void pause_crtThrowException_shouldPropogate() {
.build());
UploadFileRequest request = uploadFileRequest();
- S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
CrtRuntimeException exception = new CrtRuntimeException("exception");
- when(metaRequest.pause()).thenThrow(exception);
- observable.subscribe(metaRequest);
+ when(observable.pause()).thenThrow(exception);
CrtFileUpload fileUpload =
new CrtFileUpload(future, transferProgress, observable, request);
@@ -173,17 +165,14 @@ void pause_futureNotComplete_shouldPause() {
when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder()
.transferredBytes(0L)
.build());
- S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
- when(metaRequest.pause()).thenReturn(token);
+ when(observable.pause()).thenReturn(token);
UploadFileRequest request = uploadFileRequest();
CrtFileUpload fileUpload =
new CrtFileUpload(future, transferProgress, observable, request);
- observable.subscribe(metaRequest);
-
ResumableFileUpload resumableFileUpload = fileUpload.pause();
- Mockito.verify(metaRequest).pause();
+ Mockito.verify(observable).pause();
assertThat(resumableFileUpload.totalParts()).hasValue(TOTAL_PARTS);
assertThat(resumableFileUpload.partSizeInBytes()).hasValue(PART_SIZE_IN_BYTES);
assertThat(resumableFileUpload.multipartUploadId()).hasValue(MULTIPART_UPLOAD_ID);
@@ -204,17 +193,14 @@ void pause_singlePart_shouldPause() {
.sdkResponse(putObjectResponse)
.transferredBytes(0L)
.build());
- S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
- when(metaRequest.pause()).thenThrow(new CrtRuntimeException(6));
+ when(observable.pause()).thenThrow(new CrtRuntimeException(6));
UploadFileRequest request = uploadFileRequest();
CrtFileUpload fileUpload =
new CrtFileUpload(future, transferProgress, observable, request);
- observable.subscribe(metaRequest);
-
ResumableFileUpload resumableFileUpload = fileUpload.pause();
- Mockito.verify(metaRequest).pause();
+ Mockito.verify(observable).pause();
assertThat(resumableFileUpload.totalParts()).isEmpty();
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java
index 580cced1808a..10aa2fd68e2e 100644
--- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java
+++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java
@@ -160,7 +160,9 @@ void listeners_reports_ErrorsWhenCancelled(WireMockRuntimeInfo wm) throws Interr
assertThat(transferListener.getExceptionCaught()).isInstanceOf(CancellationException.class);
assertThat(transferListener.isTransferComplete()).isFalse();
assertThat(transferListener.isTransferInitiated()).isTrue();
- assertMockOnFailure(transferListenerMock);
+ Mockito.verify(transferListenerMock, times(1)).transferFailed(ArgumentMatchers.any());
+ Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any());
+ Mockito.verify(transferListenerMock, times(0)).transferComplete(ArgumentMatchers.any());
}
diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java
index 14a2eb9a1af0..6dbaa550628f 100644
--- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java
+++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java
@@ -45,7 +45,6 @@
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.crt.s3.S3Client;
import software.amazon.awssdk.crt.s3.S3ClientOptions;
-import software.amazon.awssdk.crt.s3.S3MetaRequest;
import software.amazon.awssdk.crt.s3.S3MetaRequestOptions;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
@@ -54,7 +53,6 @@
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.utils.AttributeMap;
-import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.NumericUtils;
import software.amazon.awssdk.utils.http.SdkHttpUtils;
@@ -64,7 +62,6 @@
*/
@SdkInternalApi
public final class S3CrtAsyncHttpClient implements SdkAsyncHttpClient {
- private static final Logger log = Logger.loggerFor(S3CrtAsyncHttpClient.class);
private final S3Client crtS3Client;
@@ -133,10 +130,12 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) {
URI uri = asyncRequest.request().getUri();
HttpRequest httpRequest = toCrtRequest(asyncRequest);
SdkHttpExecutionAttributes httpExecutionAttributes = asyncRequest.httpExecutionAttributes();
+ CompletableFuture s3MetaRequestFuture = new CompletableFuture<>();
S3CrtResponseHandlerAdapter responseHandler =
new S3CrtResponseHandlerAdapter(executeFuture,
asyncRequest.responseHandler(),
- httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER));
+ httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER),
+ s3MetaRequestFuture);
S3MetaRequestOptions.MetaRequestType requestType = requestType(asyncRequest);
@@ -160,16 +159,19 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) {
.withRequestFilePath(requestFilePath)
.withSigningConfig(signingConfig);
- S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions);
- S3MetaRequestPauseObservable observable =
- httpExecutionAttributes.getAttribute(METAREQUEST_PAUSE_OBSERVABLE);
+ try {
+ S3MetaRequestWrapper requestWrapper = new S3MetaRequestWrapper(crtS3Client.makeMetaRequest(requestOptions));
+ s3MetaRequestFuture.complete(requestWrapper);
- responseHandler.metaRequest(s3MetaRequest);
+ S3MetaRequestPauseObservable observable =
+ httpExecutionAttributes.getAttribute(METAREQUEST_PAUSE_OBSERVABLE);
- if (observable != null) {
- observable.subscribe(s3MetaRequest);
+ if (observable != null) {
+ observable.subscribe(requestWrapper);
+ }
+ } finally {
+ signingConfig.close();
}
- closeResourceCallback(executeFuture, s3MetaRequest, responseHandler, signingConfig);
return executeFuture;
}
@@ -215,23 +217,6 @@ private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequ
return S3MetaRequestOptions.MetaRequestType.DEFAULT;
}
- private static void closeResourceCallback(CompletableFuture executeFuture,
- S3MetaRequest s3MetaRequest,
- S3CrtResponseHandlerAdapter responseHandler,
- AwsSigningConfig signingConfig) {
- executeFuture.whenComplete((r, t) -> {
- if (executeFuture.isCancelled()) {
- log.debug(() -> "The request is cancelled, cancelling meta request");
- responseHandler.cancelRequest();
- s3MetaRequest.cancel();
- signingConfig.close();
- } else {
- s3MetaRequest.close();
- signingConfig.close();
- }
- });
- }
-
private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) {
SdkHttpRequest sdkRequest = asyncRequest.request();
diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java
index de04329326a5..c4fa2519527b 100644
--- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java
+++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java
@@ -25,10 +25,8 @@
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.s3.S3FinishedResponseContext;
-import software.amazon.awssdk.crt.s3.S3MetaRequest;
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
import software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandler;
-import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.utils.Logger;
@@ -46,7 +44,7 @@ public final class S3CrtResponseHandlerAdapter implements S3MetaRequestResponseH
private final SimplePublisher responsePublisher = new SimplePublisher<>();
private final SdkHttpResponse.Builder initialHeadersResponse = SdkHttpResponse.builder();
- private volatile S3MetaRequest metaRequest;
+ private final CompletableFuture metaRequestFuture;
private final PublisherListener progressListener;
@@ -54,12 +52,35 @@ public final class S3CrtResponseHandlerAdapter implements S3MetaRequestResponseH
public S3CrtResponseHandlerAdapter(CompletableFuture executeFuture,
SdkAsyncHttpResponseHandler responseHandler,
- PublisherListener progressListener) {
+ PublisherListener progressListener,
+ CompletableFuture metaRequestFuture) {
this.resultFuture = executeFuture;
+ this.metaRequestFuture = metaRequestFuture;
+
+ resultFuture.whenComplete((r, t) -> {
+ S3MetaRequestWrapper s3MetaRequest = s3MetaRequest();
+ if (s3MetaRequest == null) {
+ return;
+ }
+
+ if (t != null) {
+ s3MetaRequest.cancel();
+ }
+ s3MetaRequest.close();
+ });
+
this.responseHandler = responseHandler;
this.progressListener = progressListener == null ? new NoOpPublisherListener() : progressListener;
}
+ private S3MetaRequestWrapper s3MetaRequest() {
+ if (!metaRequestFuture.isDone()) {
+ return null;
+ }
+
+ return metaRequestFuture.join();
+ }
+
@Override
public void onResponseHeaders(int statusCode, HttpHeader[] headers) {
// Note, we cannot call responseHandler.onHeaders() here because the response status code and headers may not represent
@@ -87,6 +108,13 @@ public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long ob
return;
}
+ S3MetaRequestWrapper metaRequest = s3MetaRequest();
+ if (metaRequest == null) {
+ // should not happen
+ failResponseHandlerAndFuture(SdkClientException.create("Unexpected exception occurred: s3metaRequest is not "
+ + "initialized yet"));
+ return;
+ }
metaRequest.incrementReadWindow(bytesReceived);
});
@@ -115,22 +143,10 @@ private void onSuccessfulResponseComplete() {
return;
}
this.progressListener.subscriberOnComplete();
- completeFutureAndCloseRequest();
+ resultFuture.complete(null);
});
}
- private void completeFutureAndCloseRequest() {
- resultFuture.complete(null);
- runAndLogError(log.logger(), "Exception thrown in S3MetaRequest#close, ignoring",
- () -> metaRequest.close());
- }
-
- public void cancelRequest() {
- SdkCancellationException sdkClientException =
- new SdkCancellationException("request is cancelled");
- failResponseHandlerAndFuture(sdkClientException);
- }
-
private void handleError(S3FinishedResponseContext context) {
int crtCode = context.getErrorCode();
HttpHeader[] headers = context.getErrorHeaders();
@@ -168,27 +184,21 @@ private void onErrorResponseComplete(byte[] errorPayload) {
failResponseHandlerAndFuture(throwable);
return null;
}
- completeFutureAndCloseRequest();
+ resultFuture.complete(null);
return null;
});
}
private void failResponseHandlerAndFuture(Throwable exception) {
- resultFuture.completeExceptionally(exception);
runAndLogError(log.logger(), "Exception thrown in SdkAsyncHttpResponseHandler#onError, ignoring",
() -> responseHandler.onError(exception));
- runAndLogError(log.logger(), "Exception thrown in S3MetaRequest#close, ignoring",
- () -> metaRequest.close());
+ resultFuture.completeExceptionally(exception);
}
private static boolean isErrorResponse(int responseStatus) {
return responseStatus != 0;
}
- public void metaRequest(S3MetaRequest s3MetaRequest) {
- metaRequest = s3MetaRequest;
- }
-
@Override
public void onProgress(S3MetaRequestProgress progress) {
this.progressListener.subscriberOnNext(progress);
diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestPauseObservable.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestPauseObservable.java
index ce7b78fdd538..5ddb41219d39 100644
--- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestPauseObservable.java
+++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestPauseObservable.java
@@ -18,7 +18,6 @@
import java.util.function.Function;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.s3.ResumeToken;
-import software.amazon.awssdk.crt.s3.S3MetaRequest;
/**
* An observable that notifies the observer {@link S3CrtAsyncHttpClient} to pause the request.
@@ -26,17 +25,17 @@
@SdkInternalApi
public class S3MetaRequestPauseObservable {
- private final Function pause;
- private volatile S3MetaRequest request;
+ private final Function pause;
+ private volatile S3MetaRequestWrapper request;
public S3MetaRequestPauseObservable() {
- this.pause = S3MetaRequest::pause;
+ this.pause = S3MetaRequestWrapper::pause;
}
/**
- * Subscribe {@link S3MetaRequest} to be potentially paused later.
+ * Subscribe {@link S3MetaRequestWrapper} to be potentially paused later.
*/
- public void subscribe(S3MetaRequest request) {
+ public void subscribe(S3MetaRequestWrapper request) {
this.request = request;
}
diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestWrapper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestWrapper.java
new file mode 100644
index 000000000000..72074e1b47a3
--- /dev/null
+++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestWrapper.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.services.s3.internal.crt;
+
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.crt.s3.ResumeToken;
+import software.amazon.awssdk.crt.s3.S3MetaRequest;
+
+/**
+ * A wrapper class that manages the lifecycle of the underlying {@link S3MetaRequest}. This class is needed to ensure we don't
+ * invoke methods on {@link S3MetaRequest} after it's closed, otherwise CRT will crash.
+ */
+@SdkInternalApi
+public class S3MetaRequestWrapper {
+ private final S3MetaRequest delegate;
+ private volatile boolean isClosed;
+ private final Object lock = new Object();
+
+ public S3MetaRequestWrapper(S3MetaRequest delegate) {
+ this.delegate = delegate;
+ }
+
+ public void close() {
+ synchronized (lock) {
+ if (!isClosed) {
+ isClosed = true;
+ delegate.close();
+ }
+ }
+ }
+
+ public void incrementReadWindow(long windowSize) {
+ synchronized (lock) {
+ if (!isClosed) {
+ delegate.incrementReadWindow(windowSize);
+ }
+ }
+ }
+
+ public ResumeToken pause() {
+ synchronized (lock) {
+ if (!isClosed) {
+ return delegate.pause();
+ }
+ }
+ return null;
+ }
+
+ public void cancel() {
+ synchronized (lock) {
+ if (!isClosed) {
+ delegate.cancel();
+ }
+ }
+ }
+}
diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/crt/CrtDownloadErrorTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/crt/CrtDownloadErrorTest.java
index df1d717d866e..8819702cb957 100644
--- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/crt/CrtDownloadErrorTest.java
+++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/crt/CrtDownloadErrorTest.java
@@ -15,65 +15,74 @@
package software.amazon.awssdk.services.s3.crt;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.head;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
-import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
+import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.Log;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.S3Exception;
+@WireMockTest
+@Timeout(10)
public class CrtDownloadErrorTest {
private static final String BUCKET = "my-bucket";
private static final String KEY = "my-key";
- private static final WireMockServer WM = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort());
private S3AsyncClient s3;
@BeforeAll
- public static void setup() {
- WM.start();
- // Execute this statement before constructing the SDK service client.
- Log.initLoggingToStdout(Log.LogLevel.Trace);
+ public static void setUpBeforeAll() {
+ System.setProperty("aws.crt.debugnative", "true");
+ Log.initLoggingToStdout(Log.LogLevel.Warn);
}
- @AfterAll
- public static void teardown() {
- WM.stop();
+ @BeforeEach
+ public void setup(WireMockRuntimeInfo wiremock) {
+ s3 = S3AsyncClient.crtBuilder()
+ .endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort()))
+ .forcePathStyle(true)
+ .region(Region.US_EAST_1)
+ .build();
+
}
@AfterEach
- public void methodTeardown() {
- if (s3 != null) {
- s3.close();
- }
- s3 = null;
+ public void tearDown() {
+ s3.close();
}
+ @AfterAll
+ public static void verifyCrtResource() {
+ CrtResource.waitForNoResources();
+ }
+
+
@Test
public void getObject_headObjectOk_getObjectThrows_operationThrows() {
- s3 = S3AsyncClient.crtBuilder()
- .endpointOverride(URI.create("http://localhost:" + WM.port()))
- .forcePathStyle(true)
- .region(Region.US_EAST_1)
- .build();
-
String path = String.format("/%s/%s", BUCKET, KEY);
- WM.stubFor(WireMock.head(WireMock.urlPathEqualTo(path))
- .willReturn(WireMock.aResponse()
- .withStatus(200)
- .withHeader("ETag", "etag")
- .withHeader("Content-Length", "5")));
+ stubFor(head(urlPathEqualTo(path))
+ .willReturn(WireMock.aResponse()
+ .withStatus(200)
+ .withHeader("ETag", "etag")
+ .withHeader("Content-Length", "5")));
String errorContent = ""
+ "\n"
@@ -82,39 +91,35 @@ public void getObject_headObjectOk_getObjectThrows_operationThrows() {
+ " request-id\n"
+ " host-id\n"
+ "";
- WM.stubFor(WireMock.get(WireMock.urlPathEqualTo(path))
- .willReturn(WireMock.aResponse()
- .withStatus(403)
- .withBody(errorContent)));
+ stubFor(get(urlPathEqualTo(path))
+ .willReturn(WireMock.aResponse()
+ .withStatus(403)
+ .withBody(errorContent)));
assertThatThrownBy(s3.getObject(r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes())::join)
.hasCauseInstanceOf(S3Exception.class)
.hasMessageContaining("User does not have permission")
.hasMessageContaining("Status Code: 403");
+
+
}
@Test
public void getObject_headObjectOk_getObjectOk_operationSucceeds() {
- s3 = S3AsyncClient.crtBuilder()
- .endpointOverride(URI.create("http://localhost:" + WM.port()))
- .forcePathStyle(true)
- .region(Region.US_EAST_1)
- .build();
-
String path = String.format("/%s/%s", BUCKET, KEY);
byte[] content = "hello".getBytes(StandardCharsets.UTF_8);
- WM.stubFor(WireMock.head(WireMock.urlPathEqualTo(path))
- .willReturn(WireMock.aResponse()
- .withStatus(200)
- .withHeader("ETag", "etag")
- .withHeader("Content-Length", Integer.toString(content.length))));
- WM.stubFor(WireMock.get(WireMock.urlPathEqualTo(path))
- .willReturn(WireMock.aResponse()
- .withStatus(200)
- .withHeader("Content-Type", "text/plain")
- .withBody(content)));
+ stubFor(head(urlPathEqualTo(path))
+ .willReturn(WireMock.aResponse()
+ .withStatus(200)
+ .withHeader("ETag", "etag")
+ .withHeader("Content-Length", Integer.toString(content.length))));
+ stubFor(get(urlPathEqualTo(path))
+ .willReturn(WireMock.aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "text/plain")
+ .withBody(content)));
String objectContent = s3.getObject(r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes())
.join()
@@ -125,18 +130,11 @@ public void getObject_headObjectOk_getObjectOk_operationSucceeds() {
@Test
public void getObject_headObjectThrows_operationThrows() {
- s3 = S3AsyncClient.crtBuilder()
- .endpointOverride(URI.create("http://localhost:" + WM.port()))
- .forcePathStyle(true)
- .region(Region.US_EAST_1)
- .build();
-
String path = String.format("/%s/%s", BUCKET, KEY);
-
- WM.stubFor(WireMock.head(WireMock.urlPathEqualTo(path))
- .willReturn(WireMock.aResponse()
- .withStatus(403)));
+ stubFor(head(urlPathEqualTo(path))
+ .willReturn(WireMock.aResponse()
+ .withStatus(403)));
assertThatThrownBy(s3.getObject(r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes())::join)
.hasCauseInstanceOf(S3Exception.class)
diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/S3ExpressCreateSessionTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/S3ExpressCreateSessionTest.java
index a2751cba65f0..815736f9d1e6 100644
--- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/S3ExpressCreateSessionTest.java
+++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/S3ExpressCreateSessionTest.java
@@ -64,10 +64,12 @@
import software.amazon.awssdk.services.s3.model.Protocol;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.AttributeMap;
+import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.http.SdkHttpUtils;
@WireMockTest(httpsEnabled = true)
public class S3ExpressCreateSessionTest extends BaseRuleSetClientTest {
+ private static final Logger log = Logger.loggerFor(S3ExpressCreateSessionTest.class);
private static final Function WM_HTTP_ENDPOINT = wm -> URI.create(wm.getHttpBaseUrl());
private static final Function WM_HTTPS_ENDPOINT = wm -> URI.create(wm.getHttpsBaseUrl());
@@ -329,9 +331,8 @@ private static final class CapturingInterceptor implements ExecutionInterceptor
public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
SdkHttpRequest sdkHttpRequest = context.httpRequest();
this.headers = sdkHttpRequest.headers();
- System.out.printf("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath());
- headers.forEach((k, strings) -> System.out.printf("%s, %s%n", k, strings));
- System.out.println();
+ log.debug(() -> String.format("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath()));
+ headers.forEach((k, strings) -> log.debug(() -> String.format("%s, %s%n", k, strings)));
}
}
}
diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/S3ExpressTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/S3ExpressTest.java
index 47bb384e6fd4..71f009165590 100644
--- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/S3ExpressTest.java
+++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/S3ExpressTest.java
@@ -70,11 +70,12 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.utils.AttributeMap;
+import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.http.SdkHttpUtils;
@WireMockTest(httpsEnabled = true)
public class S3ExpressTest extends BaseRuleSetClientTest {
-
+ private static final Logger log = Logger.loggerFor(S3ExpressTest.class);
private static final Function WM_HTTP_ENDPOINT = wm -> URI.create(wm.getHttpBaseUrl());
private static final Function WM_HTTPS_ENDPOINT = wm -> URI.create(wm.getHttpsBaseUrl());
private static final AwsCredentialsProvider CREDENTIALS_PROVIDER =
@@ -431,9 +432,8 @@ private static final class CapturingInterceptor implements ExecutionInterceptor
public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
SdkHttpRequest sdkHttpRequest = context.httpRequest();
this.headers = sdkHttpRequest.headers();
- System.out.printf("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath());
- headers.forEach((k, strings) -> System.out.printf("%s, %s%n", k, strings));
- System.out.println();
+ log.debug(() -> String.format("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath()));
+ headers.forEach((k, strings) -> log.debug(() -> String.format("%s, %s%n", k, strings)));
}
}
}
\ No newline at end of file
diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/CrtCredentialProviderAdapterTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/CrtCredentialProviderAdapterTest.java
index ecdaeec905a7..d4a8a1e5faf3 100644
--- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/CrtCredentialProviderAdapterTest.java
+++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/CrtCredentialProviderAdapterTest.java
@@ -21,6 +21,7 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
@@ -28,6 +29,7 @@
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.HttpCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.auth.credentials.Credentials;
import software.amazon.awssdk.crt.auth.credentials.CredentialsProvider;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
@@ -36,6 +38,10 @@
public class CrtCredentialProviderAdapterTest {
+ @AfterAll
+ public static void verifyCrtResource() {
+ CrtResource.waitForNoResources();
+ }
@Test
void crtCredentials_withSession_shouldConvert() {
IdentityProvider extends AwsCredentialsIdentity> awsCredentialsProvider = StaticCredentialsProvider
diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClientTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClientTest.java
index ee44bf18839f..c88cdd24cfc9 100644
--- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClientTest.java
+++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClientTest.java
@@ -19,6 +19,8 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -29,6 +31,7 @@
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
+import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.endpoints.S3ClientContextParams;
@@ -37,6 +40,11 @@
class DefaultS3CrtAsyncClientTest {
+ @AfterAll
+ public static void verifyCrtResource() {
+ CrtResource.waitForNoResources();
+ }
+
@Test
void requestSignerOverrideProvided_shouldThrowException() {
try (S3AsyncClient s3AsyncClient = S3CrtAsyncClient.builder().build()) {
@@ -96,21 +104,24 @@ void invalidConfig_shouldThrowException(long value) {
}
@Test
- void crtClient_with_crossRegionAccessEnabled_asTrue(){
- S3AsyncClient crossRegionCrtClient = S3AsyncClient.crtBuilder().crossRegionAccessEnabled(true).build();
- assertThat(crossRegionCrtClient).isInstanceOf(DefaultS3CrtAsyncClient.class);
- assertThat(((DelegatingS3AsyncClient)crossRegionCrtClient).delegate()).isInstanceOf(S3CrossRegionAsyncClient.class);
+ void crtClient_with_crossRegionAccessEnabled_asTrue() {
+ try (S3AsyncClient crossRegionCrtClient = S3AsyncClient.crtBuilder().crossRegionAccessEnabled(true).build()) {
+ assertThat(crossRegionCrtClient).isInstanceOf(DefaultS3CrtAsyncClient.class);
+ assertThat(((DelegatingS3AsyncClient)crossRegionCrtClient).delegate()).isInstanceOf(S3CrossRegionAsyncClient.class);
+ }
}
@Test
- void crtClient_with_crossRegionAccessEnabled_asFalse(){
- S3AsyncClient crossRegionDisabledCrtClient = S3AsyncClient.crtBuilder().crossRegionAccessEnabled(false).build();
- assertThat(crossRegionDisabledCrtClient).isInstanceOf(DefaultS3CrtAsyncClient.class);
- assertThat(((DelegatingS3AsyncClient)crossRegionDisabledCrtClient).delegate()).isNotInstanceOf(S3CrossRegionAsyncClient.class);
-
- S3AsyncClient defaultCrtClient = S3AsyncClient.crtBuilder().build();
- assertThat(defaultCrtClient).isInstanceOf(DefaultS3CrtAsyncClient.class);
- assertThat(((DelegatingS3AsyncClient)defaultCrtClient).delegate()).isNotInstanceOf(S3CrossRegionAsyncClient.class);
+ void crtClient_with_crossRegionAccessEnabled_asFalse() {
+ try (S3AsyncClient crossRegionDisabledCrtClient = S3AsyncClient.crtBuilder().crossRegionAccessEnabled(false).build()) {
+ assertThat(crossRegionDisabledCrtClient).isInstanceOf(DefaultS3CrtAsyncClient.class);
+ assertThat(((DelegatingS3AsyncClient) crossRegionDisabledCrtClient).delegate()).isNotInstanceOf(S3CrossRegionAsyncClient.class);
+ }
+
+ try (S3AsyncClient defaultCrtClient = S3AsyncClient.crtBuilder().build()) {
+ assertThat(defaultCrtClient).isInstanceOf(DefaultS3CrtAsyncClient.class);
+ assertThat(((DelegatingS3AsyncClient)defaultCrtClient).delegate()).isNotInstanceOf(S3CrossRegionAsyncClient.class);
+ }
}
}
diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientGetObjectResourceManagementTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientGetObjectResourceManagementTest.java
new file mode 100644
index 000000000000..fe2b1d5308bd
--- /dev/null
+++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientGetObjectResourceManagementTest.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.services.s3.internal.crt;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.head;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+
+import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
+import com.github.tomakehurst.wiremock.junit5.WireMockTest;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+import org.assertj.core.util.Files;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.crt.CrtResource;
+import software.amazon.awssdk.crt.Log;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.testutils.RandomTempFile;
+import software.amazon.awssdk.utils.IoUtils;
+
+/**
+ * Tests to make sure all CRT resources are cleaned up for get object.
+ */
+@WireMockTest
+@Timeout(10)
+public class S3CrtClientGetObjectResourceManagementTest {
+
+ private static final String BUCKET = "Example-Bucket";
+ private static final String KEY = "Example-Object";
+ private static final long PART_SIZE = 1024 * 1024 * 5L;
+ private S3AsyncClient s3AsyncClient;
+
+ @BeforeAll
+ public static void setUpBeforeAll() {
+ System.setProperty("aws.crt.debugnative", "true");
+ Log.initLoggingToStdout(Log.LogLevel.Warn);
+ }
+
+ @BeforeEach
+ public void setup(WireMockRuntimeInfo wiremock) {
+ stubGetObjectCalls();
+ s3AsyncClient = S3AsyncClient.crtBuilder()
+ .region(Region.US_EAST_1)
+ .endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort()))
+ .credentialsProvider(
+ StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret")))
+ .minimumPartSizeInBytes(PART_SIZE)
+ .maxConcurrency(2)
+ .initialReadBufferSizeInBytes(1024L)
+ .build();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ s3AsyncClient.close();
+ }
+
+ @AfterAll
+ public static void verifyCrtResource() {
+ CrtResource.waitForNoResources();
+ }
+
+ @Test
+ void toBlockingInputStream_abortStream_shouldCloseResources() throws IOException {
+ ResponseInputStream response = s3AsyncClient.getObject(
+ r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBlockingInputStream()).join();
+ response.read();
+ response.abort();
+ }
+
+ @Test
+ void toBlockingInputStream_closeStream_shouldCloseResources() throws IOException {
+ ResponseInputStream response = s3AsyncClient.getObject(
+ r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBlockingInputStream()).join();
+ response.read();
+ response.close();
+ }
+
+ @Test
+ void toFile_cancelRequest_shouldCloseResource() throws IOException {
+ CompletableFuture future = s3AsyncClient.getObject(
+ r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toFile(Files.newTemporaryFile()));
+ future.cancel(false);
+ }
+
+ @Test
+ void toFile_happyCase_shouldCloseResource() throws IOException {
+ File file = RandomTempFile.randomUncreatedFile();
+ CompletableFuture future = s3AsyncClient.getObject(
+ r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toFile(file));
+ future.join();
+ }
+
+ @Test
+ void toBlockingInputStream_happyCase_shouldCloseResource() throws IOException {
+ try (ResponseInputStream response = s3AsyncClient.getObject(
+ r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBlockingInputStream()).join()) {
+ IoUtils.drainInputStream(response);
+ }
+ }
+
+ private static void stubGetObjectCalls() {
+ int numOfParts = 3;
+ long finalPartSize = 1024 * 1024 * 4;
+ long totalContentSize = PART_SIZE * (numOfParts - 1) + finalPartSize;
+
+ stubFor(head(anyUrl()).willReturn(aResponse().withStatus(200)
+ .withHeader("content-length", String.valueOf(totalContentSize))
+ .withHeader("etag", "1234")));
+
+ for (int i = 0; i < numOfParts - 1; i++) {
+ int partNumberIndex = i + 1;
+ String contentRange = "bytes " + PART_SIZE * i + "-" + (PART_SIZE * partNumberIndex - 1) + "/" + totalContentSize;
+ String range = "bytes=" + PART_SIZE * i + "-" + (PART_SIZE * partNumberIndex - 1);
+ stubFor(get(anyUrl()).withHeader("Range", equalTo(range)).willReturn(aResponse().withStatus(200)
+ .withHeader("content-length",
+ String.valueOf(PART_SIZE))
+ .withHeader("Content-Range",
+ contentRange)
+ .withHeader("etag", "1234")
+ .withBodyFile("part" + partNumberIndex)));
+ }
+
+ // final part
+ String contentRange = "bytes " + PART_SIZE * numOfParts + "-" + (totalContentSize - 1) + "/" + totalContentSize;
+ String range = "bytes=" + PART_SIZE * (numOfParts - 1) + "-" + (totalContentSize - 1);
+ stubFor(get(anyUrl()).withHeader("Range", equalTo(range)).willReturn(aResponse().withStatus(200)
+ .withHeader("content-length", String.valueOf(finalPartSize))
+ .withHeader("Content-Range",
+ contentRange)
+ .withHeader("etag", "1234")
+ .withBodyFile("part" + (numOfParts - 1))));
+ }
+}
diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java
index 2f863bb42c18..2776358ae64e 100644
--- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java
+++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java
@@ -29,6 +29,7 @@
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import java.net.URI;
import java.util.concurrent.Executor;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -84,6 +85,10 @@ public void setup(WireMockRuntimeInfo wiremock) {
@AfterEach
public void tearDown() {
s3AsyncClient.close();
+ }
+
+ @AfterAll
+ public static void verifyCrtResource() {
CrtResource.waitForNoResources();
}
diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapterTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapterTest.java
index dbd86d3be6d8..f20df7c9aa4a 100644
--- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapterTest.java
+++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapterTest.java
@@ -53,7 +53,7 @@ public class S3CrtResponseHandlerAdapterTest {
private S3FinishedResponseContext context;
@Mock
- private S3MetaRequest s3MetaRequest;
+ private S3MetaRequestWrapper s3MetaRequest;
private CompletableFuture future;
@Before
@@ -62,8 +62,8 @@ public void setup() {
sdkResponseHandler = spy(new TestResponseHandler());
responseHandlerAdapter = new S3CrtResponseHandlerAdapter(future,
sdkResponseHandler,
- null);
- responseHandlerAdapter.metaRequest(s3MetaRequest);
+ null,
+ CompletableFuture.completedFuture(s3MetaRequest));
}
@Test
diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestWrapperTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestWrapperTest.java
new file mode 100644
index 000000000000..68d57ee2ba48
--- /dev/null
+++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestWrapperTest.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.services.s3.internal.crt;
+
+import java.util.concurrent.CompletableFuture;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import software.amazon.awssdk.crt.s3.S3MetaRequest;
+
+@ExtendWith(MockitoExtension.class)
+public class S3MetaRequestWrapperTest {
+
+ @Mock
+ private S3MetaRequest request;
+
+ private S3MetaRequestWrapper wrapper;
+
+ @BeforeEach
+ void setUp() {
+ wrapper = new S3MetaRequestWrapper(request);
+ }
+
+ @Test
+ void close_concurrentCalls_onlyExecuteOnce() {
+ CompletableFuture.allOf(CompletableFuture.runAsync(() -> wrapper.close()),
+ CompletableFuture.runAsync(() -> wrapper.close())).join();
+ Mockito.verify(request, Mockito.times(1)).close();
+ }
+
+ @Test
+ void incrementWindow_afterClose_shouldBeNoOp() {
+ wrapper.close();
+ wrapper.incrementReadWindow(10L);
+ Mockito.verify(request, Mockito.times(1)).close();
+ Mockito.verify(request, Mockito.never()).incrementReadWindow(Mockito.anyLong());
+ }
+
+ @Test
+ void pause_afterClose_shouldBeNoOp() {
+ wrapper.close();
+ wrapper.pause();
+ Mockito.verify(request, Mockito.times(1)).close();
+ Mockito.verify(request, Mockito.never()).pause();
+ }
+
+ @Test
+ void cancel_afterClose_shouldBeNoOp() {
+ wrapper.close();
+ wrapper.cancel();
+ Mockito.verify(request, Mockito.times(1)).close();
+ Mockito.verify(request, Mockito.never()).cancel();
+ }
+}
diff --git a/services/s3/src/test/resources/__files/part1 b/services/s3/src/test/resources/__files/part1
new file mode 100644
index 000000000000..93cadc986855
Binary files /dev/null and b/services/s3/src/test/resources/__files/part1 differ
diff --git a/services/s3/src/test/resources/__files/part2 b/services/s3/src/test/resources/__files/part2
new file mode 100644
index 000000000000..ebd8722df565
Binary files /dev/null and b/services/s3/src/test/resources/__files/part2 differ
diff --git a/services/s3/src/test/resources/__files/part3 b/services/s3/src/test/resources/__files/part3
new file mode 100644
index 000000000000..065033159e6b
Binary files /dev/null and b/services/s3/src/test/resources/__files/part3 differ