Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.FORBIDDEN) &&
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN))
{
logger.warn("Endpoint not writable. Will refresh cache and retry. {}", e.toString());
logger.warn("Endpoint not writable. Will refresh cache and retry ", e);
return this.shouldRetryOnEndpointFailureAsync(false, true);
}

Expand All @@ -91,14 +91,18 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.DATABASE_ACCOUNT_NOTFOUND) &&
this.isReadRequest)
{
logger.warn("Endpoint not available for reads. Will refresh cache and retry. {}", e.toString());
logger.warn("Endpoint not available for reads. Will refresh cache and retry. ", e);
return this.shouldRetryOnEndpointFailureAsync(true, false);
}

// Received Connection error (HttpRequestException), initiate the endpoint rediscovery
// Received Connection error (HttpException), initiate the endpoint rediscovery
if (WebExceptionUtility.isNetworkFailure(e)) {
logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString());
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false);
if (this.isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) {
logger.warn("Endpoint not reachable. Will refresh cache and retry. ", e);
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false);
} else {
return this.shouldNotRetryOnEndpointFailureAsync(this.isReadRequest, false);
}
}

if (clientException != null &&
Expand Down Expand Up @@ -147,23 +151,14 @@ private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isRead
return Mono.just(ShouldRetryResult.noRetry());
}

this.failoverRetryCount++;

// Mark the current read endpoint as unavailable
if (this.isReadRequest) {
logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint);
} else {
logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint);
}
Mono<Void> refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh);

// Some requests may be in progress when the endpoint manager and client are closed.
// In that case, the request won't succeed since the http client is closed.
// Therefore just skip the retry here to avoid the delay because retrying won't go through in the end.

Duration retryDelay = Duration.ZERO;
if (!this.isReadRequest) {
if (!isReadRequest) {
logger.debug("Failover happening. retryCount {}", this.failoverRetryCount);
if (this.failoverRetryCount > 1) {
//if retried both endpoints, follow regular retry interval.
Expand All @@ -172,9 +167,32 @@ private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isRead
} else {
retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS);
}
return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay)));
}

private Mono<ShouldRetryResult> shouldNotRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) {
logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount);
return Mono.just(ShouldRetryResult.noRetry());
}
Mono<Void> refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh);
return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.noRetry()));
}

private Mono<Void> refreshLocation(boolean isReadRequest, boolean forceRefresh) {
this.failoverRetryCount++;

// Mark the current read endpoint as unavailable
if (isReadRequest) {
logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint);
} else {
logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint);
}

this.retryContext = new RetryContext(this.failoverRetryCount, false);
return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh)
.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay)));
return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,68 @@ public void networkFailureOnWrite() throws Exception {
Mono<IRetryPolicy.ShouldRetryResult> shouldRetry = clientRetryPolicy.shouldRetry(exception);
validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(true)
.backOfTime(i > 0 ? Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS) : Duration.ZERO)
.shouldRetry(false)
.build());

Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any());
Mockito.verify(endpointManager, Mockito.times(i + 1)).markEndpointUnavailableForWrite(Mockito.any());
}
}

@Test(groups = "unit")
public void networkFailureOnUpsert() throws Exception {
RetryOptions retryOptions = new RetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions);

Exception exception = ReadTimeoutException.INSTANCE;

RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(
OperationType.Upsert, "/dbs/db/colls/col/docs/docId", ResourceType.Document);
dsr.requestContext = Mockito.mock(DocumentServiceRequestContext.class);

clientRetryPolicy.onBeforeSendRequest(dsr);
for (int i = 0; i < 10; i++) {
Mono<IRetryPolicy.ShouldRetryResult> shouldRetry = clientRetryPolicy.shouldRetry(exception);
validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(false)
.build());

Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any());
Mockito.verify(endpointManager, Mockito.times(i + 1)).markEndpointUnavailableForWrite(Mockito.any());
}
}

@Test(groups = "unit")
public void networkFailureOnDelete() throws Exception {
RetryOptions retryOptions = new RetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions);

Exception exception = ReadTimeoutException.INSTANCE;

RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(
OperationType.Delete, "/dbs/db/colls/col/docs/docId", ResourceType.Document);
dsr.requestContext = Mockito.mock(DocumentServiceRequestContext.class);

clientRetryPolicy.onBeforeSendRequest(dsr);
for (int i = 0; i < 10; i++) {
Mono<IRetryPolicy.ShouldRetryResult> shouldRetry = clientRetryPolicy.shouldRetry(exception);
validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(false)
.build());

Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any());
Mockito.verify(endpointManager, Mockito.times(i + 1)).markEndpointUnavailableForWrite(Mockito.any());
}
}

@Test(groups = "unit")
public void onBeforeSendRequestNotInvoked() {
RetryOptions retryOptions = new RetryOptions();
Expand Down