Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
157fc10
Rename DecodedStreamBuffer to UnderlyingStreamBuffer
davidh44 Jun 26, 2023
29380ab
Update Compressor
davidh44 Jun 26, 2023
228a035
Sync streaming compression
davidh44 Jun 26, 2023
280bd38
Fix Checkstyle issues
davidh44 Jun 26, 2023
e2d6951
Refactor to common class AwsChunkedInputStream
davidh44 Jun 28, 2023
be06647
Refactor Compressor
davidh44 Jun 28, 2023
04b5e7c
Merge from feature/master/request-compression
davidh44 Jun 28, 2023
5b5641c
Refactor CompressionType
davidh44 Jun 28, 2023
7e07424
sync compression
davidh44 Jun 29, 2023
ba7e602
Close GZIPOutputStream
davidh44 Jun 29, 2023
5b37111
Add compress stage to async http client
davidh44 Jun 29, 2023
cd241a6
Add compressed PutMetricData integ test
davidh44 Jun 29, 2023
0f4fd59
Merge branch 'feature/master/request-compression' into hdavidh/sync-s…
davidh44 Jun 29, 2023
5268c0b
Refactoring
davidh44 Jul 5, 2023
d8aa9a5
Add tests
davidh44 Jul 5, 2023
37d962e
Add equals and hashCode
davidh44 Jul 7, 2023
aab6f62
Add TODO
davidh44 Jul 7, 2023
4b52a88
Add tests with retry
davidh44 Jul 7, 2023
d9e28d9
Update content length retrieval
davidh44 Jul 11, 2023
86ff75a
CompressionType test
davidh44 Jul 11, 2023
24d69db
RequestCompressionConfiguration test
davidh44 Jul 11, 2023
9dadea3
Fix Spotbugs error
davidh44 Jul 11, 2023
95e0383
Rename CompressionType to CompressorType
davidh44 Jul 12, 2023
4d1e924
Fix import ordering
davidh44 Jul 12, 2023
ebc0c35
Remove chunk headers and trailers
davidh44 Jul 14, 2023
1d9bb6f
Update tests
davidh44 Jul 14, 2023
36dd8ef
Refactoring
davidh44 Jul 15, 2023
518e4c0
Only compress streaming operations that are chunked
davidh44 Jul 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
@SdkInternalApi
public final class AwsSignedChunkedEncodingInputStream extends AwsChunkedEncodingInputStream {

private static final String CRLF = "\r\n";
private static final String CHUNK_SIGNATURE_HEADER = ";chunk-signature=";
private static final String CHECKSUM_SIGNATURE_HEADER = "x-amz-trailer-signature:";
private String previousChunkSignature;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,60 +15,80 @@

package software.amazon.awssdk.core.compression;

import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.internal.EnumUtils;
import software.amazon.awssdk.core.internal.compression.GzipCompressor;
import software.amazon.awssdk.utils.Validate;

/**
* The supported compression algorithms for operations with the requestCompression trait. Each supported algorithm will have an
* {@link Compressor} implementation.
*/
@SdkInternalApi
public enum CompressionType {
public final class CompressionType {

GZIP("gzip"),
public static final CompressionType GZIP = CompressionType.of("gzip");

UNKNOWN_TO_SDK_VERSION(null);
private static Map<String, Compressor> compressorMap = new HashMap<String, Compressor>() {{
put("gzip", new GzipCompressor());
}};

private static final Map<String, CompressionType> VALUE_MAP = EnumUtils.uniqueIndex(
CompressionType.class, CompressionType::toString);
private String id;

private final String value;
private CompressionType(String id) {
this.id = id;
}

CompressionType(String value) {
this.value = value;
/**
* Creates a new {@link CompressionType} of the given value.
*/
public static CompressionType of(String value) {
Validate.paramNotBlank(value, "compressionType");
return CompressionTypeCache.put(value);
}

@Override
public String toString() {
return String.valueOf(value);
/**
* Returns the {@link Set} of {@link String}s of compression types supported by the SDK.
*/
public static Set<String> compressionTypes() {
return compressorMap.keySet();
}

/**
* Use this in place of valueOf to convert the raw string into the enum value.
*
* @param value
* real value
* @return SupportedEncodings corresponding to the value
* Whether or not the compression type is supported by the SDK.
*/
public static CompressionType fromValue(String value) {
if (value == null) {
return null;
}
return VALUE_MAP.getOrDefault(value, UNKNOWN_TO_SDK_VERSION);
public static boolean isSupported(String compressionType) {
return compressionTypes().contains(compressionType);
}

/**
* Use this in place of {@link #values()} to return a {@link Set} of all values known to the SDK. This will return
* all known enum values except {@link #UNKNOWN_TO_SDK_VERSION}.
*
* @return a {@link Set} of known {@link CompressionType}s
* Maps the {@link CompressionType} to its corresponding {@link Compressor}.
*/
public static Set<CompressionType> knownValues() {
Set<CompressionType> knownValues = EnumSet.allOf(CompressionType.class);
knownValues.remove(UNKNOWN_TO_SDK_VERSION);
return knownValues;
public Compressor newCompressor() {
Compressor compressor = compressorMap.getOrDefault(this.id, null);
if (compressor == null) {
throw new UnsupportedOperationException("The compression type " + id + " does not have an implementation of "
+ "Compressor");
}
return compressor;
}

@Override
public String toString() {
Comment thread
davidh44 marked this conversation as resolved.
return id;
}

private static class CompressionTypeCache {
private static final ConcurrentHashMap<String, CompressionType> VALUES = new ConcurrentHashMap<>();

private CompressionTypeCache() {
}

private static CompressionType put(String value) {
return VALUES.computeIfAbsent(value, v -> new CompressionType(value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,58 @@

import java.io.InputStream;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.compression.GzipCompressor;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage;

/**
* Interface for compressors used by {@link CompressRequestStage} to compress requests.
* TODO: this will be refactored in the other PR
*/
@SdkPublicApi
@SdkInternalApi
public interface Compressor {

/**
* The compression algorithm type.
*
* @return The {@link String} compression algorithm type.
*/
Comment thread
davidh44 marked this conversation as resolved.
String compressorType();
Comment thread
davidh44 marked this conversation as resolved.

/**
* Compress an {@link InputStream}.
* Compress a {@link SdkBytes} payload.
*
* @param content
* @return The compressed {@link SdkBytes}.
*/
InputStream compress(InputStream inputStream);
SdkBytes compress(SdkBytes content);

/**
* Compress an async stream.
* Compress a byte[] payload.
*
* @param content
* @return The compressed byte array.
*/
Publisher<ByteBuffer> compressAsyncStream(Publisher<ByteBuffer> publisher);
default byte[] compress(byte[] content) {
return compress(SdkBytes.fromByteArray(content)).asByteArray();
}

/**
* Compress an {@link InputStream} payload.
*
* @param content
* @return The compressed {@link InputStream}.
*/
default InputStream compress(InputStream content) {
return compress(SdkBytes.fromInputStream(content)).asInputStream();
}

/**
* Maps the {@link CompressionType} to its corresponding {@link Compressor}.
* TODO: Update mappings here when additional compressors are supported in the future
* Compress an {@link ByteBuffer} payload.
*
* @param content
* @return The compressed {@link ByteBuffer}.
*/
static Compressor forCompressorType(CompressionType compressionType) {
switch (compressionType) {
case GZIP:
return new GzipCompressor();
default:
throw new IllegalArgumentException("The compresssion type " + compressionType + "does not have an implemenation"
+ " of Compressor.");
}
default ByteBuffer compress(ByteBuffer content) {
return compress(SdkBytes.fromByteBuffer(content)).asByteBuffer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,42 @@

package software.amazon.awssdk.core.internal.compression;

import java.io.ByteArrayInputStream;
import static software.amazon.awssdk.utils.IoUtils.closeQuietly;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.io.UncheckedIOException;
import java.util.zip.GZIPOutputStream;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.compression.Compressor;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.utils.IoUtils;

@SdkInternalApi
public final class GzipCompressor implements Compressor {

private static final String COMPRESSOR_TYPE = "gzip";
private static final Logger log = LoggerFactory.getLogger(GzipCompressor.class);

@Override
public String compressorType() {
return COMPRESSOR_TYPE;
}

@Override
public InputStream compress(InputStream inputStream) {
public SdkBytes compress(SdkBytes content) {
GZIPOutputStream gzipOutputStream = null;
try {
byte[] content = IoUtils.toByteArray(inputStream);
ByteArrayOutputStream compressedOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedOutputStream);
gzipOutputStream.write(content);
gzipOutputStream = new GZIPOutputStream(compressedOutputStream);
gzipOutputStream.write(content.asByteArray());
gzipOutputStream.close();
Comment thread
joviegas marked this conversation as resolved.

return new ByteArrayInputStream(compressedOutputStream.toByteArray());
return SdkBytes.fromByteArray(compressedOutputStream.toByteArray());
} catch (IOException e) {
throw SdkClientException.create(e.getMessage(), e);
throw new UncheckedIOException(e);
} finally {
closeQuietly(gzipOutputStream, log);
}
}

@Override
public Publisher<ByteBuffer> compressAsyncStream(Publisher<ByteBuffer> publisher) {
//TODO
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncSigningStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestImmutableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestMutableStage;
Expand Down Expand Up @@ -169,6 +170,7 @@ public <OutputT> CompletableFuture<OutputT> execute(
.then(ApplyUserAgentStage::new)
.then(MergeCustomHeadersStage::new)
.then(MergeCustomQueryParamsStage::new)
.then(CompressRequestStage::new)
Comment thread
davidh44 marked this conversation as resolved.
.then(MakeRequestImmutableStage::new)
.then(RequestPipelineBuilder
.first(AsyncSigningStage::new)
Expand Down
Loading