diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java index 105a3da30e8f..13ea67b4d0e3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java @@ -81,7 +81,7 @@ public Mono shouldRetry(Exception e) { Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.FORBIDDEN) && Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN)) { - logger.warn("Endpoint not writable. Will refresh cache and retry. {}", e.toString()); + logger.warn("Endpoint not writable. Will refresh cache and retry ", e); return this.shouldRetryOnEndpointFailureAsync(false, true); } @@ -91,14 +91,18 @@ public Mono shouldRetry(Exception e) { Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.DATABASE_ACCOUNT_NOTFOUND) && this.isReadRequest) { - logger.warn("Endpoint not available for reads. Will refresh cache and retry. {}", e.toString()); + logger.warn("Endpoint not available for reads. Will refresh cache and retry. ", e); return this.shouldRetryOnEndpointFailureAsync(true, false); } - // Received Connection error (HttpRequestException), initiate the endpoint rediscovery + // Received Connection error (HttpException), initiate the endpoint rediscovery if (WebExceptionUtility.isNetworkFailure(e)) { - logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString()); - return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false); + if (this.isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) { + logger.warn("Endpoint not reachable. Will refresh cache and retry. ", e); + return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false); + } else { + return this.shouldNotRetryOnEndpointFailureAsync(this.isReadRequest, false); + } } if (clientException != null && @@ -147,23 +151,14 @@ private Mono shouldRetryOnEndpointFailureAsync(boolean isRead return Mono.just(ShouldRetryResult.noRetry()); } - this.failoverRetryCount++; - - // Mark the current read endpoint as unavailable - if (this.isReadRequest) { - logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint); - this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint); - } else { - logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint); - this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint); - } + Mono refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh); // Some requests may be in progress when the endpoint manager and client are closed. // In that case, the request won't succeed since the http client is closed. // Therefore just skip the retry here to avoid the delay because retrying won't go through in the end. Duration retryDelay = Duration.ZERO; - if (!this.isReadRequest) { + if (!isReadRequest) { logger.debug("Failover happening. retryCount {}", this.failoverRetryCount); if (this.failoverRetryCount > 1) { //if retried both endpoints, follow regular retry interval. @@ -172,9 +167,32 @@ private Mono shouldRetryOnEndpointFailureAsync(boolean isRead } else { retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS); } + return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay))); + } + + private Mono shouldNotRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) { + if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { + logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); + return Mono.just(ShouldRetryResult.noRetry()); + } + Mono refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh); + return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.noRetry())); + } + + private Mono refreshLocation(boolean isReadRequest, boolean forceRefresh) { + this.failoverRetryCount++; + + // Mark the current read endpoint as unavailable + if (isReadRequest) { + logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint); + this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint); + } else { + logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint); + this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint); + } + this.retryContext = new RetryContext(this.failoverRetryCount, false); - return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh) - .then(Mono.just(ShouldRetryResult.retryAfter(retryDelay))); + return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java index 2311c8565ad0..2ca91c2df27a 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java @@ -66,8 +66,7 @@ public void networkFailureOnWrite() throws Exception { Mono shouldRetry = clientRetryPolicy.shouldRetry(exception); validateSuccess(shouldRetry, ShouldRetryValidator.builder() .nullException() - .shouldRetry(true) - .backOfTime(i > 0 ? Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS) : Duration.ZERO) + .shouldRetry(false) .build()); Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any()); @@ -75,6 +74,60 @@ public void networkFailureOnWrite() throws Exception { } } + @Test(groups = "unit") + public void networkFailureOnUpsert() throws Exception { + RetryOptions retryOptions = new RetryOptions(); + GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); + Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); + ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); + + Exception exception = ReadTimeoutException.INSTANCE; + + RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName( + OperationType.Upsert, "/dbs/db/colls/col/docs/docId", ResourceType.Document); + dsr.requestContext = Mockito.mock(DocumentServiceRequestContext.class); + + clientRetryPolicy.onBeforeSendRequest(dsr); + for (int i = 0; i < 10; i++) { + Mono shouldRetry = clientRetryPolicy.shouldRetry(exception); + validateSuccess(shouldRetry, ShouldRetryValidator.builder() + .nullException() + .shouldRetry(false) + .build()); + + Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any()); + Mockito.verify(endpointManager, Mockito.times(i + 1)).markEndpointUnavailableForWrite(Mockito.any()); + } + } + + @Test(groups = "unit") + public void networkFailureOnDelete() throws Exception { + RetryOptions retryOptions = new RetryOptions(); + GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); + Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); + ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); + + Exception exception = ReadTimeoutException.INSTANCE; + + RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName( + OperationType.Delete, "/dbs/db/colls/col/docs/docId", ResourceType.Document); + dsr.requestContext = Mockito.mock(DocumentServiceRequestContext.class); + + clientRetryPolicy.onBeforeSendRequest(dsr); + for (int i = 0; i < 10; i++) { + Mono shouldRetry = clientRetryPolicy.shouldRetry(exception); + validateSuccess(shouldRetry, ShouldRetryValidator.builder() + .nullException() + .shouldRetry(false) + .build()); + + Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any()); + Mockito.verify(endpointManager, Mockito.times(i + 1)).markEndpointUnavailableForWrite(Mockito.any()); + } + } + @Test(groups = "unit") public void onBeforeSendRequestNotInvoked() { RetryOptions retryOptions = new RetryOptions();