Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT-based S3 client",
"contributor": "",
"description": "Fixed memory leak issue when a request was cancelled in the AWS CRT-based S3 client."
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@
<!-- Allow private field declaration before public, to have correct initialization order -->
<suppress checks="DeclarationOrder"
files=".*SdkAdvancedClientOption\.java$"/>

<!-- Ignore usage of S3MetaRequest in S3MetaRequestWrapper. !-->
<suppress checks="Regexp"
files="software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestWrapper.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,14 @@
<property name="ignoreComments" value="true"/>
</module>

<!-- Checks that we don't use S3MetaRequest -->
<module name="Regexp">
<property name="format" value="\bS3MetaRequest\b"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Don't use S3MetaRequest directly. Use software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestWrapper instead"/>
<property name="ignoreComments" value="true"/>
</module>

<!-- Checks that we don't implement AutoCloseable/Closeable -->
<module name="Regexp">
<property name="format" value="(class|interface).*(implements|extends).*[^\w](Closeable|AutoCloseable)[^\w]"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.crt.s3.S3Client;
import software.amazon.awssdk.crt.s3.S3ClientOptions;
import software.amazon.awssdk.crt.s3.S3MetaRequest;
import software.amazon.awssdk.crt.s3.S3MetaRequestOptions;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
Expand Down Expand Up @@ -133,10 +132,12 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
URI uri = asyncRequest.request().getUri();
HttpRequest httpRequest = toCrtRequest(asyncRequest);
SdkHttpExecutionAttributes httpExecutionAttributes = asyncRequest.httpExecutionAttributes();
CompletableFuture<S3MetaRequestWrapper> s3MetaRequestFuture = new CompletableFuture<>();
S3CrtResponseHandlerAdapter responseHandler =
new S3CrtResponseHandlerAdapter(executeFuture,
asyncRequest.responseHandler(),
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER));
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER),
s3MetaRequestFuture);

S3MetaRequestOptions.MetaRequestType requestType = requestType(asyncRequest);

Expand All @@ -160,16 +161,19 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
.withRequestFilePath(requestFilePath)
.withSigningConfig(signingConfig);

S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions);
S3MetaRequestPauseObservable observable =
httpExecutionAttributes.getAttribute(METAREQUEST_PAUSE_OBSERVABLE);
try {
S3MetaRequestWrapper requestWrapper = new S3MetaRequestWrapper(crtS3Client.makeMetaRequest(requestOptions));
s3MetaRequestFuture.complete(requestWrapper);

responseHandler.metaRequest(s3MetaRequest);
S3MetaRequestPauseObservable observable =
httpExecutionAttributes.getAttribute(METAREQUEST_PAUSE_OBSERVABLE);

if (observable != null) {
observable.subscribe(s3MetaRequest);
if (observable != null) {
observable.subscribe(requestWrapper);
}
} finally {
signingConfig.close();
}
closeResourceCallback(executeFuture, s3MetaRequest, responseHandler, signingConfig);

return executeFuture;
}
Expand Down Expand Up @@ -215,23 +219,6 @@ private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequ
return S3MetaRequestOptions.MetaRequestType.DEFAULT;
}

private static void closeResourceCallback(CompletableFuture<Void> executeFuture,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to S3CrtResponseHandlerAdapter because I figured it makes more sense to manage the closure of the S3MetaRequest in one place.

S3MetaRequest s3MetaRequest,
S3CrtResponseHandlerAdapter responseHandler,
AwsSigningConfig signingConfig) {
executeFuture.whenComplete((r, t) -> {
if (executeFuture.isCancelled()) {
log.debug(() -> "The request is cancelled, cancelling meta request");
responseHandler.cancelRequest();
s3MetaRequest.cancel();
signingConfig.close();
} else {
s3MetaRequest.close();
signingConfig.close();
}
});
}

