Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
* This is meant to be internally used only by our sdk.
**/
public interface ICollectionRoutingMapCache {
default Mono<CollectionRoutingMap> tryLookupAsync(
default Mono<Utils.ValueHolder<CollectionRoutingMap>> tryLookupAsync(
String collectionRid,
CollectionRoutingMap previousValue,
Map<String, Object> properties) {
return tryLookupAsync(collectionRid, previousValue, false, properties);
}

Mono<CollectionRoutingMap> tryLookupAsync(
Mono<Utils.ValueHolder<CollectionRoutingMap>> tryLookupAsync(
String collectionRid,
CollectionRoutingMap previousValue,
boolean forceRefreshCollectionRoutingMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public interface IRoutingMapProvider {
/// <param name="range">This method will return all ranges which overlap this range.</param>
/// <param name="forceRefresh">Whether forcefully refreshing the routing map is necessary</param>
/// <returns>List of effective partition key ranges for a collection or null if collection doesn't exist.</returns>
Mono<List<PartitionKeyRange>> tryGetOverlappingRangesAsync(String collectionResourceId, Range<String> range,
boolean forceRefresh /* = false */, Map<String, Object> properties);
Mono<Utils.ValueHolder<List<PartitionKeyRange>>> tryGetOverlappingRangesAsync(String collectionResourceId, Range<String> range,
boolean forceRefresh /* = false */, Map<String, Object> properties);

Mono<PartitionKeyRange> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId,
Mono<Utils.ValueHolder<PartitionKeyRange>> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId,
boolean forceRefresh /* = false */, Map<String, Object> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ public PartitionKeyRangeGoneRetryPolicy(
this.feedOptions = feedOptions;
}

/// <summary>
/// <summary>
/// Should the caller retry the operation.
/// </summary>
/// <param name="exception">Exception that occured when the operation was tried</param>
/// <param name="cancellationToken"></param>
/// <returns>True indicates caller should retry, False otherwise</returns>
public Mono<ShouldRetryResult> shouldRetry(Exception exception) {
CosmosClientException clientException = Utils.as(exception, CosmosClientException.class);
if (clientException != null &&
if (clientException != null &&
Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.GONE) &&
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE)) {

Expand All @@ -64,28 +64,30 @@ public Mono<ShouldRetryResult> shouldRetry(Exception exception) {
if (this.feedOptions != null) {
request.properties = this.feedOptions.properties();
}
Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);

return collectionObs.flatMap(collection -> {
return collectionObs.flatMap(collectionValueHolder -> {

Mono<CollectionRoutingMap> routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(collection.getResourceId(), null, request.properties);
Mono<Utils.ValueHolder<CollectionRoutingMap>> routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(collectionValueHolder.v.getResourceId(),
null, request.properties);

Mono<CollectionRoutingMap> refreshedRoutingMapObs = routingMapObs.flatMap(routingMap -> {
// Force refresh.
return this.partitionKeyRangeCache.tryLookupAsync(
collection.getResourceId(),
routingMap,
Mono<Utils.ValueHolder<CollectionRoutingMap>> refreshedRoutingMapObs = routingMapObs.flatMap(routingMapValueHolder -> {
if (routingMapValueHolder.v != null) {
// Force refresh.
return this.partitionKeyRangeCache.tryLookupAsync(
collectionValueHolder.v.getResourceId(),
routingMapValueHolder.v,
request.properties);
}).switchIfEmpty(Mono.defer(Mono::empty));
} else {
return Mono.just(new Utils.ValueHolder<>(null));
}
});

// TODO: Check if this behavior can be replaced by doOnSubscribe
return refreshedRoutingMapObs.flatMap(rm -> {
this.retried = true;
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}).switchIfEmpty(Mono.defer(() -> {
this.retried = true;
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}));
});

});

Expand All @@ -96,7 +98,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception exception) {

@Override
public void onBeforeSendRequest(RxDocumentServiceRequest request) {
this.nextRetryPolicy.onBeforeSendRequest(request);
this.nextRetryPolicy.onBeforeSendRequest(request);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,16 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
request.forceNameCacheRefresh = true;
request.requestContext.resolvedCollectionRid = null;

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);

return collectionObs.flatMap(collectionInfo -> {
if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionInfo.getResourceId())) {
return collectionObs.flatMap(collectionValueHolder -> {
if (collectionValueHolder.v == null) {
logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress());
} else if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionValueHolder.v.getResourceId())) {
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}
return Mono.just(shouldRetryResult);
}).switchIfEmpty(Mono.defer(() -> {
logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress());
return Mono.just(shouldRetryResult);
})).onErrorResume(throwable -> {
}).onErrorResume(throwable -> {
// When resolveCollectionAsync throws an exception ignore it because it's an attempt to recover an existing
// error. When the recovery fails we return ShouldRetryResult.noRetry and propagate the original exception to the client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,19 +869,19 @@ private Map<String, String> getRequestHeaders(RequestOptions options) {
private Mono<RxDocumentServiceRequest> addPartitionKeyInformation(RxDocumentServiceRequest request, Document document,
RequestOptions options) {

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);
return collectionObs
.map(collection -> {
addPartitionKeyInformation(request, document, options, collection);
.map(collectionValueHolder -> {
addPartitionKeyInformation(request, document, options, collectionValueHolder.v);
return request;
});
}

private Mono<RxDocumentServiceRequest> addPartitionKeyInformation(RxDocumentServiceRequest request, Document document, RequestOptions options,
Mono<DocumentCollection> collectionObs) {
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs) {

return collectionObs.map(collection -> {
addPartitionKeyInformation(request, document, options, collection);
return collectionObs.map(collectionValueHolder -> {
addPartitionKeyInformation(request, document, options, collectionValueHolder.v);
return request;
});
}
Expand Down Expand Up @@ -969,7 +969,7 @@ private Mono<RxDocumentServiceRequest> getCreateDocumentRequest(String documentC
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(operationType, ResourceType.Document, path,
typedDocument, requestHeaders, options);

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);
return addPartitionKeyInformation(request, typedDocument, options, collectionObs);
}

Expand Down Expand Up @@ -1217,7 +1217,7 @@ private Flux<ResourceResponse<Document>> replaceDocumentInternal(String document

validateResource(document);

Mono<DocumentCollection> collectionObs = collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(request);
Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, document, options, collectionObs);

return requestObs.flux().flatMap(req -> {
Expand Down Expand Up @@ -1247,7 +1247,7 @@ private Flux<ResourceResponse<Document>> deleteDocumentInternal(String documentL
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Delete,
ResourceType.Document, path, requestHeaders, options);

Mono<DocumentCollection> collectionObs = collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(request);

Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, options, collectionObs);

Expand Down Expand Up @@ -1283,7 +1283,7 @@ private Flux<ResourceResponse<Document>> readDocumentInternal(String documentLin
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Read,
ResourceType.Document, path, requestHeaders, options);

Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);

Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, options, collectionObs);

Expand Down Expand Up @@ -2587,7 +2587,7 @@ private <T extends Resource> Flux<FeedResponse<T>> readFeedCollectionChild(FeedO

Function<RxDocumentServiceRequest, Flux<FeedResponse<T>>> executeFunc = request -> {
return ObservableHelper.inlineIfPossibleAsObs(() -> {
Mono<DocumentCollection> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(request);
Mono<RxDocumentServiceRequest> requestObs = this.addPartitionKeyInformation(request, null, requestOptions, collectionObs);

return requestObs.flux().flatMap(req -> this.readFeed(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation.caches;

import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.implementation.ICollectionRoutingMapCache;
Expand All @@ -13,18 +14,18 @@
import java.util.Map;

/**
*
*
*/
public interface IPartitionKeyRangeCache extends IRoutingMapProvider, ICollectionRoutingMapCache {

Mono<CollectionRoutingMap> tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map<String, Object> properties);
Mono<Utils.ValueHolder<CollectionRoutingMap>> tryLookupAsync(String collectionRid, CollectionRoutingMap previousValue, Map<String, Object> properties);

Mono<List<PartitionKeyRange>> tryGetOverlappingRangesAsync(String collectionRid, Range<String> range, boolean forceRefresh,
Map<String, Object> properties);
Mono<Utils.ValueHolder<List<PartitionKeyRange>>> tryGetOverlappingRangesAsync(String collectionRid, Range<String> range, boolean forceRefresh,
Map<String, Object> properties);

Mono<PartitionKeyRange> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh,
Map<String, Object> properties);
Mono<Utils.ValueHolder<PartitionKeyRange>> tryGetPartitionKeyRangeByIdAsync(String collectionResourceId, String partitionKeyRangeId, boolean forceRefresh,
Map<String, Object> properties);

Mono<PartitionKeyRange> tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map<String, Object> properties);
Mono<Utils.ValueHolder<PartitionKeyRange>> tryGetRangeByPartitionKeyRangeId(String collectionRid, String partitionKeyRangeId, Map<String, Object> properties);

}
Loading