From b0ca91783f660d95ef306d738a67cd2ee0e1cbef Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Fri, 30 Aug 2019 01:15:37 -0700 Subject: [PATCH 01/17] Reactor Netty Milestone release changes --- .../data/cosmos/benchmark/Configuration.java | 2 +- .../azure/data/cosmos/ConnectionPolicy.java | 6 +- .../cosmos/internal/RxGatewayStoreModel.java | 28 +-------- .../directconnectivity/ErrorUtils.java | 2 +- .../directconnectivity/HttpClientUtils.java | 2 +- .../directconnectivity/ResponseUtils.java | 24 +------- .../internal/http/BufferedHttpResponse.java | 7 +++ .../data/cosmos/internal/http/HttpClient.java | 38 ++++++++++-- .../internal/http/HttpClientConfig.java | 10 ++++ .../cosmos/internal/http/HttpResponse.java | 8 +++ .../internal/http/ReactorNettyClient.java | 59 ++++++++++++++----- ...GatewayServiceConfigurationReaderTest.java | 2 +- .../HttpClientMockWrapper.java | 5 +- .../HttpTransportClientTest.java | 18 +++--- sdk/cosmos/pom.xml | 18 +++++- 15 files changed, 139 insertions(+), 90 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Configuration.java b/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Configuration.java index 1ab63af0d5cf..f1e3cc2612d5 100644 --- a/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Configuration.java +++ b/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Configuration.java @@ -55,7 +55,7 @@ class Configuration { private int documentDataFieldSize = 20; @Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size") - private Integer maxConnectionPoolSize = 1000; + private Integer maxConnectionPoolSize = 500; @Parameter(names = "-consistencyLevel", description = "Consistency Level", converter = ConsistencyLevelConverter.class) private ConsistencyLevel consistencyLevel = ConsistencyLevel.SESSION; diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/ConnectionPolicy.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/ConnectionPolicy.java index 7e5992482575..3ff901219d51 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/ConnectionPolicy.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/ConnectionPolicy.java @@ -18,7 +18,7 @@ public final class ConnectionPolicy { private static final int DEFAULT_MEDIA_REQUEST_TIMEOUT_IN_MILLIS = 300 * 1000; private static final int DEFAULT_IDLE_CONNECTION_TIMEOUT_IN_MILLIS = 60 * 1000; - private static final int DEFAULT_MAX_POOL_SIZE = 1000; + private static final int DEFAULT_MAX_POOL_SIZE = 500; private static ConnectionPolicy default_policy = null; private int requestTimeoutInMillis; @@ -38,7 +38,7 @@ public final class ConnectionPolicy { * Constructor. */ public ConnectionPolicy() { - this.connectionMode = ConnectionMode.GATEWAY; + this.connectionMode = ConnectionMode.DIRECT; this.enableReadRequestsFallback = null; this.idleConnectionTimeoutInMillis = DEFAULT_IDLE_CONNECTION_TIMEOUT_IN_MILLIS; this.maxPoolSize = DEFAULT_MAX_POOL_SIZE; @@ -62,7 +62,7 @@ public static ConnectionPolicy defaultPolicy() { /** * Gets the request timeout (time to wait for response from network peer) in - * milliseconds. + * milliseconds. * * @return the request timeout in milliseconds. */ diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java index d76d639d5036..96d62fefa490 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java @@ -24,7 +24,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -43,7 +42,6 @@ */ class RxGatewayStoreModel implements RxStoreModel { - private final static int INITIAL_RESPONSE_BUFFER_SIZE = 1024; private final Logger logger = LoggerFactory.getLogger(RxGatewayStoreModel.class); private final Map defaultHeaders; private final HttpClient httpClient; @@ -229,22 +227,6 @@ private String ensureSlashPrefixed(String path) { return "/" + path; } - private Mono toString(Flux contentObservable) { - return contentObservable - .reduce( - new ByteArrayOutputStream(INITIAL_RESPONSE_BUFFER_SIZE), - (out, bb) -> { - try { - bb.readBytes(out, bb.readableBytes()); - return out; - } - catch (IOException e) { - throw new RuntimeException(e); - } - }) - .map(out -> new String(out.toByteArray(), StandardCharsets.UTF_8)); - } - /** * Transforms the reactor netty's client response Observable to RxDocumentServiceResponse Observable. * @@ -272,11 +254,7 @@ private Flux toDocumentServiceResponse(Mono - inputStreamObservable = httpResponse - .body() - .flatMap(byteBuf -> - Flux.just(IOUtils.toInputStream(byteBuf.toString(StandardCharsets.UTF_8), StandardCharsets.UTF_8))); + inputStreamObservable = httpResponse.bodyAsInputStream(); } return inputStreamObservable @@ -315,7 +293,7 @@ private Flux toDocumentServiceResponse(Mono - contentObservable = toString(httpResponse.body()).flux(); + contentObservable = httpResponse.bodyAsString().flux(); } return contentObservable @@ -498,4 +476,4 @@ private void applySessionToken(RxDocumentServiceRequest request) { headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, sessionToken); } } -} \ No newline at end of file +} diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ErrorUtils.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ErrorUtils.java index 0956a1460b7c..230e28463c69 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ErrorUtils.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ErrorUtils.java @@ -17,7 +17,7 @@ public class ErrorUtils { private static final Logger logger = LoggerFactory.getLogger(ErrorUtils.class); static Mono getErrorResponseAsync(HttpResponse responseMessage, HttpRequest request) { - Mono responseAsString = ResponseUtils.toString(responseMessage.body()); + Mono responseAsString = responseMessage.bodyAsString(); if (request.httpMethod() == HttpMethod.DELETE) { return Mono.just(StringUtils.EMPTY); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientUtils.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientUtils.java index 279a16b4c5a9..48d9f571890e 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientUtils.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientUtils.java @@ -31,7 +31,7 @@ static Mono parseResponseAsync(Mono htt } private static Mono createDocumentClientException(HttpResponse httpResponse) { - Mono readStream = ResponseUtils.toString(httpResponse.body()); + Mono readStream = httpResponse.bodyAsString(); return readStream.map(body -> { CosmosError cosmosError = BridgeInternal.createCosmosError(body); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ResponseUtils.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ResponseUtils.java index 724b1e31cc01..e1bdf9f3b3f5 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ResponseUtils.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ResponseUtils.java @@ -6,33 +6,11 @@ import com.azure.data.cosmos.internal.http.HttpHeaders; import com.azure.data.cosmos.internal.http.HttpRequest; import com.azure.data.cosmos.internal.http.HttpResponse; -import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpMethod; import org.apache.commons.lang3.StringUtils; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; - class ResponseUtils { - private final static int INITIAL_RESPONSE_BUFFER_SIZE = 1024; - - public static Mono toString(Flux contentObservable) { - return contentObservable - .reduce( - new ByteArrayOutputStream(INITIAL_RESPONSE_BUFFER_SIZE), - (out, bb) -> { - try { - bb.readBytes(out, bb.readableBytes()); - return out; - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .map(out -> new String(out.toByteArray(), StandardCharsets.UTF_8)); - } static Mono toStoreResponse(HttpResponse httpClientResponse, HttpRequest httpRequest) { @@ -44,7 +22,7 @@ static Mono toStoreResponse(HttpResponse httpClientResponse, Http // for delete we don't expect any body contentObservable = Mono.just(StringUtils.EMPTY); } else { - contentObservable = toString(httpClientResponse.body()); + contentObservable = httpClientResponse.bodyAsString(); } return contentObservable.flatMap(content -> { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java index bad4c3cc57e7..0ea8845308f7 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java @@ -7,6 +7,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -53,6 +55,11 @@ public Flux body() { return bodyAsByteArray().flatMapMany(bytes -> Flux.just(Unpooled.wrappedBuffer(bytes))); } + @Override + public Flux bodyAsInputStream() { + return bodyAsByteArray().flatMapMany(bytes -> Flux.just(new ByteArrayInputStream(bytes))); + } + @Override public Mono bodyAsString() { return bodyAsByteArray() diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java index 7a4ed04a838e..bf323f898b1a 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java @@ -5,6 +5,8 @@ import reactor.core.publisher.Mono; import reactor.netty.resources.ConnectionProvider; +import java.time.Duration; + /** * A generic interface for sending HTTP requests and getting responses. */ @@ -19,7 +21,7 @@ public interface HttpClient { Mono send(HttpRequest request); /** - * Create fixed HttpClient with {@link HttpClientConfig} + * Create HttpClient with FixedChannelPool {@link HttpClientConfig} * * @return the HttpClient */ @@ -28,12 +30,40 @@ static HttpClient createFixed(HttpClientConfig httpClientConfig) { throw new IllegalArgumentException("HttpClientConfig is null"); } - if (httpClientConfig.getMaxPoolSize() == null) { - return new ReactorNettyClient(ConnectionProvider.fixed(httpClientConfig.getConfigs().getReactorNettyConnectionPoolName()), httpClientConfig); + Integer maxIdleConnectionTimeoutInMillis = 60 * 1000; + if (httpClientConfig.getMaxIdleConnectionTimeoutInMillis() != null) { + maxIdleConnectionTimeoutInMillis = httpClientConfig.getMaxIdleConnectionTimeoutInMillis(); + } + + // Default pool size + Integer maxPoolSize = 500; + if (httpClientConfig.getMaxPoolSize() != null) { + maxPoolSize = httpClientConfig.getMaxPoolSize(); } - return new ReactorNettyClient(ConnectionProvider.fixed(httpClientConfig.getConfigs().getReactorNettyConnectionPoolName(), httpClientConfig.getMaxPoolSize()), httpClientConfig); + + int connectionAcquireTimeoutInMillis = 45 * 1000; + + ConnectionProvider fixedConnectionProvider = + ConnectionProvider.fixed(httpClientConfig.getConfigs().getReactorNettyConnectionPoolName(), + maxPoolSize, connectionAcquireTimeoutInMillis, Duration.ofMillis(maxIdleConnectionTimeoutInMillis)); + + return ReactorNettyClient.createWithConnectionProvider(fixedConnectionProvider, httpClientConfig); } + /** + * Create HttpClient with un-pooled connection {@link HttpClientConfig} + * + * @return the HttpClient + */ + static HttpClient create(HttpClientConfig httpClientConfig) { + if (httpClientConfig.getConfigs() == null) { + throw new IllegalArgumentException("HttpClientConfig is null"); + } + + return ReactorNettyClient.create(httpClientConfig); + } + + /** * Shutdown the Http Client and clean up resources */ diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClientConfig.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClientConfig.java index f83d8dbf1b65..6be4ea5ccf49 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClientConfig.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClientConfig.java @@ -18,6 +18,7 @@ public class HttpClientConfig { private Integer maxIdleConnectionTimeoutInMillis; private Integer requestTimeoutInMillis; private InetSocketAddress proxy; + private boolean connectionKeepAlive = true; public HttpClientConfig(Configs configs) { this.configs = configs; @@ -43,6 +44,11 @@ public HttpClientConfig withRequestTimeoutInMillis(int requestTimeoutInMillis) { return this; } + public HttpClientConfig withConnectionKeepAlive(boolean connectionKeepAlive) { + this.connectionKeepAlive = connectionKeepAlive; + return this; + } + public Configs getConfigs() { return configs; } @@ -62,4 +68,8 @@ public Integer getRequestTimeoutInMillis() { public InetSocketAddress getProxy() { return proxy; } + + public boolean isConnectionKeepAlive() { + return connectionKeepAlive; + } } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java index 4ccc799132ee..2363da2dde7e 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java @@ -8,6 +8,7 @@ import reactor.core.publisher.Mono; import reactor.netty.Connection; +import java.io.InputStream; import java.nio.charset.Charset; /** @@ -68,6 +69,13 @@ public abstract class HttpResponse implements AutoCloseable { */ public abstract Flux body(); + /** + * Get the response content as InputStream. + * + * @return this response content as InputStream + */ + public abstract Flux bodyAsInputStream(); + /** * Get the response content as a byte[]. * diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java index abf63363dffe..b6d3ae161ca5 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java @@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelOption; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.logging.LogLevel; import org.reactivestreams.Publisher; @@ -18,8 +19,8 @@ import reactor.netty.http.client.HttpClientResponse; import reactor.netty.resources.ConnectionProvider; import reactor.netty.tcp.ProxyProvider; -import reactor.netty.tcp.TcpResources; +import java.io.InputStream; import java.nio.charset.Charset; import java.util.Objects; import java.util.function.BiFunction; @@ -31,30 +32,51 @@ */ class ReactorNettyClient implements HttpClient { - private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName()); + private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class.getSimpleName()); private HttpClientConfig httpClientConfig; private reactor.netty.http.client.HttpClient httpClient; private ConnectionProvider connectionProvider; + private ReactorNettyClient() {} + + /** + * Creates ReactorNettyClient with un-pooled connection. + */ + public static ReactorNettyClient create(HttpClientConfig httpClientConfig) { + ReactorNettyClient reactorNettyClient = new ReactorNettyClient(); + reactorNettyClient.httpClientConfig = httpClientConfig; + reactorNettyClient.httpClient = reactor.netty.http.client.HttpClient.newConnection(); + reactorNettyClient.configureChannelPipelineHandlers(); + return reactorNettyClient; + } + /** * Creates ReactorNettyClient with {@link ConnectionProvider}. */ - ReactorNettyClient(ConnectionProvider connectionProvider, HttpClientConfig httpClientConfig) { - this.connectionProvider = connectionProvider; - this.httpClientConfig = httpClientConfig; - this.httpClient = reactor.netty.http.client.HttpClient.create(connectionProvider); - configureChannelPipelineHandlers(); + public static ReactorNettyClient createWithConnectionProvider(ConnectionProvider connectionProvider, HttpClientConfig httpClientConfig) { + ReactorNettyClient reactorNettyClient = new ReactorNettyClient(); + reactorNettyClient.connectionProvider = connectionProvider; + reactorNettyClient.httpClientConfig = httpClientConfig; + reactorNettyClient.httpClient = reactor.netty.http.client.HttpClient.create(connectionProvider); + reactorNettyClient.configureChannelPipelineHandlers(); + return reactorNettyClient; } private void configureChannelPipelineHandlers() { this.httpClient = this.httpClient.tcpConfiguration(tcpClient -> { - if (LoggerFactory.getLogger(REACTOR_NETWORK_LOG_CATEGORY).isTraceEnabled()) { - tcpClient = tcpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.TRACE); - } if (this.httpClientConfig.getProxy() != null) { - tcpClient = tcpClient.proxy(typeSpec -> typeSpec.type(ProxyProvider.Proxy.HTTP).address(this.httpClientConfig.getProxy())); + tcpClient = + tcpClient.proxy(typeSpec -> typeSpec.type(ProxyProvider.Proxy.HTTP).address(this.httpClientConfig.getProxy())); } + tcpClient = + tcpClient.secure(sslContextSpec -> sslContextSpec.sslContext(this.httpClientConfig.getConfigs().getSslContext())); + if (LoggerFactory.getLogger(REACTOR_NETWORK_LOG_CATEGORY).isTraceEnabled()) { + tcpClient = tcpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.INFO); + } + // By default, keep alive is enabled on http client + tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); + return tcpClient; }); } @@ -66,6 +88,7 @@ public Mono send(final HttpRequest request) { Objects.requireNonNull(this.httpClientConfig); return this.httpClient + .keepAlive(this.httpClientConfig.isConnectionKeepAlive()) .port(request.port()) .request(HttpMethod.valueOf(request.httpMethod().toString())) .uri(request.uri().toString()) @@ -87,9 +110,9 @@ private static BiFunction> bod } if (restRequest.body() != null) { Flux nettyByteBufFlux = restRequest.body().map(Unpooled::wrappedBuffer); - return reactorNettyOutbound.options(sendOptions -> sendOptions.flushOnEach(false)).send(nettyByteBufFlux); + return reactorNettyOutbound.send(nettyByteBufFlux); } else { - return reactorNettyOutbound.options(sendOptions -> sendOptions.flushOnEach(false)); + return reactorNettyOutbound; } }; } @@ -107,8 +130,9 @@ private static BiFunction body() { return bodyIntern().doFinally(s -> this.close()); } + @Override + public Flux bodyAsInputStream() { + return bodyIntern().asInputStream().doFinally(s -> this.close()); + } + @Override public Mono bodyAsByteArray() { return bodyIntern().aggregate().asByteArray().doFinally(s -> this.close()); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java index 0401d60021aa..013cc48ca65c 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java @@ -147,7 +147,7 @@ private HttpResponse getMockResponse(String databaseAccountJson) { Mockito.doReturn(Flux.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, databaseAccountJson))) .when(httpResponse).body(); Mockito.doReturn(Mono.just(databaseAccountJson)) - .when(httpResponse).bodyAsString(StandardCharsets.UTF_8); + .when(httpResponse).bodyAsString(); Mockito.doReturn(new HttpHeaders()).when(httpResponse).headers(); return httpResponse; diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientMockWrapper.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientMockWrapper.java index fa67ed06cc39..f86b2207c251 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientMockWrapper.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientMockWrapper.java @@ -3,7 +3,6 @@ package com.azure.data.cosmos.internal.directconnectivity; -import com.azure.data.cosmos.internal.directconnectivity.WFConstants; import com.azure.data.cosmos.internal.http.HttpClient; import com.azure.data.cosmos.internal.http.HttpHeaders; import com.azure.data.cosmos.internal.http.HttpRequest; @@ -20,8 +19,6 @@ import java.util.Collections; import java.util.List; -; - public class HttpClientMockWrapper { public static HttpClientBehaviourBuilder httpClientBehaviourBuilder() { return new HttpClientBehaviourBuilder(); @@ -88,7 +85,7 @@ public HttpResponse asHttpResponse() { HttpResponse resp = Mockito.mock(HttpResponse.class); Mockito.doReturn(this.status).when(resp).statusCode(); Mockito.doReturn(Flux.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, this.content))).when(resp).body(); - Mockito.doReturn(Mono.just(this.content)).when(resp).bodyAsString(StandardCharsets.UTF_8); + Mockito.doReturn(Mono.just(this.content)).when(resp).bodyAsString(); Mockito.doReturn(this.httpHeaders).when(resp).headers(); return resp; } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpTransportClientTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpTransportClientTest.java index 92af307dfb96..bfebc6844a51 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpTransportClientTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpTransportClientTest.java @@ -7,9 +7,14 @@ import com.azure.data.cosmos.ConflictException; import com.azure.data.cosmos.ForbiddenException; import com.azure.data.cosmos.GoneException; +import com.azure.data.cosmos.InternalServerErrorException; +import com.azure.data.cosmos.InvalidPartitionException; import com.azure.data.cosmos.LockedException; import com.azure.data.cosmos.MethodNotAllowedException; +import com.azure.data.cosmos.NotFoundException; +import com.azure.data.cosmos.PartitionIsMigratingException; import com.azure.data.cosmos.PartitionKeyRangeGoneException; +import com.azure.data.cosmos.PartitionKeyRangeIsSplittingException; import com.azure.data.cosmos.PreconditionFailedException; import com.azure.data.cosmos.RequestEntityTooLargeException; import com.azure.data.cosmos.RequestRateTooLargeException; @@ -17,26 +22,17 @@ import com.azure.data.cosmos.RetryWithException; import com.azure.data.cosmos.ServiceUnavailableException; import com.azure.data.cosmos.UnauthorizedException; -import com.azure.data.cosmos.internal.directconnectivity.HttpTransportClient; -import com.azure.data.cosmos.internal.directconnectivity.HttpUtils; import com.azure.data.cosmos.internal.Configs; +import com.azure.data.cosmos.internal.FailureValidator; import com.azure.data.cosmos.internal.HttpConstants; -import com.azure.data.cosmos.InternalServerErrorException; -import com.azure.data.cosmos.InvalidPartitionException; -import com.azure.data.cosmos.NotFoundException; import com.azure.data.cosmos.internal.OperationType; -import com.azure.data.cosmos.PartitionIsMigratingException; -import com.azure.data.cosmos.PartitionKeyRangeIsSplittingException; import com.azure.data.cosmos.internal.ResourceType; import com.azure.data.cosmos.internal.RxDocumentServiceRequest; import com.azure.data.cosmos.internal.UserAgentContainer; -import com.azure.data.cosmos.internal.directconnectivity.HttpTransportClient; -import com.azure.data.cosmos.internal.directconnectivity.HttpUtils; import com.azure.data.cosmos.internal.http.HttpClient; import com.azure.data.cosmos.internal.http.HttpHeaders; import com.azure.data.cosmos.internal.http.HttpRequest; import com.azure.data.cosmos.internal.http.HttpResponse; -import com.azure.data.cosmos.internal.FailureValidator; import io.netty.channel.ConnectTimeoutException; import io.reactivex.subscribers.TestSubscriber; import org.assertj.core.api.Assertions; @@ -67,7 +63,7 @@ public class HttpTransportClientTest { private final long lsn = 5; private final String partitionKeyRangeId = "3"; - + @Test(groups = "unit") public void getResourceFeedUri_Document() throws Exception { RxDocumentServiceRequest req = RxDocumentServiceRequest.createFromName( diff --git a/sdk/cosmos/pom.xml b/sdk/cosmos/pom.xml index 3477993a4597..62d978602f6f 100644 --- a/sdk/cosmos/pom.xml +++ b/sdk/cosmos/pom.xml @@ -41,14 +41,23 @@ Licensed under the MIT License. 1.10.19 4.1.38.Final 2.0.25.Final + 0.9.0.M3 3.1.0 27.0.1-jre - 3.2.9.RELEASE + 3.3.0.M3 unit ${project.basedir}/target/collectedArtifactsForRelease + + + spring-milestones + Spring Milestones Repository + https://repo.spring.io/milestone + + + @@ -70,6 +79,13 @@ Licensed under the MIT License. ${reactor-core.version} + + io.projectreactor.netty + reactor-netty + ${reactor-netty.version} + + + io.netty netty-codec-http From 5b4d442c07163fc1e466ca2434dd3d836ad6a3c6 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Wed, 4 Sep 2019 14:57:52 -0700 Subject: [PATCH 02/17] Updated Milestone version to RC version --- sdk/cosmos/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/pom.xml b/sdk/cosmos/pom.xml index 62d978602f6f..adfe814952b1 100644 --- a/sdk/cosmos/pom.xml +++ b/sdk/cosmos/pom.xml @@ -41,10 +41,10 @@ Licensed under the MIT License. 1.10.19 4.1.38.Final 2.0.25.Final - 0.9.0.M3 + 0.9.0.RC1 3.1.0 27.0.1-jre - 3.3.0.M3 + 3.3.0.RC1 unit ${project.basedir}/target/collectedArtifactsForRelease From 93d9a91311105aac34e1bfff39663caf1eff45be Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Wed, 4 Sep 2019 17:43:10 -0700 Subject: [PATCH 03/17] Code review comments: 1. Reverted the RC1 back to M3 version. 2. Added constants for literal values in Configs.java 3. Removed inputStream implementation from HttpResponse.java as its not required --- .../azure/data/cosmos/internal/Configs.java | 16 +++ .../cosmos/internal/RxGatewayStoreModel.java | 136 +++++++----------- .../internal/http/BufferedHttpResponse.java | 5 - .../data/cosmos/internal/http/HttpClient.java | 6 +- .../cosmos/internal/http/HttpResponse.java | 7 - .../internal/http/ReactorNettyClient.java | 8 +- sdk/cosmos/pom.xml | 4 +- 7 files changed, 71 insertions(+), 111 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Configs.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Configs.java index e747ffa0ed6f..2bf3bd18196f 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Configs.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Configs.java @@ -55,6 +55,10 @@ public class Configs { private static final int CPU_CNT = Runtime.getRuntime().availableProcessors(); private static final int DEFAULT_DIRECT_HTTPS_POOL_SIZE = CPU_CNT * 500; + // Reactor Netty Constants + private static final int MAX_IDLE_CONNECTION_TIMEOUT_IN_MILLIS = 60 * 1000; + private static final int CONNECTION_ACQUIRE_TIMEOUT_IN_MILLIS = 45 * 1000; + private static final int REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE = 500; private static final String REACTOR_NETTY_CONNECTION_POOL_NAME = "reactor-netty-connection-pool"; public Configs() { @@ -147,6 +151,18 @@ public String getReactorNettyConnectionPoolName() { return REACTOR_NETTY_CONNECTION_POOL_NAME; } + public int getMaxIdleConnectionTimeoutInMillis() { + return MAX_IDLE_CONNECTION_TIMEOUT_IN_MILLIS; + } + + public int getConnectionAcquireTimeoutInMillis() { + return CONNECTION_ACQUIRE_TIMEOUT_IN_MILLIS; + } + + public int getReactorNettyMaxConnectionPoolSize() { + return REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE; + } + private static String getJVMConfigAsString(String propName, String defaultValue) { String propValue = System.getProperty(propName); return StringUtils.defaultString(propValue, defaultValue); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java index 96d62fefa490..f43457f55775 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java @@ -241,98 +241,58 @@ private String ensureSlashPrefixed(String path) { private Flux toDocumentServiceResponse(Mono httpResponseMono, RxDocumentServiceRequest request) { - if (request.getIsMedia()) { - return httpResponseMono.flatMap(httpResponse -> { + return httpResponseMono.flatMap(httpResponse -> { - // header key/value pairs - HttpHeaders httpResponseHeaders = httpResponse.headers(); - int httpResponseStatus = httpResponse.statusCode(); + // header key/value pairs + HttpHeaders httpResponseHeaders = httpResponse.headers(); + int httpResponseStatus = httpResponse.statusCode(); - Flux inputStreamObservable; + Flux contentObservable; - if (request.getOperationType() == OperationType.Delete) { - // for delete we don't expect any body - inputStreamObservable = Flux.just(IOUtils.toInputStream("", StandardCharsets.UTF_8)); - } else { - inputStreamObservable = httpResponse.bodyAsInputStream(); - } - - return inputStreamObservable - .flatMap(contentInputStream -> { - try { - // If there is any error in the header response this throws exception - // TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception - validateOrThrow(request, - HttpResponseStatus.valueOf(httpResponseStatus), - httpResponseHeaders, - null, - contentInputStream); - - // transforms to Observable - StoreResponse rsp = new StoreResponse(httpResponseStatus, HttpUtils - .unescape(httpResponseHeaders.toMap().entrySet()), contentInputStream); - return Flux.just(rsp); - } catch (Exception e) { - return Flux.error(e); - } - }).single(); - - }).map(RxDocumentServiceResponse::new).flux(); - - } else { - return httpResponseMono.flatMap(httpResponse -> { - - // header key/value pairs - HttpHeaders httpResponseHeaders = httpResponse.headers(); - int httpResponseStatus = httpResponse.statusCode(); - - Flux contentObservable; - - if (request.getOperationType() == OperationType.Delete) { - // for delete we don't expect any body - contentObservable = Flux.just(StringUtils.EMPTY); - } else { - // transforms the ByteBufFlux to Flux - contentObservable = httpResponse.bodyAsString().flux(); - } + if (request.getOperationType() == OperationType.Delete) { + // for delete we don't expect any body + contentObservable = Flux.just(StringUtils.EMPTY); + } else { + // transforms the ByteBufFlux to Flux + contentObservable = httpResponse.bodyAsString().flux(); + } - return contentObservable - .flatMap(content -> { - try { - // If there is any error in the header response this throws exception - // TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception - validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null); - - // transforms to Observable - StoreResponse rsp = new StoreResponse(httpResponseStatus, - HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()), - content); - return Flux.just(rsp); - } catch (Exception e) { - return Flux.error(e); - } - }).single(); - - }).map(RxDocumentServiceResponse::new) - .onErrorResume(throwable -> { - if (!(throwable instanceof Exception)) { - // fatal error - logger.error("Unexpected failure {}", throwable.getMessage(), throwable); - return Mono.error(throwable); - } - - Exception exception = (Exception) throwable; - if (!(exception instanceof CosmosClientException)) { - // wrap in CosmosClientException - logger.error("Network failure", exception); - CosmosClientException dce = BridgeInternal.createCosmosClientException(0, exception); - BridgeInternal.setRequestHeaders(dce, request.getHeaders()); - return Mono.error(dce); - } - - return Mono.error(exception); - }).flux(); - } + return contentObservable + .flatMap(content -> { + try { + // If there is any error in the header response this throws exception + // TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception + validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null); + + // transforms to Observable + StoreResponse rsp = new StoreResponse(httpResponseStatus, + HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()), + content); + return Flux.just(rsp); + } catch (Exception e) { + return Flux.error(e); + } + }).single(); + + }).map(RxDocumentServiceResponse::new) + .onErrorResume(throwable -> { + if (!(throwable instanceof Exception)) { + // fatal error + logger.error("Unexpected failure {}", throwable.getMessage(), throwable); + return Mono.error(throwable); + } + + Exception exception = (Exception) throwable; + if (!(exception instanceof CosmosClientException)) { + // wrap in CosmosClientException + logger.error("Network failure", exception); + CosmosClientException dce = BridgeInternal.createCosmosClientException(0, exception); + BridgeInternal.setRequestHeaders(dce, request.getHeaders()); + return Mono.error(dce); + } + + return Mono.error(exception); + }).flux(); } private void validateOrThrow(RxDocumentServiceRequest request, HttpResponseStatus status, HttpHeaders headers, String body, diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java index 0ea8845308f7..e59445ca1cc4 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java @@ -55,11 +55,6 @@ public Flux body() { return bodyAsByteArray().flatMapMany(bytes -> Flux.just(Unpooled.wrappedBuffer(bytes))); } - @Override - public Flux bodyAsInputStream() { - return bodyAsByteArray().flatMapMany(bytes -> Flux.just(new ByteArrayInputStream(bytes))); - } - @Override public Mono bodyAsString() { return bodyAsByteArray() diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java index bf323f898b1a..9ea150006cf7 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java @@ -30,18 +30,18 @@ static HttpClient createFixed(HttpClientConfig httpClientConfig) { throw new IllegalArgumentException("HttpClientConfig is null"); } - Integer maxIdleConnectionTimeoutInMillis = 60 * 1000; + Integer maxIdleConnectionTimeoutInMillis = httpClientConfig.getConfigs().getMaxIdleConnectionTimeoutInMillis(); if (httpClientConfig.getMaxIdleConnectionTimeoutInMillis() != null) { maxIdleConnectionTimeoutInMillis = httpClientConfig.getMaxIdleConnectionTimeoutInMillis(); } // Default pool size - Integer maxPoolSize = 500; + Integer maxPoolSize = httpClientConfig.getConfigs().getReactorNettyMaxConnectionPoolSize(); if (httpClientConfig.getMaxPoolSize() != null) { maxPoolSize = httpClientConfig.getMaxPoolSize(); } - int connectionAcquireTimeoutInMillis = 45 * 1000; + int connectionAcquireTimeoutInMillis = httpClientConfig.getConfigs().getConnectionAcquireTimeoutInMillis(); ConnectionProvider fixedConnectionProvider = ConnectionProvider.fixed(httpClientConfig.getConfigs().getReactorNettyConnectionPoolName(), diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java index 2363da2dde7e..e57f3d0fb312 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java @@ -69,13 +69,6 @@ public abstract class HttpResponse implements AutoCloseable { */ public abstract Flux body(); - /** - * Get the response content as InputStream. - * - * @return this response content as InputStream - */ - public abstract Flux bodyAsInputStream(); - /** * Get the response content as a byte[]. * diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java index b6d3ae161ca5..07d59e724dd7 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java @@ -75,7 +75,8 @@ private void configureChannelPipelineHandlers() { tcpClient = tcpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.INFO); } // By default, keep alive is enabled on http client - tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); + tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, + httpClientConfig.getConfigs().getConnectionAcquireTimeoutInMillis()); return tcpClient; }); @@ -166,11 +167,6 @@ public Flux body() { return bodyIntern().doFinally(s -> this.close()); } - @Override - public Flux bodyAsInputStream() { - return bodyIntern().asInputStream().doFinally(s -> this.close()); - } - @Override public Mono bodyAsByteArray() { return bodyIntern().aggregate().asByteArray().doFinally(s -> this.close()); diff --git a/sdk/cosmos/pom.xml b/sdk/cosmos/pom.xml index adfe814952b1..62d978602f6f 100644 --- a/sdk/cosmos/pom.xml +++ b/sdk/cosmos/pom.xml @@ -41,10 +41,10 @@ Licensed under the MIT License. 1.10.19 4.1.38.Final 2.0.25.Final - 0.9.0.RC1 + 0.9.0.M3 3.1.0 27.0.1-jre - 3.3.0.RC1 + 3.3.0.M3 unit ${project.basedir}/target/collectedArtifactsForRelease From d6087c46adb60c652bd214d535764bcb6d75f01c Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 30 Sep 2019 18:58:23 -0700 Subject: [PATCH 04/17] Setting default connections to 1000, and fixed Exception handling by unwrapping exception --- .../src/main/java/com/azure/data/cosmos/ConnectionPolicy.java | 2 +- .../data/cosmos/internal/directconnectivity/StoreReader.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/ConnectionPolicy.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/ConnectionPolicy.java index 3ff901219d51..328a3453d1b6 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/ConnectionPolicy.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/ConnectionPolicy.java @@ -18,7 +18,7 @@ public final class ConnectionPolicy { private static final int DEFAULT_MEDIA_REQUEST_TIMEOUT_IN_MILLIS = 300 * 1000; private static final int DEFAULT_IDLE_CONNECTION_TIMEOUT_IN_MILLIS = 60 * 1000; - private static final int DEFAULT_MAX_POOL_SIZE = 500; + private static final int DEFAULT_MAX_POOL_SIZE = 1000; private static ConnectionPolicy default_policy = null; private int requestTimeoutInMillis; diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java index 35f97b27fea2..2ab30182c5e2 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java @@ -721,7 +721,8 @@ StoreResult createStoreResult(StoreResponse storeResponse, /* itemLSN: */ itemLSN, /* sessionToken: */ sessionToken); } else { - CosmosClientException cosmosClientException = Utils.as(responseException, CosmosClientException.class); + Throwable unwrappedResponseExceptions = Exceptions.unwrap(responseException); + CosmosClientException cosmosClientException = Utils.as(unwrappedResponseExceptions, CosmosClientException.class); if (cosmosClientException != null) { StoreReader.verifyCanContinueOnException(cosmosClientException); long quorumAckedLSN = -1; From fc123c3ce56870d59a687a6a43dbf236a892610b Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Tue, 1 Oct 2019 12:51:13 -0700 Subject: [PATCH 05/17] Unwrapping exception --- .../directconnectivity/ReplicatedResourceClientTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/ReplicatedResourceClientTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/ReplicatedResourceClientTest.java index 712276d8b4ad..875048d1f5d4 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/ReplicatedResourceClientTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/ReplicatedResourceClientTest.java @@ -19,6 +19,7 @@ import org.mockito.Mockito; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import java.util.concurrent.TimeUnit; @@ -69,6 +70,7 @@ public static void validateFailure(Mono single, FailureValidator testSubscriber.assertNotComplete(); testSubscriber.assertTerminated(); Assertions.assertThat(testSubscriber.errorCount()).isEqualTo(1); - validator.validate(testSubscriber.errors().get(0)); + Throwable throwable = Exceptions.unwrap(testSubscriber.errors().get(0)); + validator.validate(throwable); } } From 3d672fbf74f75026a1e0f7de728304411921afbe Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Tue, 1 Oct 2019 16:52:56 -0700 Subject: [PATCH 06/17] Fixed validate failure by unwrapping exception --- .../java/com/azure/data/cosmos/internal/RetryUtilsTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RetryUtilsTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RetryUtilsTest.java index bfddbc7286b9..4a7ad273c7aa 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RetryUtilsTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RetryUtilsTest.java @@ -13,6 +13,7 @@ import org.mockito.stubbing.Answer; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import java.time.Duration; @@ -100,7 +101,8 @@ private void validateFailure(Mono single, long timeout, Class Date: Wed, 2 Oct 2019 14:39:48 -0700 Subject: [PATCH 07/17] Fixed multi-master conflict resolution test --- .../rx/MultiMasterConflictResolutionTest.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/MultiMasterConflictResolutionTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/MultiMasterConflictResolutionTest.java index 0f8996e31132..863197e2f83d 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/MultiMasterConflictResolutionTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/MultiMasterConflictResolutionTest.java @@ -104,18 +104,19 @@ public void conflictResolutionPolicyCRUD() { } // Tests the following scenarios - // 1. CUSTOM with valid sprocLink - // 2. CUSTOM with null sprocLink, should default to empty string - // 3. CUSTOM with empty sprocLink, should default to empty string - testConflictResolutionPolicyRequiringPath(ConflictResolutionMode.CUSTOM, - new String[] { "randomSprocName", null, "" }, new String[] { "randomSprocName", "", "" }); + // 1. Custom with valid sprocLink + // 2. Custom with null sprocLink, should default to empty string + // 3. Custom with empty sprocLink, should default to empty string + testConflictResolutionPolicyRequiringPath(ConflictResolutionMode.CUSTOM, new String[] { "dbs/mydb/colls" + + "/mycoll/sprocs/randomSprocName", null, "" }, new String[] { "dbs/mydb/colls/mycoll/sprocs" + + "/randomSprocName", "", "" }); } private void testConflictResolutionPolicyRequiringPath(ConflictResolutionMode conflictResolutionMode, String[] paths, String[] expectedPaths) { - for (int i = 0; i < paths.length; i++) { + for (int i = 0; i < paths.length; i++) { CosmosContainerProperties collectionSettings = new CosmosContainerProperties(UUID.randomUUID().toString(), partitionKeyDef); - + if (conflictResolutionMode == ConflictResolutionMode.LAST_WRITER_WINS) { collectionSettings.conflictResolutionPolicy(ConflictResolutionPolicy.createLastWriterWinsPolicy(paths[i])); } else { @@ -123,7 +124,7 @@ private void testConflictResolutionPolicyRequiringPath(ConflictResolutionMode co } collectionSettings = database.createContainer(collectionSettings, new CosmosContainerRequestOptions()).block().properties(); assertThat(collectionSettings.conflictResolutionPolicy().mode()).isEqualTo(conflictResolutionMode); - + if (conflictResolutionMode == ConflictResolutionMode.LAST_WRITER_WINS) { assertThat(collectionSettings.conflictResolutionPolicy().conflictResolutionPath()).isEqualTo(expectedPaths[i]); } else { @@ -131,7 +132,7 @@ private void testConflictResolutionPolicyRequiringPath(ConflictResolutionMode co } } } - + @Test(groups = "multi-master", timeOut = TIMEOUT) public void invalidConflictResolutionPolicy_LastWriterWinsWithStoredProc() throws Exception { CosmosContainerProperties collection = new CosmosContainerProperties(UUID.randomUUID().toString(), partitionKeyDef); From a201feabe0778190a36bd4706eda2aa8a1349353 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Wed, 2 Oct 2019 16:51:13 -0700 Subject: [PATCH 08/17] Unwrapping exceptions wherever possible to make sure we check on inner exception cause --- .../com/azure/data/cosmos/CosmosClient.java | 13 ++++++---- .../com/azure/data/cosmos/CosmosDatabase.java | 8 +++--- .../cosmos/internal/RxGatewayStoreModel.java | 9 ++++--- .../internal/caches/RxCollectionCache.java | 10 ++++--- .../GatewayAddressCache.java | 9 ++++--- .../directconnectivity/StoreReader.java | 19 +++++++------- .../ProxyDocumentQueryExecutionContext.java | 26 ++++++++++--------- 7 files changed, 53 insertions(+), 41 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java index 2740ead28186..bc5717311fb8 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java @@ -9,6 +9,7 @@ import com.azure.data.cosmos.internal.Permission; import com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdMetrics; import io.micrometer.core.instrument.MeterRegistry; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -167,16 +168,18 @@ public Mono createDatabaseIfNotExists(String id) { } private Mono createDatabaseIfNotExistsInternal(CosmosDatabase database){ - return database.read().onErrorResume(exception -> { - if (exception instanceof CosmosClientException) { - CosmosClientException cosmosClientException = (CosmosClientException) exception; + return database.read().onErrorResume(t -> { + Throwable throwable = Exceptions.unwrap(t); + if (throwable instanceof CosmosClientException) { + CosmosClientException cosmosClientException = (CosmosClientException) throwable; if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) { return createDatabase(new CosmosDatabaseProperties(database.id()), new CosmosDatabaseRequestOptions()); } } - return Mono.error(exception); + return Mono.error(throwable); }); - } + }CosmosDatabase.java + CosmosDatabase.java /** * Creates a database. diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosDatabase.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosDatabase.java index 0b8ce7d03e1b..685d559f37b1 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosDatabase.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosDatabase.java @@ -7,6 +7,7 @@ import com.azure.data.cosmos.internal.Offer; import com.azure.data.cosmos.internal.Paths; import org.apache.commons.lang3.StringUtils; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -308,13 +309,14 @@ public Mono createContainerIfNotExists(String id, Strin private Mono createContainerIfNotExistsInternal( CosmosContainerProperties containerProperties, CosmosContainer container, CosmosContainerRequestOptions options) { return container.read(options).onErrorResume(exception -> { - if (exception instanceof CosmosClientException) { - CosmosClientException cosmosClientException = (CosmosClientException) exception; + Throwable unwrappedException = Exceptions.unwrap(exception); + if (unwrappedException instanceof CosmosClientException) { + CosmosClientException cosmosClientException = (CosmosClientException) unwrappedException; if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) { return createContainer(containerProperties, options); } } - return Mono.error(exception); + return Mono.error(unwrappedException); }); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java index f43457f55775..3868d105cb69 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java @@ -276,13 +276,14 @@ private Flux toDocumentServiceResponse(Mono { - if (!(throwable instanceof Exception)) { + Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable); + if (!(unwrappedException instanceof Exception)) { // fatal error - logger.error("Unexpected failure {}", throwable.getMessage(), throwable); - return Mono.error(throwable); + logger.error("Unexpected failure {}", unwrappedException.getMessage(), unwrappedException); + return Mono.error(unwrappedException); } - Exception exception = (Exception) throwable; + Exception exception = (Exception) unwrappedException; if (!(exception instanceof CosmosClientException)) { // wrap in CosmosClientException logger.error("Network failure", exception); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java index a3c707a02f80..231cb463f787 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java @@ -11,6 +11,7 @@ import com.azure.data.cosmos.internal.RxDocumentServiceRequest; import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity; import org.apache.commons.lang3.StringUtils; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import java.util.Map; @@ -105,13 +106,14 @@ private Mono resolveByPartitionKeyRangeIdentityAsync(Partiti // which contains value ",", then resolve to collection rid in this header. if (partitionKeyRangeIdentity != null && partitionKeyRangeIdentity.getCollectionRid() != null) { return this.resolveByRidAsync(partitionKeyRangeIdentity.getCollectionRid(), properties) - .onErrorResume(e -> { - if (e instanceof NotFoundException) { + .onErrorResume(t -> { + Throwable unwrappedException = Exceptions.unwrap(t); + if (unwrappedException instanceof NotFoundException) { // This is signal to the upper logic either to refresh // collection cache and retry. return Mono.error(new InvalidPartitionException(RMResources.InvalidDocumentCollection)); } - return Mono.error(e); + return Mono.error(unwrappedException); }); } @@ -165,7 +167,7 @@ private Mono refreshAsync(RxDocumentServiceRequest request) { }); }).then(); } else { - // In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we + // In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we // need to refresh unconditionally. mono = Mono.fromRunnable(() -> this.refresh(request.getResourceAddress(), request.properties)); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java index e9427767cf87..ff7528b21c26 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java @@ -202,13 +202,14 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque } return addresses; - }).onErrorResume(ex -> { - CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(ex, CosmosClientException.class); + }).onErrorResume(throwable -> { + Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable); + CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(unwrappedException, CosmosClientException.class); if (dce == null) { if (forceRefreshPartitionAddressesModified) { this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity); } - return Mono.error(ex); + return Mono.error(unwrappedException); } else { if (Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND) || Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.GONE) || @@ -217,7 +218,7 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity); return null; } - return Mono.error(ex); + return Mono.error(unwrappedException); } }); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java index 2ab30182c5e2..e130afaba629 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java @@ -158,13 +158,13 @@ private Flux toStoreResult(RxDocumentServiceRequest request, return Flux.error(e); } } - ).onErrorResume(t -> { - + ).onErrorResume(throwable -> { + Throwable unwrappedException = Exceptions.unwrap(throwable); try { - logger.debug("Exception {} is thrown while doing readMany", t); - Exception storeException = Utils.as(t, Exception.class); + logger.debug("Exception {} is thrown while doing readMany", unwrappedException); + Exception storeException = Utils.as(unwrappedException, Exception.class); if (storeException == null) { - return Flux.error(t); + return Flux.error(unwrappedException); } // Exception storeException = readTask.Exception != null ? readTask.Exception.InnerException : null; @@ -534,12 +534,13 @@ private Mono readPrimaryInternalAsync( } } - ).onErrorResume(t -> { - logger.debug("Exception {} is thrown while doing READ Primary", t); + ).onErrorResume(throwable -> { + Throwable unwrappedException = Exceptions.unwrap(throwable); + logger.debug("Exception {} is thrown while doing READ Primary", unwrappedException); - Exception storeTaskException = Utils.as(t, Exception.class); + Exception storeTaskException = Utils.as(unwrappedException, Exception.class); if (storeTaskException == null) { - return Mono.error(t); + return Mono.error(unwrappedException); } try { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ProxyDocumentQueryExecutionContext.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ProxyDocumentQueryExecutionContext.java index 9f9310935921..a0bf1a54df7c 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ProxyDocumentQueryExecutionContext.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/query/ProxyDocumentQueryExecutionContext.java @@ -26,7 +26,7 @@ /** * While this class is public, but it is not part of our published public APIs. * This is meant to be internally used only by our sdk. - * + * * This class is used as a proxy to wrap the * DefaultDocumentQueryExecutionContext which is needed for sending the query to * GATEWAY first and then uses PipelinedDocumentQueryExecutionContext after it @@ -74,25 +74,27 @@ public ProxyDocumentQueryExecutionContext( @Override public Flux> executeAsync() { - Function>> func = t -> { + Function>> func = throwable -> { - logger.debug("Received non result message from gateway", t); - if (!(t instanceof Exception)) { - logger.error("Unexpected failure", t); - return Flux.error(t); + Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable); + + logger.debug("Received non result message from gateway", unwrappedException); + if (!(unwrappedException instanceof Exception)) { + logger.error("Unexpected failure", unwrappedException); + return Flux.error(unwrappedException); } - - if (!isCrossPartitionQuery((Exception) t)) { + + if (!isCrossPartitionQuery((Exception) unwrappedException)) { // If this is not a cross partition query then propagate error - logger.debug("Failure from gateway", t); - return Flux.error(t); + logger.debug("Failure from gateway", unwrappedException); + return Flux.error(unwrappedException); } logger.debug("Setting up query pipeline using the query plan received form gateway"); // cross partition query construct pipeline - CosmosClientException dce = (CosmosClientException) t; + CosmosClientException dce = (CosmosClientException) unwrappedException; PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = new PartitionedQueryExecutionInfo(dce.error().getPartitionedQueryExecutionInfo()); @@ -161,6 +163,6 @@ public static Flux> c resourceLink, collection, isContinuationExpected, - correlatedActivityId)); + correlatedActivityId)); } } From f6dfd061c17cc41d68540a6447eae65f3dee149e Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Wed, 2 Oct 2019 17:16:07 -0700 Subject: [PATCH 09/17] Fixed compilation errors --- .../src/main/java/com/azure/data/cosmos/CosmosClient.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java index bc5717311fb8..ad8a4754b42c 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java @@ -178,8 +178,7 @@ private Mono createDatabaseIfNotExistsInternal(CosmosDat } return Mono.error(throwable); }); - }CosmosDatabase.java - CosmosDatabase.java + } /** * Creates a database. From 4e54cc3e413f636e19e8c5579c16449b04ebfd34 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Thu, 3 Oct 2019 11:36:34 -0700 Subject: [PATCH 10/17] Fixed doOnError in Store Client and Consistency Writer to handle unwrapped exceptions --- .../cosmos/internal/directconnectivity/ConsistencyWriter.java | 4 +++- .../data/cosmos/internal/directconnectivity/StoreClient.java | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java index c4a38cf2abde..3c51d58fd756 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java @@ -21,6 +21,7 @@ import org.apache.commons.collections4.ComparatorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -164,7 +165,8 @@ Mono writePrivateAsync( .doOnError( t -> { try { - CosmosClientException ex = Utils.as(t, CosmosClientException.class); + Throwable unwrappedException = Exceptions.unwrap(t); + CosmosClientException ex = Utils.as(unwrappedException, CosmosClientException.class); try { BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request, storeReader.createStoreResult(null, ex, false, false, primaryUri)); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreClient.java index e41245c011c1..976bb0e02366 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreClient.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreClient.java @@ -88,7 +88,8 @@ public Mono processMessageAsync(RxDocumentServiceRequ storeResponse = storeResponse.doOnError(e -> { try { - CosmosClientException exception = Utils.as(e, CosmosClientException.class); + Throwable unwrappedException = reactor.core.Exceptions.unwrap(e); + CosmosClientException exception = Utils.as(unwrappedException, CosmosClientException.class); if (exception == null) { return; From 9533896ee1f8e2ad15c4b1af0dcc549cbfb1d24b Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Thu, 3 Oct 2019 19:20:29 -0700 Subject: [PATCH 11/17] Handling empty response from backend --- .../azure/data/cosmos/internal/RxGatewayStoreModel.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java index 3868d105cb69..8f2291d29d6c 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java @@ -17,6 +17,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.internal.StringUtil; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -254,7 +255,10 @@ private Flux toDocumentServiceResponse(Mono - contentObservable = httpResponse.bodyAsString().flux(); + contentObservable = httpResponse + .bodyAsString() + .switchIfEmpty(Mono.just(StringUtils.EMPTY)) + .flux(); } return contentObservable @@ -272,7 +276,8 @@ private Flux toDocumentServiceResponse(Mono { From 3ffcb75c51c91358e07acdeb88f76f90c4d8c126 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Fri, 11 Oct 2019 23:40:12 -0500 Subject: [PATCH 12/17] Updating number of documents and collection throughput size --- .../rx/BackPressureCrossPartitionTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/BackPressureCrossPartitionTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/BackPressureCrossPartitionTest.java index 60b31f0c8d93..6490384fca4e 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/BackPressureCrossPartitionTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/BackPressureCrossPartitionTest.java @@ -47,7 +47,7 @@ public class BackPressureCrossPartitionTest extends TestSuiteBase { private static final int TIMEOUT = 1800000; private static final int SETUP_TIMEOUT = 60000; - private int numberOfDocs = 4000; + private int numberOfDocs = 1000; private CosmosDatabase createdDatabase; private CosmosContainer createdCollection; private List createdDocuments; @@ -105,12 +105,12 @@ private void warmUp() { public Object[][] queryProvider() { return new Object[][] { // query, maxItemCount, max expected back pressure buffered, total number of expected query results - { "SELECT * FROM r", 1, 2 * Queues.SMALL_BUFFER_SIZE, numberOfDocs}, - { "SELECT * FROM r", 100, 2 * Queues.SMALL_BUFFER_SIZE, numberOfDocs}, - { "SELECT * FROM r ORDER BY r.prop", 100, 2 * Queues.SMALL_BUFFER_SIZE + 3 * numberOfPartitions, numberOfDocs}, - { "SELECT TOP 1000 * FROM r", 1, 2 * Queues.SMALL_BUFFER_SIZE, 1000}, - { "SELECT TOP 1000 * FROM r", 100, 2 * Queues.SMALL_BUFFER_SIZE, 1000}, - { "SELECT TOP 1000 * FROM r ORDER BY r.prop", 100, 2 * Queues.SMALL_BUFFER_SIZE + 3 * numberOfPartitions , 1000}, + { "SELECT * FROM r", 1, Queues.SMALL_BUFFER_SIZE, numberOfDocs}, + { "SELECT * FROM r", 100, Queues.SMALL_BUFFER_SIZE, numberOfDocs}, + { "SELECT * FROM r ORDER BY r.prop", 100, Queues.SMALL_BUFFER_SIZE + 3 * numberOfPartitions, numberOfDocs}, + { "SELECT TOP 500 * FROM r", 1, Queues.SMALL_BUFFER_SIZE, 500}, + { "SELECT TOP 500 * FROM r", 100, Queues.SMALL_BUFFER_SIZE, 500}, + { "SELECT TOP 500 * FROM r ORDER BY r.prop", 100, Queues.SMALL_BUFFER_SIZE + 3 * numberOfPartitions , 500}, }; } @@ -131,7 +131,7 @@ public void query(String query, int maxItemCount, int maxExpectedBufferedCountFo log.info("instantiating subscriber ..."); TestSubscriber> subscriber = new TestSubscriber<>(1); queryObservable.publishOn(Schedulers.elastic(), 1).subscribe(subscriber); - int sleepTimeInMillis = 40000; + int sleepTimeInMillis = 10000; int i = 0; // use a test subscriber and request for more result and sleep in between @@ -168,7 +168,7 @@ public void beforeClass() { CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); client = new ClientUnderTestBuilder(clientBuilder()).build(); createdDatabase = getSharedCosmosDatabase(client); - createdCollection = createCollection(createdDatabase, getCollectionDefinition(), options, 20000); + createdCollection = createCollection(createdDatabase, getCollectionDefinition(), options, 50000); ArrayList docDefList = new ArrayList<>(); for(int i = 0; i < numberOfDocs; i++) { From fcbc6794567a21a04747f985d8a8a1afb09d405f Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Sat, 12 Oct 2019 18:33:04 -0500 Subject: [PATCH 13/17] Updated Feed Response Validator for query metrics --- .../cosmos/internal/FeedResponseListValidator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java index 616e36422665..cd078eaab524 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java @@ -173,7 +173,7 @@ public Builder allPagesSatisfy(FeedResponseValidator pageValidator) { public void validate(List> feedList) { for(FeedResponse fp: feedList) { - pageValidator.validate(fp); + pageValidator.validate(fp); } } }); @@ -191,15 +191,15 @@ public void validate(List> feedList) { if (value instanceof Double) { Double d = result.getDouble("_aggregate"); - assertThat(d).isEqualTo(value); + assertThat(d).isEqualTo(value); } else if (value instanceof Integer) { Integer d = result.getInt("_aggregate"); - assertThat(d).isEqualTo(value); + assertThat(d).isEqualTo(value); } else if (value instanceof String) { String d = result.getString("_aggregate"); - assertThat(d).isEqualTo(value); + assertThat(d).isEqualTo(value); } else if (value instanceof Document){ assertThat(result.toString()).isEqualTo(value.toString()); @@ -280,10 +280,10 @@ public void validate(List> feedList) { assertThat(queryMetrics.getTotalQueryExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0); assertThat(queryMetrics.getOutputDocumentCount()).isGreaterThan(0); assertThat(queryMetrics.getRetrievedDocumentCount()).isGreaterThan(0); - assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThan(0); + assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getDocumentWriteTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getVMExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0); - assertThat(queryMetrics.getQueryPreparationTimes().getLogicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThan(0); + assertThat(queryMetrics.getQueryPreparationTimes().getLogicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getQueryPreparationTimes().getPhysicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getQueryPreparationTimes().getQueryCompilationTime().compareTo(Duration.ZERO)).isGreaterThan(0); assertThat(queryMetrics.getRuntimeExecutionTimes().getQueryEngineExecutionTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); From e3c0920c64791bd0593dd5940869be33f445515a Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 14 Oct 2019 12:35:50 -0700 Subject: [PATCH 14/17] Fixed query documents with aggregate test --- .../azure/data/cosmos/internal/FeedResponseListValidator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java index cd078eaab524..5d4aa1806596 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java @@ -276,7 +276,7 @@ public void validate(List> feedList) { if (shouldHaveMetrics) { QueryMetrics queryMetrics = BridgeInternal.createQueryMetricsFromCollection(BridgeInternal.queryMetricsFromFeedResponse(feedPage).values()); assertThat(queryMetrics.getIndexHitDocumentCount()).isGreaterThanOrEqualTo(0); - assertThat(queryMetrics.getRetrievedDocumentSize()).isGreaterThan(0); + assertThat(queryMetrics.getRetrievedDocumentSize()).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getTotalQueryExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0); assertThat(queryMetrics.getOutputDocumentCount()).isGreaterThan(0); assertThat(queryMetrics.getRetrievedDocumentCount()).isGreaterThan(0); From 4f01fd2620da1db366ff27ead4f97249bde54156 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 14 Oct 2019 13:59:05 -0700 Subject: [PATCH 15/17] Updated retrieved documents count --- .../azure/data/cosmos/internal/FeedResponseListValidator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java index 5d4aa1806596..773f1ccd6967 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java @@ -279,7 +279,7 @@ public void validate(List> feedList) { assertThat(queryMetrics.getRetrievedDocumentSize()).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getTotalQueryExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0); assertThat(queryMetrics.getOutputDocumentCount()).isGreaterThan(0); - assertThat(queryMetrics.getRetrievedDocumentCount()).isGreaterThan(0); + assertThat(queryMetrics.getRetrievedDocumentCount()).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getDocumentWriteTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getVMExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0); From 7641a2daa2c76e62921d8370b46a51992846bad4 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 14 Oct 2019 18:16:05 -0700 Subject: [PATCH 16/17] Updated getLogicalPlanBuildTime for query metrics validation --- .../azure/data/cosmos/internal/FeedResponseListValidator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java index 773f1ccd6967..927b1f375a52 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java @@ -283,7 +283,7 @@ public void validate(List> feedList) { assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getDocumentWriteTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getVMExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0); - assertThat(queryMetrics.getQueryPreparationTimes().getLogicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); + assertThat(queryMetrics.getQueryPreparationTimes().getLogicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThan(0); assertThat(queryMetrics.getQueryPreparationTimes().getPhysicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getQueryPreparationTimes().getQueryCompilationTime().compareTo(Duration.ZERO)).isGreaterThan(0); assertThat(queryMetrics.getRuntimeExecutionTimes().getQueryEngineExecutionTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); From 5973dfedcdf65d2a3b6f738add9d49ac8aa0a7a2 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 14 Oct 2019 23:41:36 -0700 Subject: [PATCH 17/17] Unused imports --- .../java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java index 8f2291d29d6c..07891c82eaba 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java @@ -17,7 +17,6 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.util.internal.StringUtil; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger;