-
Notifications
You must be signed in to change notification settings - Fork 1k
Fix request cancellation issue in the AWS CRT-based S3 client that co… #4955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "type": "bugfix", | ||
| "category": "AWS CRT-based S3 client", | ||
| "contributor": "", | ||
| "description": "Fixed memory leak issue when a request was cancelled in the AWS CRT-based S3 client." | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,7 +45,6 @@ | |
| import software.amazon.awssdk.crt.s3.ResumeToken; | ||
| import software.amazon.awssdk.crt.s3.S3Client; | ||
| import software.amazon.awssdk.crt.s3.S3ClientOptions; | ||
| import software.amazon.awssdk.crt.s3.S3MetaRequest; | ||
| import software.amazon.awssdk.crt.s3.S3MetaRequestOptions; | ||
| import software.amazon.awssdk.http.Header; | ||
| import software.amazon.awssdk.http.SdkHttpExecutionAttributes; | ||
|
|
@@ -64,7 +63,6 @@ | |
| */ | ||
| @SdkInternalApi | ||
| public final class S3CrtAsyncHttpClient implements SdkAsyncHttpClient { | ||
| private static final Logger log = Logger.loggerFor(S3CrtAsyncHttpClient.class); | ||
|
|
||
| private final S3Client crtS3Client; | ||
|
|
||
|
|
@@ -133,10 +131,12 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) { | |
| URI uri = asyncRequest.request().getUri(); | ||
| HttpRequest httpRequest = toCrtRequest(asyncRequest); | ||
| SdkHttpExecutionAttributes httpExecutionAttributes = asyncRequest.httpExecutionAttributes(); | ||
| CompletableFuture<S3MetaRequestWrapper> s3MetaRequestFuture = new CompletableFuture<>(); | ||
| S3CrtResponseHandlerAdapter responseHandler = | ||
| new S3CrtResponseHandlerAdapter(executeFuture, | ||
| asyncRequest.responseHandler(), | ||
| httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER)); | ||
| httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER), | ||
| s3MetaRequestFuture); | ||
|
|
||
| S3MetaRequestOptions.MetaRequestType requestType = requestType(asyncRequest); | ||
|
|
||
|
|
@@ -160,16 +160,19 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) { | |
| .withRequestFilePath(requestFilePath) | ||
| .withSigningConfig(signingConfig); | ||
|
|
||
| S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions); | ||
| S3MetaRequestPauseObservable observable = | ||
| httpExecutionAttributes.getAttribute(METAREQUEST_PAUSE_OBSERVABLE); | ||
| try { | ||
| S3MetaRequestWrapper requestWrapper = new S3MetaRequestWrapper(crtS3Client.makeMetaRequest(requestOptions)); | ||
| s3MetaRequestFuture.complete(requestWrapper); | ||
|
|
||
| responseHandler.metaRequest(s3MetaRequest); | ||
| S3MetaRequestPauseObservable observable = | ||
| httpExecutionAttributes.getAttribute(METAREQUEST_PAUSE_OBSERVABLE); | ||
|
|
||
| if (observable != null) { | ||
| observable.subscribe(s3MetaRequest); | ||
| if (observable != null) { | ||
| observable.subscribe(requestWrapper); | ||
| } | ||
| } finally { | ||
| signingConfig.close(); | ||
| } | ||
| closeResourceCallback(executeFuture, s3MetaRequest, responseHandler, signingConfig); | ||
|
|
||
| return executeFuture; | ||
| } | ||
|
|
@@ -215,23 +218,6 @@ private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequ | |
| return S3MetaRequestOptions.MetaRequestType.DEFAULT; | ||
| } | ||
|
|
||
| private static void closeResourceCallback(CompletableFuture<Void> executeFuture, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved this to S3CrtResponseHandlerAdapter because I figured it makes more sense to manage the closure of the S3MetaRequest in one place. |
||
| S3MetaRequest s3MetaRequest, | ||
| S3CrtResponseHandlerAdapter responseHandler, | ||
| AwsSigningConfig signingConfig) { | ||
| executeFuture.whenComplete((r, t) -> { | ||
| if (executeFuture.isCancelled()) { | ||
| log.debug(() -> "The request is cancelled, cancelling meta request"); | ||
| responseHandler.cancelRequest(); | ||
| s3MetaRequest.cancel(); | ||
| signingConfig.close(); | ||
| } else { | ||
| s3MetaRequest.close(); | ||
| signingConfig.close(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) { | ||
| SdkHttpRequest sdkRequest = asyncRequest.request(); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,25 +18,24 @@ | |
| import java.util.function.Function; | ||
| import software.amazon.awssdk.annotations.SdkInternalApi; | ||
| import software.amazon.awssdk.crt.s3.ResumeToken; | ||
| import software.amazon.awssdk.crt.s3.S3MetaRequest; | ||
|
|
||
| /** | ||
| * An observable that notifies the observer {@link S3CrtAsyncHttpClient} to pause the request. | ||
| */ | ||
| @SdkInternalApi | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class should've been a protected API. I will create a separate PR to fix it. |
||
| public class S3MetaRequestPauseObservable { | ||
|
|
||
| private final Function<S3MetaRequest, ResumeToken> pause; | ||
| private volatile S3MetaRequest request; | ||
| private final Function<S3MetaRequestWrapper, ResumeToken> pause; | ||
| private volatile S3MetaRequestWrapper request; | ||
|
|
||
| public S3MetaRequestPauseObservable() { | ||
| this.pause = S3MetaRequest::pause; | ||
| this.pause = S3MetaRequestWrapper::pause; | ||
| } | ||
|
|
||
| /** | ||
| * Subscribe {@link S3MetaRequest} to be potentially paused later. | ||
| * Subscribe {@link S3MetaRequestWrapper} to be potentially paused later. | ||
| */ | ||
| public void subscribe(S3MetaRequest request) { | ||
| public void subscribe(S3MetaRequestWrapper request) { | ||
| this.request = request; | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The failure of the test actually verified this fix. Before the fix, the request would never cancelled, so we'd still get data after cancelling future. After the fix, the request would get cancelled properly, so
Mockito.verify(transferListenerMock, times(1)).bytesTransferred(ArgumentMatchers.any());may not be invoked at all.