diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/traits/RequestCompressionTrait.java b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/traits/RequestCompressionTrait.java index 122e38d730e8..5168b2e36ed9 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/traits/RequestCompressionTrait.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/traits/RequestCompressionTrait.java @@ -42,13 +42,9 @@ public static CodeBlock create(OperationModel operationModel, IntermediateModel return CodeBlock.of(""); } - // TODO : remove once request compression for streaming operations is supported - if (operationModel.isStreaming()) { - throw new IllegalStateException("Request compression for streaming operations is not yet supported in the AWS SDK " - + "for Java."); - } - - // TODO : remove once S3 checksum interceptors are moved to occur after CompressRequestStage + // TODO : remove once: + // 1) S3 checksum interceptors are moved to occur after CompressRequestStage + // 2) Transfer-Encoding:chunked is supported in S3 if (model.getMetadata().getServiceName().equals("S3")) { throw new IllegalStateException("Request compression for S3 is not yet supported in the AWS SDK for Java."); } diff --git a/core/auth/src/main/java/software/amazon/awssdk/auth/signer/internal/chunkedencoding/AwsSignedChunkedEncodingInputStream.java b/core/auth/src/main/java/software/amazon/awssdk/auth/signer/internal/chunkedencoding/AwsSignedChunkedEncodingInputStream.java index 636fad74f9fc..3174eb7c6caa 100644 --- a/core/auth/src/main/java/software/amazon/awssdk/auth/signer/internal/chunkedencoding/AwsSignedChunkedEncodingInputStream.java +++ b/core/auth/src/main/java/software/amazon/awssdk/auth/signer/internal/chunkedencoding/AwsSignedChunkedEncodingInputStream.java @@ -40,7 +40,6 @@ @SdkInternalApi public final class AwsSignedChunkedEncodingInputStream extends AwsChunkedEncodingInputStream { - private static final String CRLF = "\r\n"; private static final String CHUNK_SIGNATURE_HEADER = ";chunk-signature="; private static final String CHECKSUM_SIGNATURE_HEADER = "x-amz-trailer-signature:"; private String previousChunkSignature; diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index 859f8394ac91..1eadb88d32db 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -35,6 +35,7 @@ import software.amazon.awssdk.core.internal.http.HttpClientDependencies; import software.amazon.awssdk.core.internal.http.RequestExecutionContext; import software.amazon.awssdk.core.internal.http.pipeline.MutableRequestToRequestPipeline; +import software.amazon.awssdk.core.internal.sync.CompressionContentStreamProvider; import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.SdkHttpFullRequest; import software.amazon.awssdk.utils.IoUtils; @@ -67,10 +68,21 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ compressEntirePayload(input, compressor); updateContentEncodingHeader(input, compressor); updateContentLengthHeader(input); + return input; + } + + if (!isTransferEncodingChunked(input)) { + return input; } - // TODO : streaming - sync & async + if (context.requestProvider() == null) { + // sync streaming + input.contentStreamProvider(new CompressionContentStreamProvider(input.contentStreamProvider(), compressor)); + } + + // TODO : streaming - async + updateContentEncodingHeader(input, compressor); return input; } @@ -123,6 +135,12 @@ private void updateContentLengthHeader(SdkHttpFullRequest.Builder input) { } } + private boolean isTransferEncodingChunked(SdkHttpFullRequest.Builder input) { + return input.firstMatchingHeader("Transfer-Encoding") + .map(headerValue -> headerValue.equals("chunked")) + .orElse(false); + } + private Compressor resolveCompressorType(ExecutionAttributes executionAttributes) { List encodings = executionAttributes.getAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION).getEncodings(); diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java index f382bd5ced40..ec4870f5e686 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java @@ -22,8 +22,6 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.checksums.SdkChecksum; import software.amazon.awssdk.core.internal.chunked.AwsChunkedEncodingConfig; -import software.amazon.awssdk.core.io.SdkInputStream; -import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; /** @@ -37,37 +35,18 @@ * the wrapped stream. */ @SdkInternalApi -public abstract class AwsChunkedEncodingInputStream extends SdkInputStream { +public abstract class AwsChunkedEncodingInputStream extends AwsChunkedInputStream { - public static final int DEFAULT_CHUNK_SIZE = 128 * 1024; - protected static final int SKIP_BUFFER_SIZE = 256 * 1024; protected static final String CRLF = "\r\n"; protected static final byte[] FINAL_CHUNK = new byte[0]; protected static final String HEADER_COLON_SEPARATOR = ":"; - private static final Logger log = Logger.loggerFor(AwsChunkedEncodingInputStream.class); protected byte[] calculatedChecksum = null; protected final String checksumHeaderForTrailer; protected boolean isTrailingTerminated = true; - private InputStream is = null; private final int chunkSize; private final int maxBufferSize; private final SdkChecksum sdkChecksum; private boolean isLastTrailingCrlf; - /** - * Iterator on the current chunk. - */ - private ChunkContentIterator currentChunkIterator; - - /** - * Iterator on the buffer of the decoded stream, - * Null if the wrapped stream is marksupported, - * otherwise it will be initialized when this wrapper is marked. - */ - private DecodedStreamBuffer decodedStreamBuffer; - - private boolean isAtStart = true; - private boolean isTerminating = false; - /** * Creates a chunked encoding input stream initialized with the originating stream. The configuration allows @@ -89,10 +68,10 @@ protected AwsChunkedEncodingInputStream(InputStream in, AwsChunkedEncodingInputStream originalChunkedStream = (AwsChunkedEncodingInputStream) in; providedMaxBufferSize = Math.max(originalChunkedStream.maxBufferSize, providedMaxBufferSize); is = originalChunkedStream.is; - decodedStreamBuffer = originalChunkedStream.decodedStreamBuffer; + underlyingStreamBuffer = originalChunkedStream.underlyingStreamBuffer; } else { is = in; - decodedStreamBuffer = null; + underlyingStreamBuffer = null; } this.chunkSize = awsChunkedEncodingConfig.chunkSize(); this.maxBufferSize = providedMaxBufferSize; @@ -153,19 +132,6 @@ public T checksumHeaderForTrailer(String checksumHeaderForTrailer) { } - @Override - public int read() throws IOException { - byte[] tmp = new byte[1]; - int count = read(tmp, 0, 1); - if (count > 0) { - log.debug(() -> "One byte read from the stream."); - int unsignedByte = (int) tmp[0] & 0xFF; - return unsignedByte; - } else { - return count; - } - } - @Override public int read(byte[] b, int off, int len) throws IOException { abortIfNeeded(); @@ -211,32 +177,6 @@ private boolean setUpTrailingChunks() { return true; } - @Override - public long skip(long n) throws IOException { - if (n <= 0) { - return 0; - } - long remaining = n; - int toskip = (int) Math.min(SKIP_BUFFER_SIZE, n); - byte[] temp = new byte[toskip]; - while (remaining > 0) { - int count = read(temp, 0, toskip); - if (count < 0) { - break; - } - remaining -= count; - } - return n - remaining; - } - - /** - * @see java.io.InputStream#markSupported() - */ - @Override - public boolean markSupported() { - return true; - } - /** * The readlimit parameter is ignored. */ @@ -256,7 +196,7 @@ public void mark(int readlimit) { } else { log.debug(() -> "AwsChunkedEncodingInputStream marked at the start of the stream " + "(initializing the buffer since the wrapped stream is not mark-supported)."); - decodedStreamBuffer = new DecodedStreamBuffer(maxBufferSize); + underlyingStreamBuffer = new UnderlyingStreamBuffer(maxBufferSize); } } @@ -280,8 +220,8 @@ public void reset() throws IOException { is.reset(); } else { log.debug(() -> "AwsChunkedEncodingInputStream reset (will use the buffer of the decoded stream)."); - Validate.notNull(decodedStreamBuffer, "Cannot reset the stream because the mark is not set."); - decodedStreamBuffer.startReadBuffer(); + Validate.notNull(underlyingStreamBuffer, "Cannot reset the stream because the mark is not set."); + underlyingStreamBuffer.startReadBuffer(); } isAtStart = true; isTerminating = false; @@ -298,14 +238,14 @@ private boolean setUpNextChunk() throws IOException { int chunkSizeInBytes = 0; while (chunkSizeInBytes < chunkSize) { /** Read from the buffer of the decoded stream */ - if (null != decodedStreamBuffer && decodedStreamBuffer.hasNext()) { - chunkData[chunkSizeInBytes++] = decodedStreamBuffer.next(); + if (null != underlyingStreamBuffer && underlyingStreamBuffer.hasNext()) { + chunkData[chunkSizeInBytes++] = underlyingStreamBuffer.next(); } else { /** Read from the wrapped stream */ int bytesToRead = chunkSize - chunkSizeInBytes; int count = is.read(chunkData, chunkSizeInBytes, bytesToRead); if (count != -1) { - if (null != decodedStreamBuffer) { - decodedStreamBuffer.buffer(chunkData, chunkSizeInBytes, count); + if (null != underlyingStreamBuffer) { + underlyingStreamBuffer.buffer(chunkData, chunkSizeInBytes, count); } chunkSizeInBytes += count; } else { @@ -333,13 +273,6 @@ private boolean setUpNextChunk() throws IOException { } } - - @Override - protected InputStream getWrappedInputStream() { - return is; - } - - /** * The final chunk. * @@ -361,5 +294,4 @@ protected InputStream getWrappedInputStream() { * @return ChecksumChunkHeader in bytes based on the Header name field. */ protected abstract byte[] createChecksumChunkHeader(); - -} \ No newline at end of file +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedInputStream.java new file mode 100644 index 000000000000..11beb216f16f --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedInputStream.java @@ -0,0 +1,90 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.io; + +import java.io.IOException; +import java.io.InputStream; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.io.SdkInputStream; +import software.amazon.awssdk.utils.Logger; + +/** + * A wrapper of InputStream that implements streaming in chunks. + */ +@SdkInternalApi +public abstract class AwsChunkedInputStream extends SdkInputStream { + public static final int DEFAULT_CHUNK_SIZE = 128 * 1024; + protected static final int SKIP_BUFFER_SIZE = 256 * 1024; + protected static final Logger log = Logger.loggerFor(AwsChunkedInputStream.class); + protected InputStream is; + /** + * Iterator on the current chunk. + */ + protected ChunkContentIterator currentChunkIterator; + + /** + * Iterator on the buffer of the underlying stream, + * Null if the wrapped stream is marksupported, + * otherwise it will be initialized when this wrapper is marked. + */ + protected UnderlyingStreamBuffer underlyingStreamBuffer; + protected boolean isAtStart = true; + protected boolean isTerminating = false; + + @Override + public int read() throws IOException { + byte[] tmp = new byte[1]; + int count = read(tmp, 0, 1); + if (count > 0) { + log.debug(() -> "One byte read from the stream."); + int unsignedByte = (int) tmp[0] & 0xFF; + return unsignedByte; + } else { + return count; + } + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + return 0; + } + long remaining = n; + int toskip = (int) Math.min(SKIP_BUFFER_SIZE, n); + byte[] temp = new byte[toskip]; + while (remaining > 0) { + int count = read(temp, 0, toskip); + if (count < 0) { + break; + } + remaining -= count; + } + return n - remaining; + } + + /** + * @see InputStream#markSupported() + */ + @Override + public boolean markSupported() { + return true; + } + + @Override + protected InputStream getWrappedInputStream() { + return is; + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java new file mode 100644 index 000000000000..93642bad8c47 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java @@ -0,0 +1,170 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.io; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.internal.compression.Compressor; +import software.amazon.awssdk.utils.Validate; + +/** + * A wrapper class of InputStream that implements compression in chunks. + */ +@SdkInternalApi +public final class AwsCompressionInputStream extends AwsChunkedInputStream { + private final Compressor compressor; + + private AwsCompressionInputStream(InputStream in, Compressor compressor) { + this.compressor = compressor; + if (in instanceof AwsCompressionInputStream) { + // This could happen when the request is retried. + AwsCompressionInputStream originalCompressionStream = (AwsCompressionInputStream) in; + this.is = originalCompressionStream.is; + this.underlyingStreamBuffer = originalCompressionStream.underlyingStreamBuffer; + } else { + this.is = in; + this.underlyingStreamBuffer = null; + } + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + abortIfNeeded(); + Validate.notNull(b, "buff"); + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + if (currentChunkIterator == null || !currentChunkIterator.hasNext()) { + if (isTerminating) { + return -1; + } + isTerminating = setUpNextChunk(); + } + + int count = currentChunkIterator.read(b, off, len); + if (count > 0) { + isAtStart = false; + log.trace(() -> count + " byte read from the stream."); + } + return count; + } + + private boolean setUpNextChunk() throws IOException { + byte[] chunkData = new byte[DEFAULT_CHUNK_SIZE]; + int chunkSizeInBytes = 0; + while (chunkSizeInBytes < DEFAULT_CHUNK_SIZE) { + /** Read from the buffer of the uncompressed stream */ + if (underlyingStreamBuffer != null && underlyingStreamBuffer.hasNext()) { + chunkData[chunkSizeInBytes++] = underlyingStreamBuffer.next(); + } else { /** Read from the wrapped stream */ + int bytesToRead = DEFAULT_CHUNK_SIZE - chunkSizeInBytes; + int count = is.read(chunkData, chunkSizeInBytes, bytesToRead); + if (count != -1) { + if (underlyingStreamBuffer != null) { + underlyingStreamBuffer.buffer(chunkData, chunkSizeInBytes, count); + } + chunkSizeInBytes += count; + } else { + break; + } + } + } + if (chunkSizeInBytes == 0) { + return true; + } + + if (chunkSizeInBytes < chunkData.length) { + chunkData = Arrays.copyOf(chunkData, chunkSizeInBytes); + } + // Compress the chunk + byte[] compressedChunkData = compressor.compress(chunkData); + currentChunkIterator = new ChunkContentIterator(compressedChunkData); + return false; + } + + /** + * The readlimit parameter is ignored. + */ + @Override + public void mark(int readlimit) { + abortIfNeeded(); + if (!isAtStart) { + throw new UnsupportedOperationException("Compression stream only supports mark() at the start of the stream."); + } + if (is.markSupported()) { + log.debug(() -> "AwsCompressionInputStream marked at the start of the stream " + + "(will directly mark the wrapped stream since it's mark-supported)."); + is.mark(readlimit); + } else { + log.debug(() -> "AwsCompressionInputStream marked at the start of the stream " + + "(initializing the buffer since the wrapped stream is not mark-supported)."); + underlyingStreamBuffer = new UnderlyingStreamBuffer(SKIP_BUFFER_SIZE); + } + } + + /** + * Reset the stream, either by resetting the wrapped stream or using the + * buffer created by this class. + */ + @Override + public void reset() throws IOException { + abortIfNeeded(); + // Clear up any encoded data + currentChunkIterator = null; + // Reset the wrapped stream if it is mark-supported, + // otherwise use our buffered data. + if (is.markSupported()) { + log.debug(() -> "AwsCompressionInputStream reset " + + "(will reset the wrapped stream because it is mark-supported)."); + is.reset(); + } else { + log.debug(() -> "AwsCompressionInputStream reset (will use the buffer of the decoded stream)."); + Validate.notNull(underlyingStreamBuffer, "Cannot reset the stream because the mark is not set."); + underlyingStreamBuffer.startReadBuffer(); + } + isAtStart = true; + isTerminating = false; + } + + public static final class Builder { + InputStream inputStream; + Compressor compressor; + + public AwsCompressionInputStream build() { + return new AwsCompressionInputStream( + this.inputStream, this.compressor); + } + + public Builder inputStream(InputStream inputStream) { + this.inputStream = inputStream; + return this; + } + + public Builder compressor(Compressor compressor) { + this.compressor = compressor; + return this; + } + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/DecodedStreamBuffer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/UnderlyingStreamBuffer.java similarity index 93% rename from core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/DecodedStreamBuffer.java rename to core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/UnderlyingStreamBuffer.java index f6d3c47c0c1e..6fc086983fda 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/DecodedStreamBuffer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/UnderlyingStreamBuffer.java @@ -20,8 +20,8 @@ import software.amazon.awssdk.utils.Logger; @SdkInternalApi -class DecodedStreamBuffer { - private static final Logger log = Logger.loggerFor(DecodedStreamBuffer.class); +class UnderlyingStreamBuffer { + private static final Logger log = Logger.loggerFor(UnderlyingStreamBuffer.class); private byte[] bufferArray; private int maxBufferSize; @@ -29,7 +29,7 @@ class DecodedStreamBuffer { private int pos = -1; private boolean bufferSizeOverflow; - DecodedStreamBuffer(int maxBufferSize) { + UnderlyingStreamBuffer(int maxBufferSize) { bufferArray = new byte[maxBufferSize]; this.maxBufferSize = maxBufferSize; } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/sync/CompressionContentStreamProvider.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/sync/CompressionContentStreamProvider.java new file mode 100644 index 000000000000..52a222bc372c --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/sync/CompressionContentStreamProvider.java @@ -0,0 +1,55 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.sync; + +import java.io.InputStream; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.internal.compression.Compressor; +import software.amazon.awssdk.core.internal.io.AwsCompressionInputStream; +import software.amazon.awssdk.http.ContentStreamProvider; +import software.amazon.awssdk.utils.IoUtils; + +/** + * {@link ContentStreamProvider} implementation for compression. + */ +@SdkInternalApi +public class CompressionContentStreamProvider implements ContentStreamProvider { + private final ContentStreamProvider underlyingInputStreamProvider; + private InputStream currentStream; + private final Compressor compressor; + + public CompressionContentStreamProvider(ContentStreamProvider underlyingInputStreamProvider, Compressor compressor) { + this.underlyingInputStreamProvider = underlyingInputStreamProvider; + this.compressor = compressor; + } + + @Override + public InputStream newStream() { + closeCurrentStream(); + currentStream = AwsCompressionInputStream.builder() + .inputStream(underlyingInputStreamProvider.newStream()) + .compressor(compressor) + .build(); + return currentStream; + } + + private void closeCurrentStream() { + if (currentStream != null) { + IoUtils.closeQuietly(currentStream, null); + currentStream = null; + } + } +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStreamTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStreamTest.java new file mode 100644 index 000000000000..99359dfcd58d --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStreamTest.java @@ -0,0 +1,93 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.io; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static software.amazon.awssdk.core.util.FileUtils.generateRandomAsciiFile; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Random; +import org.junit.BeforeClass; +import org.junit.Test; +import software.amazon.awssdk.core.internal.compression.Compressor; +import software.amazon.awssdk.core.internal.compression.GzipCompressor; + +public class AwsCompressionInputStreamTest { + private static Compressor compressor; + + @BeforeClass + public static void setup() throws IOException { + compressor = new GzipCompressor(); + } + + @Test + public void nonMarkSupportedInputStream_marksAndResetsCorrectly() throws IOException { + File file = generateRandomAsciiFile(100); + InputStream is = new FileInputStream(file); + assertFalse(is.markSupported()); + + AwsCompressionInputStream compressionInputStream = AwsCompressionInputStream.builder() + .inputStream(is) + .compressor(compressor) + .build(); + + compressionInputStream.mark(100); + compressionInputStream.reset(); + String read1 = readInputStream(compressionInputStream); + compressionInputStream.reset(); + String read2 = readInputStream(compressionInputStream); + assertThat(read1).isEqualTo(read2); + } + + @Test + public void markSupportedInputStream_marksAndResetsCorrectly() throws IOException { + InputStream is = new ByteArrayInputStream(generateRandomBody(100)); + assertTrue(is.markSupported()); + AwsCompressionInputStream compressionInputStream = AwsCompressionInputStream.builder() + .inputStream(is) + .compressor(compressor) + .build(); + compressionInputStream.mark(100); + compressionInputStream.reset(); + String read1 = readInputStream(compressionInputStream); + compressionInputStream.reset(); + String read2 = readInputStream(compressionInputStream); + assertThat(read1).isEqualTo(read2); + } + + private byte[] generateRandomBody(int size) { + byte[] randomData = new byte[size]; + new Random().nextBytes(randomData); + return randomData; + } + + private String readInputStream(InputStream is) throws IOException { + byte[] buffer = new byte[512]; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + byteArrayOutputStream.write(buffer, 0, bytesRead); + } + return byteArrayOutputStream.toString(); + } +} diff --git a/services/mediastoredata/pom.xml b/services/mediastoredata/pom.xml index 32d52843ee66..3e5e81a0a216 100644 --- a/services/mediastoredata/pom.xml +++ b/services/mediastoredata/pom.xml @@ -74,5 +74,11 @@ commons-lang3 test + + software.amazon.awssdk + mediastore + ${awsjavasdk.version} + test + diff --git a/services/mediastoredata/src/it/java/software/amazon/awssdk/services/mediastoredata/MediaStoreDataIntegrationTestBase.java b/services/mediastoredata/src/it/java/software/amazon/awssdk/services/mediastoredata/MediaStoreDataIntegrationTestBase.java new file mode 100644 index 000000000000..3a0e7006ef8b --- /dev/null +++ b/services/mediastoredata/src/it/java/software/amazon/awssdk/services/mediastoredata/MediaStoreDataIntegrationTestBase.java @@ -0,0 +1,155 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.mediastoredata; + +import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; + +import io.reactivex.Flowable; +import java.io.ByteArrayInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.interceptor.Context; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.http.ContentStreamProvider; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.mediastore.MediaStoreClient; +import software.amazon.awssdk.services.mediastore.model.Container; +import software.amazon.awssdk.services.mediastore.model.ContainerStatus; +import software.amazon.awssdk.services.mediastore.model.DescribeContainerResponse; +import software.amazon.awssdk.testutils.Waiter; +import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase; + +/** + * Base class for MediaStoreData integration tests. Used for Transfer-Encoding and Request Compression testing. + */ +public class MediaStoreDataIntegrationTestBase extends AwsIntegrationTestBase { + protected static final String CONTAINER_NAME = "java-sdk-test-mediastoredata-" + Instant.now().toEpochMilli(); + protected static AwsCredentialsProvider credentialsProvider; + protected static MediaStoreClient mediaStoreClient; + protected static URI uri; + + @BeforeAll + public static void init() { + credentialsProvider = getCredentialsProvider(); + mediaStoreClient = MediaStoreClient.builder() + .credentialsProvider(credentialsProvider) + .httpClient(ApacheHttpClient.builder().build()) + .build(); + uri = URI.create(createContainer().endpoint()); + } + + @AfterEach + public void reset() { + CaptureTransferEncodingHeaderInterceptor.reset(); + } + + private static Container createContainer() { + mediaStoreClient.createContainer(r -> r.containerName(CONTAINER_NAME)); + DescribeContainerResponse response = waitContainerToBeActive(); + return response.container(); + } + + private static DescribeContainerResponse waitContainerToBeActive() { + return Waiter.run(() -> mediaStoreClient.describeContainer(r -> r.containerName(CONTAINER_NAME))) + .until(r -> r.container().status() == ContainerStatus.ACTIVE) + .orFailAfter(Duration.ofMinutes(3)); + } + + protected AsyncRequestBody customAsyncRequestBodyWithoutContentLength() { + return new AsyncRequestBody() { + @Override + public Optional contentLength() { + return Optional.empty(); + } + + @Override + public void subscribe(Subscriber s) { + Flowable.fromPublisher(AsyncRequestBody.fromBytes("Random text".getBytes())) + .subscribe(s); + } + }; + } + + protected static class CaptureTransferEncodingHeaderInterceptor implements ExecutionInterceptor { + public static boolean isChunked; + + public static void reset() { + isChunked = false; + } + + @Override + public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) { + isChunked = context.httpRequest().matchingHeaders("Transfer-Encoding").contains("chunked"); + } + } + + protected static class TestContentProvider implements ContentStreamProvider { + private final byte[] content; + private final List createdStreams = new ArrayList<>(); + private CloseTrackingInputStream currentStream; + + protected TestContentProvider(byte[] content) { + this.content = content.clone(); + } + + @Override + public InputStream newStream() { + if (currentStream != null) { + invokeSafely(currentStream::close); + } + currentStream = new CloseTrackingInputStream(new ByteArrayInputStream(content)); + createdStreams.add(currentStream); + return currentStream; + } + + List getCreatedStreams() { + return Collections.unmodifiableList(createdStreams); + } + } + + protected static class CloseTrackingInputStream extends FilterInputStream { + private boolean isClosed = false; + + CloseTrackingInputStream(InputStream in) { + super(in); + } + + @Override + public void close() throws IOException { + super.close(); + isClosed = true; + } + + boolean isClosed() { + return isClosed; + } + } +} diff --git a/services/mediastoredata/src/it/java/software/amazon/awssdk/services/mediastoredata/RequestCompressionStreamingIntegrationTest.java b/services/mediastoredata/src/it/java/software/amazon/awssdk/services/mediastoredata/RequestCompressionStreamingIntegrationTest.java new file mode 100644 index 000000000000..9530f2319b38 --- /dev/null +++ b/services/mediastoredata/src/it/java/software/amazon/awssdk/services/mediastoredata/RequestCompressionStreamingIntegrationTest.java @@ -0,0 +1,168 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.mediastoredata; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.RequestCompressionConfiguration; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.interceptor.Context; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute; +import software.amazon.awssdk.core.internal.compression.Compressor; +import software.amazon.awssdk.core.internal.compression.GzipCompressor; +import software.amazon.awssdk.core.internal.interceptor.trait.RequestCompression; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.services.mediastoredata.model.DeleteObjectRequest; +import software.amazon.awssdk.services.mediastoredata.model.GetObjectRequest; +import software.amazon.awssdk.services.mediastoredata.model.GetObjectResponse; +import software.amazon.awssdk.services.mediastoredata.model.ObjectNotFoundException; +import software.amazon.awssdk.services.mediastoredata.model.PutObjectRequest; +import software.amazon.awssdk.testutils.Waiter; + +/** + * Integration test to verify Request Compression functionalities for streaming operations. Do not delete. + */ +public class RequestCompressionStreamingIntegrationTest extends MediaStoreDataIntegrationTestBase { + private static final String UNCOMPRESSED_BODY = + "RequestCompressionTest-RequestCompressionTest-RequestCompressionTest-RequestCompressionTest-RequestCompressionTest"; + private static String compressedBody; + private static MediaStoreDataClient syncClient; + private static MediaStoreDataAsyncClient asyncClient; + private static PutObjectRequest putObjectRequest; + private static DeleteObjectRequest deleteObjectRequest; + private static GetObjectRequest getObjectRequest; + + @BeforeAll + public static void setup() { + RequestCompressionConfiguration compressionConfiguration = + RequestCompressionConfiguration.builder() + .minimumCompressionThresholdInBytes(1) + .requestCompressionEnabled(true) + .build(); + + RequestCompression requestCompressionTrait = RequestCompression.builder() + .encodings("gzip") + .isStreaming(true) + .build(); + + syncClient = MediaStoreDataClient.builder() + .endpointOverride(uri) + .credentialsProvider(credentialsProvider) + .httpClient(ApacheHttpClient.builder().build()) + .overrideConfiguration(o -> o.addExecutionInterceptor(new CaptureTransferEncodingHeaderInterceptor()) + .addExecutionInterceptor(new CaptureContentEncodingHeaderInterceptor()) + .putExecutionAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION, + requestCompressionTrait) + .requestCompressionConfiguration(compressionConfiguration)) + .build(); + + asyncClient = MediaStoreDataAsyncClient.builder() + .endpointOverride(uri) + .credentialsProvider(getCredentialsProvider()) + .httpClient(NettyNioAsyncHttpClient.create()) + .overrideConfiguration(o -> o.addExecutionInterceptor(new CaptureTransferEncodingHeaderInterceptor()) + .addExecutionInterceptor(new CaptureContentEncodingHeaderInterceptor()) + .putExecutionAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION, + requestCompressionTrait) + .requestCompressionConfiguration(compressionConfiguration)) + .build(); + + putObjectRequest = PutObjectRequest.builder() + .contentType("application/octet-stream") + .path("/foo") + .overrideConfiguration( + o -> o.requestCompressionConfiguration( + c -> c.requestCompressionEnabled(true))) + .build(); + deleteObjectRequest = DeleteObjectRequest.builder().path("/foo").build(); + getObjectRequest = GetObjectRequest.builder().path("/foo").build(); + + Compressor compressor = new GzipCompressor(); + byte[] compressedBodyBytes = compressor.compress(SdkBytes.fromUtf8String(UNCOMPRESSED_BODY)).asByteArray(); + compressedBody = new String(compressedBodyBytes); + } + + @AfterAll + public static void tearDown() { + syncClient.deleteObject(deleteObjectRequest); + Waiter.run(() -> syncClient.describeObject(r -> r.path("/foo"))) + .untilException(ObjectNotFoundException.class) + .orFailAfter(Duration.ofMinutes(1)); + } + + @AfterEach + public void cleanUp() { + CaptureContentEncodingHeaderInterceptor.reset(); + } + + @Test + public void putObject_withRequestCompressionSyncStreaming_compressesPayloadAndSendsCorrectly() throws IOException { + TestContentProvider provider = new TestContentProvider(UNCOMPRESSED_BODY.getBytes(StandardCharsets.UTF_8)); + syncClient.putObject(putObjectRequest, RequestBody.fromContentProvider(provider, "binary/octet-stream")); + + assertThat(CaptureTransferEncodingHeaderInterceptor.isChunked).isTrue(); + assertThat(CaptureContentEncodingHeaderInterceptor.isGzip).isTrue(); + + ResponseInputStream response = syncClient.getObject(getObjectRequest); + byte[] buffer = new byte[UNCOMPRESSED_BODY.getBytes(StandardCharsets.UTF_8).length]; + response.read(buffer); + String retrievedContent = new String(buffer); + assertThat(UNCOMPRESSED_BODY).isEqualTo(retrievedContent); + } + + // TODO : uncomment once async streaming compression is implemented + /*@Test + public void nettyClientPutObject_withoutContentLength_sendsSuccessfully() throws IOException { + AsyncRequestBody asyncRequestBody = customAsyncRequestBodyWithoutContentLength(); + asyncClient.putObject(putObjectRequest, asyncRequestBody).join(); + + assertThat(CaptureTransferEncodingHeaderInterceptor.isChunked).isTrue(); + assertThat(CaptureContentEncodingHeaderInterceptor.isGzip).isTrue(); + + // verify stored content is correct + ResponseInputStream response = syncClient.getObject(getObjectRequest); + byte[] buffer = new byte[UNCOMPRESSED_BODY.getBytes(StandardCharsets.UTF_8).length]; + response.read(buffer); + String retrievedContent = new String(buffer); + assertThat(UNCOMPRESSED_BODY).isEqualTo(retrievedContent); + assertThat(CaptureTransferEncodingHeaderInterceptor.isChunked).isTrue(); + }*/ + + private static class CaptureContentEncodingHeaderInterceptor implements ExecutionInterceptor { + public static boolean isGzip; + + public static void reset() { + isGzip = false; + } + + @Override + public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) { + isGzip = context.httpRequest().matchingHeaders("Content-Encoding").contains("gzip"); + } + } +} diff --git a/services/mediastoredata/src/it/java/software/amazon/awssdk/services/mediastoredata/TransferEncodingChunkedIntegrationTest.java b/services/mediastoredata/src/it/java/software/amazon/awssdk/services/mediastoredata/TransferEncodingChunkedIntegrationTest.java index acab0a8d6723..80fb67dc6fab 100644 --- a/services/mediastoredata/src/it/java/software/amazon/awssdk/services/mediastoredata/TransferEncodingChunkedIntegrationTest.java +++ b/services/mediastoredata/src/it/java/software/amazon/awssdk/services/mediastoredata/TransferEncodingChunkedIntegrationTest.java @@ -16,70 +16,34 @@ package software.amazon.awssdk.services.mediastoredata; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; -import io.reactivex.Flowable; -import java.io.ByteArrayInputStream; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.reactivestreams.Subscriber; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.interceptor.Context; -import software.amazon.awssdk.core.interceptor.ExecutionAttributes; -import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; -import software.amazon.awssdk.services.mediastore.MediaStoreClient; -import software.amazon.awssdk.services.mediastore.model.Container; -import software.amazon.awssdk.services.mediastore.model.ContainerStatus; -import software.amazon.awssdk.services.mediastore.model.DescribeContainerResponse; import software.amazon.awssdk.services.mediastoredata.model.DeleteObjectRequest; import software.amazon.awssdk.services.mediastoredata.model.ObjectNotFoundException; import software.amazon.awssdk.services.mediastoredata.model.PutObjectRequest; import software.amazon.awssdk.testutils.Waiter; -import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase; /** * Integration test to verify Transfer-Encoding:chunked functionalities for all supported HTTP clients. Do not delete. */ -public class TransferEncodingChunkedIntegrationTest extends AwsIntegrationTestBase { - private static final String CONTAINER_NAME = "java-sdk-test-" + Instant.now().toEpochMilli(); - private static MediaStoreClient mediaStoreClient; +public class TransferEncodingChunkedIntegrationTest extends MediaStoreDataIntegrationTestBase { private static MediaStoreDataClient syncClientWithApache; private static MediaStoreDataClient syncClientWithUrlConnection; private static MediaStoreDataAsyncClient asyncClientWithNetty; - private static AwsCredentialsProvider credentialsProvider; - private static Container container; private static PutObjectRequest putObjectRequest; private static DeleteObjectRequest deleteObjectRequest; @BeforeAll public static void setup() { - credentialsProvider = getCredentialsProvider(); - mediaStoreClient = MediaStoreClient.builder() - .credentialsProvider(credentialsProvider) - .httpClient(ApacheHttpClient.builder().build()) - .build(); - container = createContainer(); - URI uri = URI.create(container.endpoint()); - syncClientWithApache = MediaStoreDataClient.builder() .endpointOverride(uri) .credentialsProvider(credentialsProvider) @@ -117,7 +81,7 @@ public static void tearDown() { Waiter.run(() -> syncClientWithApache.describeObject(r -> r.path("/foo"))) .untilException(ObjectNotFoundException.class) .orFailAfter(Duration.ofMinutes(1)); - CaptureTransferEncodingHeaderInterceptor.reset(); + mediaStoreClient.deleteContainer(r -> r.containerName(CONTAINER_NAME)); } @Test @@ -139,86 +103,4 @@ public void nettyClientPutObject_withoutContentLength_sendsSuccessfully() { asyncClientWithNetty.putObject(putObjectRequest, customAsyncRequestBodyWithoutContentLength()).join(); assertThat(CaptureTransferEncodingHeaderInterceptor.isChunked).isTrue(); } - - private static Container createContainer() { - mediaStoreClient.createContainer(r -> r.containerName(CONTAINER_NAME)); - DescribeContainerResponse response = waitContainerToBeActive(); - return response.container(); - } - - private static DescribeContainerResponse waitContainerToBeActive() { - return Waiter.run(() -> mediaStoreClient.describeContainer(r -> r.containerName(CONTAINER_NAME))) - .until(r -> ContainerStatus.ACTIVE.equals(r.container().status())) - .orFailAfter(Duration.ofMinutes(3)); - } - - private static class CaptureTransferEncodingHeaderInterceptor implements ExecutionInterceptor { - private static boolean isChunked; - - public static void reset() { - isChunked = false; - } - - @Override - public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) { - isChunked = context.httpRequest().matchingHeaders("Transfer-Encoding").contains("chunked"); - } - } - - private AsyncRequestBody customAsyncRequestBodyWithoutContentLength() { - return new AsyncRequestBody() { - @Override - public Optional contentLength() { - return Optional.empty(); - } - - @Override - public void subscribe(Subscriber s) { - Flowable.fromPublisher(AsyncRequestBody.fromBytes("Random text".getBytes())) - .subscribe(s); - } - }; - } - - private static class TestContentProvider implements ContentStreamProvider { - private final byte[] content; - private final List createdStreams = new ArrayList<>(); - private CloseTrackingInputStream currentStream; - - private TestContentProvider(byte[] content) { - this.content = content; - } - - @Override - public InputStream newStream() { - if (currentStream != null) { - invokeSafely(currentStream::close); - } - currentStream = new CloseTrackingInputStream(new ByteArrayInputStream(content)); - createdStreams.add(currentStream); - return currentStream; - } - - List getCreatedStreams() { - return createdStreams; - } - } - - private static class CloseTrackingInputStream extends FilterInputStream { - private boolean isClosed = false; - - CloseTrackingInputStream(InputStream in) { - super(in); - } - - @Override - public void close() throws IOException { - super.close(); - isClosed = true; - } - - boolean isClosed() { - return isClosed; - } - } } diff --git a/services/mediastoredata/src/it/resources/log4j2.properties b/services/mediastoredata/src/it/resources/log4j2.properties new file mode 100644 index 000000000000..ea24f17148e6 --- /dev/null +++ b/services/mediastoredata/src/it/resources/log4j2.properties @@ -0,0 +1,38 @@ +# +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://aws.amazon.com/apache2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. +# + +status = warn + +appender.console.type = Console +appender.console.name = ConsoleAppender +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n%throwable + +rootLogger.level = info +rootLogger.appenderRef.stdout.ref = ConsoleAppender + +# Uncomment below to enable more specific logging +# +#logger.sdk.name = software.amazon.awssdk +#logger.sdk.level = debug +# +#logger.request.name = software.amazon.awssdk.request +#logger.request.level = debug +# +#logger.apache.name = org.apache.http.wire +#logger.apache.level = debug +# +#logger.netty.name = io.netty.handler.logging +#logger.netty.level = debug \ No newline at end of file diff --git a/test/codegen-generated-classes-test/src/main/resources/codegen-resources/customresponsemetadata/service-2.json b/test/codegen-generated-classes-test/src/main/resources/codegen-resources/customresponsemetadata/service-2.json index b1f994fd1d44..8cdb71614e38 100644 --- a/test/codegen-generated-classes-test/src/main/resources/codegen-resources/customresponsemetadata/service-2.json +++ b/test/codegen-generated-classes-test/src/main/resources/codegen-resources/customresponsemetadata/service-2.json @@ -289,6 +289,19 @@ "encodings": ["gzip"] } }, + "PutOperationWithStreamingRequestCompression":{ + "name":"PutOperationWithStreamingRequestCompression", + "http":{ + "method":"PUT", + "requestUri":"/" + }, + "input":{"shape":"RequestCompressionStructureWithStreaming"}, + "output":{"shape":"RequestCompressionStructureWithStreaming"}, + "requestCompression": { + "encodings": ["gzip"] + }, + "authtype":"v4-unsigned-body" + }, "GetOperationWithChecksum":{ "name":"GetOperationWithChecksum", "http":{ @@ -1030,6 +1043,17 @@ } }, "payload":"Body" + }, + "RequestCompressionStructureWithStreaming":{ + "type":"structure", + "members":{ + "Body":{ + "shape":"Body", + "documentation":"

