diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java index b0f7adf8ec42..85456e2e7d3a 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java @@ -14,11 +14,12 @@ import java.net.URL; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; /** * While this class is public, but it is not part of our published public APIs. * This is meant to be internally used only by our sdk. - * + * * Client policy is combination of endpoint change retry + throttling retry. */ public class ClientRetryPolicy implements IDocumentClientRetryPolicy { @@ -39,6 +40,7 @@ public class ClientRetryPolicy implements IDocumentClientRetryPolicy { private URL locationEndpoint; private RetryContext retryContext; private CosmosResponseDiagnostics cosmosResponseDiagnostics; + private AtomicInteger cnt = new AtomicInteger(0); public ClientRetryPolicy(GlobalEndpointManager globalEndpointManager, boolean enableEndpointDiscovery, @@ -57,6 +59,11 @@ public ClientRetryPolicy(GlobalEndpointManager globalEndpointManager, @Override public Mono shouldRetry(Exception e) { + logger.debug("retry count {}, isReadRequest {}, canUseMultipleWriteLocations {}, due to failure:", + cnt.incrementAndGet(), + isReadRequest, + canUseMultipleWriteLocations, + e); if (this.locationEndpoint == null) { // on before request is not invoked because Document Service Request creation failed. logger.error("locationEndpoint is null because ClientRetryPolicy::onBeforeRequest(.) is not invoked, " + @@ -70,7 +77,7 @@ public Mono shouldRetry(Exception e) { if (clientException != null && clientException.cosmosResponseDiagnostics() != null) { this.cosmosResponseDiagnostics = clientException.cosmosResponseDiagnostics(); } - if (clientException != null && + if (clientException != null && Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.FORBIDDEN) && Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN)) { @@ -94,7 +101,7 @@ public Mono shouldRetry(Exception e) { return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest); } - if (clientException != null && + if (clientException != null && Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.NOTFOUND) && Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE)) { return Mono.just(this.shouldRetryOnSessionNotAvailable()); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/JavaStreamUtils.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/JavaStreamUtils.java index 8cbbf459b910..86c02b89c867 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/JavaStreamUtils.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/JavaStreamUtils.java @@ -3,6 +3,7 @@ package com.azure.data.cosmos.internal; +import java.util.Arrays; import java.util.Collection; import java.util.stream.Collectors; @@ -12,11 +13,27 @@ private static String safeToString(T t) { return t != null ? t.toString() : "null"; } + public static String info(Collection collection) { + return collection == null ? "null collection" : + "collection size: " + collection.size(); + } + + public static String info(T[] collection) { + return collection == null ? "null collection" : + "collection size: " + collection.length; + } + public static String toString(Collection collection, String delimiter) { - return collection.stream() - .map( t -> safeToString(t) ) - .collect(Collectors.joining(delimiter)); + return collection == null ? "null collection" : + collection.isEmpty() ? "empty collection" : + collection.stream() + .map(t -> safeToString(t)) + .collect(Collectors.joining(delimiter)); } + public static String toString(T[] array, String delimiter) { + return array == null ? "null array" : + toString(Arrays.asList(array), delimiter); + } } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/AsyncCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/AsyncCache.java index 2f8354a894ad..37d4dd0b2a9e 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/AsyncCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/caches/AsyncCache.java @@ -81,7 +81,7 @@ public Mono getAsync( }, err -> { - logger.debug("cache[{}] resulted in error {}, computing new value", key, err); + logger.debug("cache[{}] resulted in error, computing new value", key, err); AsyncLazy asyncLazy = new AsyncLazy<>(singleValueInitFunc); AsyncLazy resultAsyncLazy = values.merge(key, asyncLazy, (lazyValue1, lazyValu2) -> lazyValue1 == initialLazyValue ? lazyValu2 : lazyValue1); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolver.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolver.java index d5af634a825a..e05497cb956a 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolver.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/AddressResolver.java @@ -230,6 +230,8 @@ private Mono tryResolveServerPartitionAsync( if (range == null) { // Collection cache or routing map cache is potentially outdated. Return empty - // upper logic will refresh cache and retry. + logger.debug("Collection cache or routing map cache is potentially outdated." + + " Returning null. Upper logic will refresh cache and retry."); return Mono.empty(); } @@ -271,6 +273,7 @@ private PartitionKeyRange tryResolveSinglePartitionCollection( return routingMap.getOrderedPartitionKeyRanges().get(0); } + logger.debug("tryResolveSinglePartitionCollection: collectionCacheIsUptoDate = {}", collectionCacheIsUptoDate); if (collectionCacheIsUptoDate) { throw BridgeInternal.setResourceAddress(new BadRequestException(RMResources.MissingPartitionKeyValue), request.getResourceAddress()); } else { @@ -548,6 +551,7 @@ private ResolutionResult handleRangeAddressResolutionFailure( throw BridgeInternal.setResourceAddress(new PartitionKeyRangeGoneException(errorMessage), request.getResourceAddress()); } + logger.debug("handleRangeAddressResolutionFailure returns null"); return null; } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java index e9427767cf87..d15e5ebc0cf9 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GatewayAddressCache.java @@ -12,6 +12,7 @@ import com.azure.data.cosmos.internal.Exceptions; import com.azure.data.cosmos.internal.HttpConstants; import com.azure.data.cosmos.internal.IAuthorizationTokenProvider; +import com.azure.data.cosmos.internal.JavaStreamUtils; import com.azure.data.cosmos.internal.OperationType; import com.azure.data.cosmos.internal.PartitionKeyRange; import com.azure.data.cosmos.internal.Paths; @@ -140,6 +141,10 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque com.azure.data.cosmos.internal.Utils.checkNotNullOrThrow(request, "request", ""); com.azure.data.cosmos.internal.Utils.checkNotNullOrThrow(partitionKeyRangeIdentity, "partitionKeyRangeIdentity", ""); + logger.debug("PartitionKeyRangeIdentity {}, forceRefreshPartitionAddresses {}", + partitionKeyRangeIdentity, + forceRefreshPartitionAddresses); + if (StringUtils.equals(partitionKeyRangeIdentity.getPartitionKeyRangeId(), PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) { @@ -150,6 +155,8 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque Instant suboptimalServerPartitionTimestamp = this.suboptimalServerPartitionTimestamps.get(partitionKeyRangeIdentity); if (suboptimalServerPartitionTimestamp != null) { + logger.debug("suboptimalServerPartitionTimestamp is {}", suboptimalServerPartitionTimestamp); + boolean forceRefreshDueToSuboptimalPartitionReplicaSet = Duration.between(suboptimalServerPartitionTimestamp, Instant.now()).getSeconds() > this.suboptimalPartitionForceRefreshIntervalInSeconds; @@ -158,6 +165,8 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque // and if they are equal, updates the key with a third value. Instant newValue = this.suboptimalServerPartitionTimestamps.computeIfPresent(partitionKeyRangeIdentity, (key, oldVal) -> { + logger.debug("key = {}, oldValue = {}", key, oldVal); + if (suboptimalServerPartitionTimestamp.equals(oldVal)) { return Instant.MAX; } else { @@ -165,7 +174,9 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque } }); + logger.debug("newValue is {}", newValue); if (!newValue.equals(suboptimalServerPartitionTimestamp)) { + logger.debug("setting forceRefreshPartitionAddresses to true"); // the value was replaced; forceRefreshPartitionAddresses = true; } @@ -175,6 +186,8 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque final boolean forceRefreshPartitionAddressesModified = forceRefreshPartitionAddresses; if (forceRefreshPartitionAddressesModified) { + logger.debug("refresh serverPartitionAddressCache for {}", partitionKeyRangeIdentity); + this.serverPartitionAddressCache.refresh( partitionKeyRangeIdentity, () -> this.getAddressesForRangeId( @@ -198,6 +211,10 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque return addressesObs.map( addresses -> { if (notAllReplicasAvailable(addresses)) { + if (logger.isDebugEnabled()) { + logger.debug("not all replicas available {}", JavaStreamUtils.info(addresses)); + } + this.suboptimalServerPartitionTimestamps.putIfAbsent(partitionKeyRangeIdentity, Instant.now()); } @@ -205,17 +222,21 @@ public Mono tryGetAddresses(RxDocumentServiceRequest reque }).onErrorResume(ex -> { CosmosClientException dce = com.azure.data.cosmos.internal.Utils.as(ex, CosmosClientException.class); if (dce == null) { + logger.error("unexpected failure", ex); if (forceRefreshPartitionAddressesModified) { this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity); } return Mono.error(ex); } else { + logger.debug("tryGetAddresses dce", dce); if (Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.NOTFOUND) || Exceptions.isStatusCode(dce, HttpConstants.StatusCodes.GONE) || Exceptions.isSubStatusCode(dce, HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE)) { //remove from suboptimal cache in case the collection+pKeyRangeId combo is gone. this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity); - return null; + + logger.debug("tryGetAddresses: inner onErrorResumeNext return null", dce); + return Mono.empty(); } return Mono.error(ex); } @@ -228,6 +249,10 @@ public Mono> getServerAddressesViaGatewayAsync( String collectionRid, List partitionKeyRangeIds, boolean forceRefresh) { + if (logger.isDebugEnabled()) { + logger.debug("getServerAddressesViaGatewayAsync collectionRid {}, partitionKeyRangeIds {}", collectionRid, + JavaStreamUtils.toString(partitionKeyRangeIds, ",")); + } String entryUrl = PathsHelper.generatePath(ResourceType.Document, collectionRid, true); HashMap addressQuery = new HashMap<>(); @@ -285,6 +310,9 @@ public Mono> getServerAddressesViaGatewayAsync( Mono dsrObs = HttpClientUtils.parseResponseAsync(httpResponseMono, httpRequest); return dsrObs.map( dsr -> { + if (logger.isDebugEnabled()) { + logger.debug("getServerAddressesViaGatewayAsync deserializes result"); + } logAddressResolutionEnd(request, identifier); return dsr.getQueryResponse(Address.class); }); @@ -296,6 +324,7 @@ public void dispose() { } private Mono> resolveMasterAsync(RxDocumentServiceRequest request, boolean forceRefresh, Map properties) { + logger.debug("resolveMasterAsync forceRefresh: {}", forceRefresh); Pair masterAddressAndRangeInitial = this.masterPartitionAddressCache; forceRefresh = forceRefresh || @@ -347,18 +376,25 @@ private Mono getAddressesForRangeId( String collectionRid, String partitionKeyRangeId, boolean forceRefresh) { + logger.debug("getAddressesForRangeId collectionRid {}, partitionKeyRangeId {}, forceRefresh {}", + collectionRid, partitionKeyRangeId, forceRefresh); Mono> addressResponse = this.getServerAddressesViaGatewayAsync(request, collectionRid, Collections.singletonList(partitionKeyRangeId), forceRefresh); Mono>> addressInfos = addressResponse.map( - addresses -> - addresses.stream().filter(addressInfo -> - this.protocolScheme.equals(addressInfo.getProtocolScheme())) - .collect(Collectors.groupingBy( - Address::getParitionKeyRangeId)) - .values().stream() - .map(groupedAddresses -> toPartitionAddressAndRange(collectionRid, addresses)) - .collect(Collectors.toList())); + addresses -> { + if (logger.isDebugEnabled()) { + logger.debug("addresses from getServerAddressesViaGatewayAsync in getAddressesForRangeId {}", + JavaStreamUtils.info(addresses)); + } + return addresses.stream().filter(addressInfo -> + this.protocolScheme.equals(addressInfo.getProtocolScheme())) + .collect(Collectors.groupingBy( + Address::getParitionKeyRangeId)) + .values().stream() + .map(groupedAddresses -> toPartitionAddressAndRange(collectionRid, addresses)) + .collect(Collectors.toList()); + }); Mono>> result = addressInfos.map(addressInfo -> addressInfo.stream() .filter(a -> @@ -367,6 +403,10 @@ private Mono getAddressesForRangeId( return result.flatMap( list -> { + if (logger.isDebugEnabled()) { + logger.debug("getAddressesForRangeId flatMap got result {}", JavaStreamUtils.info(list)); + } + if (list.isEmpty()) { String errorMessage = String.format( @@ -381,7 +421,9 @@ private Mono getAddressesForRangeId( } else { return Mono.just(list.get(0).getRight()); } - }); + }).doOnError(e -> { + logger.debug("getAddressesForRangeId", e); + }); } public Mono> getMasterAddressesViaGatewayAsync( @@ -392,6 +434,20 @@ public Mono> getMasterAddressesViaGatewayAsync( boolean forceRefresh, boolean useMasterCollectionResolver, Map properties) { + + logger.debug("getMasterAddressesViaGatewayAsync " + + "resourceType {}, " + + "resourceAddress {}, " + + "entryUrl {}, " + + "forceRefresh {}, " + + "useMasterCollectionResolver {}", + resourceType, + resourceAddress, + entryUrl, + forceRefresh, + useMasterCollectionResolver + ); + HashMap queryParameters = new HashMap<>(); queryParameters.put(HttpConstants.QueryStrings.URL, HttpUtils.urlEncode(entryUrl)); HashMap headers = new HashMap<>(defaultRequestHeaders); @@ -441,6 +497,7 @@ public Mono> getMasterAddressesViaGatewayAsync( } private Pair toPartitionAddressAndRange(String collectionRid, List
addresses) { + logger.debug("toPartitionAddressAndRange"); Address address = addresses.get(0); AddressInformation[] addressInfos = @@ -457,6 +514,11 @@ private static AddressInformation toAddressInformation(Address address) { public Mono openAsync( DocumentCollection collection, List partitionKeyRangeIdentities) { + + if (logger.isDebugEnabled()) { + logger.debug("openAsync collection: {}, partitionKeyRangeIdentities: {}", collection, JavaStreamUtils.toString(partitionKeyRangeIdentities, ",")); + } + List>> tasks = new ArrayList<>(); int batchSize = GatewayAddressCache.DefaultBatchSize; diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GoneAndRetryWithRetryPolicy.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GoneAndRetryWithRetryPolicy.java index 8b80b2c9efd0..7be99d94fc6c 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GoneAndRetryWithRetryPolicy.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/GoneAndRetryWithRetryPolicy.java @@ -64,7 +64,7 @@ public Mono shouldRetry(Exception exception) { (this.request.getPartitionKeyRangeIdentity() == null || this.request.getPartitionKeyRangeIdentity().getCollectionRid() == null)) && !(exception instanceof PartitionKeyRangeIsSplittingException)) { - logger.debug("Operation will NOT be retried. Current attempt {}, Exception: {} ", this.attemptCount, + logger.debug("Operation will NOT be retried. Current attempt {}, Exception:", this.attemptCount, exception); stopStopWatch(this.durationTimer); return Mono.just(ShouldRetryResult.noRetry()); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/PartitionKeyRangeIdentity.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/PartitionKeyRangeIdentity.java index 7aa1bdcf8f5d..734f5e2785c7 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/PartitionKeyRangeIdentity.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/PartitionKeyRangeIdentity.java @@ -62,6 +62,14 @@ public String toHeader() { return String.format("%s", this.partitionKeyRangeId); } + @Override + public String toString() { + return "PartitionKeyRangeIdentity{" + + "collectionRid='" + collectionRid + '\'' + + ", partitionKeyRangeId='" + partitionKeyRangeId + '\'' + + '}'; + } + @Override public boolean equals(Object other) { if (null == other) { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java index 616e36422665..927b1f375a52 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/FeedResponseListValidator.java @@ -173,7 +173,7 @@ public Builder allPagesSatisfy(FeedResponseValidator pageValidator) { public void validate(List> feedList) { for(FeedResponse fp: feedList) { - pageValidator.validate(fp); + pageValidator.validate(fp); } } }); @@ -191,15 +191,15 @@ public void validate(List> feedList) { if (value instanceof Double) { Double d = result.getDouble("_aggregate"); - assertThat(d).isEqualTo(value); + assertThat(d).isEqualTo(value); } else if (value instanceof Integer) { Integer d = result.getInt("_aggregate"); - assertThat(d).isEqualTo(value); + assertThat(d).isEqualTo(value); } else if (value instanceof String) { String d = result.getString("_aggregate"); - assertThat(d).isEqualTo(value); + assertThat(d).isEqualTo(value); } else if (value instanceof Document){ assertThat(result.toString()).isEqualTo(value.toString()); @@ -276,11 +276,11 @@ public void validate(List> feedList) { if (shouldHaveMetrics) { QueryMetrics queryMetrics = BridgeInternal.createQueryMetricsFromCollection(BridgeInternal.queryMetricsFromFeedResponse(feedPage).values()); assertThat(queryMetrics.getIndexHitDocumentCount()).isGreaterThanOrEqualTo(0); - assertThat(queryMetrics.getRetrievedDocumentSize()).isGreaterThan(0); + assertThat(queryMetrics.getRetrievedDocumentSize()).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getTotalQueryExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0); assertThat(queryMetrics.getOutputDocumentCount()).isGreaterThan(0); - assertThat(queryMetrics.getRetrievedDocumentCount()).isGreaterThan(0); - assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThan(0); + assertThat(queryMetrics.getRetrievedDocumentCount()).isGreaterThanOrEqualTo(0); + assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getDocumentWriteTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getVMExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0); assertThat(queryMetrics.getQueryPreparationTimes().getLogicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThan(0);