Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.atomic.AtomicLong;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/**
Expand All @@ -33,37 +32,43 @@
@SdkInternalApi
public final class ChunkBuffer {
private static final Logger log = Logger.loggerFor(ChunkBuffer.class);
private final AtomicLong transferredBytes;
private AtomicLong transferredBytes;
Comment thread
davidh44 marked this conversation as resolved.
Outdated
private final ByteBuffer currentBuffer;
private final int chunkSize;
private final long totalBytes;
private Long totalBytes;
Comment thread
davidh44 marked this conversation as resolved.
Outdated

private ChunkBuffer(Long totalBytes, Integer bufferSize) {
Validate.notNull(totalBytes, "The totalBytes must not be null");

int chunkSize = bufferSize != null ? bufferSize : DEFAULT_ASYNC_CHUNK_SIZE;
this.chunkSize = chunkSize;
this.currentBuffer = ByteBuffer.allocate(chunkSize);
this.totalBytes = totalBytes;
this.transferredBytes = new AtomicLong(0);
if (totalBytes != null) {
this.totalBytes = totalBytes;
Comment thread
davidh44 marked this conversation as resolved.
Outdated
this.transferredBytes = new AtomicLong(0);
}
}

public static Builder builder() {
return new DefaultBuilder();
}


/**
* Split the input {@link ByteBuffer} into multiple smaller {@link ByteBuffer}s, each of which contains {@link #chunkSize}
* worth of bytes. If the last chunk of the input ByteBuffer contains less than {@link #chunkSize} data, the last chunk will
* be buffered.
*/
public synchronized Iterable<ByteBuffer> split(ByteBuffer inputByteBuffer) {

if (!inputByteBuffer.hasRemaining()) {
return Collections.singletonList(inputByteBuffer);
}

if (totalBytes != null) {
return splitWithKnownLength(inputByteBuffer);
} else {
return splitWithUnknownLength(inputByteBuffer);
}
}

