Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ Licensed under the MIT License.
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>${reactor-netty.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.data.cosmos.internal.Permission;
import com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdMetrics;
import io.micrometer.core.instrument.MeterRegistry;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -172,13 +173,14 @@ public Mono<CosmosAsyncDatabaseResponse> createDatabaseIfNotExists(String id) {

private Mono<CosmosAsyncDatabaseResponse> createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database){
return database.read().onErrorResume(exception -> {
if (exception instanceof CosmosClientException) {
CosmosClientException cosmosClientException = (CosmosClientException) exception;
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosClientException) {
final CosmosClientException cosmosClientException = (CosmosClientException) unwrappedException;
if (cosmosClientException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createDatabase(new CosmosDatabaseProperties(database.getId()), new CosmosDatabaseRequestOptions());
}
}
return Mono.error(exception);
return Mono.error(unwrappedException);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.data.cosmos.internal.Offer;
import com.azure.data.cosmos.internal.Paths;
import org.apache.commons.lang3.StringUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -308,13 +309,14 @@ public Mono<CosmosAsyncContainerResponse> createContainerIfNotExists(String id,
private Mono<CosmosAsyncContainerResponse> createContainerIfNotExistsInternal(
CosmosContainerProperties containerProperties, CosmosAsyncContainer container, CosmosContainerRequestOptions options) {
return container.read(options).onErrorResume(exception -> {
if (exception instanceof CosmosClientException) {
CosmosClientException cosmosClientException = (CosmosClientException) exception;
final Throwable unwrappedException = Exceptions.unwrap(exception);
if (unwrappedException instanceof CosmosClientException) {
final CosmosClientException cosmosClientException = (CosmosClientException) unwrappedException;
if (cosmosClientException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
return createContainer(containerProperties, options);
}
}
return Mono.error(exception);
return Mono.error(unwrappedException);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.net.URL;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

/**
* While this class is public, but it is not part of our published public APIs.
Expand All @@ -39,6 +40,7 @@ public class ClientRetryPolicy implements IDocumentClientRetryPolicy {
private URL locationEndpoint;
private RetryContext retryContext;
private CosmosResponseDiagnostics cosmosResponseDiagnostics;
private AtomicInteger cnt = new AtomicInteger(0);

public ClientRetryPolicy(GlobalEndpointManager globalEndpointManager,
boolean enableEndpointDiscovery,
Expand All @@ -57,6 +59,11 @@ public ClientRetryPolicy(GlobalEndpointManager globalEndpointManager,

@Override
public Mono<ShouldRetryResult> shouldRetry(Exception e) {
logger.debug("retry count {}, isReadRequest {}, canUseMultipleWriteLocations {}, due to failure:",
cnt.incrementAndGet(),
isReadRequest,
canUseMultipleWriteLocations,
e);
if (this.locationEndpoint == null) {
// on before request is not invoked because Document Service Request creation failed.
logger.error("locationEndpoint is null because ClientRetryPolicy::onBeforeRequest(.) is not invoked, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public class Configs {
private static final int CPU_CNT = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_DIRECT_HTTPS_POOL_SIZE = CPU_CNT * 500;

// Reactor Netty Constants
private static final int MAX_IDLE_CONNECTION_TIMEOUT_IN_MILLIS = 60 * 1000;
private static final int CONNECTION_ACQUIRE_TIMEOUT_IN_MILLIS = 45 * 1000;
private static final int REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE = 1000;
private static final String REACTOR_NETTY_CONNECTION_POOL_NAME = "reactor-netty-connection-pool";

public Configs() {
Expand Down Expand Up @@ -147,6 +151,18 @@ public String getReactorNettyConnectionPoolName() {
return REACTOR_NETTY_CONNECTION_POOL_NAME;
}

public int getMaxIdleConnectionTimeoutInMillis() {
return MAX_IDLE_CONNECTION_TIMEOUT_IN_MILLIS;
}

public int getConnectionAcquireTimeoutInMillis() {
return CONNECTION_ACQUIRE_TIMEOUT_IN_MILLIS;
}

public int getReactorNettyMaxConnectionPoolSize() {
return REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE;
}

private static String getJVMConfigAsString(String propName, String defaultValue) {
String propValue = System.getProperty(propName);
return StringUtils.defaultString(propValue, defaultValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.data.cosmos.internal;

import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;

Expand All @@ -12,11 +13,28 @@ private static <T> String safeToString(T t) {
return t != null ? t.toString() : "null";
}

public static <T> String info(Collection<T> collection) {
return collection == null ? "null collection" :
"collection size: " + collection.size();
}

public static <T> String info(T[] collection) {
return collection == null ? "null collection" :
"collection size: " + collection.length;
}


public static <T> String toString(Collection<T> collection, String delimiter) {
return collection.stream()
.map( t -> safeToString(t) )
.collect(Collectors.joining(delimiter));
return collection == null ? "null collection" :
collection.isEmpty() ? "empty collection" :
collection.stream()
.map(t -> safeToString(t))
.collect(Collectors.joining(delimiter));
}

public static <T> String toString(T[] array, String delimiter) {
return array == null ? "null array" :
toString(Arrays.asList(array), delimiter);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -43,7 +42,6 @@
*/
class RxGatewayStoreModel implements RxStoreModel {

private final static int INITIAL_RESPONSE_BUFFER_SIZE = 1024;
private final Logger logger = LoggerFactory.getLogger(RxGatewayStoreModel.class);
private final Map<String, String> defaultHeaders;
private final HttpClient httpClient;
Expand Down Expand Up @@ -229,22 +227,6 @@ private String ensureSlashPrefixed(String path) {
return "/" + path;
}

private Mono<String> toString(Flux<ByteBuf> contentObservable) {
return contentObservable
.reduce(
new ByteArrayOutputStream(INITIAL_RESPONSE_BUFFER_SIZE),
(out, bb) -> {
try {
bb.readBytes(out, bb.readableBytes());
return out;
}
catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(out -> new String(out.toByteArray(), StandardCharsets.UTF_8));
}

/**
* Transforms the reactor netty's client response Observable to RxDocumentServiceResponse Observable.
*
Expand All @@ -259,102 +241,63 @@ private Mono<String> toString(Flux<ByteBuf> contentObservable) {
private Flux<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpResponse> httpResponseMono,
RxDocumentServiceRequest request) {

if (request.getIsMedia()) {
return httpResponseMono.flatMap(httpResponse -> {
return httpResponseMono.flatMap(httpResponse -> {

// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
int httpResponseStatus = httpResponse.statusCode();
// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
int httpResponseStatus = httpResponse.statusCode();

Flux<InputStream> inputStreamObservable;
Flux<String> contentObservable;

if (request.getOperationType() == OperationType.Delete) {
// for delete we don't expect any body
inputStreamObservable = Flux.just(IOUtils.toInputStream("", StandardCharsets.UTF_8));
} else {
// transforms the ByteBufFlux to Flux<InputStream>
inputStreamObservable = httpResponse
.body()
.flatMap(byteBuf ->
Flux.just(IOUtils.toInputStream(byteBuf.toString(StandardCharsets.UTF_8), StandardCharsets.UTF_8)));
}

return inputStreamObservable
.flatMap(contentInputStream -> {
try {
// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request,
HttpResponseStatus.valueOf(httpResponseStatus),
httpResponseHeaders,
null,
contentInputStream);

// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus, HttpUtils
.unescape(httpResponseHeaders.toMap().entrySet()), contentInputStream);
return Flux.just(rsp);
} catch (Exception e) {
return Flux.error(e);
}
}).single();

}).map(RxDocumentServiceResponse::new).flux();

} else {
return httpResponseMono.flatMap(httpResponse -> {

// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
int httpResponseStatus = httpResponse.statusCode();

Flux<String> contentObservable;

if (request.getOperationType() == OperationType.Delete) {
// for delete we don't expect any body
contentObservable = Flux.just(StringUtils.EMPTY);
} else {
// transforms the ByteBufFlux to Flux<String>
contentObservable = toString(httpResponse.body()).flux();
}
if (request.getOperationType() == OperationType.Delete) {
// for delete we don't expect any body
contentObservable = Flux.just(StringUtils.EMPTY);
} else {
// transforms the ByteBufFlux to Flux<String>
contentObservable = httpResponse
.bodyAsString()
.switchIfEmpty(Mono.just(StringUtils.EMPTY))
.flux();
}

return contentObservable
.flatMap(content -> {
try {
// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);

// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
content);
return Flux.just(rsp);
} catch (Exception e) {
return Flux.error(e);
}
}).single();

}).map(RxDocumentServiceResponse::new)
.onErrorResume(throwable -> {
if (!(throwable instanceof Exception)) {
// fatal error
logger.error("Unexpected failure {}", throwable.getMessage(), throwable);
return Mono.error(throwable);
}

Exception exception = (Exception) throwable;
if (!(exception instanceof CosmosClientException)) {
// wrap in CosmosClientException
logger.error("Network failure", exception);
CosmosClientException dce = BridgeInternal.createCosmosClientException(0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
return Mono.error(dce);
}

return Mono.error(exception);
}).flux();
}
return contentObservable
.flatMap(content -> {
try {
// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);

// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
content);
return Flux.just(rsp);
} catch (Exception e) {
return Flux.error(e);
}
})
.single();

}).map(RxDocumentServiceResponse::new)
.onErrorResume(throwable -> {
Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
if (!(unwrappedException instanceof Exception)) {
// fatal error
logger.error("Unexpected failure {}", unwrappedException.getMessage(), unwrappedException);
return Mono.error(unwrappedException);
}

Exception exception = (Exception) unwrappedException;
if (!(exception instanceof CosmosClientException)) {
// wrap in CosmosClientException
logger.error("Network failure", exception);
CosmosClientException dce = BridgeInternal.createCosmosClientException(0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
return Mono.error(dce);
}

return Mono.error(exception);
}).flux();
}

private void validateOrThrow(RxDocumentServiceRequest request, HttpResponseStatus status, HttpHeaders headers, String body,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Mono<TValue> getAsync(

}, err -> {

logger.debug("cache[{}] resulted in error {}, computing new value", key, err);
logger.debug("cache[{}] resulted in error, computing new value", key, err);
AsyncLazy<TValue> asyncLazy = new AsyncLazy<>(singleValueInitFunc);
AsyncLazy<TValue> resultAsyncLazy = values.merge(key, asyncLazy,
(lazyValue1, lazyValu2) -> lazyValue1 == initialLazyValue ? lazyValu2 : lazyValue1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity;
import org.apache.commons.lang3.StringUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

import java.util.Map;
Expand Down Expand Up @@ -106,12 +107,13 @@ private Mono<DocumentCollection> resolveByPartitionKeyRangeIdentityAsync(Partiti
if (partitionKeyRangeIdentity != null && partitionKeyRangeIdentity.getCollectionRid() != null) {
return this.resolveByRidAsync(partitionKeyRangeIdentity.getCollectionRid(), properties)
.onErrorResume(e -> {
if (e instanceof NotFoundException) {
Throwable unwrappedException = Exceptions.unwrap(e);
if (unwrappedException instanceof NotFoundException) {
// This is signal to the upper logic either to refresh
// collection cache and retry.
return Mono.error(new InvalidPartitionException(RMResources.InvalidDocumentCollection));
}
return Mono.error(e);
return Mono.error(unwrappedException);

});
}
Expand Down
Loading