diff --git a/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Configuration.java b/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Configuration.java
index 1ab63af0d5cf..f1e3cc2612d5 100644
--- a/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Configuration.java
+++ b/sdk/cosmos/microsoft-azure-cosmos-benchmark/src/main/java/com/azure/data/cosmos/benchmark/Configuration.java
@@ -55,7 +55,7 @@ class Configuration {
private int documentDataFieldSize = 20;
@Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size")
- private Integer maxConnectionPoolSize = 1000;
+ private Integer maxConnectionPoolSize = 500;
@Parameter(names = "-consistencyLevel", description = "Consistency Level", converter = ConsistencyLevelConverter.class)
private ConsistencyLevel consistencyLevel = ConsistencyLevel.SESSION;
diff --git a/sdk/cosmos/microsoft-azure-cosmos/pom.xml b/sdk/cosmos/microsoft-azure-cosmos/pom.xml
index d845ecd66627..cb8c12a72491 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/pom.xml
+++ b/sdk/cosmos/microsoft-azure-cosmos/pom.xml
@@ -140,6 +140,7 @@ Licensed under the MIT License.
io.projectreactor.netty
reactor-netty
+ ${reactor-netty.version}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java
index 2740ead28186..ad8a4754b42c 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosClient.java
@@ -9,6 +9,7 @@
import com.azure.data.cosmos.internal.Permission;
import com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdMetrics;
import io.micrometer.core.instrument.MeterRegistry;
+import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -167,14 +168,15 @@ public Mono createDatabaseIfNotExists(String id) {
}
private Mono createDatabaseIfNotExistsInternal(CosmosDatabase database){
- return database.read().onErrorResume(exception -> {
- if (exception instanceof CosmosClientException) {
- CosmosClientException cosmosClientException = (CosmosClientException) exception;
+ return database.read().onErrorResume(t -> {
+ Throwable throwable = Exceptions.unwrap(t);
+ if (throwable instanceof CosmosClientException) {
+ CosmosClientException cosmosClientException = (CosmosClientException) throwable;
if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createDatabase(new CosmosDatabaseProperties(database.id()), new CosmosDatabaseRequestOptions());
}
}
- return Mono.error(exception);
+ return Mono.error(throwable);
});
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosDatabase.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosDatabase.java
index 0b8ce7d03e1b..685d559f37b1 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosDatabase.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/CosmosDatabase.java
@@ -7,6 +7,7 @@
import com.azure.data.cosmos.internal.Offer;
import com.azure.data.cosmos.internal.Paths;
import org.apache.commons.lang3.StringUtils;
+import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -308,13 +309,14 @@ public Mono createContainerIfNotExists(String id, Strin
private Mono createContainerIfNotExistsInternal(
CosmosContainerProperties containerProperties, CosmosContainer container, CosmosContainerRequestOptions options) {
return container.read(options).onErrorResume(exception -> {
- if (exception instanceof CosmosClientException) {
- CosmosClientException cosmosClientException = (CosmosClientException) exception;
+ Throwable unwrappedException = Exceptions.unwrap(exception);
+ if (unwrappedException instanceof CosmosClientException) {
+ CosmosClientException cosmosClientException = (CosmosClientException) unwrappedException;
if (cosmosClientException.statusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createContainer(containerProperties, options);
}
}
- return Mono.error(exception);
+ return Mono.error(unwrappedException);
});
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Configs.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Configs.java
index e747ffa0ed6f..2bf3bd18196f 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Configs.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/Configs.java
@@ -55,6 +55,10 @@ public class Configs {
private static final int CPU_CNT = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_DIRECT_HTTPS_POOL_SIZE = CPU_CNT * 500;
+ // Reactor Netty Constants
+ private static final int MAX_IDLE_CONNECTION_TIMEOUT_IN_MILLIS = 60 * 1000;
+ private static final int CONNECTION_ACQUIRE_TIMEOUT_IN_MILLIS = 45 * 1000;
+ private static final int REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE = 500;
private static final String REACTOR_NETTY_CONNECTION_POOL_NAME = "reactor-netty-connection-pool";
public Configs() {
@@ -147,6 +151,18 @@ public String getReactorNettyConnectionPoolName() {
return REACTOR_NETTY_CONNECTION_POOL_NAME;
}
+ public int getMaxIdleConnectionTimeoutInMillis() {
+ return MAX_IDLE_CONNECTION_TIMEOUT_IN_MILLIS;
+ }
+
+ public int getConnectionAcquireTimeoutInMillis() {
+ return CONNECTION_ACQUIRE_TIMEOUT_IN_MILLIS;
+ }
+
+ public int getReactorNettyMaxConnectionPoolSize() {
+ return REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE;
+ }
+
private static String getJVMConfigAsString(String propName, String defaultValue) {
String propValue = System.getProperty(propName);
return StringUtils.defaultString(propValue, defaultValue);
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java
index d76d639d5036..07891c82eaba 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxGatewayStoreModel.java
@@ -24,7 +24,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -43,7 +42,6 @@
*/
class RxGatewayStoreModel implements RxStoreModel {
- private final static int INITIAL_RESPONSE_BUFFER_SIZE = 1024;
private final Logger logger = LoggerFactory.getLogger(RxGatewayStoreModel.class);
private final Map defaultHeaders;
private final HttpClient httpClient;
@@ -229,22 +227,6 @@ private String ensureSlashPrefixed(String path) {
return "/" + path;
}
- private Mono toString(Flux contentObservable) {
- return contentObservable
- .reduce(
- new ByteArrayOutputStream(INITIAL_RESPONSE_BUFFER_SIZE),
- (out, bb) -> {
- try {
- bb.readBytes(out, bb.readableBytes());
- return out;
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .map(out -> new String(out.toByteArray(), StandardCharsets.UTF_8));
- }
-
/**
* Transforms the reactor netty's client response Observable to RxDocumentServiceResponse Observable.
*
@@ -259,102 +241,63 @@ private Mono toString(Flux contentObservable) {
private Flux toDocumentServiceResponse(Mono httpResponseMono,
RxDocumentServiceRequest request) {
- if (request.getIsMedia()) {
- return httpResponseMono.flatMap(httpResponse -> {
+ return httpResponseMono.flatMap(httpResponse -> {
- // header key/value pairs
- HttpHeaders httpResponseHeaders = httpResponse.headers();
- int httpResponseStatus = httpResponse.statusCode();
+ // header key/value pairs
+ HttpHeaders httpResponseHeaders = httpResponse.headers();
+ int httpResponseStatus = httpResponse.statusCode();
- Flux inputStreamObservable;
-
- if (request.getOperationType() == OperationType.Delete) {
- // for delete we don't expect any body
- inputStreamObservable = Flux.just(IOUtils.toInputStream("", StandardCharsets.UTF_8));
- } else {
- // transforms the ByteBufFlux to Flux
- inputStreamObservable = httpResponse
- .body()
- .flatMap(byteBuf ->
- Flux.just(IOUtils.toInputStream(byteBuf.toString(StandardCharsets.UTF_8), StandardCharsets.UTF_8)));
- }
-
- return inputStreamObservable
- .flatMap(contentInputStream -> {
- try {
- // If there is any error in the header response this throws exception
- // TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
- validateOrThrow(request,
- HttpResponseStatus.valueOf(httpResponseStatus),
- httpResponseHeaders,
- null,
- contentInputStream);
-
- // transforms to Observable
- StoreResponse rsp = new StoreResponse(httpResponseStatus, HttpUtils
- .unescape(httpResponseHeaders.toMap().entrySet()), contentInputStream);
- return Flux.just(rsp);
- } catch (Exception e) {
- return Flux.error(e);
- }
- }).single();
-
- }).map(RxDocumentServiceResponse::new).flux();
-
- } else {
- return httpResponseMono.flatMap(httpResponse -> {
+ Flux contentObservable;
- // header key/value pairs
- HttpHeaders httpResponseHeaders = httpResponse.headers();
- int httpResponseStatus = httpResponse.statusCode();
-
- Flux contentObservable;
-
- if (request.getOperationType() == OperationType.Delete) {
- // for delete we don't expect any body
- contentObservable = Flux.just(StringUtils.EMPTY);
- } else {
- // transforms the ByteBufFlux to Flux
- contentObservable = toString(httpResponse.body()).flux();
- }
+ if (request.getOperationType() == OperationType.Delete) {
+ // for delete we don't expect any body
+ contentObservable = Flux.just(StringUtils.EMPTY);
+ } else {
+ // transforms the ByteBufFlux to Flux
+ contentObservable = httpResponse
+ .bodyAsString()
+ .switchIfEmpty(Mono.just(StringUtils.EMPTY))
+ .flux();
+ }
- return contentObservable
- .flatMap(content -> {
- try {
- // If there is any error in the header response this throws exception
- // TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
- validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);
-
- // transforms to Observable
- StoreResponse rsp = new StoreResponse(httpResponseStatus,
- HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
- content);
- return Flux.just(rsp);
- } catch (Exception e) {
- return Flux.error(e);
- }
- }).single();
-
- }).map(RxDocumentServiceResponse::new)
- .onErrorResume(throwable -> {
- if (!(throwable instanceof Exception)) {
- // fatal error
- logger.error("Unexpected failure {}", throwable.getMessage(), throwable);
- return Mono.error(throwable);
- }
-
- Exception exception = (Exception) throwable;
- if (!(exception instanceof CosmosClientException)) {
- // wrap in CosmosClientException
- logger.error("Network failure", exception);
- CosmosClientException dce = BridgeInternal.createCosmosClientException(0, exception);
- BridgeInternal.setRequestHeaders(dce, request.getHeaders());
- return Mono.error(dce);
- }
-
- return Mono.error(exception);
- }).flux();
- }
+ return contentObservable
+ .flatMap(content -> {
+ try {
+ // If there is any error in the header response this throws exception
+ // TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
+ validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);
+
+ // transforms to Observable
+ StoreResponse rsp = new StoreResponse(httpResponseStatus,
+ HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
+ content);
+ return Flux.just(rsp);
+ } catch (Exception e) {
+ return Flux.error(e);
+ }
+ })
+ .single();
+
+ }).map(RxDocumentServiceResponse::new)
+ .onErrorResume(throwable -> {
+ Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
+ if (!(unwrappedException instanceof Exception)) {
+ // fatal error
+ logger.error("Unexpected failure {}", unwrappedException.getMessage(), unwrappedException);
+ return Mono.error(unwrappedException);
+ }
+
+ Exception exception = (Exception) unwrappedException;
+ if (!(exception instanceof CosmosClientException)) {
+ // wrap in CosmosClientException
+ logger.error("Network failure", exception);
+ CosmosClientException dce = BridgeInternal.createCosmosClientException(0, exception);
+ BridgeInternal.setRequestHeaders(dce, request.getHeaders());
+ return Mono.error(dce);
+ }
+
+ return Mono.error(exception);
+ }).flux();
}
private void validateOrThrow(RxDocumentServiceRequest request, HttpResponseStatus status, HttpHeaders headers, String body,
@@ -498,4 +441,4 @@ private void applySessionToken(RxDocumentServiceRequest request) {
headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, sessionToken);
}
}
-}
\ No newline at end of file
+}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java
index a3c707a02f80..231cb463f787 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/RxCollectionCache.java
@@ -11,6 +11,7 @@
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity;
import org.apache.commons.lang3.StringUtils;
+import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import java.util.Map;
@@ -105,13 +106,14 @@ private Mono resolveByPartitionKeyRangeIdentityAsync(Partiti
// which contains value ",", then resolve to collection rid in this header.
if (partitionKeyRangeIdentity != null && partitionKeyRangeIdentity.getCollectionRid() != null) {
return this.resolveByRidAsync(partitionKeyRangeIdentity.getCollectionRid(), properties)
- .onErrorResume(e -> {
- if (e instanceof NotFoundException) {
+ .onErrorResume(t -> {
+ Throwable unwrappedException = Exceptions.unwrap(t);
+ if (unwrappedException instanceof NotFoundException) {
// This is signal to the upper logic either to refresh
// collection cache and retry.
return Mono.error(new InvalidPartitionException(RMResources.InvalidDocumentCollection));
}
- return Mono.error(e);
+ return Mono.error(unwrappedException);
});
}
@@ -165,7 +167,7 @@ private Mono refreshAsync(RxDocumentServiceRequest request) {
});
}).then();
} else {
- // In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we
+ // In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we
// need to refresh unconditionally.
mono = Mono.fromRunnable(() -> this.refresh(request.getResourceAddress(), request.properties));
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java
index c4a38cf2abde..3c51d58fd756 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ConsistencyWriter.java
@@ -21,6 +21,7 @@
import org.apache.commons.collections4.ComparatorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -164,7 +165,8 @@ Mono writePrivateAsync(
.doOnError(
t -> {
try {
- CosmosClientException ex = Utils.as(t, CosmosClientException.class);
+ Throwable unwrappedException = Exceptions.unwrap(t);
+ CosmosClientException ex = Utils.as(unwrappedException, CosmosClientException.class);
try {
BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request,
storeReader.createStoreResult(null, ex, false, false, primaryUri));
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ErrorUtils.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ErrorUtils.java
index 0956a1460b7c..230e28463c69 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ErrorUtils.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ErrorUtils.java
@@ -17,7 +17,7 @@ public class ErrorUtils {
private static final Logger logger = LoggerFactory.getLogger(ErrorUtils.class);
static Mono getErrorResponseAsync(HttpResponse responseMessage, HttpRequest request) {
- Mono responseAsString = ResponseUtils.toString(responseMessage.body());
+ Mono responseAsString = responseMessage.bodyAsString();
if (request.httpMethod() == HttpMethod.DELETE) {
return Mono.just(StringUtils.EMPTY);
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java
index e9427767cf87..ff7528b21c26 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java
@@ -202,13 +202,14 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque
}
return addresses;
- }).onErrorResume(ex -> {
- CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(ex, CosmosClientException.class);
+ }).onErrorResume(throwable -> {
+ Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
+ CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(unwrappedException, CosmosClientException.class);
if (dce == null) {
if (forceRefreshPartitionAddressesModified) {
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
}
- return Mono.error(ex);
+ return Mono.error(unwrappedException);
} else {
if (Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND) ||
Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.GONE) ||
@@ -217,7 +218,7 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
return null;
}
- return Mono.error(ex);
+ return Mono.error(unwrappedException);
}
});
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientUtils.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientUtils.java
index 279a16b4c5a9..48d9f571890e 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientUtils.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientUtils.java
@@ -31,7 +31,7 @@ static Mono parseResponseAsync(Mono htt
}
private static Mono createDocumentClientException(HttpResponse httpResponse) {
- Mono readStream = ResponseUtils.toString(httpResponse.body());
+ Mono readStream = httpResponse.bodyAsString();
return readStream.map(body -> {
CosmosError cosmosError = BridgeInternal.createCosmosError(body);
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ResponseUtils.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ResponseUtils.java
index 724b1e31cc01..e1bdf9f3b3f5 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ResponseUtils.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/ResponseUtils.java
@@ -6,33 +6,11 @@
import com.azure.data.cosmos.internal.http.HttpHeaders;
import com.azure.data.cosmos.internal.http.HttpRequest;
import com.azure.data.cosmos.internal.http.HttpResponse;
-import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMethod;
import org.apache.commons.lang3.StringUtils;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
class ResponseUtils {
- private final static int INITIAL_RESPONSE_BUFFER_SIZE = 1024;
-
- public static Mono toString(Flux contentObservable) {
- return contentObservable
- .reduce(
- new ByteArrayOutputStream(INITIAL_RESPONSE_BUFFER_SIZE),
- (out, bb) -> {
- try {
- bb.readBytes(out, bb.readableBytes());
- return out;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .map(out -> new String(out.toByteArray(), StandardCharsets.UTF_8));
- }
static Mono toStoreResponse(HttpResponse httpClientResponse, HttpRequest httpRequest) {
@@ -44,7 +22,7 @@ static Mono toStoreResponse(HttpResponse httpClientResponse, Http
// for delete we don't expect any body
contentObservable = Mono.just(StringUtils.EMPTY);
} else {
- contentObservable = toString(httpClientResponse.body());
+ contentObservable = httpClientResponse.bodyAsString();
}
return contentObservable.flatMap(content -> {
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreClient.java
index e41245c011c1..976bb0e02366 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreClient.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreClient.java
@@ -88,7 +88,8 @@ public Mono processMessageAsync(RxDocumentServiceRequ
storeResponse = storeResponse.doOnError(e -> {
try {
- CosmosClientException exception = Utils.as(e, CosmosClientException.class);
+ Throwable unwrappedException = reactor.core.Exceptions.unwrap(e);
+ CosmosClientException exception = Utils.as(unwrappedException, CosmosClientException.class);
if (exception == null) {
return;
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java
index 35f97b27fea2..e130afaba629 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/StoreReader.java
@@ -158,13 +158,13 @@ private Flux toStoreResult(RxDocumentServiceRequest request,
return Flux.error(e);
}
}
- ).onErrorResume(t -> {
-
+ ).onErrorResume(throwable -> {
+ Throwable unwrappedException = Exceptions.unwrap(throwable);
try {
- logger.debug("Exception {} is thrown while doing readMany", t);
- Exception storeException = Utils.as(t, Exception.class);
+ logger.debug("Exception {} is thrown while doing readMany", unwrappedException);
+ Exception storeException = Utils.as(unwrappedException, Exception.class);
if (storeException == null) {
- return Flux.error(t);
+ return Flux.error(unwrappedException);
}
// Exception storeException = readTask.Exception != null ? readTask.Exception.InnerException : null;
@@ -534,12 +534,13 @@ private Mono readPrimaryInternalAsync(
}
}
- ).onErrorResume(t -> {
- logger.debug("Exception {} is thrown while doing READ Primary", t);
+ ).onErrorResume(throwable -> {
+ Throwable unwrappedException = Exceptions.unwrap(throwable);
+ logger.debug("Exception {} is thrown while doing READ Primary", unwrappedException);
- Exception storeTaskException = Utils.as(t, Exception.class);
+ Exception storeTaskException = Utils.as(unwrappedException, Exception.class);
if (storeTaskException == null) {
- return Mono.error(t);
+ return Mono.error(unwrappedException);
}
try {
@@ -721,7 +722,8 @@ StoreResult createStoreResult(StoreResponse storeResponse,
/* itemLSN: */ itemLSN,
/* sessionToken: */ sessionToken);
} else {
- CosmosClientException cosmosClientException = Utils.as(responseException, CosmosClientException.class);
+ Throwable unwrappedResponseExceptions = Exceptions.unwrap(responseException);
+ CosmosClientException cosmosClientException = Utils.as(unwrappedResponseExceptions, CosmosClientException.class);
if (cosmosClientException != null) {
StoreReader.verifyCanContinueOnException(cosmosClientException);
long quorumAckedLSN = -1;
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java
index bad4c3cc57e7..e59445ca1cc4 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/BufferedHttpResponse.java
@@ -7,6 +7,8 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java
index 7a4ed04a838e..9ea150006cf7 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClient.java
@@ -5,6 +5,8 @@
import reactor.core.publisher.Mono;
import reactor.netty.resources.ConnectionProvider;
+import java.time.Duration;
+
/**
* A generic interface for sending HTTP requests and getting responses.
*/
@@ -19,7 +21,7 @@ public interface HttpClient {
Mono send(HttpRequest request);
/**
- * Create fixed HttpClient with {@link HttpClientConfig}
+ * Create HttpClient with FixedChannelPool {@link HttpClientConfig}
*
* @return the HttpClient
*/
@@ -28,12 +30,40 @@ static HttpClient createFixed(HttpClientConfig httpClientConfig) {
throw new IllegalArgumentException("HttpClientConfig is null");
}
- if (httpClientConfig.getMaxPoolSize() == null) {
- return new ReactorNettyClient(ConnectionProvider.fixed(httpClientConfig.getConfigs().getReactorNettyConnectionPoolName()), httpClientConfig);
+ Integer maxIdleConnectionTimeoutInMillis = httpClientConfig.getConfigs().getMaxIdleConnectionTimeoutInMillis();
+ if (httpClientConfig.getMaxIdleConnectionTimeoutInMillis() != null) {
+ maxIdleConnectionTimeoutInMillis = httpClientConfig.getMaxIdleConnectionTimeoutInMillis();
+ }
+
+ // Default pool size
+ Integer maxPoolSize = httpClientConfig.getConfigs().getReactorNettyMaxConnectionPoolSize();
+ if (httpClientConfig.getMaxPoolSize() != null) {
+ maxPoolSize = httpClientConfig.getMaxPoolSize();
}
- return new ReactorNettyClient(ConnectionProvider.fixed(httpClientConfig.getConfigs().getReactorNettyConnectionPoolName(), httpClientConfig.getMaxPoolSize()), httpClientConfig);
+
+ int connectionAcquireTimeoutInMillis = httpClientConfig.getConfigs().getConnectionAcquireTimeoutInMillis();
+
+ ConnectionProvider fixedConnectionProvider =
+ ConnectionProvider.fixed(httpClientConfig.getConfigs().getReactorNettyConnectionPoolName(),
+ maxPoolSize, connectionAcquireTimeoutInMillis, Duration.ofMillis(maxIdleConnectionTimeoutInMillis));
+
+ return ReactorNettyClient.createWithConnectionProvider(fixedConnectionProvider, httpClientConfig);
}
+ /**
+ * Create HttpClient with un-pooled connection {@link HttpClientConfig}
+ *
+ * @return the HttpClient
+ */
+ static HttpClient create(HttpClientConfig httpClientConfig) {
+ if (httpClientConfig.getConfigs() == null) {
+ throw new IllegalArgumentException("HttpClientConfig is null");
+ }
+
+ return ReactorNettyClient.create(httpClientConfig);
+ }
+
+
/**
* Shutdown the Http Client and clean up resources
*/
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClientConfig.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClientConfig.java
index f83d8dbf1b65..6be4ea5ccf49 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClientConfig.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpClientConfig.java
@@ -18,6 +18,7 @@ public class HttpClientConfig {
private Integer maxIdleConnectionTimeoutInMillis;
private Integer requestTimeoutInMillis;
private InetSocketAddress proxy;
+ private boolean connectionKeepAlive = true;
public HttpClientConfig(Configs configs) {
this.configs = configs;
@@ -43,6 +44,11 @@ public HttpClientConfig withRequestTimeoutInMillis(int requestTimeoutInMillis) {
return this;
}
+ public HttpClientConfig withConnectionKeepAlive(boolean connectionKeepAlive) {
+ this.connectionKeepAlive = connectionKeepAlive;
+ return this;
+ }
+
public Configs getConfigs() {
return configs;
}
@@ -62,4 +68,8 @@ public Integer getRequestTimeoutInMillis() {
public InetSocketAddress getProxy() {
return proxy;
}
+
+ public boolean isConnectionKeepAlive() {
+ return connectionKeepAlive;
+ }
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java
index 4ccc799132ee..e57f3d0fb312 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/HttpResponse.java
@@ -8,6 +8,7 @@
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
+import java.io.InputStream;
import java.nio.charset.Charset;
/**
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java
index abf63363dffe..07d59e724dd7 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/http/ReactorNettyClient.java
@@ -4,6 +4,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.logging.LogLevel;
import org.reactivestreams.Publisher;
@@ -18,8 +19,8 @@
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.ProxyProvider;
-import reactor.netty.tcp.TcpResources;
+import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.function.BiFunction;
@@ -31,30 +32,52 @@
*/
class ReactorNettyClient implements HttpClient {
- private final Logger logger = LoggerFactory.getLogger(getClass().getSimpleName());
+ private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class.getSimpleName());
private HttpClientConfig httpClientConfig;
private reactor.netty.http.client.HttpClient httpClient;
private ConnectionProvider connectionProvider;
+ private ReactorNettyClient() {}
+
+ /**
+ * Creates ReactorNettyClient with un-pooled connection.
+ */
+ public static ReactorNettyClient create(HttpClientConfig httpClientConfig) {
+ ReactorNettyClient reactorNettyClient = new ReactorNettyClient();
+ reactorNettyClient.httpClientConfig = httpClientConfig;
+ reactorNettyClient.httpClient = reactor.netty.http.client.HttpClient.newConnection();
+ reactorNettyClient.configureChannelPipelineHandlers();
+ return reactorNettyClient;
+ }
+
/**
* Creates ReactorNettyClient with {@link ConnectionProvider}.
*/
- ReactorNettyClient(ConnectionProvider connectionProvider, HttpClientConfig httpClientConfig) {
- this.connectionProvider = connectionProvider;
- this.httpClientConfig = httpClientConfig;
- this.httpClient = reactor.netty.http.client.HttpClient.create(connectionProvider);
- configureChannelPipelineHandlers();
+ public static ReactorNettyClient createWithConnectionProvider(ConnectionProvider connectionProvider, HttpClientConfig httpClientConfig) {
+ ReactorNettyClient reactorNettyClient = new ReactorNettyClient();
+ reactorNettyClient.connectionProvider = connectionProvider;
+ reactorNettyClient.httpClientConfig = httpClientConfig;
+ reactorNettyClient.httpClient = reactor.netty.http.client.HttpClient.create(connectionProvider);
+ reactorNettyClient.configureChannelPipelineHandlers();
+ return reactorNettyClient;
}
private void configureChannelPipelineHandlers() {
this.httpClient = this.httpClient.tcpConfiguration(tcpClient -> {
- if (LoggerFactory.getLogger(REACTOR_NETWORK_LOG_CATEGORY).isTraceEnabled()) {
- tcpClient = tcpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.TRACE);
- }
if (this.httpClientConfig.getProxy() != null) {
- tcpClient = tcpClient.proxy(typeSpec -> typeSpec.type(ProxyProvider.Proxy.HTTP).address(this.httpClientConfig.getProxy()));
+ tcpClient =
+ tcpClient.proxy(typeSpec -> typeSpec.type(ProxyProvider.Proxy.HTTP).address(this.httpClientConfig.getProxy()));
+ }
+ tcpClient =
+ tcpClient.secure(sslContextSpec -> sslContextSpec.sslContext(this.httpClientConfig.getConfigs().getSslContext()));
+ if (LoggerFactory.getLogger(REACTOR_NETWORK_LOG_CATEGORY).isTraceEnabled()) {
+ tcpClient = tcpClient.wiretap(REACTOR_NETWORK_LOG_CATEGORY, LogLevel.INFO);
}
+ // By default, keep alive is enabled on http client
+ tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+ httpClientConfig.getConfigs().getConnectionAcquireTimeoutInMillis());
+
return tcpClient;
});
}
@@ -66,6 +89,7 @@ public Mono send(final HttpRequest request) {
Objects.requireNonNull(this.httpClientConfig);
return this.httpClient
+ .keepAlive(this.httpClientConfig.isConnectionKeepAlive())
.port(request.port())
.request(HttpMethod.valueOf(request.httpMethod().toString()))
.uri(request.uri().toString())
@@ -87,9 +111,9 @@ private static BiFunction> bod
}
if (restRequest.body() != null) {
Flux nettyByteBufFlux = restRequest.body().map(Unpooled::wrappedBuffer);
- return reactorNettyOutbound.options(sendOptions -> sendOptions.flushOnEach(false)).send(nettyByteBufFlux);
+ return reactorNettyOutbound.send(nettyByteBufFlux);
} else {
- return reactorNettyOutbound.options(sendOptions -> sendOptions.flushOnEach(false));
+ return reactorNettyOutbound;
}
};
}
@@ -107,8 +131,9 @@ private static BiFunction> executeAsync() {
- Function super Throwable, ? extends Flux extends FeedResponse>> func = t -> {
+ Function super Throwable, ? extends Flux extends FeedResponse>> func = throwable -> {
- logger.debug("Received non result message from gateway", t);
- if (!(t instanceof Exception)) {
- logger.error("Unexpected failure", t);
- return Flux.error(t);
+ Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
+
+ logger.debug("Received non result message from gateway", unwrappedException);
+ if (!(unwrappedException instanceof Exception)) {
+ logger.error("Unexpected failure", unwrappedException);
+ return Flux.error(unwrappedException);
}
-
- if (!isCrossPartitionQuery((Exception) t)) {
+
+ if (!isCrossPartitionQuery((Exception) unwrappedException)) {
// If this is not a cross partition query then propagate error
- logger.debug("Failure from gateway", t);
- return Flux.error(t);
+ logger.debug("Failure from gateway", unwrappedException);
+ return Flux.error(unwrappedException);
}
logger.debug("Setting up query pipeline using the query plan received form gateway");
// cross partition query construct pipeline
- CosmosClientException dce = (CosmosClientException) t;
+ CosmosClientException dce = (CosmosClientException) unwrappedException;
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = new
PartitionedQueryExecutionInfo(dce.error().getPartitionedQueryExecutionInfo());
@@ -161,6 +163,6 @@ public static Flux> c
resourceLink,
collection,
isContinuationExpected,
- correlatedActivityId));
+ correlatedActivityId));
}
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java
index 616e36422665..927b1f375a52 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java
@@ -173,7 +173,7 @@ public Builder allPagesSatisfy(FeedResponseValidator pageValidator) {
public void validate(List> feedList) {
for(FeedResponse fp: feedList) {
- pageValidator.validate(fp);
+ pageValidator.validate(fp);
}
}
});
@@ -191,15 +191,15 @@ public void validate(List> feedList) {
if (value instanceof Double) {
Double d = result.getDouble("_aggregate");
- assertThat(d).isEqualTo(value);
+ assertThat(d).isEqualTo(value);
} else if (value instanceof Integer) {
Integer d = result.getInt("_aggregate");
- assertThat(d).isEqualTo(value);
+ assertThat(d).isEqualTo(value);
} else if (value instanceof String) {
String d = result.getString("_aggregate");
- assertThat(d).isEqualTo(value);
+ assertThat(d).isEqualTo(value);
} else if (value instanceof Document){
assertThat(result.toString()).isEqualTo(value.toString());
@@ -276,11 +276,11 @@ public void validate(List> feedList) {
if (shouldHaveMetrics) {
QueryMetrics queryMetrics = BridgeInternal.createQueryMetricsFromCollection(BridgeInternal.queryMetricsFromFeedResponse(feedPage).values());
assertThat(queryMetrics.getIndexHitDocumentCount()).isGreaterThanOrEqualTo(0);
- assertThat(queryMetrics.getRetrievedDocumentSize()).isGreaterThan(0);
+ assertThat(queryMetrics.getRetrievedDocumentSize()).isGreaterThanOrEqualTo(0);
assertThat(queryMetrics.getTotalQueryExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0);
assertThat(queryMetrics.getOutputDocumentCount()).isGreaterThan(0);
- assertThat(queryMetrics.getRetrievedDocumentCount()).isGreaterThan(0);
- assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThan(0);
+ assertThat(queryMetrics.getRetrievedDocumentCount()).isGreaterThanOrEqualTo(0);
+ assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0);
assertThat(queryMetrics.getDocumentWriteTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0);
assertThat(queryMetrics.getVMExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0);
assertThat(queryMetrics.getQueryPreparationTimes().getLogicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThan(0);
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RetryUtilsTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RetryUtilsTest.java
index bfddbc7286b9..4a7ad273c7aa 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RetryUtilsTest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RetryUtilsTest.java
@@ -13,6 +13,7 @@
import org.mockito.stubbing.Answer;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import java.time.Duration;
@@ -100,7 +101,8 @@ private void validateFailure(Mono single, long timeout, Class e
testSubscriber.assertNotComplete();
testSubscriber.assertTerminated();
assertThat(testSubscriber.errorCount()).isEqualTo(1);
- if (!(testSubscriber.getEvents().get(1).get(0).getClass().equals(class1))) {
+ Throwable throwable = Exceptions.unwrap(testSubscriber.errors().get(0));
+ if (!(throwable.getClass().equals(class1))) {
fail("Not expecting " + testSubscriber.getEvents().get(1).get(0));
}
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java
index b78b9e6cc4c1..c34cbb3ffa9b 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java
@@ -147,7 +147,7 @@ private HttpResponse getMockResponse(String databaseAccountJson) {
Mockito.doReturn(Flux.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, databaseAccountJson)))
.when(httpResponse).body();
Mockito.doReturn(Mono.just(databaseAccountJson))
- .when(httpResponse).bodyAsString(StandardCharsets.UTF_8);
+ .when(httpResponse).bodyAsString();
Mockito.doReturn(new HttpHeaders()).when(httpResponse).headers();
return httpResponse;
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientMockWrapper.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientMockWrapper.java
index fa67ed06cc39..f86b2207c251 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientMockWrapper.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpClientMockWrapper.java
@@ -3,7 +3,6 @@
package com.azure.data.cosmos.internal.directconnectivity;
-import com.azure.data.cosmos.internal.directconnectivity.WFConstants;
import com.azure.data.cosmos.internal.http.HttpClient;
import com.azure.data.cosmos.internal.http.HttpHeaders;
import com.azure.data.cosmos.internal.http.HttpRequest;
@@ -20,8 +19,6 @@
import java.util.Collections;
import java.util.List;
-;
-
public class HttpClientMockWrapper {
public static HttpClientBehaviourBuilder httpClientBehaviourBuilder() {
return new HttpClientBehaviourBuilder();
@@ -88,7 +85,7 @@ public HttpResponse asHttpResponse() {
HttpResponse resp = Mockito.mock(HttpResponse.class);
Mockito.doReturn(this.status).when(resp).statusCode();
Mockito.doReturn(Flux.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, this.content))).when(resp).body();
- Mockito.doReturn(Mono.just(this.content)).when(resp).bodyAsString(StandardCharsets.UTF_8);
+ Mockito.doReturn(Mono.just(this.content)).when(resp).bodyAsString();
Mockito.doReturn(this.httpHeaders).when(resp).headers();
return resp;
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpTransportClientTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpTransportClientTest.java
index 92af307dfb96..bfebc6844a51 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpTransportClientTest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/HttpTransportClientTest.java
@@ -7,9 +7,14 @@
import com.azure.data.cosmos.ConflictException;
import com.azure.data.cosmos.ForbiddenException;
import com.azure.data.cosmos.GoneException;
+import com.azure.data.cosmos.InternalServerErrorException;
+import com.azure.data.cosmos.InvalidPartitionException;
import com.azure.data.cosmos.LockedException;
import com.azure.data.cosmos.MethodNotAllowedException;
+import com.azure.data.cosmos.NotFoundException;
+import com.azure.data.cosmos.PartitionIsMigratingException;
import com.azure.data.cosmos.PartitionKeyRangeGoneException;
+import com.azure.data.cosmos.PartitionKeyRangeIsSplittingException;
import com.azure.data.cosmos.PreconditionFailedException;
import com.azure.data.cosmos.RequestEntityTooLargeException;
import com.azure.data.cosmos.RequestRateTooLargeException;
@@ -17,26 +22,17 @@
import com.azure.data.cosmos.RetryWithException;
import com.azure.data.cosmos.ServiceUnavailableException;
import com.azure.data.cosmos.UnauthorizedException;
-import com.azure.data.cosmos.internal.directconnectivity.HttpTransportClient;
-import com.azure.data.cosmos.internal.directconnectivity.HttpUtils;
import com.azure.data.cosmos.internal.Configs;
+import com.azure.data.cosmos.internal.FailureValidator;
import com.azure.data.cosmos.internal.HttpConstants;
-import com.azure.data.cosmos.InternalServerErrorException;
-import com.azure.data.cosmos.InvalidPartitionException;
-import com.azure.data.cosmos.NotFoundException;
import com.azure.data.cosmos.internal.OperationType;
-import com.azure.data.cosmos.PartitionIsMigratingException;
-import com.azure.data.cosmos.PartitionKeyRangeIsSplittingException;
import com.azure.data.cosmos.internal.ResourceType;
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.UserAgentContainer;
-import com.azure.data.cosmos.internal.directconnectivity.HttpTransportClient;
-import com.azure.data.cosmos.internal.directconnectivity.HttpUtils;
import com.azure.data.cosmos.internal.http.HttpClient;
import com.azure.data.cosmos.internal.http.HttpHeaders;
import com.azure.data.cosmos.internal.http.HttpRequest;
import com.azure.data.cosmos.internal.http.HttpResponse;
-import com.azure.data.cosmos.internal.FailureValidator;
import io.netty.channel.ConnectTimeoutException;
import io.reactivex.subscribers.TestSubscriber;
import org.assertj.core.api.Assertions;
@@ -67,7 +63,7 @@ public class HttpTransportClientTest {
private final long lsn = 5;
private final String partitionKeyRangeId = "3";
-
+
@Test(groups = "unit")
public void getResourceFeedUri_Document() throws Exception {
RxDocumentServiceRequest req = RxDocumentServiceRequest.createFromName(
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/ReplicatedResourceClientTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/ReplicatedResourceClientTest.java
index 712276d8b4ad..875048d1f5d4 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/ReplicatedResourceClientTest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/ReplicatedResourceClientTest.java
@@ -19,6 +19,7 @@
import org.mockito.Mockito;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import java.util.concurrent.TimeUnit;
@@ -69,6 +70,7 @@ public static void validateFailure(Mono single, FailureValidator
testSubscriber.assertNotComplete();
testSubscriber.assertTerminated();
Assertions.assertThat(testSubscriber.errorCount()).isEqualTo(1);
- validator.validate(testSubscriber.errors().get(0));
+ Throwable throwable = Exceptions.unwrap(testSubscriber.errors().get(0));
+ validator.validate(throwable);
}
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/BackPressureCrossPartitionTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/BackPressureCrossPartitionTest.java
index 60b31f0c8d93..6490384fca4e 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/BackPressureCrossPartitionTest.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/BackPressureCrossPartitionTest.java
@@ -47,7 +47,7 @@ public class BackPressureCrossPartitionTest extends TestSuiteBase {
private static final int TIMEOUT = 1800000;
private static final int SETUP_TIMEOUT = 60000;
- private int numberOfDocs = 4000;
+ private int numberOfDocs = 1000;
private CosmosDatabase createdDatabase;
private CosmosContainer createdCollection;
private List createdDocuments;
@@ -105,12 +105,12 @@ private void warmUp() {
public Object[][] queryProvider() {
return new Object[][] {
// query, maxItemCount, max expected back pressure buffered, total number of expected query results
- { "SELECT * FROM r", 1, 2 * Queues.SMALL_BUFFER_SIZE, numberOfDocs},
- { "SELECT * FROM r", 100, 2 * Queues.SMALL_BUFFER_SIZE, numberOfDocs},
- { "SELECT * FROM r ORDER BY r.prop", 100, 2 * Queues.SMALL_BUFFER_SIZE + 3 * numberOfPartitions, numberOfDocs},
- { "SELECT TOP 1000 * FROM r", 1, 2 * Queues.SMALL_BUFFER_SIZE, 1000},
- { "SELECT TOP 1000 * FROM r", 100, 2 * Queues.SMALL_BUFFER_SIZE, 1000},
- { "SELECT TOP 1000 * FROM r ORDER BY r.prop", 100, 2 * Queues.SMALL_BUFFER_SIZE + 3 * numberOfPartitions , 1000},
+ { "SELECT * FROM r", 1, Queues.SMALL_BUFFER_SIZE, numberOfDocs},
+ { "SELECT * FROM r", 100, Queues.SMALL_BUFFER_SIZE, numberOfDocs},
+ { "SELECT * FROM r ORDER BY r.prop", 100, Queues.SMALL_BUFFER_SIZE + 3 * numberOfPartitions, numberOfDocs},
+ { "SELECT TOP 500 * FROM r", 1, Queues.SMALL_BUFFER_SIZE, 500},
+ { "SELECT TOP 500 * FROM r", 100, Queues.SMALL_BUFFER_SIZE, 500},
+ { "SELECT TOP 500 * FROM r ORDER BY r.prop", 100, Queues.SMALL_BUFFER_SIZE + 3 * numberOfPartitions , 500},
};
}
@@ -131,7 +131,7 @@ public void query(String query, int maxItemCount, int maxExpectedBufferedCountFo
log.info("instantiating subscriber ...");
TestSubscriber> subscriber = new TestSubscriber<>(1);
queryObservable.publishOn(Schedulers.elastic(), 1).subscribe(subscriber);
- int sleepTimeInMillis = 40000;
+ int sleepTimeInMillis = 10000;
int i = 0;
// use a test subscriber and request for more result and sleep in between
@@ -168,7 +168,7 @@ public void beforeClass() {
CosmosContainerRequestOptions options = new CosmosContainerRequestOptions();
client = new ClientUnderTestBuilder(clientBuilder()).build();
createdDatabase = getSharedCosmosDatabase(client);
- createdCollection = createCollection(createdDatabase, getCollectionDefinition(), options, 20000);
+ createdCollection = createCollection(createdDatabase, getCollectionDefinition(), options, 50000);
ArrayList docDefList = new ArrayList<>();
for(int i = 0; i < numberOfDocs; i++) {
diff --git a/sdk/cosmos/pom.xml b/sdk/cosmos/pom.xml
index 532301e16c4d..ff3ca479c806 100644
--- a/sdk/cosmos/pom.xml
+++ b/sdk/cosmos/pom.xml
@@ -43,7 +43,8 @@ Licensed under the MIT License.
2.0.25.Final
${project.version}
27.0.1-jre
- 3.2.9.RELEASE
+ 3.3.0.RELEASE
+ 0.9.0.RELEASE
unit
${project.basedir}/target/collectedArtifactsForRelease