private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) {
SdkHttpRequest sdkRequest = asyncRequest.request();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.s3.S3FinishedResponseContext;
import software.amazon.awssdk.crt.s3.S3MetaRequest;
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
import software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandler;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.utils.Logger;
Expand All @@ -46,20 +44,44 @@ public final class S3CrtResponseHandlerAdapter implements S3MetaRequestResponseH
private final SimplePublisher<ByteBuffer> responsePublisher = new SimplePublisher<>();

private final SdkHttpResponse.Builder initialHeadersResponse = SdkHttpResponse.builder();
private volatile S3MetaRequest metaRequest;
private final CompletableFuture<S3MetaRequestWrapper> metaRequestFuture;

private final PublisherListener<S3MetaRequestProgress> progressListener;

private volatile boolean responseHandlingInitiated;

public S3CrtResponseHandlerAdapter(CompletableFuture<Void> executeFuture,
SdkAsyncHttpResponseHandler responseHandler,
PublisherListener<S3MetaRequestProgress> progressListener) {
PublisherListener<S3MetaRequestProgress> progressListener,
CompletableFuture<S3MetaRequestWrapper> metaRequestFuture) {
this.resultFuture = executeFuture;
this.metaRequestFuture = metaRequestFuture;

resultFuture.whenComplete((r, t) -> {
S3MetaRequestWrapper s3MetaRequest = s3MetaRequest();
if (s3MetaRequest == null) {
return;
}

if (executeFuture.isCancelled()) {
s3MetaRequest.cancel();
}
s3MetaRequest.close();
});

this.responseHandler = responseHandler;
this.progressListener = progressListener == null ? new NoOpPublisherListener() : progressListener;
}

private S3MetaRequestWrapper s3MetaRequest() {
if (!metaRequestFuture.isDone()) {
return null;
}

S3MetaRequestWrapper s3MetaRequest = metaRequestFuture.join();
return s3MetaRequest;
}

@Override
public void onResponseHeaders(int statusCode, HttpHeader[] headers) {
// Note, we cannot call responseHandler.onHeaders() here because the response status code and headers may not represent
Expand All @@ -86,8 +108,7 @@ public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long ob
failResponseHandlerAndFuture(failure);
Comment thread
zoewangg marked this conversation as resolved.
return;
}

metaRequest.incrementReadWindow(bytesReceived);
s3MetaRequest().incrementReadWindow(bytesReceived);
Comment thread
zoewangg marked this conversation as resolved.
Outdated
});

// Returning 0 to disable flow control because we manually increase read window above
Expand Down Expand Up @@ -115,22 +136,10 @@ private void onSuccessfulResponseComplete() {
return;
}
this.progressListener.subscriberOnComplete();
completeFutureAndCloseRequest();
resultFuture.complete(null);
});
}

private void completeFutureAndCloseRequest() {
resultFuture.complete(null);
runAndLogError(log.logger(), "Exception thrown in S3MetaRequest#close, ignoring",
() -> metaRequest.close());
}

public void cancelRequest() {
SdkCancellationException sdkClientException =
new SdkCancellationException("request is cancelled");
failResponseHandlerAndFuture(sdkClientException);
}

private void handleError(S3FinishedResponseContext context) {
int crtCode = context.getErrorCode();
HttpHeader[] headers = context.getErrorHeaders();
Expand Down Expand Up @@ -168,27 +177,21 @@ private void onErrorResponseComplete(byte[] errorPayload) {
failResponseHandlerAndFuture(throwable);
return null;
}
completeFutureAndCloseRequest();
resultFuture.complete(null);
return null;
});
}

private void failResponseHandlerAndFuture(Throwable exception) {
resultFuture.completeExceptionally(exception);
runAndLogError(log.logger(), "Exception thrown in SdkAsyncHttpResponseHandler#onError, ignoring",
() -> responseHandler.onError(exception));
runAndLogError(log.logger(), "Exception thrown in S3MetaRequest#close, ignoring",
() -> metaRequest.close());
resultFuture.completeExceptionally(exception);
}

private static boolean isErrorResponse(int responseStatus) {
return responseStatus != 0;
}

public void metaRequest(S3MetaRequest s3MetaRequest) {
metaRequest = s3MetaRequest;
}

