diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ICollectionRoutingMapCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ICollectionRoutingMapCache.java index a37ef5618ef1..e6b335a81e86 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ICollectionRoutingMapCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ICollectionRoutingMapCache.java @@ -13,14 +13,14 @@ * This is meant to be internally used only by our sdk. **/ public interface ICollectionRoutingMapCache { - default Mono tryLookupAsync( + default Mono> tryLookupAsync( String collectionRid, CollectionRoutingMap previousValue, Map properties) { return tryLookupAsync(collectionRid, previousValue, false, properties); } - Mono tryLookupAsync( + Mono> tryLookupAsync( String collectionRid, CollectionRoutingMap previousValue, boolean forceRefreshCollectionRoutingMap, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRoutingMapProvider.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRoutingMapProvider.java index af1415f7a8fc..6f7651773a51 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRoutingMapProvider.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRoutingMapProvider.java @@ -22,9 +22,9 @@ public interface IRoutingMapProvider { /// This method will return all ranges which overlap this range. /// Whether forcefully refreshing the routing map is necessary /// List of effective partition key ranges for a collection or null if collection doesn't exist. - Mono> tryGetOverlappingRangesAsync(String collectionResourceId, Range range, - boolean forceRefresh /* = false */, Map properties); + Mono>> tryGetOverlappingRangesAsync(String collectionResourceId, Range range, + boolean forceRefresh /* = false */, Map properties); - Mono tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, + Mono> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh /* = false */, Map properties); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/PartitionKeyRangeGoneRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/PartitionKeyRangeGoneRetryPolicy.java index 686431eb9207..15407a32ce76 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/PartitionKeyRangeGoneRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/PartitionKeyRangeGoneRetryPolicy.java @@ -38,7 +38,7 @@ public PartitionKeyRangeGoneRetryPolicy( this.feedOptions = feedOptions; } - /// + /// /// Should the caller retry the operation. /// /// Exception that occured when the operation was tried @@ -46,7 +46,7 @@ public PartitionKeyRangeGoneRetryPolicy( /// True indicates caller should retry, False otherwise public Mono shouldRetry(Exception exception) { CosmosClientException clientException = Utils.as(exception, CosmosClientException.class); - if (clientException != null && + if (clientException != null && Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.GONE) && Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE)) { @@ -64,28 +64,30 @@ public Mono shouldRetry(Exception exception) { if (this.feedOptions != null) { request.properties = this.feedOptions.properties(); } - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); - return collectionObs.flatMap(collection -> { + return collectionObs.flatMap(collectionValueHolder -> { - Mono routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(collection.getResourceId(), null, request.properties); + Mono> routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(collectionValueHolder.v.getResourceId(), + null, request.properties); - Mono refreshedRoutingMapObs = routingMapObs.flatMap(routingMap -> { - // Force refresh. - return this.partitionKeyRangeCache.tryLookupAsync( - collection.getResourceId(), - routingMap, + Mono> refreshedRoutingMapObs = routingMapObs.flatMap(routingMapValueHolder -> { + if (routingMapValueHolder.v != null) { + // Force refresh. + return this.partitionKeyRangeCache.tryLookupAsync( + collectionValueHolder.v.getResourceId(), + routingMapValueHolder.v, request.properties); - }).switchIfEmpty(Mono.defer(Mono::empty)); + } else { + return Mono.just(new Utils.ValueHolder<>(null)); + } + }); // TODO: Check if this behavior can be replaced by doOnSubscribe return refreshedRoutingMapObs.flatMap(rm -> { this.retried = true; return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO)); - }).switchIfEmpty(Mono.defer(() -> { - this.retried = true; - return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO)); - })); + }); }); @@ -96,7 +98,7 @@ public Mono shouldRetry(Exception exception) { @Override public void onBeforeSendRequest(RxDocumentServiceRequest request) { - this.nextRetryPolicy.onBeforeSendRequest(request); + this.nextRetryPolicy.onBeforeSendRequest(request); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicy.java index 20a7033f8d2d..0706ea2d406d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicy.java @@ -61,17 +61,16 @@ public Mono shouldRetry(Exception e) { request.forceNameCacheRefresh = true; request.requestContext.resolvedCollectionRid = null; - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); - return collectionObs.flatMap(collectionInfo -> { - if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionInfo.getResourceId())) { + return collectionObs.flatMap(collectionValueHolder -> { + if (collectionValueHolder.v == null) { + logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress()); + } else if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionValueHolder.v.getResourceId())) { return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO)); } return Mono.just(shouldRetryResult); - }).switchIfEmpty(Mono.defer(() -> { - logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress()); - return Mono.just(shouldRetryResult); - })).onErrorResume(throwable -> { + }).onErrorResume(throwable -> { // When resolveCollectionAsync throws an exception ignore it because it's an attempt to recover an existing // error. When the recovery fails we return ShouldRetryResult.noRetry and propagate the original exception to the client diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 8a5082d131f8..0e19a666ba43 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -869,19 +869,19 @@ private Map getRequestHeaders(RequestOptions options) { private Mono addPartitionKeyInformation(RxDocumentServiceRequest request, Document document, RequestOptions options) { - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); return collectionObs - .map(collection -> { - addPartitionKeyInformation(request, document, options, collection); + .map(collectionValueHolder -> { + addPartitionKeyInformation(request, document, options, collectionValueHolder.v); return request; }); } private Mono addPartitionKeyInformation(RxDocumentServiceRequest request, Document document, RequestOptions options, - Mono collectionObs) { + Mono> collectionObs) { - return collectionObs.map(collection -> { - addPartitionKeyInformation(request, document, options, collection); + return collectionObs.map(collectionValueHolder -> { + addPartitionKeyInformation(request, document, options, collectionValueHolder.v); return request; }); } @@ -969,7 +969,7 @@ private Mono getCreateDocumentRequest(String documentC RxDocumentServiceRequest request = RxDocumentServiceRequest.create(operationType, ResourceType.Document, path, typedDocument, requestHeaders, options); - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); return addPartitionKeyInformation(request, typedDocument, options, collectionObs); } @@ -1217,7 +1217,7 @@ private Flux> replaceDocumentInternal(String document validateResource(document); - Mono collectionObs = collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = collectionCache.resolveCollectionAsync(request); Mono requestObs = addPartitionKeyInformation(request, document, options, collectionObs); return requestObs.flux().flatMap(req -> { @@ -1247,7 +1247,7 @@ private Flux> deleteDocumentInternal(String documentL RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Delete, ResourceType.Document, path, requestHeaders, options); - Mono collectionObs = collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = collectionCache.resolveCollectionAsync(request); Mono requestObs = addPartitionKeyInformation(request, null, options, collectionObs); @@ -1283,7 +1283,7 @@ private Flux> readDocumentInternal(String documentLin RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Read, ResourceType.Document, path, requestHeaders, options); - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); Mono requestObs = addPartitionKeyInformation(request, null, options, collectionObs); @@ -2587,7 +2587,7 @@ private Flux> readFeedCollectionChild(FeedO Function>> executeFunc = request -> { return ObservableHelper.inlineIfPossibleAsObs(() -> { - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); Mono requestObs = this.addPartitionKeyInformation(request, null, requestOptions, collectionObs); return requestObs.flux().flatMap(req -> this.readFeed(req) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/IPartitionKeyRangeCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/IPartitionKeyRangeCache.java index 1c8d5f25e498..6797b3ad9102 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/IPartitionKeyRangeCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/IPartitionKeyRangeCache.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation.caches; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.routing.CollectionRoutingMap; import com.azure.cosmos.implementation.routing.Range; import com.azure.cosmos.implementation.ICollectionRoutingMapCache; @@ -13,18 +14,18 @@ import java.util.Map; /** - * + * */ public interface IPartitionKeyRangeCache extends IRoutingMapProvider, ICollectionRoutingMapCache { - Mono tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map properties); + Mono> tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map properties); - Mono> tryGetOverlappingRangesAsync(String collectionRid, Range range, boolean forceRefresh, - Map properties); + Mono>> tryGetOverlappingRangesAsync(String collectionRid, Range range, boolean forceRefresh, + Map properties); - Mono tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh, - Map properties); + Mono> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh, + Map properties); - Mono tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map properties); + Mono> tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map properties); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxCollectionCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxCollectionCache.java index 2068a98b7507..5fd376e038c0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxCollectionCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxCollectionCache.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation.caches; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import com.azure.cosmos.implementation.DocumentCollection; import com.azure.cosmos.InvalidPartitionException; @@ -36,7 +37,7 @@ protected RxCollectionCache() { * @param request Request to resolve. * @return an instance of Single<DocumentCollection> */ - public Mono resolveCollectionAsync( + public Mono> resolveCollectionAsync( RxDocumentServiceRequest request) { // Mono Void to represent only terminal events specifically complete and error Mono init = null; @@ -46,14 +47,17 @@ public Mono resolveCollectionAsync( init = mono.then(Mono.fromRunnable(() -> request.setForceNameCacheRefresh(false))); } - Mono collectionInfoObs = this.resolveByPartitionKeyRangeIdentityAsync( + Mono> collectionInfoObs = this.resolveByPartitionKeyRangeIdentityAsync( request.getPartitionKeyRangeIdentity(), request.properties); if (init != null) { collectionInfoObs = init.then(collectionInfoObs); } - return collectionInfoObs.flatMap(Mono::just).switchIfEmpty(Mono.defer(() -> { + return collectionInfoObs.flatMap(collectionValueHolder -> { + if (collectionValueHolder.v != null) { + return Mono.just(collectionValueHolder); + } if (request.requestContext.resolvedCollectionRid == null) { Mono collectionInfoRes = this.resolveByNameAsync(request.getResourceAddress(), request.properties); @@ -67,16 +71,23 @@ public Mono resolveCollectionAsync( request.setResourceId(collection.getResourceId()); request.requestContext.resolvedCollectionRid = collection.getResourceId(); - return Mono.just(collection); + return Mono.just(new Utils.ValueHolder<>(collection)); }); } else { return this.resolveByRidAsync(request.requestContext.resolvedCollectionRid, request.properties); } - })); + }); } else { return resolveByPartitionKeyRangeIdentityAsync(request.getPartitionKeyRangeIdentity(),request.properties) - .flatMap(Mono::just).switchIfEmpty(this.resolveByRidAsync(request.getResourceAddress(), request.properties)); + .flatMap(collectionValueHolder -> { + + if (collectionValueHolder.v != null) { + return Mono.just(collectionValueHolder); + } + + return this.resolveByRidAsync(request.getResourceAddress(), request.properties); + }); } } @@ -101,7 +112,7 @@ public void refresh(String resourceAddress, Map properties) { protected abstract Mono getByNameAsync(String resourceAddress, Map properties); - private Mono resolveByPartitionKeyRangeIdentityAsync(PartitionKeyRangeIdentity partitionKeyRangeIdentity, Map properties) { + private Mono> resolveByPartitionKeyRangeIdentityAsync(PartitionKeyRangeIdentity partitionKeyRangeIdentity, Map properties) { // if request is targeted at specific partition using x-ms-documentd-partitionkeyrangeid header, // which contains value ",", then resolve to collection rid in this header. if (partitionKeyRangeIdentity != null && partitionKeyRangeIdentity.getCollectionRid() != null) { @@ -117,19 +128,20 @@ private Mono resolveByPartitionKeyRangeIdentityAsync(Partiti }); } - return Mono.empty(); + return Mono.just(new Utils.ValueHolder<>(null)); } - private Mono resolveByRidAsync( + private Mono> resolveByRidAsync( String resourceId, Map properties) { ResourceId resourceIdParsed = ResourceId.parse(resourceId); String collectionResourceId = resourceIdParsed.getDocumentCollectionId().toString(); - return this.collectionInfoByIdCache.getAsync( - collectionResourceId, - null, - () -> this.getByRidAsync(collectionResourceId, properties)); + Mono async = this.collectionInfoByIdCache.getAsync( + collectionResourceId, + null, + () -> this.getByRidAsync(collectionResourceId, properties)); + return async.map(Utils.ValueHolder::new); } private Mono resolveByNameAsync( @@ -167,7 +179,7 @@ private Mono refreshAsync(RxDocumentServiceRequest request) { }); }).then(); } else { - // In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we + // In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we // need to refresh unconditionally. mono = Mono.fromRunnable(() -> this.refresh(request.getResourceAddress(), request.properties)); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java index 1b59fda07920..536512009837 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java @@ -25,6 +25,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -52,25 +53,26 @@ public RxPartitionKeyRangeCache(AsyncDocumentClient client, RxCollectionCache co * @see IPartitionKeyRangeCache#tryLookupAsync(java.lang.STRING, com.azure.cosmos.internal.routing.CollectionRoutingMap) */ @Override - public Mono tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map properties) { + public Mono> tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map properties) { return routingMapCache.getAsync( collectionRid, previousValue, () -> getRoutingMapForCollectionAsync(collectionRid, previousValue, properties)) - .onErrorResume(err -> { - logger.debug("tryLookupAsync on collectionRid {} encountered failure", collectionRid, err); - CosmosClientException dce = Utils.as(err, CosmosClientException.class); - if (dce != null && Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND)) { - return Mono.empty(); - } - - return Mono.error(err); - }); + .map(Utils.ValueHolder::new) + .onErrorResume(err -> { + logger.debug("tryLookupAsync on collectionRid {} encountered failure", collectionRid, err); + CosmosClientException dce = Utils.as(err, CosmosClientException.class); + if (dce != null && Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND)) { + return Mono.just(new Utils.ValueHolder<>(null)); + } + + return Mono.error(err); + }); } @Override - public Mono tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, boolean forceRefreshCollectionRoutingMap, - Map properties) { + public Mono> tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, boolean forceRefreshCollectionRoutingMap, + Map properties) { return tryLookupAsync(collectionRid, previousValue, properties); } @@ -78,63 +80,73 @@ public Mono tryLookupAsync(String collectionRid, Collectio * @see IPartitionKeyRangeCache#tryGetOverlappingRangesAsync(java.lang.STRING, com.azure.cosmos.internal.routing.RANGE, boolean) */ @Override - public Mono> tryGetOverlappingRangesAsync(String collectionRid, Range range, boolean forceRefresh, - Map properties) { + public Mono>> tryGetOverlappingRangesAsync(String collectionRid, Range range, boolean forceRefresh, + Map properties) { - Mono routingMapObs = tryLookupAsync(collectionRid, null, properties); + Mono> routingMapObs = tryLookupAsync(collectionRid, null, properties); - return routingMapObs.flatMap(routingMap -> { - if (forceRefresh) { + return routingMapObs.flatMap(routingMapValueHolder -> { + if (forceRefresh && routingMapValueHolder.v != null) { logger.debug("tryGetOverlappingRangesAsync with forceRefresh on collectionRid {}", collectionRid); - return tryLookupAsync(collectionRid, routingMap, properties); + return tryLookupAsync(collectionRid, routingMapValueHolder.v, properties); } - return Mono.just(routingMap); - }).switchIfEmpty(Mono.empty()).map(routingMap -> routingMap.getOverlappingRanges(range)).switchIfEmpty(Mono.defer(() -> { - logger.debug("Routing Map Null for collection: {} for range: {}, forceRefresh:{}", collectionRid, range.toString(), forceRefresh); - return Mono.empty(); - })); + return Mono.just(routingMapValueHolder); + }).map(routingMapValueHolder -> { + if (routingMapValueHolder.v != null) { + // TODO: the routingMap.getOverlappingRanges(range) returns Collection + // maybe we should consider changing to ArrayList to avoid conversion + return new Utils.ValueHolder<>(new ArrayList<>(routingMapValueHolder.v.getOverlappingRanges(range))); + } else { + logger.debug("Routing Map Null for collection: {} for range: {}, forceRefresh:{}", collectionRid, range.toString(), forceRefresh); + return new Utils.ValueHolder<>(null); + } + }); } /* (non-Javadoc) * @see IPartitionKeyRangeCache#tryGetPartitionKeyRangeByIdAsync(java.lang.STRING, java.lang.STRING, boolean) */ @Override - public Mono tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, - boolean forceRefresh, Map properties) { + public Mono> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, + boolean forceRefresh, Map properties) { - Mono routingMapObs = tryLookupAsync(collectionResourceId, null, properties); + Mono> routingMapObs = tryLookupAsync(collectionResourceId, null, properties); - return routingMapObs.flatMap(routingMap -> { - if (forceRefresh && routingMap != null) { - return tryLookupAsync(collectionResourceId, routingMap, properties); + return routingMapObs.flatMap(routingMapValueHolder -> { + if (forceRefresh && routingMapValueHolder.v != null) { + return tryLookupAsync(collectionResourceId, routingMapValueHolder.v, properties); } - return Mono.justOrEmpty(routingMap); - - }).switchIfEmpty(Mono.defer(Mono::empty)).map(routingMap -> routingMap.getRangeByPartitionKeyRangeId(partitionKeyRangeId)).switchIfEmpty(Mono.defer(() -> { - logger.debug("Routing Map Null for collection: {}, PartitionKeyRangeId: {}, forceRefresh:{}", collectionResourceId, partitionKeyRangeId, forceRefresh); - return null; - })); + return Mono.just(routingMapValueHolder); + + }).map(routingMapValueHolder -> { + if (routingMapValueHolder.v != null) { + return new Utils.ValueHolder<>(routingMapValueHolder.v.getRangeByPartitionKeyRangeId(partitionKeyRangeId)); + } else { + logger.debug("Routing Map Null for collection: {}, PartitionKeyRangeId: {}, forceRefresh:{}", collectionResourceId, partitionKeyRangeId, forceRefresh); + return new Utils.ValueHolder<>(null); + } + }); } /* (non-Javadoc) * @see IPartitionKeyRangeCache#tryGetRangeByPartitionKeyRangeId(java.lang.STRING, java.lang.STRING) */ @Override - public Mono tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map properties) { - Mono routingMapObs = routingMapCache.getAsync( + public Mono> tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map properties) { + Mono> routingMapObs = routingMapCache.getAsync( collectionRid, null, - () -> getRoutingMapForCollectionAsync(collectionRid, null, properties)); + () -> getRoutingMapForCollectionAsync(collectionRid, null, properties)).map(Utils.ValueHolder::new); - return routingMapObs.map(routingMap -> routingMap.getRangeByPartitionKeyRangeId(partitionKeyRangeId)) + return routingMapObs.map(routingMapValueHolder -> new Utils.ValueHolder<>(routingMapValueHolder.v.getRangeByPartitionKeyRangeId(partitionKeyRangeId))) .onErrorResume(err -> { CosmosClientException dce = Utils.as(err, CosmosClientException.class); logger.debug("tryGetRangeByPartitionKeyRangeId on collectionRid {} and partitionKeyRangeId {} encountered failure", collectionRid, partitionKeyRangeId, err); if (dce != null && Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND)) { - return Mono.empty(); + return Mono.just(new Utils.ValueHolder<>(null)); } return Mono.error(dce); @@ -193,7 +205,8 @@ private Mono> getPartitionKeyRange(String collectionRid, ); //this request doesn't actually go to server request.requestContext.resolvedCollectionRid = collectionRid; - Mono collectionObs = collectionCache.resolveCollectionAsync(request); + Mono collectionObs = collectionCache.resolveCollectionAsync(request) + .map(collectionValueHolder -> collectionValueHolder.v); return collectionObs.flatMap(coll -> { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java index 36eac71d83cd..c8f48eaa6a3e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java @@ -20,6 +20,7 @@ import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.Strings; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.caches.RxCollectionCache; import com.azure.cosmos.implementation.routing.CollectionRoutingMap; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; @@ -180,7 +181,7 @@ private static void ensureRoutingMapPresent( } } - private Mono tryResolveServerPartitionAsync( + private Mono> tryResolveServerPartitionAsync( RxDocumentServiceRequest request, DocumentCollection collection, CollectionRoutingMap routingMap, @@ -232,20 +233,25 @@ private Mono tryResolveServerPartitionAsync( // upper logic will refresh cache and retry. logger.debug("Collection cache or routing map cache is potentially outdated." + " Returning null. Upper logic will refresh cache and retry."); - return Mono.empty(); + return Mono.just(new Utils.ValueHolder<>(null)); } - Mono addressesObs = this.addressCache.tryGetAddresses( + Mono> addressesObs = this.addressCache.tryGetAddresses( request, new PartitionKeyRangeIdentity(collection.getResourceId(), range.getId()), forceRefreshPartitionAddresses); - return addressesObs.flatMap(addresses -> Mono.just(new ResolutionResult(range, addresses))).switchIfEmpty(Mono.defer(() -> { - logger.info( + return addressesObs.flatMap(addressesValueHolder -> { + + if (addressesValueHolder.v == null) { + logger.info( "Could not resolve addresses for identity {}/{}. Potentially collection cache or routing map cache is outdated. Return empty - upper logic will refresh and retry. ", new PartitionKeyRangeIdentity(collection.getResourceId(), range.getId())); - return Mono.empty(); - })); + return Mono.just(new Utils.ValueHolder<>(null)); + } + + return Mono.just(new Utils.ValueHolder<>(new ResolutionResult(range, addressesValueHolder.v))); + }); } catch (Exception e) { return Mono.error(e); @@ -287,21 +293,23 @@ private Mono resolveMasterResourceAddress(RxDocumentServiceReq && request.getPartitionKeyRangeIdentity() == null; // ServiceIdentity serviceIdentity = this.masterServiceIdentity; - Mono addressesObs = this.addressCache.tryGetAddresses(request, + Mono> addressesObs = this.addressCache.tryGetAddresses(request, masterPartitionKeyRangeIdentity,forceRefreshPartitionAddresses); - return addressesObs.flatMap(addresses -> { + return addressesObs.flatMap(addressesValueHolder -> { + if (addressesValueHolder.v == null) { + logger.warn("Could not get addresses for master partition"); + + // return Observable.getError() + NotFoundException e = new NotFoundException(); + BridgeInternal.setResourceAddress(e, request.getResourceAddress()); + return Mono.error(e); + } + PartitionKeyRange partitionKeyRange = new PartitionKeyRange(); partitionKeyRange.setId(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID); - return Mono.just(new ResolutionResult(partitionKeyRange, addresses)); - }).switchIfEmpty(Mono.defer(() -> { - logger.warn("Could not get addresses for master partition"); - - // return Observable.getError() - NotFoundException e = new NotFoundException(); - BridgeInternal.setResourceAddress(e, request.getResourceAddress()); - return Mono.error(e); - })); + return Mono.just(new ResolutionResult(partitionKeyRange, addressesValueHolder.v)); + }); } private class RefreshState { @@ -321,36 +329,30 @@ private Mono getOrRefreshRoutingMap(RxDocumentServiceRequest reque (request.getPartitionKeyRangeIdentity() != null && request.getPartitionKeyRangeIdentity().getCollectionRid() != null); state.collectionRoutingMapCacheIsUptoDate = false; - Mono collectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> collectionObs = this.collectionCache.resolveCollectionAsync(request); - Mono stateObs = collectionObs.flatMap(collection -> { - state.collection = collection; - Mono routingMapObs = - this.collectionRoutingMapCache.tryLookupAsync(collection.getResourceId(), null, request.forceCollectionRoutingMapRefresh, request.properties); - final DocumentCollection underlyingCollection = collection; - return routingMapObs.flatMap(routingMap -> { - state.routingMap = routingMap; + Mono stateObs = collectionObs.flatMap(collectionValueHolder -> { + state.collection = collectionValueHolder.v; + Mono> routingMapObs = + this.collectionRoutingMapCache.tryLookupAsync(collectionValueHolder.v.getResourceId(), null, request.forceCollectionRoutingMapRefresh, request.properties); + final Utils.ValueHolder underlyingCollection = collectionValueHolder; + return routingMapObs.flatMap(routingMapValueHolder -> { + state.routingMap = routingMapValueHolder.v; if (request.forcePartitionKeyRangeRefresh) { state.collectionRoutingMapCacheIsUptoDate = true; request.forcePartitionKeyRangeRefresh = false; - if (routingMap != null) { - return this.collectionRoutingMapCache.tryLookupAsync(underlyingCollection.getResourceId(), routingMap, request.properties) - .map(newRoutingMap -> { - state.routingMap = newRoutingMap; + if (routingMapValueHolder.v != null) { + return this.collectionRoutingMapCache.tryLookupAsync(underlyingCollection.v.getResourceId(), routingMapValueHolder.v, request.properties) + .map(newRoutingMapValueHolder -> { + state.routingMap = newRoutingMapValueHolder.v; return state; }); } } return Mono.just(state); - }).switchIfEmpty(Mono.defer(() -> { - if (request.forcePartitionKeyRangeRefresh) { - state.collectionRoutingMapCacheIsUptoDate = true; - request.forcePartitionKeyRangeRefresh = false; - } - return Mono.just(state); - })); + }); }); return stateObs.flatMap(newState -> { @@ -362,17 +364,17 @@ private Mono getOrRefreshRoutingMap(RxDocumentServiceRequest reque newState.collectionCacheIsUptoDate = true; newState.collectionRoutingMapCacheIsUptoDate = false; - Mono newCollectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono> newCollectionObs = this.collectionCache.resolveCollectionAsync(request); - return newCollectionObs.flatMap(collection -> { - newState.collection = collection; - Mono newRoutingMapObs = this.collectionRoutingMapCache.tryLookupAsync( - collection.getResourceId(), + return newCollectionObs.flatMap(collectionValueHolder -> { + newState.collection = collectionValueHolder.v; + Mono> newRoutingMapObs = this.collectionRoutingMapCache.tryLookupAsync( + collectionValueHolder.v.getResourceId(), null, request.properties); - return newRoutingMapObs.map(routingMap -> { - newState.routingMap = routingMap; + return newRoutingMapObs.map(routingMapValueHolder -> { + newState.routingMap = routingMapValueHolder.v; return newState; }); } @@ -384,14 +386,11 @@ private Mono getOrRefreshRoutingMap(RxDocumentServiceRequest reque }); } - private Mono getStateWithNewRoutingMap(RefreshState state, Mono routingMapSingle) { - return routingMapSingle.map(r -> { - state.routingMap = r; - return state; - }).switchIfEmpty(Mono.fromSupplier(() -> { - state.routingMap = null; + private Mono getStateWithNewRoutingMap(RefreshState state, Mono> routingMapSingle) { + return routingMapSingle.map(routingMapValueHolder -> { + state.routingMap = routingMapValueHolder.v; return state; - })); + }); } /** @@ -422,7 +421,7 @@ private Mono resolveAddressesAndIdentityAsync( } // At this point we have both collection and routingMap. - Mono resultObs = this.tryResolveServerPartitionAsync( + Mono> resultObs = this.tryResolveServerPartitionAsync( request, state.collection, state.routingMap, @@ -446,16 +445,22 @@ private Mono resolveAddressesAndIdentityAsync( return Mono.just(funcResolutionResult); }; - return resultObs.flatMap(addCollectionRidIfNameBased).switchIfEmpty(Mono.defer(() -> { + return resultObs.flatMap(resolutionResultValueHolder -> { + if (resolutionResultValueHolder.v != null) { + return addCollectionRidIfNameBased.apply(resolutionResultValueHolder.v); + } // result is empty + // result is null: + assert resolutionResultValueHolder.v == null; + Function> ensureCollectionRoutingMapCacheIsUptoDateFunc = funcState -> { if (!funcState.collectionRoutingMapCacheIsUptoDate) { funcState.collectionRoutingMapCacheIsUptoDate = true; - Mono newRoutingMapObs = this.collectionRoutingMapCache.tryLookupAsync( - funcState.collection.getResourceId(), - funcState.routingMap, - request.properties); + Mono> newRoutingMapObs = this.collectionRoutingMapCache.tryLookupAsync( + funcState.collection.getResourceId(), + funcState.routingMap, + request.properties); return getStateWithNewRoutingMap(funcState, newRoutingMapObs); } else { @@ -463,7 +468,7 @@ private Mono resolveAddressesAndIdentityAsync( } }; - Function> resolveServerPartition = funcState -> { + Function>> resolveServerPartition = funcState -> { try { AddressResolver.ensureRoutingMapPresent(request, funcState.routingMap, funcState.collection); @@ -472,16 +477,16 @@ private Mono resolveAddressesAndIdentityAsync( } return this.tryResolveServerPartitionAsync( - request, - funcState.collection, - funcState.routingMap, - true, - true, - forceRefreshPartitionAddresses); + request, + funcState.collection, + funcState.routingMap, + true, + true, + forceRefreshPartitionAddresses); }; - Function> onNullThrowNotFound = funcResolutionResult -> { - if (funcResolutionResult == null) { + Function, Mono> onNullThrowNotFound = funcResolutionResult -> { + if (funcResolutionResult.v == null) { logger.debug("Couldn't route partitionkeyrange-oblivious request after retry/cache refresh. Collection doesn't exist."); // At this point collection cache and routing map caches are refreshed. @@ -491,7 +496,7 @@ private Mono resolveAddressesAndIdentityAsync( return Mono.error(BridgeInternal.setResourceAddress(new NotFoundException(), request.getResourceAddress())); } - return Mono.just(funcResolutionResult); + return Mono.just(funcResolutionResult.v); }; // Couldn't resolve server partition or its addresses. @@ -500,18 +505,18 @@ private Mono resolveAddressesAndIdentityAsync( request.forceNameCacheRefresh = true; state.collectionCacheIsUptoDate = true; - Mono newCollectionObs = this.collectionCache.resolveCollectionAsync(request); - Mono newRefreshStateObs = newCollectionObs.flatMap(collection -> { - state.collection = collection; + Mono> newCollectionObs = this.collectionCache.resolveCollectionAsync(request); + Mono newRefreshStateObs = newCollectionObs.flatMap(collectionValueHolder -> { + state.collection = collectionValueHolder.v; - if (collection.getResourceId() != state.routingMap.getCollectionUniqueId()) { + if (collectionValueHolder.v.getResourceId() != state.routingMap.getCollectionUniqueId()) { // Collection cache was stale. We resolved to new Rid. routing map cache is potentially stale // for this new collection rid. Mark it as such. state.collectionRoutingMapCacheIsUptoDate = false; - Mono newRoutingMap = this.collectionRoutingMapCache.tryLookupAsync( - collection.getResourceId(), - null, - request.properties); + Mono> newRoutingMap = this.collectionRoutingMapCache.tryLookupAsync( + collectionValueHolder.v.getResourceId(), + null, + request.properties); return getStateWithNewRoutingMap(state, newRoutingMap); } @@ -519,18 +524,18 @@ private Mono resolveAddressesAndIdentityAsync( return Mono.just(state); }); - Mono newResultObs = newRefreshStateObs.flatMap(ensureCollectionRoutingMapCacheIsUptoDateFunc) - .flatMap(resolveServerPartition); + Mono> newResultObs = newRefreshStateObs.flatMap(ensureCollectionRoutingMapCacheIsUptoDateFunc) + .flatMap(resolveServerPartition); return newResultObs.flatMap(onNullThrowNotFound).flatMap(addCollectionRidIfNameBased); } else { return ensureCollectionRoutingMapCacheIsUptoDateFunc.apply(state) - .flatMap(resolveServerPartition) - .flatMap(onNullThrowNotFound) - .flatMap(addCollectionRidIfNameBased); + .flatMap(resolveServerPartition) + .flatMap(onNullThrowNotFound) + .flatMap(addCollectionRidIfNameBased); } - })); + }); } ); } @@ -556,13 +561,13 @@ private ResolutionResult handleRangeAddressResolutionFailure( private Mono returnOrError(Callable function) { try { - return Mono.justOrEmpty(function.call()); + return Mono.just(function.call()); } catch (Exception e) { return Mono.error(e); } } - private Mono tryResolveServerPartitionByPartitionKeyRangeIdAsync( + private Mono> tryResolveServerPartitionByPartitionKeyRangeIdAsync( RxDocumentServiceRequest request, DocumentCollection collection, CollectionRoutingMap routingMap, @@ -573,23 +578,26 @@ private Mono tryResolveServerPartitionByPartitionKeyRangeIdAsy PartitionKeyRange partitionKeyRange = routingMap.getRangeByPartitionKeyRangeId(request.getPartitionKeyRangeIdentity().getPartitionKeyRangeId()); if (partitionKeyRange == null) { logger.debug("Cannot resolve range '{}'", request.getPartitionKeyRangeIdentity().toHeader()); - return returnOrError(() -> this.handleRangeAddressResolutionFailure(request, collectionCacheIsUpToDate, routingMapCacheIsUpToDate, routingMap)); + return returnOrError(() -> new Utils.ValueHolder<>(this.handleRangeAddressResolutionFailure(request, collectionCacheIsUpToDate, routingMapCacheIsUpToDate, routingMap))); } - Mono addressesObs = this.addressCache.tryGetAddresses( + Mono> addressesObs = this.addressCache.tryGetAddresses( request, new PartitionKeyRangeIdentity(collection.getResourceId(), request.getPartitionKeyRangeIdentity().getPartitionKeyRangeId()), forceRefreshPartitionAddresses); - return addressesObs.flatMap(addresses -> Mono.just(new ResolutionResult(partitionKeyRange, addresses))).switchIfEmpty(Mono.defer(() -> { - logger.debug("Cannot resolve addresses for range '{}'", request.getPartitionKeyRangeIdentity().toHeader()); + return addressesObs.flatMap(addressesValueHolder -> { + if (addressesValueHolder.v == null) { + logger.debug("Cannot resolve addresses for range '{}'", request.getPartitionKeyRangeIdentity().toHeader()); - try { - return Mono.justOrEmpty(this.handleRangeAddressResolutionFailure(request, collectionCacheIsUpToDate, routingMapCacheIsUpToDate, routingMap)); - } catch (CosmosClientException e) { - return Mono.error(e); + try { + return Mono.just(new Utils.ValueHolder<>(this.handleRangeAddressResolutionFailure(request, collectionCacheIsUpToDate, routingMapCacheIsUpToDate, routingMap))); + } catch (CosmosClientException e) { + return Mono.error(e); + } } - })); + return Mono.just(new Utils.ValueHolder<>(new ResolutionResult(partitionKeyRange, addressesValueHolder.v))); + }); } private PartitionKeyRange tryResolveServerPartitionByPartitionKey( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java index 49e65b5f9f1e..e056de1f74ff 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java @@ -134,9 +134,9 @@ private URL getServiceEndpoint() { } @Override - public Mono tryGetAddresses(RxDocumentServiceRequest request, - PartitionKeyRangeIdentity partitionKeyRangeIdentity, - boolean forceRefreshPartitionAddresses) { + public Mono> tryGetAddresses(RxDocumentServiceRequest request, + PartitionKeyRangeIdentity partitionKeyRangeIdentity, + boolean forceRefreshPartitionAddresses) { Utils.checkNotNullOrThrow(request, "request", ""); Utils.checkNotNullOrThrow(partitionKeyRangeIdentity, "partitionKeyRangeIdentity", ""); @@ -148,7 +148,8 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) { // if that's master partition return master partition address! - return this.resolveMasterAsync(request, forceRefreshPartitionAddresses, request.properties).map(Pair::getRight); + return this.resolveMasterAsync(request, forceRefreshPartitionAddresses, request.properties) + .map(partitionKeyRangeIdentityPair -> new Utils.ValueHolder<>(partitionKeyRangeIdentityPair.getRight())); } Instant suboptimalServerPartitionTimestamp = this.suboptimalServerPartitionTimestamps.get(partitionKeyRangeIdentity); @@ -194,25 +195,25 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity); } - Mono addressesObs = this.serverPartitionAddressCache.getAsync( + Mono> addressesObs = this.serverPartitionAddressCache.getAsync( partitionKeyRangeIdentity, null, () -> this.getAddressesForRangeId( request, partitionKeyRangeIdentity.getCollectionRid(), partitionKeyRangeIdentity.getPartitionKeyRangeId(), - false)); + false)).map(Utils.ValueHolder::new); return addressesObs.map( - addresses -> { - if (notAllReplicasAvailable(addresses)) { + addressesValueHolder -> { + if (notAllReplicasAvailable(addressesValueHolder.v)) { if (logger.isDebugEnabled()) { - logger.debug("not all replicas available {}", JavaStreamUtils.info(addresses)); + logger.debug("not all replicas available {}", JavaStreamUtils.info(addressesValueHolder.v)); } this.suboptimalServerPartitionTimestamps.putIfAbsent(partitionKeyRangeIdentity, Instant.now()); } - return addresses; + return addressesValueHolder; }).onErrorResume(ex -> { Throwable unwrappedException = reactor.core.Exceptions.unwrap(ex); CosmosClientException dce = Utils.as(unwrappedException, CosmosClientException.class); @@ -230,7 +231,7 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque //remove from suboptimal cache in case the collection+pKeyRangeId combo is gone. this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity); logger.debug("tryGetAddresses: inner onErrorResumeNext return null", dce); - return Mono.empty(); + return Mono.just(new Utils.ValueHolder<>(null)); } return Mono.error(unwrappedException); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java index cde73be67088..d5cd1386fdcf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java @@ -8,15 +8,17 @@ import com.azure.cosmos.implementation.DocumentCollection; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.IAuthorizationTokenProvider; -import com.azure.cosmos.implementation.PartitionKeyRange; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.UserAgentContainer; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.caches.RxCollectionCache; import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.routing.CollectionRoutingMap; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.concurrent.Queues; import java.net.URL; import java.util.ArrayList; @@ -27,8 +29,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -; - public class GlobalAddressResolver implements IAddressResolver { private final static int MaxBackupReadRegions = 3; private final GlobalEndpointManager endpointManager; @@ -78,18 +78,22 @@ public GlobalAddressResolver( } Mono openAsync(DocumentCollection collection) { - Mono routingMap = this.routingMapProvider.tryLookupAsync(collection.getId(), null, null); + Mono> routingMap = this.routingMapProvider.tryLookupAsync(collection.getId(), null, null); return routingMap.flatMap(collectionRoutingMap -> { - List ranges = ((List)collectionRoutingMap.getOrderedPartitionKeyRanges()).stream().map(range -> + if ( collectionRoutingMap.v == null) { + return Mono.empty(); + } + + List ranges = collectionRoutingMap.v.getOrderedPartitionKeyRanges().stream().map(range -> new PartitionKeyRangeIdentity(collection.getResourceId(), range.getId())).collect(Collectors.toList()); List> tasks = new ArrayList<>(); + Mono[] array = new Mono[this.addressCacheByEndpoint.values().size()]; for (EndpointCache endpointCache : this.addressCacheByEndpoint.values()) { tasks.add(endpointCache.addressCache.openAsync(collection, ranges)); } - // TODO: Not sure if this will work. - return Mono.whenDelayError(tasks); - }).switchIfEmpty(Mono.defer(Mono::empty)); + return Flux.mergeDelayError(Queues.SMALL_BUFFER_SIZE, tasks.toArray(array)).then(); + }); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java index f6f51e78cccf..5b01ff483050 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java @@ -4,6 +4,7 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import reactor.core.publisher.Mono; @@ -19,7 +20,7 @@ public interface IAddressCache { * @param forceRefreshPartitionAddresses Whether addresses need to be refreshed as previously resolved addresses were determined to be outdated. * @return Physical addresses. */ - Mono tryGetAddresses( + Mono> tryGetAddresses( RxDocumentServiceRequest request, PartitionKeyRangeIdentity partitionKeyRangeIdentity, boolean forceRefreshPartitionAddresses); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java index eb1816555bd9..c488b60feacb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java @@ -85,16 +85,16 @@ public Flux> executeAsync() { if (feedOptions == null) { feedOptions = new FeedOptions(); } - + FeedOptions newFeedOptions = new FeedOptions(feedOptions); - + // We can not go to backend with the composite continuation token, // but we still need the gateway for the query plan. // The workaround is to try and parse the continuation token as a composite continuation token. // If it is, then we send the query to the gateway with max degree of parallelism to force getting back the query plan - + String originalContinuation = newFeedOptions.requestContinuation(); - + if (isClientSideContinuationToken(originalContinuation)) { // At this point we know we want back a query plan newFeedOptions.requestContinuation(null); @@ -112,7 +112,7 @@ public Flux> executeAsync() { .getPaginatedQueryResultAsObservable(newFeedOptions, createRequestFunc, executeFunc, resourceType, maxPageSize); } - public Mono> getTargetPartitionKeyRanges(String resourceId, List> queryRanges) { + public Mono>> getTargetPartitionKeyRanges(String resourceId, List> queryRanges) { // TODO: FIXME this needs to be revisited Range r = new Range<>("", "FF", true, false); @@ -222,7 +222,7 @@ public RxDocumentServiceRequest createRequestAsync(String continuationToken, Int return request; } - + private static boolean isClientSideContinuationToken(String continuationToken) { if (continuationToken != null) { ValueHolder outCompositeContinuationToken = new ValueHolder(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java index c114945ddf6f..e2bb45f37afa 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java @@ -161,9 +161,9 @@ public Flux produceAsync() { .getPaginatedQueryResultAsObservable( feedOptions.requestContinuation(), sourcePartitionCreateRequestFunc, - executeRequestFuncWithRetries, - resourceType, - top, + executeRequestFuncWithRetries, + resourceType, + top, pageSize) .map(rsp -> { lastResponseContinuationToken = rsp.getContinuationToken(); @@ -186,22 +186,22 @@ private Flux splitProof(Flux> replacementRangesObs = getReplacementRanges(targetRange.toRange()); + Mono>> replacementRangesObs = getReplacementRanges(targetRange.toRange()); // Since new DocumentProducers are instantiated for the new replacement ranges, if for the new // replacement partitions split happens the corresponding DocumentProducer can recursively handle splits. // so this is resilient to split on splits. Flux> replacementProducers = replacementRangesObs.flux().flatMap( - partitionKeyRanges -> { + partitionKeyRangesValueHolder -> { if (logger.isDebugEnabled()) { logger.info("Cross Partition Query Execution detected partition [{}] split into [{}] partitions," + " last continuation token is [{}].", targetRange.toJson(), - partitionKeyRanges.stream() + partitionKeyRangesValueHolder.v.stream() .map(JsonSerializable::toJson).collect(Collectors.joining(", ")), lastResponseContinuationToken); } - return Flux.fromIterable(createReplacingDocumentProducersOnSplit(partitionKeyRanges)); + return Flux.fromIterable(createReplacingDocumentProducersOnSplit(partitionKeyRangesValueHolder.v)); }); return produceOnSplit(replacementProducers); @@ -220,7 +220,7 @@ private List> createReplacingDocumentProducersOnSplit(List

createChildDocumentProducerOnSplit( PartitionKeyRange targetRange, String initialContinuationToken) { @@ -241,7 +241,7 @@ protected DocumentProducer createChildDocumentProducerOnSplit( top); } - private Mono> getReplacementRanges(Range range) { + private Mono>> getReplacementRanges(Range range) { return client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(collectionRid, range, true, feedOptions.properties()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java index 6b4154248222..ad6b763158c2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java @@ -29,8 +29,8 @@ public class DocumentQueryExecutionContextFactory { private final static int PageSizeFactorForTop = 5; - private static Mono resolveCollection(IDocumentQueryClient client, SqlQuerySpec query, - ResourceType resourceTypeEnum, String resourceLink) { + private static Mono> resolveCollection(IDocumentQueryClient client, SqlQuerySpec query, + ResourceType resourceTypeEnum, String resourceLink) { RxCollectionCache collectionCache = client.getCollectionCache(); @@ -54,7 +54,7 @@ public static Flux collectionObs = Flux.empty(); + Flux> collectionObs = Flux.just(new Utils.ValueHolder<>(null)); if (resourceTypeEnum.isCollectionChild()) { collectionObs = resolveCollection(client, query, resourceTypeEnum, resourceLink).flux(); @@ -65,9 +65,10 @@ public static Flux> proxyQueryExecutionContext = - collectionObs.flatMap(collection -> { - if (feedOptions != null && feedOptions.partitionKey() != null && feedOptions.partitionKey().equals(PartitionKey.None)) { - feedOptions.partitionKey(BridgeInternal.getPartitionKey(BridgeInternal.getNonePartitionKey(collection.getPartitionKey()))); + collectionObs.flatMap(collectionValueHolder -> { + + if (collectionValueHolder.v != null && feedOptions != null && feedOptions.partitionKey() != null && feedOptions.partitionKey().equals(PartitionKey.None)) { + feedOptions.partitionKey(BridgeInternal.getPartitionKey(BridgeInternal.getNonePartitionKey(collectionValueHolder.v.getPartitionKey()))); } return ProxyDocumentQueryExecutionContext.createAsync( client, @@ -76,19 +77,10 @@ public static Flux> executeAsync() { logger.error("Unexpected failure", unwrappedException); return Flux.error(unwrappedException); } - + if (!isCrossPartitionQuery((Exception) unwrappedException)) { // If this is not a cross partition query then propagate error logger.debug("Failure from gateway", unwrappedException); @@ -104,11 +104,11 @@ public Flux> executeAsync() { DefaultDocumentQueryExecutionContext queryExecutionContext = (DefaultDocumentQueryExecutionContext) this.innerExecutionContext; - Mono> partitionKeyRanges = queryExecutionContext.getTargetPartitionKeyRanges(collection.getResourceId(), + Mono>> partitionKeyRanges = queryExecutionContext.getTargetPartitionKeyRanges(collection.getResourceId(), partitionedQueryExecutionInfo.getQueryRanges()); Flux> exContext = partitionKeyRanges.flux() - .flatMap(pkranges -> DocumentQueryExecutionContextFactory.createSpecializedDocumentQueryExecutionContextAsync( + .flatMap(pkrangesValueHolder -> DocumentQueryExecutionContextFactory.createSpecializedDocumentQueryExecutionContextAsync( this.client, this.resourceTypeEnum, this.resourceType, @@ -117,7 +117,7 @@ public Flux> executeAsync() { this.resourceLink, isContinuationExpected, partitionedQueryExecutionInfo, - pkranges, + pkrangesValueHolder.v, this.collection.getResourceId(), this.correlatedActivityId)); @@ -163,6 +163,6 @@ public static Flux> c resourceLink, collection, isContinuationExpected, - correlatedActivityId)); + correlatedActivityId)); } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicyTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicyTest.java index e38023372db0..bcba73fe1f21 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicyTest.java @@ -98,7 +98,7 @@ public void shouldRetryWithNotFoundStatusCodeAndReadSessionNotAvailableSubStatus DocumentCollection documentCollection = new DocumentCollection(); documentCollection.setResourceId("rid_1"); - Mockito.when(rxClientCollectionCache.resolveCollectionAsync(request)).thenReturn(Mono.just(documentCollection)); + Mockito.when(rxClientCollectionCache.resolveCollectionAsync(request)).thenReturn(Mono.just(new Utils.ValueHolder<>(documentCollection))); Mono singleShouldRetry = renameCollectionAwareClientRetryPolicy .shouldRetry(notFoundException); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/AddressResolverTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/AddressResolverTest.java index f2b7bf4964ea..1432d6c55008 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/AddressResolverTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/AddressResolverTest.java @@ -16,6 +16,7 @@ import com.azure.cosmos.implementation.PartitionKeyRange; import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.caches.RxCollectionCache; import com.azure.cosmos.implementation.routing.CollectionRoutingMap; import com.azure.cosmos.implementation.routing.IServerIdentity; @@ -319,7 +320,7 @@ private void initializeMocks( currentCollection.setValue(collectionAfterRefresh); AddressResolverTest.this.collectionCacheRefreshedCount++; request.forceNameCacheRefresh = false; - return Mono.just(currentCollection.getValue()); + return Mono.just(new Utils.ValueHolder<>(currentCollection.getValue())); } if (request.forceNameCacheRefresh && collectionAfterRefresh == null) { @@ -335,10 +336,10 @@ private void initializeMocks( } if (!request.forceNameCacheRefresh && currentCollection.getValue() != null) { - return Mono.just(currentCollection.getValue()); + return Mono.just(new Utils.ValueHolder<>(currentCollection.getValue())); } - return Mono.empty(); + return new Utils.ValueHolder<>(null); }).when(this.collectionCache).resolveCollectionAsync(Mockito.any(RxDocumentServiceRequest.class)); // Routing map cache @@ -359,7 +360,7 @@ private void initializeMocks( CollectionRoutingMap previousValue = invocationOnMock.getArgumentAt(1, CollectionRoutingMap.class); if (previousValue == null) { - return Mono.justOrEmpty(currentRoutingMap.get(collectionRid)); + return Mono.just(new Utils.ValueHolder<>(currentRoutingMap.get(collectionRid))); } if (previousValue != null && currentRoutingMap.containsKey(previousValue.getCollectionUniqueId()) && @@ -383,7 +384,7 @@ private void initializeMocks( } - return Mono.justOrEmpty(currentRoutingMap.get(collectionRid)); + return Mono.just(new Utils.ValueHolder<>(currentRoutingMap.get(collectionRid))); } return Mono.error(new NotImplementedException("not mocked")); @@ -403,7 +404,7 @@ private void initializeMocks( Boolean forceRefresh = invocationOnMock.getArgumentAt(2, Boolean.class); if (!forceRefresh) { - return Mono.justOrEmpty(currentAddresses.get(findMatchingServiceIdentity(currentAddresses, pkri))); + return Mono.just(new Utils.ValueHolder<>(currentAddresses.get(findMatchingServiceIdentity(currentAddresses, pkri)))); } else { ServiceIdentity si; @@ -426,8 +427,7 @@ private void initializeMocks( addressesRefreshCount.put(si, addressesRefreshCount.get(si) + 1); } - // TODO: what to return in this case if it is null!! - return Mono.justOrEmpty(currentAddresses.get(si)); + return Mono.just(new Utils.ValueHolder<>(currentAddresses.get(si))); } }).when(fabricAddressCache).tryGetAddresses(Mockito.any(RxDocumentServiceRequest.class), Mockito.any(PartitionKeyRangeIdentity.class), Mockito.anyBoolean()); } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java index 0d4da1ad8ff0..15d108fa0c68 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java @@ -19,6 +19,7 @@ import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.TestSuiteBase; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.http.HttpClientConfig; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; @@ -186,9 +187,10 @@ public void tryGetAddresses_ForDataPartitions(String partitionKeyRangeId, String PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, partitionKeyRangeId); boolean forceRefreshPartitionAddresses = false; - Mono addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); + Mono> addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); - ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT)); + ArrayList addressInfosFromCache = + Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); Mono> masterAddressFromGatewayObs = cache.getServerAddressesViaGatewayAsync(req, collectionRid, ImmutableList.of(partitionKeyRangeId), false); @@ -242,8 +244,8 @@ public void tryGetAddresses_ForDataPartitions_AddressCachedByOpenAsync_NoHttpReq PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, partitionKeyRangeId); boolean forceRefreshPartitionAddresses = false; - Mono addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); - ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT)); + Mono> addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); + ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); // no new request is made assertThat(httpClientWrapper.capturedRequests) @@ -296,8 +298,8 @@ public void tryGetAddresses_ForDataPartitions_ForceRefresh( new Database(), new HashMap<>()); PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, partitionKeyRangeId); - Mono addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, true); - ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT)); + Mono> addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, true); + ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); // no new request is made assertThat(httpClientWrapper.capturedRequests) @@ -353,8 +355,8 @@ public void tryGetAddresses_ForDataPartitions_Suboptimal_Refresh( new Database(), new HashMap<>()); PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, partitionKeyRangeId); - Mono addressesInfosFromCacheObs = origCache.tryGetAddresses(req, partitionKeyRangeIdentity, true); - ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT)); + Mono> addressesInfosFromCacheObs = origCache.tryGetAddresses(req, partitionKeyRangeIdentity, true); + ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); // no new request is made assertThat(httpClientWrapper.capturedRequests) @@ -397,15 +399,15 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro // force refresh to replace existing with sub-optimal addresses addressesInfosFromCacheObs = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, true); - AddressInformation[] suboptimalAddresses = getSuccessResult(addressesInfosFromCacheObs, TIMEOUT); + Utils.ValueHolder suboptimalAddresses = getSuccessResult(addressesInfosFromCacheObs, TIMEOUT); assertThat(httpClientWrapper.capturedRequests) .describedAs("getServerAddressesViaGatewayAsync will read addresses from gateway") .asList().hasSize(1); httpClientWrapper.capturedRequests.clear(); // relaxes one replica being down - assertThat(suboptimalAddresses.length).isLessThanOrEqualTo((ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1)); - assertThat(suboptimalAddresses.length).isGreaterThanOrEqualTo(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 2); + assertThat(suboptimalAddresses.v.length).isLessThanOrEqualTo((ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1)); + assertThat(suboptimalAddresses.v.length).isGreaterThanOrEqualTo(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 2); assertThat(fetchCounter.get()).isEqualTo(1); // no refresh, use cache @@ -414,15 +416,15 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro assertThat(httpClientWrapper.capturedRequests) .describedAs("getServerAddressesViaGatewayAsync will read addresses from gateway") .asList().hasSize(0); - AssertionsForClassTypes.assertThat(suboptimalAddresses).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1); + AssertionsForClassTypes.assertThat(suboptimalAddresses.v).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1); assertThat(fetchCounter.get()).isEqualTo(1); // wait for refresh time TimeUnit.SECONDS.sleep(suboptimalRefreshTime + 1); addressesInfosFromCacheObs = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, false); - AddressInformation[] addresses = getSuccessResult(addressesInfosFromCacheObs, TIMEOUT); - AssertionsForClassTypes.assertThat(addresses).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize); + Utils.ValueHolder addresses = getSuccessResult(addressesInfosFromCacheObs, TIMEOUT); + AssertionsForClassTypes.assertThat(addresses.v).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize); assertThat(httpClientWrapper.capturedRequests) .describedAs("getServerAddressesViaGatewayAsync will read addresses from gateway") .asList().hasSize(1); @@ -448,9 +450,9 @@ public void tryGetAddresses_ForMasterPartition(Protocol protocol) throws Excepti PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity("M"); boolean forceRefreshPartitionAddresses = false; - Mono addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); + Mono> addressesInfosFromCacheObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); - ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT)); + ArrayList addressInfosFromCache = Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); Mono> masterAddressFromGatewayObs = cache.getMasterAddressesViaGatewayAsync(req, ResourceType.Database, null, "/dbs/", false, false, null); @@ -498,8 +500,7 @@ public void tryGetAddresses_ForMasterPartition_MasterPartitionAddressAlreadyCach // request master partition info to ensure it is cached. AddressInformation[] expectedAddresses = cache.tryGetAddresses(req, partitionKeyRangeIdentity, - forceRefreshPartitionAddresses) - .block(); + forceRefreshPartitionAddresses).block().v; assertThat(clientWrapper.capturedRequests).asList().hasSize(1); clientWrapper.capturedRequests.clear(); @@ -507,11 +508,11 @@ public void tryGetAddresses_ForMasterPartition_MasterPartitionAddressAlreadyCach TimeUnit.SECONDS.sleep(waitTimeInBetweenAttemptsInSeconds); - Mono addressesObs = cache.tryGetAddresses(req, + Mono> addressesObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); - AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT); + AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT).v; assertExactlyEqual(actualAddresses, expectedAddresses); @@ -546,16 +547,16 @@ public void tryGetAddresses_ForMasterPartition_ForceRefresh() throws Exception { AddressInformation[] expectedAddresses = cache.tryGetAddresses(req, partitionKeyRangeIdentity, false) - .block(); + .block().v; assertThat(clientWrapper.capturedRequests).asList().hasSize(1); clientWrapper.capturedRequests.clear(); - Mono addressesObs = cache.tryGetAddresses(req, + Mono> addressesObs = cache.tryGetAddresses(req, partitionKeyRangeIdentity, true); - AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT); + AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT).v; assertExactlyEqual(actualAddresses, expectedAddresses); @@ -639,16 +640,16 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro AddressInformation[] expectedAddresses = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, false) - .block(); + .block().v; assertThat(clientWrapper.capturedRequests).asList().hasSize(1); clientWrapper.capturedRequests.clear(); - Mono addressesObs = spyCache.tryGetAddresses(req, + Mono> addressesObs = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, false); - AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT); + AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT).v; assertExactlyEqual(actualAddresses, expectedAddresses); @@ -730,7 +731,7 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro AddressInformation[] subOptimalAddresses = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, false) - .block(); + .block().v; assertThat(getMasterAddressesViaGatewayAsyncInvocation.get()).isEqualTo(1); AssertionsForClassTypes.assertThat(subOptimalAddresses).hasSize(ServiceConfig.SystemReplicationPolicy.MaxReplicaSetSize - 1); @@ -743,12 +744,12 @@ public Mono> answer(InvocationOnMock invocationOnMock) throws Thro assertThat(clientWrapper.capturedRequests).asList().hasSize(1); clientWrapper.capturedRequests.clear(); - Mono addressesObs = spyCache.tryGetAddresses(req, + Mono> addressesObs = spyCache.tryGetAddresses(req, partitionKeyRangeIdentity, false); - AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT); + AddressInformation[] actualAddresses = getSuccessResult(addressesObs, TIMEOUT).v; // the cache address is used. no new http request is sent assertThat(clientWrapper.capturedRequests).asList().hasSize(1); assertThat(getMasterAddressesViaGatewayAsyncInvocation.get()).isEqualTo(2); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java index 309643836db6..b4bc1aefa9eb 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java @@ -13,6 +13,7 @@ import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.UserAgentContainer; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.caches.RxCollectionCache; import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; import com.azure.cosmos.implementation.http.HttpClient; @@ -37,8 +38,6 @@ import static org.assertj.core.api.Assertions.assertThat; -; - public class GlobalAddressResolverTest { private HttpClient httpClient; @@ -91,7 +90,7 @@ public void setup() throws Exception { DocumentCollection collectionDefinition = new DocumentCollection(); collectionDefinition.setId(UUID.randomUUID().toString()); collectionCache = Mockito.mock(RxCollectionCache.class); - Mockito.when(collectionCache.resolveCollectionAsync(Matchers.any(RxDocumentServiceRequest.class))).thenReturn(Mono.just(collectionDefinition)); + Mockito.when(collectionCache.resolveCollectionAsync(Matchers.any(RxDocumentServiceRequest.class))).thenReturn(Mono.just(new Utils.ValueHolder<>(collectionDefinition))); routingMapProvider = Mockito.mock(RxPartitionKeyRangeCache.class); userAgentContainer = Mockito.mock(UserAgentContainer.class); serviceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); @@ -148,7 +147,7 @@ public void openAsync() throws Exception { List partitionKeyRanges = new ArrayList<>(); partitionKeyRanges.add(range); Mockito.when(collectionRoutingMap.getOrderedPartitionKeyRanges()).thenReturn(partitionKeyRanges); - Mono collectionRoutingMapSingle = Mono.just(collectionRoutingMap); + Mono> collectionRoutingMapSingle = Mono.just(new Utils.ValueHolder<>(collectionRoutingMap)); Mockito.when(routingMapProvider.tryLookupAsync(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(collectionRoutingMapSingle); List ranges = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java index f9062b026619..2ecd0d45cf7b 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java @@ -14,6 +14,7 @@ import com.azure.cosmos.implementation.PartitionKeyRange; import com.azure.cosmos.implementation.RetryPolicy; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult; import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer; @@ -519,7 +520,7 @@ private IDocumentQueryClient mockQueryClient(List replacement IDocumentQueryClient client = Mockito.mock(IDocumentQueryClient.class); RxPartitionKeyRangeCache cache = Mockito.mock(RxPartitionKeyRangeCache.class); doReturn(cache).when(client).getPartitionKeyRangeCache(); - doReturn(Mono.just(replacementRanges)).when(cache). + doReturn(Mono.just(new Utils.ValueHolder<>(replacementRanges))).when(cache). tryGetOverlappingRangesAsync(anyString(), any(Range.class), anyBoolean(), Matchers.anyMap()); return client; }