private synchronized Iterable<ByteBuffer> splitWithKnownLength(ByteBuffer inputByteBuffer) {
List<ByteBuffer> byteBuffers = new ArrayList<>();

// If current buffer is not empty, fill the buffer first.
Expand All @@ -88,6 +93,33 @@ public synchronized Iterable<ByteBuffer> split(ByteBuffer inputByteBuffer) {
return byteBuffers;
}

private synchronized Iterable<ByteBuffer> splitWithUnknownLength(ByteBuffer inputByteBuffer) {
boolean isLastChunk = inputByteBuffer.remaining() != chunkSize;
Comment thread
davidh44 marked this conversation as resolved.
Outdated
List<ByteBuffer> bufferedList = new ArrayList<>();

while (inputByteBuffer.hasRemaining()) {
int bytesToCopy = Math.min(inputByteBuffer.remaining(), currentBuffer.remaining());
byte[] bytes = new byte[bytesToCopy];
Comment thread
zoewangg marked this conversation as resolved.
Outdated
inputByteBuffer.get(bytes);
currentBuffer.put(bytes);

if (!currentBuffer.hasRemaining()) {
currentBuffer.flip();
ByteBuffer bufferToSend = ByteBuffer.allocate(currentBuffer.limit());
Comment thread
davidh44 marked this conversation as resolved.
Outdated
bufferToSend.put(currentBuffer);
bufferToSend.flip();
bufferedList.add(bufferToSend);
currentBuffer.clear();
}
}

if (isLastChunk && currentBuffer.position() != 0) {
bufferedList.add((ByteBuffer) currentBuffer.flip());
}

return bufferedList;
}

private boolean isCurrentBufferFull() {
return currentBuffer.position() == chunkSize;
}
Expand Down Expand Up @@ -151,8 +183,6 @@ public interface Builder extends SdkBuilder<Builder, ChunkBuffer> {
Builder bufferSize(int bufferSize);

Builder totalBytes(long totalBytes);


}

private static final class DefaultBuilder implements Builder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import java.nio.ByteBuffer;
import java.util.Optional;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.compression.Compressor;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/**
* Wrapper class to wrap an AsyncRequestBody.
* This will chunk and compress the payload with the provided {@link Compressor}.
*/
@SdkInternalApi
public class CompressionAsyncRequestBody implements AsyncRequestBody {
Comment thread
zoewangg marked this conversation as resolved.

private static final int COMPRESSION_CHUNK_SIZE = 128 * 1024;
Comment thread
cenedhryn marked this conversation as resolved.
Outdated
Comment thread
davidh44 marked this conversation as resolved.
Outdated
private final AsyncRequestBody wrapped;
private final Compressor compressor;
private final int chunkSize;

private CompressionAsyncRequestBody(DefaultBuilder builder) {
this.wrapped = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.compressor = Validate.paramNotNull(builder.compressor, "compressor");
this.chunkSize = builder.chunkSize != null ? builder.chunkSize : COMPRESSION_CHUNK_SIZE;
}

/**
* @return Builder instance to construct a {@link CompressionAsyncRequestBody}.
*/
public static Builder builder() {
return new DefaultBuilder();
}

public interface Builder extends SdkBuilder<CompressionAsyncRequestBody.Builder, CompressionAsyncRequestBody> {
Comment thread
davidh44 marked this conversation as resolved.

/**
* Sets the AsyncRequestBody that will be wrapped.
* @param asyncRequestBody
* @return This builder for method chaining.
*/
Builder asyncRequestBody(AsyncRequestBody asyncRequestBody);

/**
* Sets the compressor to compress the request.
* @param compressor
* @return This builder for method chaining.
*/
Builder compressor(Compressor compressor);

/**
* Sets the chunk size. Default size is 128 * 1024.
* @param chunkSize
* @return This builder for method chaining.
*/
Builder chunkSize(Integer chunkSize);
}

private static final class DefaultBuilder implements Builder {

private AsyncRequestBody asyncRequestBody;
private Compressor compressor;
private Integer chunkSize;

@Override
public CompressionAsyncRequestBody build() {
return new CompressionAsyncRequestBody(this);
}

@Override
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
this.asyncRequestBody = asyncRequestBody;
return this;
}

@Override
public Builder compressor(Compressor compressor) {
this.compressor = compressor;
return this;
}

@Override
public Builder chunkSize(Integer chunkSize) {
this.chunkSize = chunkSize;
return this;
}
}

@Override
public Optional<Long> contentLength() {
return wrapped.contentLength();
}

@Override
public String contentType() {
return wrapped.contentType();
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
Validate.notNull(s, "Subscription MUST NOT be null.");

ChunkBuffer chunkBuffer = ChunkBuffer.builder()
.bufferSize(chunkSize)
.build();

wrapped.flatMapIterable(chunkBuffer::split)
.map(compressor::compress).subscribe(s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
import software.amazon.awssdk.core.internal.async.CompressionAsyncRequestBody;
import software.amazon.awssdk.core.internal.compression.Compressor;
import software.amazon.awssdk.core.internal.compression.CompressorType;
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
Expand Down Expand Up @@ -63,7 +64,6 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ

Compressor compressor = resolveCompressorType(context.executionAttributes());

// non-streaming
if (!isStreaming(context)) {
compressEntirePayload(input, compressor);
updateContentEncodingHeader(input, compressor);
Expand All @@ -76,12 +76,14 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ
}

if (context.requestProvider() == null) {
// sync streaming
input.contentStreamProvider(new CompressionContentStreamProvider(input.contentStreamProvider(), compressor));
} else {
context.requestProvider(CompressionAsyncRequestBody.builder()
.asyncRequestBody(context.requestProvider())
.compressor(compressor)
.build());
}

// TODO : streaming - async

updateContentEncodingHeader(input, compressor);
return input;
}
Expand Down
Loading