@Override
public void onProgress(S3MetaRequestProgress progress) {
this.progressListener.subscriberOnNext(progress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,24 @@
import java.util.function.Function;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.crt.s3.S3MetaRequest;

/**
* An observable that notifies the observer {@link S3CrtAsyncHttpClient} to pause the request.
*/
@SdkInternalApi

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should've been a protected API. I will create a separate PR to fix it.

public class S3MetaRequestPauseObservable {

private final Function<S3MetaRequest, ResumeToken> pause;
private volatile S3MetaRequest request;
private final Function<S3MetaRequestWrapper, ResumeToken> pause;
private volatile S3MetaRequestWrapper request;

public S3MetaRequestPauseObservable() {
this.pause = S3MetaRequest::pause;
this.pause = S3MetaRequestWrapper::pause;
}

/**
* Subscribe {@link S3MetaRequest} to be potentially paused later.
* Subscribe {@link S3MetaRequestWrapper} to be potentially paused later.
*/
public void subscribe(S3MetaRequest request) {
public void subscribe(S3MetaRequestWrapper request) {
this.request = request;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crt;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.crt.s3.S3MetaRequest;

/**
* A wrapper class that manages the lifecycle of the underlying {@link S3MetaRequest}. This class is needed to ensure we don't
* invoke methods on {@link S3MetaRequest} after it's closed, otherwise CRT will crash.
*/
@SdkInternalApi
public class S3MetaRequestWrapper {
private final S3MetaRequest delegate;
private volatile boolean isClosed;
private final Object lock = new Object();

public S3MetaRequestWrapper(S3MetaRequest delegate) {
this.delegate = delegate;
}

public void close() {
synchronized (lock) {
if (!isClosed) {
isClosed = true;
delegate.close();
}
}
}

public void incrementReadWindow(long windowSize) {
synchronized (lock) {
if (!isClosed) {
delegate.incrementReadWindow(windowSize);
}
}
}

public ResumeToken pause() {
synchronized (lock) {
if (!isClosed) {
return delegate.pause();
}
}
return null;
}

public void cancel() {
synchronized (lock) {
if (!isClosed) {
delegate.cancel();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@
import software.amazon.awssdk.services.s3.model.Protocol;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.http.SdkHttpUtils;

@WireMockTest(httpsEnabled = true)
public class S3ExpressCreateSessionTest extends BaseRuleSetClientTest {
private static final Logger log = Logger.loggerFor(S3ExpressCreateSessionTest.class);

private static final Function<WireMockRuntimeInfo, URI> WM_HTTP_ENDPOINT = wm -> URI.create(wm.getHttpBaseUrl());
private static final Function<WireMockRuntimeInfo, URI> WM_HTTPS_ENDPOINT = wm -> URI.create(wm.getHttpsBaseUrl());
Expand Down Expand Up @@ -329,9 +331,8 @@ private static final class CapturingInterceptor implements ExecutionInterceptor
public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
SdkHttpRequest sdkHttpRequest = context.httpRequest();
this.headers = sdkHttpRequest.headers();
System.out.printf("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath());
headers.forEach((k, strings) -> System.out.printf("%s, %s%n", k, strings));
System.out.println();
log.debug(() -> String.format("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath()));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this because I noticed it produced a lot of (unnecessary) logs.

headers.forEach((k, strings) -> log.debug(() -> String.format("%s, %s%n", k, strings)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.http.SdkHttpUtils;

@WireMockTest(httpsEnabled = true)
public class S3ExpressTest extends BaseRuleSetClientTest {

private static final Logger log = Logger.loggerFor(S3ExpressTest.class);
private static final Function<WireMockRuntimeInfo, URI> WM_HTTP_ENDPOINT = wm -> URI.create(wm.getHttpBaseUrl());
private static final Function<WireMockRuntimeInfo, URI> WM_HTTPS_ENDPOINT = wm -> URI.create(wm.getHttpsBaseUrl());
private static final AwsCredentialsProvider CREDENTIALS_PROVIDER =
Expand Down Expand Up @@ -431,9 +432,8 @@ private static final class CapturingInterceptor implements ExecutionInterceptor
public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
SdkHttpRequest sdkHttpRequest = context.httpRequest();
this.headers = sdkHttpRequest.headers();
System.out.printf("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath());
headers.forEach((k, strings) -> System.out.printf("%s, %s%n", k, strings));
System.out.println();
log.debug(() -> String.format("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath()));
headers.forEach((k, strings) -> log.debug(() -> String.format("%s, %s%n", k, strings)));
}
}
}
Loading