Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
import software.amazon.awssdk.http.async.AbortableInputStreamSubscriber;

/**
* A {@link AsyncResponseTransformer} that allows performing blocking reads on the response data.
Expand Down Expand Up @@ -50,7 +50,7 @@ public void onResponse(ResponseT response) {

@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
InputStreamSubscriber inputStreamSubscriber = new InputStreamSubscriber();
AbortableInputStreamSubscriber inputStreamSubscriber = AbortableInputStreamSubscriber.builder().build();
publisher.subscribe(inputStreamSubscriber);
future.complete(new ResponseInputStream<>(response, inputStreamSubscriber));
}
Expand Down
10 changes: 10 additions & 0 deletions http-client-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@
<artifactId>byte-buddy</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,43 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt.internal.response;
package software.amazon.awssdk.http.async;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.http.Abortable;
import software.amazon.awssdk.utils.FunctionalUtils;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;

/**
* Wrapper of {@link InputStreamSubscriber} that also implements {@link Abortable} and closes the underlying connections when
* {@link #close()} or {@link #abort()} is invoked.
* Wrapper of {@link InputStreamSubscriber} that also implements {@link Abortable}. It will invoke {@link #close()}
* when {@link #abort()} is invoked. Upon closing, the underlying {@link InputStreamSubscriber} will be closed, and additional
* action can be added via {@link Builder#doAfterClose(Runnable)}.
*
*/
@SdkInternalApi
@SdkProtectedApi
public final class AbortableInputStreamSubscriber extends InputStream implements Subscriber<ByteBuffer>, Abortable {

private final InputStreamSubscriber delegate;
private final Runnable closeConnection;

public AbortableInputStreamSubscriber(Runnable onClose, InputStreamSubscriber inputStreamSubscriber) {
this.delegate = inputStreamSubscriber;
this.closeConnection = onClose;
private final Runnable doAfterClose;

private AbortableInputStreamSubscriber(Builder builder) {
this(builder, new InputStreamSubscriber());
}

@SdkTestInternalApi
AbortableInputStreamSubscriber(Builder builder, InputStreamSubscriber delegate) {
this.delegate = delegate;
this.doAfterClose = builder.doAfterClose == null ? FunctionalUtils.noOpRunnable() : builder.doAfterClose;
}

public static Builder builder() {
return new Builder();
}

@Override
Expand Down Expand Up @@ -81,7 +94,23 @@ public void onComplete() {

@Override
public void close() {
closeConnection.run();
delegate.close();
FunctionalUtils.invokeSafely(() -> doAfterClose.run());
}

public static final class Builder {
private Runnable doAfterClose;

/**
* Additional action to run when {@link #close()} is invoked
*/
public Builder doAfterClose(Runnable doAfterClose) {
this.doAfterClose = doAfterClose;
return this;
}

public AbortableInputStreamSubscriber build() {
return new AbortableInputStreamSubscriber(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt.internal;
package software.amazon.awssdk.http.async;

import static org.mockito.Mockito.verify;

Expand All @@ -22,7 +22,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.http.crt.internal.response.AbortableInputStreamSubscriber;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;

@ExtendWith(MockitoExtension.class)
Expand All @@ -33,20 +32,39 @@ public class AbortableInputStreamSubscriberTest {
@Mock
private Runnable onClose;

@Mock
private InputStreamSubscriber inputStreamSubscriber;

@BeforeEach
void setUp() {
abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(onClose, new InputStreamSubscriber());
abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(AbortableInputStreamSubscriber.builder()
.doAfterClose(onClose),
inputStreamSubscriber);


}

@Test
void close_shouldInvokeOnClose() {
void close_closeConfigured_shouldInvokeOnClose() {
abortableInputStreamSubscriber.close();
verify(inputStreamSubscriber).close();
verify(onClose).run();
}

@Test
void abort_shouldInvokeOnClose() {
abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(AbortableInputStreamSubscriber.builder()
.doAfterClose(onClose),
inputStreamSubscriber);
abortableInputStreamSubscriber.abort();
verify(onClose).run();
}

@Test
void close_closeNotConfigured_shouldCloseDelegate() {
abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(AbortableInputStreamSubscriber.builder(),
inputStreamSubscriber);
abortableInputStreamSubscriber.close();
verify(inputStreamSubscriber).close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.AbortableInputStreamSubscriber;
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
import software.amazon.awssdk.utils.async.SimplePublisher;

/**
Expand Down Expand Up @@ -87,8 +87,10 @@ public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blo
@Override
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
if (inputStreamSubscriber == null) {
inputStreamSubscriber = new AbortableInputStreamSubscriber(() -> responseHandlerHelper.closeConnection(stream),
new InputStreamSubscriber());
inputStreamSubscriber =
AbortableInputStreamSubscriber.builder()
.doAfterClose(() -> responseHandlerHelper.closeConnection(stream))
.build();
simplePublisher.subscribe(inputStreamSubscriber);
// For response with a payload, we need to complete the future here to allow downstream to retrieve the data from
// the stream directly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ private void onCancel() {
try {
SdkCancellationException e = new SdkCancellationException(
"Subscriber cancelled before all events were published");
log.warn(channelContext.channel(), () -> "Subscriber cancelled before all events were published");
log.debug(channelContext.channel(), () -> "Subscriber cancelled before all events were published");
executeFuture.completeExceptionally(e);
} finally {
runAndLogError(channelContext.channel(), () -> "Could not release channel back to the pool",
Expand Down
17 changes: 17 additions & 0 deletions test/codegen-generated-classes-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
Expand Down Expand Up @@ -305,6 +311,17 @@
<skip>true</skip>
</configuration>
</plugin>
<!-- Disable dependency check for this test module to speed up the build. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>${maven-dependency-plugin.version}</version>
<configuration>
<failOnWarning>false</failOnWarning>
<ignoreNonCompile>false</ignoreNonCompile>
</configuration>
</plugin>

</plugins>
</build>

Expand Down
Loading