Object data.

", + "streaming":true + } + }, + "payload":"Body" } } } diff --git a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java index e66f5f47bd1e..29664c5f53f3 100644 --- a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java +++ b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java @@ -16,9 +16,16 @@ package software.amazon.awssdk.services; import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; +import java.io.ByteArrayInputStream; +import java.io.FilterInputStream; +import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.AfterEach; @@ -26,6 +33,9 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.internal.compression.Compressor; import software.amazon.awssdk.core.internal.compression.GzipCompressor; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.HttpExecuteResponse; import software.amazon.awssdk.http.SdkHttpFullRequest; import software.amazon.awssdk.http.SdkHttpResponse; @@ -33,6 +43,7 @@ import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient; import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonClient; import software.amazon.awssdk.services.protocolrestjson.model.PutOperationWithRequestCompressionRequest; +import software.amazon.awssdk.services.protocolrestjson.model.PutOperationWithStreamingRequestCompressionRequest; import software.amazon.awssdk.testutils.service.http.MockAsyncHttpClient; import software.amazon.awssdk.testutils.service.http.MockSyncHttpClient; @@ -46,6 +57,7 @@ public class RequestCompressionTest { private ProtocolRestJsonClient syncClient; private ProtocolRestJsonAsyncClient asyncClient; private Compressor compressor; + private RequestBody requestBody; @BeforeEach public void setUp() { @@ -65,6 +77,8 @@ public void setUp() { byte[] compressedBodyBytes = compressor.compress(SdkBytes.fromUtf8String(UNCOMPRESSED_BODY)).asByteArray(); compressedLen = compressedBodyBytes.length; compressedBody = new String(compressedBodyBytes); + TestContentProvider provider = new TestContentProvider(UNCOMPRESSED_BODY.getBytes(StandardCharsets.UTF_8)); + requestBody = RequestBody.fromContentProvider(provider, "binary/octet-stream"); } @AfterEach @@ -118,6 +132,24 @@ public void async_nonStreaming_compression_compressesCorrectly() { assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); } + @Test + public void sync_streaming_compression_compressesCorrectly() { + mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); + + PutOperationWithStreamingRequestCompressionRequest request = + PutOperationWithStreamingRequestCompressionRequest.builder().build(); + syncClient.putOperationWithStreamingRequestCompression(request, requestBody, ResponseTransformer.toBytes()); + + SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockHttpClient.getLastRequest(); + InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); + String loggedBody = new String(SdkBytes.fromInputStream(loggedStream).asByteArray()); + + assertThat(loggedBody).isEqualTo(compressedBody); + assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); + assertThat(loggedRequest.matchingHeaders("Content-Length")).isEmpty(); + assertThat(loggedRequest.firstMatchingHeader("Transfer-Encoding").get()).isEqualTo("chunked"); + } + @Test public void sync_nonStreaming_compression_withRetry_compressesCorrectly() { mockHttpClient.stubNextResponse(mockErrorResponse(), Duration.ofMillis(500)); @@ -165,6 +197,25 @@ public void async_nonStreaming_compression_withRetry_compressesCorrectly() { assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); } + @Test + public void sync_streaming_compression_withRetry_compressesCorrectly() { + mockHttpClient.stubNextResponse(mockErrorResponse(), Duration.ofMillis(500)); + mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); + + PutOperationWithStreamingRequestCompressionRequest request = + PutOperationWithStreamingRequestCompressionRequest.builder().build(); + syncClient.putOperationWithStreamingRequestCompression(request, requestBody, ResponseTransformer.toBytes()); + + SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockHttpClient.getLastRequest(); + InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); + String loggedBody = new String(SdkBytes.fromInputStream(loggedStream).asByteArray()); + + assertThat(loggedBody).isEqualTo(compressedBody); + assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); + assertThat(loggedRequest.matchingHeaders("Content-Length")).isEmpty(); + assertThat(loggedRequest.firstMatchingHeader("Transfer-Encoding").get()).isEqualTo("chunked"); + } + private HttpExecuteResponse mockResponse() { return HttpExecuteResponse.builder() .response(SdkHttpResponse.builder().statusCode(200).build()) @@ -176,4 +227,46 @@ private HttpExecuteResponse mockErrorResponse() { .response(SdkHttpResponse.builder().statusCode(500).build()) .build(); } + + private static final class TestContentProvider implements ContentStreamProvider { + private final byte[] content; + private final List createdStreams = new ArrayList<>(); + private CloseTrackingInputStream currentStream; + + private TestContentProvider(byte[] content) { + this.content = content; + } + + @Override + public InputStream newStream() { + if (currentStream != null) { + invokeSafely(currentStream::close); + } + currentStream = new CloseTrackingInputStream(new ByteArrayInputStream(content)); + createdStreams.add(currentStream); + return currentStream; + } + + List getCreatedStreams() { + return createdStreams; + } + } + + private static class CloseTrackingInputStream extends FilterInputStream { + private boolean isClosed = false; + + CloseTrackingInputStream(InputStream in) { + super(in); + } + + @Override + public void close() throws IOException { + super.close(); + isClosed = true; + } + + boolean isClosed() { + return isClosed; + } + } }