Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

import java.net.URL;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

/**
* While this class is public, but it is not part of our published public APIs.
* This is meant to be internally used only by our sdk.
*
*
* Client policy is combination of endpoint change retry + throttling retry.
*/
public class ClientRetryPolicy implements IDocumentClientRetryPolicy {
Expand All @@ -39,6 +40,7 @@ public class ClientRetryPolicy implements IDocumentClientRetryPolicy {
private URL locationEndpoint;
private RetryContext retryContext;
private CosmosResponseDiagnostics cosmosResponseDiagnostics;
private AtomicInteger cnt = new AtomicInteger(0);

public ClientRetryPolicy(GlobalEndpointManager globalEndpointManager,
boolean enableEndpointDiscovery,
Expand All @@ -57,6 +59,11 @@ public ClientRetryPolicy(GlobalEndpointManager globalEndpointManager,

@Override
public Mono<ShouldRetryResult> shouldRetry(Exception e) {
logger.debug("retry count {}, isReadRequest {}, canUseMultipleWriteLocations {}, due to failure:",
cnt.incrementAndGet(),
isReadRequest,
canUseMultipleWriteLocations,
e);
if (this.locationEndpoint == null) {
// on before request is not invoked because Document Service Request creation failed.
logger.error("locationEndpoint is null because ClientRetryPolicy::onBeforeRequest(.) is not invoked, " +
Expand All @@ -70,7 +77,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
if (clientException != null && clientException.cosmosResponseDiagnostics() != null) {
this.cosmosResponseDiagnostics = clientException.cosmosResponseDiagnostics();
}
if (clientException != null &&
if (clientException != null &&
Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.FORBIDDEN) &&
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN))
{
Expand All @@ -94,7 +101,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest);
}

if (clientException != null &&
if (clientException != null &&
Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.NOTFOUND) &&
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE)) {
return Mono.just(this.shouldRetryOnSessionNotAvailable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.data.cosmos.internal;

import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;

Expand All @@ -12,11 +13,27 @@ private static <T> String safeToString(T t) {
return t != null ? t.toString() : "null";
}

public static <T> String info(Collection<T> collection) {
return collection == null ? "null collection" :
"collection size: " + collection.size();
}

public static <T> String info(T[] collection) {
return collection == null ? "null collection" :
"collection size: " + collection.length;
}

public static <T> String toString(Collection<T> collection, String delimiter) {
return collection.stream()
.map( t -> safeToString(t) )
.collect(Collectors.joining(delimiter));
return collection == null ? "null collection" :
collection.isEmpty() ? "empty collection" :
collection.stream()
.map(t -> safeToString(t))
.collect(Collectors.joining(delimiter));
}

public static <T> String toString(T[] array, String delimiter) {
return array == null ? "null array" :
toString(Arrays.asList(array), delimiter);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Mono<TValue> getAsync(

}, err -> {

logger.debug("cache[{}] resulted in error {}, computing new value", key, err);
logger.debug("cache[{}] resulted in error, computing new value", key, err);
AsyncLazy<TValue> asyncLazy = new AsyncLazy<>(singleValueInitFunc);
AsyncLazy<TValue> resultAsyncLazy = values.merge(key, asyncLazy,
(lazyValue1, lazyValu2) -> lazyValue1 == initialLazyValue ? lazyValu2 : lazyValue1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ private Mono<ResolutionResult> tryResolveServerPartitionAsync(
if (range == null) {
// Collection cache or routing map cache is potentially outdated. Return empty -
// upper logic will refresh cache and retry.
logger.debug("Collection cache or routing map cache is potentially outdated." +
" Returning null. Upper logic will refresh cache and retry.");
return Mono.empty();
}

Expand Down Expand Up @@ -271,6 +273,7 @@ private PartitionKeyRange tryResolveSinglePartitionCollection(
return routingMap.getOrderedPartitionKeyRanges().get(0);
}

logger.debug("tryResolveSinglePartitionCollection: collectionCacheIsUptoDate = {}", collectionCacheIsUptoDate);
if (collectionCacheIsUptoDate) {
throw BridgeInternal.setResourceAddress(new BadRequestException(RMResources.MissingPartitionKeyValue), request.getResourceAddress());
} else {
Expand Down Expand Up @@ -548,6 +551,7 @@ private ResolutionResult handleRangeAddressResolutionFailure(
throw BridgeInternal.setResourceAddress(new PartitionKeyRangeGoneException(errorMessage), request.getResourceAddress());
}

logger.debug("handleRangeAddressResolutionFailure returns null");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.data.cosmos.internal.Exceptions;
import com.azure.data.cosmos.internal.HttpConstants;
import com.azure.data.cosmos.internal.IAuthorizationTokenProvider;
import com.azure.data.cosmos.internal.JavaStreamUtils;
import com.azure.data.cosmos.internal.OperationType;
import com.azure.data.cosmos.internal.PartitionKeyRange;
import com.azure.data.cosmos.internal.Paths;
Expand Down Expand Up @@ -140,6 +141,10 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque
com.azure.data.cosmos.internal.Utils.checkNotNullOrThrow(request, "request", "");
com.azure.data.cosmos.internal.Utils.checkNotNullOrThrow(partitionKeyRangeIdentity, "partitionKeyRangeIdentity", "");

logger.debug("PartitionKeyRangeIdentity {}, forceRefreshPartitionAddresses {}",
partitionKeyRangeIdentity,
forceRefreshPartitionAddresses);

if (StringUtils.equals(partitionKeyRangeIdentity.getPartitionKeyRangeId(),
PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) {

Expand All @@ -150,6 +155,8 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque
Instant suboptimalServerPartitionTimestamp = this.suboptimalServerPartitionTimestamps.get(partitionKeyRangeIdentity);

if (suboptimalServerPartitionTimestamp != null) {
logger.debug("suboptimalServerPartitionTimestamp is {}", suboptimalServerPartitionTimestamp);

boolean forceRefreshDueToSuboptimalPartitionReplicaSet = Duration.between(suboptimalServerPartitionTimestamp, Instant.now()).getSeconds()
> this.suboptimalPartitionForceRefreshIntervalInSeconds;

Expand All @@ -158,14 +165,18 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque
// and if they are equal, updates the key with a third value.
Instant newValue = this.suboptimalServerPartitionTimestamps.computeIfPresent(partitionKeyRangeIdentity,
(key, oldVal) -> {
logger.debug("key = {}, oldValue = {}", key, oldVal);

if (suboptimalServerPartitionTimestamp.equals(oldVal)) {
return Instant.MAX;
} else {
return oldVal;
}
});

logger.debug("newValue is {}", newValue);
if (!newValue.equals(suboptimalServerPartitionTimestamp)) {
logger.debug("setting forceRefreshPartitionAddresses to true");
// the value was replaced;
forceRefreshPartitionAddresses = true;
}
Expand All @@ -175,6 +186,8 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque
final boolean forceRefreshPartitionAddressesModified = forceRefreshPartitionAddresses;

if (forceRefreshPartitionAddressesModified) {
logger.debug("refresh serverPartitionAddressCache for {}", partitionKeyRangeIdentity);

this.serverPartitionAddressCache.refresh(
partitionKeyRangeIdentity,
() -> this.getAddressesForRangeId(
Expand All @@ -198,24 +211,32 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque
return addressesObs.map(
addresses -> {
if (notAllReplicasAvailable(addresses)) {
if (logger.isDebugEnabled()) {
logger.debug("not all replicas available {}", JavaStreamUtils.info(addresses));
}

this.suboptimalServerPartitionTimestamps.putIfAbsent(partitionKeyRangeIdentity, Instant.now());
}

return addresses;
}).onErrorResume(ex -> {
CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(ex, CosmosClientException.class);
if (dce == null) {
logger.error("unexpected failure", ex);
if (forceRefreshPartitionAddressesModified) {
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
}
return Mono.error(ex);
} else {
logger.debug("tryGetAddresses dce", dce);
if (Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND) ||
Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.GONE) ||
Exceptions.isSubStatusCode(dce, HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE)) {
//remove from suboptimal cache in case the collection+pKeyRangeId combo is gone.
this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
return null;

logger.debug("tryGetAddresses: inner onErrorResumeNext return null", dce);
return Mono.empty();
}
return Mono.error(ex);
}
Expand All @@ -228,6 +249,10 @@ public Mono<List<Address>> getServerAddressesViaGatewayAsync(
String collectionRid,
List<String> partitionKeyRangeIds,
boolean forceRefresh) {
if (logger.isDebugEnabled()) {
logger.debug("getServerAddressesViaGatewayAsync collectionRid {}, partitionKeyRangeIds {}", collectionRid,
JavaStreamUtils.toString(partitionKeyRangeIds, ","));
}
String entryUrl = PathsHelper.generatePath(ResourceType.Document, collectionRid, true);
HashMap<String, String> addressQuery = new HashMap<>();

Expand Down Expand Up @@ -285,6 +310,9 @@ public Mono<List<Address>> getServerAddressesViaGatewayAsync(
Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(httpResponseMono, httpRequest);
return dsrObs.map(
dsr -> {
if (logger.isDebugEnabled()) {
logger.debug("getServerAddressesViaGatewayAsync deserializes result");
}
logAddressResolutionEnd(request, identifier);
return dsr.getQueryResponse(Address.class);
});
Expand All @@ -296,6 +324,7 @@ public void dispose() {
}

private Mono<Pair<PartitionKeyRangeIdentity, AddressInformation[]>> resolveMasterAsync(RxDocumentServiceRequest request, boolean forceRefresh, Map<String, Object> properties) {
logger.debug("resolveMasterAsync forceRefresh: {}", forceRefresh);
Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterAddressAndRangeInitial = this.masterPartitionAddressCache;

forceRefresh = forceRefresh ||
Expand Down Expand Up @@ -347,18 +376,25 @@ private Mono<AddressInformation[]> getAddressesForRangeId(
String collectionRid,
String partitionKeyRangeId,
boolean forceRefresh) {
logger.debug("getAddressesForRangeId collectionRid {}, partitionKeyRangeId {}, forceRefresh {}",
collectionRid, partitionKeyRangeId, forceRefresh);
Mono<List<Address>> addressResponse = this.getServerAddressesViaGatewayAsync(request, collectionRid, Collections.singletonList(partitionKeyRangeId), forceRefresh);

Mono<List<Pair<PartitionKeyRangeIdentity, AddressInformation[]>>> addressInfos =
addressResponse.map(
addresses ->
addresses.stream().filter(addressInfo ->
this.protocolScheme.equals(addressInfo.getProtocolScheme()))
.collect(Collectors.groupingBy(
Address::getParitionKeyRangeId))
.values().stream()
.map(groupedAddresses -> toPartitionAddressAndRange(collectionRid, addresses))
.collect(Collectors.toList()));
addresses -> {
if (logger.isDebugEnabled()) {
logger.debug("addresses from getServerAddressesViaGatewayAsync in getAddressesForRangeId {}",
JavaStreamUtils.info(addresses));
}
return addresses.stream().filter(addressInfo ->
this.protocolScheme.equals(addressInfo.getProtocolScheme()))
.collect(Collectors.groupingBy(
Address::getParitionKeyRangeId))
.values().stream()
.map(groupedAddresses -> toPartitionAddressAndRange(collectionRid, addresses))
.collect(Collectors.toList());
});

Mono<List<Pair<PartitionKeyRangeIdentity, AddressInformation[]>>> result = addressInfos.map(addressInfo -> addressInfo.stream()
.filter(a ->
Expand All @@ -367,6 +403,10 @@ private Mono<AddressInformation[]> getAddressesForRangeId(

return result.flatMap(
list -> {
if (logger.isDebugEnabled()) {
logger.debug("getAddressesForRangeId flatMap got result {}", JavaStreamUtils.info(list));
}

if (list.isEmpty()) {

String errorMessage = String.format(
Expand All @@ -381,7 +421,9 @@ private Mono<AddressInformation[]> getAddressesForRangeId(
} else {
return Mono.just(list.get(0).getRight());
}
});
}).doOnError(e -> {
logger.debug("getAddressesForRangeId", e);
});
}

public Mono<List<Address>> getMasterAddressesViaGatewayAsync(
Expand All @@ -392,6 +434,20 @@ public Mono<List<Address>> getMasterAddressesViaGatewayAsync(
boolean forceRefresh,
boolean useMasterCollectionResolver,
Map<String, Object> properties) {

logger.debug("getMasterAddressesViaGatewayAsync " +
"resourceType {}, " +
"resourceAddress {}, " +
"entryUrl {}, " +
"forceRefresh {}, " +
"useMasterCollectionResolver {}",
resourceType,
resourceAddress,
entryUrl,
forceRefresh,
useMasterCollectionResolver
);

HashMap<String, String> queryParameters = new HashMap<>();
queryParameters.put(HttpConstants.QueryStrings.URL, HttpUtils.urlEncode(entryUrl));
HashMap<String, String> headers = new HashMap<>(defaultRequestHeaders);
Expand Down Expand Up @@ -441,6 +497,7 @@ public Mono<List<Address>> getMasterAddressesViaGatewayAsync(
}

private Pair<PartitionKeyRangeIdentity, AddressInformation[]> toPartitionAddressAndRange(String collectionRid, List<Address> addresses) {
logger.debug("toPartitionAddressAndRange");
Address address = addresses.get(0);

AddressInformation[] addressInfos =
Expand All @@ -457,6 +514,11 @@ private static AddressInformation toAddressInformation(Address address) {
public Mono<Void> openAsync(
DocumentCollection collection,
List<PartitionKeyRangeIdentity> partitionKeyRangeIdentities) {

if (logger.isDebugEnabled()) {
logger.debug("openAsync collection: {}, partitionKeyRangeIdentities: {}", collection, JavaStreamUtils.toString(partitionKeyRangeIdentities, ","));
}

List<Flux<List<Address>>> tasks = new ArrayList<>();
int batchSize = GatewayAddressCache.DefaultBatchSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception exception) {
(this.request.getPartitionKeyRangeIdentity() == null ||
this.request.getPartitionKeyRangeIdentity().getCollectionRid() == null)) &&
!(exception instanceof PartitionKeyRangeIsSplittingException)) {
logger.debug("Operation will NOT be retried. Current attempt {}, Exception: {} ", this.attemptCount,
logger.debug("Operation will NOT be retried. Current attempt {}, Exception:", this.attemptCount,
exception);
stopStopWatch(this.durationTimer);
return Mono.just(ShouldRetryResult.noRetry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ public String toHeader() {
return String.format("%s", this.partitionKeyRangeId);
}

@Override
public String toString() {
return "PartitionKeyRangeIdentity{" +
"collectionRid='" + collectionRid + '\'' +
", partitionKeyRangeId='" + partitionKeyRangeId + '\'' +
'}';
}

@Override
public boolean equals(Object other) {
if (null == other) {
Expand Down
Loading