diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicy.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicy.java index f5cc3b5a9..24bb35ac5 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicy.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicy.java @@ -99,12 +99,6 @@ public Single shouldRetry(Exception e) { return rxNettyConnectionPoolExhaustedRetry.shouldRetry(e); } - // Received Connection error (HttpRequestException), initiate the endpoint rediscovery - if (WebExceptionUtility.isNetworkFailure(e)) { - logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString()); - return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false, e); - } - this.retryContext = null; // Received 403.3 on write region, initiate the endpoint re-discovery DocumentClientException clientException = Utils.as(e, DocumentClientException.class); @@ -115,8 +109,8 @@ public Single shouldRetry(Exception e) { Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.FORBIDDEN) && Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN)) { - logger.warn("Endpoint not writable. Will refresh cache and retry. {}", e.toString()); - return this.shouldRetryOnEndpointFailureAsync(false, true, e); + logger.warn("Endpoint not writable. Will refresh cache and retry. ", e); + return this.shouldRetryOnEndpointFailureAsync(false, true); } // Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress) @@ -125,14 +119,18 @@ public Single shouldRetry(Exception e) { Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.DATABASE_ACCOUNT_NOTFOUND) && (this.isReadRequest || this.canUseMultipleWriteLocations)) { - logger.warn("Endpoint not available for reads. Will refresh cache and retry. {}", e.toString()); - return this.shouldRetryOnEndpointFailureAsync(true, false, e); + logger.warn("Endpoint not available for reads. Will refresh cache and retry. ", e); + return this.shouldRetryOnEndpointFailureAsync(true, false); } // Received Connection error (HttpRequestException), initiate the endpoint rediscovery if (WebExceptionUtility.isNetworkFailure(e)) { - logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString()); - return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false, e); + if (this.isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) { + logger.warn("Endpoint not reachable. Will refresh cache and retry. ", e); + return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false); + } else { + return this.shouldNotRetryOnEndpointFailureAsync(this.isReadRequest, false); + } } if (clientException != null && @@ -175,29 +173,20 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable() { } } - private Single shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh, Exception e) { + private Single shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) { if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); return Single.just(ShouldRetryResult.noRetry()); } - this.failoverRetryCount++; - - // Mark the current read endpoint as unavailable - if (this.isReadRequest) { - logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint); - this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint); - } else { - logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint); - this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint); - } + Completable refreshCompletion = this.refreshLocation(isReadRequest, forceRefresh); // Some requests may be in progress when the endpoint manager and client are closed. // In that case, the request won't succeed since the http client is closed. // Therefore just skip the retry here to avoid the delay because retrying won't go through in the end. Duration retryDelay = Duration.ZERO; - if (!this.isReadRequest) { + if (!isReadRequest) { logger.debug("Failover happening. retryCount {}", this.failoverRetryCount); if (this.failoverRetryCount > 1) { //if retried both endpoints, follow regular retry interval. @@ -206,22 +195,33 @@ private Single shouldRetryOnEndpointFailureAsync(boolean isRe } else { retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS); } - this.retryContext = new RetryContext(this.failoverRetryCount, false); + return refreshCompletion.andThen(Single.just(ShouldRetryResult.retryAfter(retryDelay))); + } + + private Single shouldNotRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) { + if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { + logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); + return Single.just(ShouldRetryResult.noRetry()); + } + Completable refreshCompletion = this.refreshLocation(isReadRequest, forceRefresh); + return refreshCompletion.andThen(Single.just(ShouldRetryResult.noRetry())); + } - Completable refreshCompletion = this.globalEndpointManager.refreshLocationAsync(null, forceRefresh); + private Completable refreshLocation(boolean isReadRequest, boolean forceRefresh) { + this.failoverRetryCount++; - if (isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) { - // refresh cache and - // if it is a read request or if it is a write but we are sure the write - // hasn't reached the service retry - return refreshCompletion - .andThen(Single.just(ShouldRetryResult.retryAfter(retryDelay))); + // Mark the current read endpoint as unavailable + if (isReadRequest) { + logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint); + this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint); } else { - // refresh cache and - // no retry for writes which we are not sure if have reached to the service or not - return refreshCompletion - .andThen(Single.just(ShouldRetryResult.noRetry())); + logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint); + this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint); } + + this.retryContext = new RetryContext(this.failoverRetryCount, false); + + return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh); } @Override