Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -30,4 +31,9 @@ public Flux<ByteBuffer> getBody() {
public Mono<byte[]> getBodyAsByteArray() {
return Mono.defer(() -> Mono.just(body));
}

@Override
public HttpResponse buffer() {
return this; // This response is already buffered.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
int statusCode = innerResponse.statusCode();
HttpHeaders headers = fromJdkHttpHeaders(innerResponse.headers());

return FluxUtil.collectBytesInByteBufferStream(JdkFlowAdapter
return FluxUtil.collectBytesFromNetworkResponse(JdkFlowAdapter
.flowPublisherToFlux(innerResponse.body())
.flatMapSequential(Flux::fromIterable))
.flatMapSequential(Flux::fromIterable), headers)
.map(bytes -> new BufferedJdkHttpResponse(request, statusCode, headers, bytes));
} else {
return Mono.just(new JdkHttpResponse(request, innerResponse));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public Flux<ByteBuffer> getBody() {

@Override
public Mono<byte[]> getBodyAsByteArray() {
return FluxUtil.collectBytesInByteBufferStream(getBody());
return FluxUtil.collectBytesFromNetworkResponse(getBody(), getHeaders());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.implementation.NettyAsyncHttpResponse;
import com.azure.core.http.netty.implementation.NettyAsyncHttpBufferedResponse;
import com.azure.core.http.netty.implementation.NettyAsyncHttpResponse;
import com.azure.core.http.netty.implementation.NettyToAzureCoreHttpHeadersWrapper;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -144,7 +145,8 @@ private static BiFunction<HttpClientResponse, Connection, Publisher<HttpResponse
Flux<ByteBuffer> body = reactorNettyConnection.inbound().receive().asByteBuffer()
.doFinally(ignored -> closeConnection(reactorNettyConnection));

return FluxUtil.collectBytesInByteBufferStream(body)
return FluxUtil.collectBytesFromNetworkResponse(body,
new NettyToAzureCoreHttpHeadersWrapper(reactorNettyResponse.responseHeaders()))
.map(bytes -> new NettyAsyncHttpBufferedResponse(reactorNettyResponse, restRequest, bytes));

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.core.http.netty.implementation;

import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.util.CoreUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -42,4 +43,9 @@ public Mono<String> getBodyAsString() {
public Mono<String> getBodyAsString(Charset charset) {
return Mono.defer(() -> Mono.just(new String(body, charset)));
}

@Override
public HttpResponse buffer() {
return this; // This response is already buffered.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.core.http.okhttp.implementation;

import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import okhttp3.Response;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -30,4 +31,9 @@ public Flux<ByteBuffer> getBody() {
public Mono<byte[]> getBodyAsByteArray() {
return Mono.defer(() -> Mono.just(body));
}

@Override
public HttpResponse buffer() {
return this; // This response is already buffered.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,9 @@ public MockHttpResponse addHeader(String name, String value) {
headers.set(name, value);
return this;
}

@Override
public HttpResponse buffer() {
return this; // This response is already buffered.
}
}
1 change: 1 addition & 0 deletions sdk/core/azure-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@
--add-opens com.azure.core/com.azure.core.http=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.http.policy=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.http.rest=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.implementation=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.implementation.entities=com.fasterxml.jackson.databind
--add-opens com.azure.core/com.azure.core.implementation.entities=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.implementation.http=ALL-UNNAMED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ private Constructor<? extends Response<?>> locateResponseConstructor(Class<?> re
* @return an instance of a {@link Response} implementation
*/
Mono<Response<?>> invoke(final Constructor<? extends Response<?>> constructor,
final HttpResponseDecoder.HttpDecodedResponse decodedResponse,
final Object bodyAsObject) {
final HttpResponseDecoder.HttpDecodedResponse decodedResponse,
final Object bodyAsObject) {
final HttpResponse httpResponse = decodedResponse.getSourceResponse();
final HttpRequest httpRequest = httpResponse.getRequest();
final int responseStatusCode = httpResponse.getStatusCode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.implementation;

import com.azure.core.util.logging.ClientLogger;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;

/**
* This class offers functionality similar to {@link ByteArrayOutputStream} but instead of consuming byte arrays it
* consumes ByteBuffers. This class is optimized to reduce the number of memory copies by directly writing a passed
* ByteBuffers data directly into its backing byte array, this differs from handling for {@link ByteArrayOutputStream}
* where ByteBuffer data may need to be first copied into a temporary buffer resulting in an extra memory copy.
*/
public final class ByteBufferCollector {
/*
* Start with a default size of 1 KB as this is small enough to be performant while covering most small response
* sizes.
*/
private static final int DEFAULT_INITIAL_SIZE = 1024;

private static final String INVALID_INITIAL_SIZE = "'initialSize' cannot be equal to or less than 0.";
private static final String REQUESTED_BUFFER_INVALID = "Required capacity is greater than Integer.MAX_VALUE.";

private final ClientLogger logger = new ClientLogger(ByteBufferCollector.class);

private byte[] buffer;
private int position;

/**
* Constructs a new ByteBufferCollector instance with a default sized backing array.
*/
public ByteBufferCollector() {
this(DEFAULT_INITIAL_SIZE);
}

/**
* Constructs a new ByteBufferCollector instance with a specified initial size.
*
* @param initialSize The initial size for the backing array.
* @throws IllegalArgumentException If {@code initialSize} is equal to or less than {@code 0}.
*/
public ByteBufferCollector(int initialSize) {
if (initialSize <= 0) {
throw logger.logExceptionAsError(new IllegalArgumentException(INVALID_INITIAL_SIZE));
}

this.buffer = new byte[initialSize];
this.position = 0;
}

/**
* Writes a ByteBuffers content into the backing array.
*
* @param byteBuffer The ByteBuffer to concatenate into the collector.
* @throws IllegalStateException If the size of the backing array would be larger than {@link Integer#MAX_VALUE}
* when the passed buffer is written.
*/
public synchronized void write(ByteBuffer byteBuffer) {
// Null buffer.
if (byteBuffer == null) {
return;
}

int remaining = byteBuffer.remaining();

// Nothing to write.
if (remaining == 0) {
return;
}

ensureCapacity(remaining);
byteBuffer.get(buffer, position, remaining);
position += remaining;
}

/**
* Creates a copy of the backing array resized to the number of bytes written into the collector.
*
* @return A copy of the backing array.
*/
public synchronized byte[] toByteArray() {
return Arrays.copyOf(buffer, position);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is used internally, can we avoid array copy? Can the buffer be returned directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll be a performance hit, but for now I'd rather return a copy as this will be safe from additional changes after return if more bytes are written.

}

/*
* This method ensures that the backing buffer has sufficient space to write the data from the passed ByteBuffer.
*/
private void ensureCapacity(int byteBufferRemaining) throws OutOfMemoryError {
int currentCapacity = buffer.length;
int requiredCapacity = position + byteBufferRemaining;

/*
* This validates that adding the current capacity and ByteBuffer remaining doesn't result in an integer
* overflow response by checking that the result uses the same sign as both of the addition arguments.
*/
if (((position ^ requiredCapacity) & (byteBufferRemaining ^ requiredCapacity)) < 0) {
throw logger.logExceptionAsError(new IllegalStateException(REQUESTED_BUFFER_INVALID));
}

// Buffer is already large enough to accept the data being written.
if (currentCapacity >= requiredCapacity) {
return;
}

// Propose a new capacity that is double the size of the current capacity.
int proposedNewCapacity = currentCapacity << 1;

// If the proposed capacity is less than the required capacity use the required capacity.
// Subtraction is used instead of a direct comparison as the bit shift could overflow into a negative int.
if ((proposedNewCapacity - requiredCapacity) < 0) {
proposedNewCapacity = requiredCapacity;
}

// If the proposed capacity doubling overflowed integer use a slightly smaller size than max value.
if (proposedNewCapacity < 0) {
proposedNewCapacity = Integer.MAX_VALUE - 8;
}

buffer = Arrays.copyOf(buffer, proposedNewCapacity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public final class BufferedHttpResponse extends HttpResponse {
public BufferedHttpResponse(HttpResponse innerHttpResponse) {
super(innerHttpResponse.getRequest());
this.innerHttpResponse = innerHttpResponse;
this.cachedBody = FluxUtil.collectBytesInByteBufferStream(innerHttpResponse.getBody())
this.cachedBody = FluxUtil.collectBytesFromNetworkResponse(innerHttpResponse.getBody(),
innerHttpResponse.getHeaders())
.map(ByteBuffer::wrap)
.flux()
.cache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ static Mono<Object> decode(HttpResponse httpResponse, SerializerAdapter serializ
return Mono.empty();
} else {
return Mono.fromCallable(() -> serializer.deserialize(httpResponse.getHeaders(), headerType))
.onErrorResume(IOException.class, e -> Mono.error(new HttpResponseException(
"HTTP response has malformed headers", httpResponse, e)));
.onErrorMap(IOException.class, e -> new HttpResponseException("HTTP response has malformed headers",
httpResponse, e));
}
}
}
Loading