Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,9 @@ public static CodeBlock create(OperationModel operationModel, IntermediateModel
return CodeBlock.of("");
}

// TODO : remove once request compression for streaming operations is supported
if (operationModel.isStreaming()) {
throw new IllegalStateException("Request compression for streaming operations is not yet supported in the AWS SDK "
+ "for Java.");
}

// TODO : remove once S3 checksum interceptors are moved to occur after CompressRequestStage
// TODO : remove once:
// 1) S3 checksum interceptors are moved to occur after CompressRequestStage
// 2) Transfer-Encoding:chunked is supported in S3
if (model.getMetadata().getServiceName().equals("S3")) {
throw new IllegalStateException("Request compression for S3 is not yet supported in the AWS SDK for Java.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
@SdkInternalApi
public final class AwsSignedChunkedEncodingInputStream extends AwsChunkedEncodingInputStream {

private static final String CRLF = "\r\n";
private static final String CHUNK_SIGNATURE_HEADER = ";chunk-signature=";
private static final String CHECKSUM_SIGNATURE_HEADER = "x-amz-trailer-signature:";
private String previousChunkSignature;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,41 @@
import java.util.concurrent.atomic.AtomicLong;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/**
* Class that will buffer incoming BufferBytes of totalBytes length to chunks of bufferSize*
*/
@SdkInternalApi
public final class ChunkBuffer {
private final AtomicLong remainingBytes;
private AtomicLong remainingBytes;
private final ByteBuffer currentBuffer;
private final int bufferSize;
private int bufferSize;

private ChunkBuffer(Long totalBytes, Integer bufferSize) {
Validate.notNull(totalBytes, "The totalBytes must not be null");

int chunkSize = bufferSize != null ? bufferSize : DEFAULT_ASYNC_CHUNK_SIZE;
this.bufferSize = chunkSize;
this.currentBuffer = ByteBuffer.allocate(chunkSize);
this.remainingBytes = new AtomicLong(totalBytes);

if (totalBytes != null) {
this.remainingBytes = new AtomicLong(totalBytes);
}
}

public static Builder builder() {
return new DefaultBuilder();
}

public synchronized Iterable<ByteBuffer> bufferAndCreateChunks(ByteBuffer buffer) {
if (remainingBytes == null) {
return bufferAndCreateChunksWithUnknownLength(buffer);
} else {
return bufferAndCreateChunksWithKnownLength(buffer);
}
}

// currentBuffer and bufferedList can get over written if concurrent Threads calls this method at the same time.
public synchronized Iterable<ByteBuffer> bufferAndCreateChunks(ByteBuffer buffer) {
private synchronized Iterable<ByteBuffer> bufferAndCreateChunksWithKnownLength(ByteBuffer buffer) {
int startPosition = 0;
List<ByteBuffer> bufferedList = new ArrayList<>();
int currentBytesRead = buffer.remaining();
Expand Down Expand Up @@ -97,6 +104,26 @@ public synchronized Iterable<ByteBuffer> bufferAndCreateChunks(ByteBuffer buffer
return bufferedList;
}

private synchronized Iterable<ByteBuffer> bufferAndCreateChunksWithUnknownLength(ByteBuffer buffer) {
List<ByteBuffer> bufferedList = new ArrayList<>();
while (buffer.hasRemaining()) {
int bytesToCopy = Math.min(buffer.remaining(), currentBuffer.remaining());
byte[] bytes = new byte[bytesToCopy];
Comment thread
zoewangg marked this conversation as resolved.
Outdated
buffer.get(bytes);
currentBuffer.put(bytes);

if (!currentBuffer.hasRemaining() || !buffer.hasRemaining()) {
currentBuffer.flip();
ByteBuffer bufferToSend = ByteBuffer.allocate(currentBuffer.limit());
Comment thread
davidh44 marked this conversation as resolved.
Outdated
bufferToSend.put(currentBuffer);
bufferToSend.flip();
bufferedList.add(bufferToSend);
currentBuffer.clear();
}
}
return bufferedList;
}

public interface Builder extends SdkBuilder<Builder, ChunkBuffer> {

Builder bufferSize(int bufferSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import java.nio.ByteBuffer;
import java.util.Optional;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.compression.Compressor;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/**
* Wrapper class to wrap an AsyncRequestBody.
* This will chunk and compress the payload with the provided {@link Compressor}.
*/
@SdkInternalApi
public class CompressionAsyncRequestBody implements AsyncRequestBody {
Comment thread
zoewangg marked this conversation as resolved.

private static final int COMPRESSION_CHUNK_SIZE = 128 * 1024;
Comment thread
cenedhryn marked this conversation as resolved.
Outdated
Comment thread
davidh44 marked this conversation as resolved.
Outdated
private final AsyncRequestBody wrapped;
private final Compressor compressor;

private CompressionAsyncRequestBody(DefaultBuilder builder) {
Validate.notNull(builder.asyncRequestBody, "wrapped AsyncRequestBody cannot be null");
Validate.notNull(builder.compressor, "compressor cannot be null");
this.wrapped = builder.asyncRequestBody;
Comment thread
cenedhryn marked this conversation as resolved.
Outdated
this.compressor = builder.compressor;
}

/**
* @return Builder instance to construct a {@link CompressionAsyncRequestBody}.
*/
public static Builder builder() {
return new DefaultBuilder();
}

public interface Builder extends SdkBuilder<CompressionAsyncRequestBody.Builder, CompressionAsyncRequestBody> {
Comment thread
davidh44 marked this conversation as resolved.

/**
* Sets the AsyncRequestBody that will be wrapped.
* @param asyncRequestBody
* @return This builder for method chaining.
*/
Builder asyncRequestBody(AsyncRequestBody asyncRequestBody);

/**
* Sets the compressor to compress the request.
* @param compressor
* @return This builder for method chaining.
*/
Builder compressor(Compressor compressor);
}

private static final class DefaultBuilder implements Builder {

private AsyncRequestBody asyncRequestBody;
private Compressor compressor;

@Override
public CompressionAsyncRequestBody build() {
return new CompressionAsyncRequestBody(this);
}

@Override
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
this.asyncRequestBody = asyncRequestBody;
return this;
}

@Override
public Builder compressor(Compressor compressor) {
this.compressor = compressor;
return this;
}
}

@Override
public Optional<Long> contentLength() {
return Optional.empty();
Comment thread
davidh44 marked this conversation as resolved.
Outdated
}

@Override
public String contentType() {
return wrapped.contentType();
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
Validate.notNull(s, "Subscription MUST NOT be null.");

ChunkBuffer chunkBuffer = ChunkBuffer.builder()
.bufferSize(COMPRESSION_CHUNK_SIZE)
.build();

wrapped.flatMapIterable(chunkBuffer::bufferAndCreateChunks)
.subscribe(new CompressionSubscriber(s, compressor));
}

private static final class CompressionSubscriber implements Subscriber<ByteBuffer> {

private final Subscriber<? super ByteBuffer> subscriber;
private final Compressor compressor;

CompressionSubscriber(Subscriber<? super ByteBuffer> subscriber, Compressor compressor) {
this.subscriber = subscriber;
this.compressor = compressor;
}

@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}

@Override
public void onNext(ByteBuffer byteBuffer) {
ByteBuffer compressedBuffer = compressor.compress(byteBuffer);
subscriber.onNext(compressedBuffer);
}

@Override
public void onError(Throwable t) {
subscriber.onError(t);
}

@Override
public void onComplete() {
subscriber.onComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
import software.amazon.awssdk.core.internal.async.CompressionAsyncRequestBody;
import software.amazon.awssdk.core.internal.compression.Compressor;
import software.amazon.awssdk.core.internal.compression.CompressorType;
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.MutableRequestToRequestPipeline;
import software.amazon.awssdk.core.internal.sync.CompressionContentStreamProvider;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.utils.IoUtils;
Expand Down Expand Up @@ -62,15 +64,27 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ

Compressor compressor = resolveCompressorType(context.executionAttributes());

// non-streaming
if (!isStreaming(context)) {
compressEntirePayload(input, compressor);
updateContentEncodingHeader(input, compressor);
updateContentLengthHeader(input);
return input;
}

if (!isTransferEncodingChunked(input)) {
return input;
}

// TODO : streaming - sync & async
if (context.requestProvider() == null) {
input.contentStreamProvider(new CompressionContentStreamProvider(input.contentStreamProvider(), compressor));
} else {
context.requestProvider(CompressionAsyncRequestBody.builder()
.asyncRequestBody(context.requestProvider())
.compressor(compressor)
.build());
}

updateContentEncodingHeader(input, compressor);
return input;
}

Expand Down Expand Up @@ -123,6 +137,12 @@ private void updateContentLengthHeader(SdkHttpFullRequest.Builder input) {
}
}

private boolean isTransferEncodingChunked(SdkHttpFullRequest.Builder input) {
return input.firstMatchingHeader("Transfer-Encoding")
.map(headerValue -> headerValue.equals("chunked"))
.orElse(false);
}

private Compressor resolveCompressorType(ExecutionAttributes executionAttributes) {
List<String> encodings =
executionAttributes.getAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION).getEncodings();
Expand Down
Loading