Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,20 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBackground)
if (this.enableEndpointDiscovery) {

boolean shouldRefresh = this.useMultipleWriteLocations && !this.enableMultipleWriteLocations;
List<URL> readLocationEndpoints = currentLocationInfo.readEndpoints;
if (this.isEndpointUnavailable(readLocationEndpoints.get(0), OperationType.Read)) {
// Since most preferred read endpoint is unavailable, we can only refresh in background if
// we have an alternate read endpoint
canRefreshInBackground.v = anyEndpointsAvailable(readLocationEndpoints,OperationType.Read);
logger.debug("shouldRefreshEndpoints = true, since the first read endpoint " +
"[{}] is not available for read. canRefreshInBackground = [{}]",
readLocationEndpoints.get(0),
canRefreshInBackground.v);
return true;
}

if (!Strings.isNullOrEmpty(mostPreferredLocation)) {
Utils.ValueHolder<URL> mostPreferredReadEndpointHolder = new Utils.ValueHolder<>();
List<URL> readLocationEndpoints = currentLocationInfo.readEndpoints;
logger.debug("getReadEndpoints [{}]", readLocationEndpoints);

if (Utils.tryGetValue(currentLocationInfo.availableReadEndpointByLocation, mostPreferredLocation, mostPreferredReadEndpointHolder)) {
Expand Down Expand Up @@ -237,7 +248,7 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBackground)
if (this.isEndpointUnavailable(writeLocationEndpoints.get(0), OperationType.Write)) {
// Since most preferred write endpoint is unavailable, we can only refresh in background if
// we have an alternate write endpoint
canRefreshInBackground.v = writeLocationEndpoints.size() > 1;
canRefreshInBackground.v = anyEndpointsAvailable(writeLocationEndpoints,OperationType.Write);
logger.debug("shouldRefreshEndpoints = true, most preferred location " +
"[{}] endpoint [{}] is not available for write. canRefreshInBackground = [{}]",
mostPreferredLocation,
Expand Down Expand Up @@ -324,6 +335,18 @@ private boolean isEndpointUnavailable(URL endpoint, OperationType expectedAvaila
}
}

private boolean anyEndpointsAvailable(List<URL> endpoints, OperationType expectedAvailableOperations) {
Utils.ValueHolder<LocationUnavailabilityInfo> unavailabilityInfoHolder = new Utils.ValueHolder<>();
boolean anyEndpointsAvailable = false;
for (URL endpoint : endpoints) {
if (!isEndpointUnavailable(endpoint, expectedAvailableOperations)) {
anyEndpointsAvailable = true;
break;
}
}
return anyEndpointsAvailable;
}

private void markEndpointUnavailable(
URL unavailableEndpoint,
OperationType unavailableOperationType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Single<ShouldRetryResult> shouldRetry(Exception e) {
// Received Connection error (HttpRequestException), initiate the endpoint rediscovery
if (WebExceptionUtility.isNetworkFailure(e)) {
logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString());
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, e);
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false, e);
}

this.retryContext = null;
Expand All @@ -116,7 +116,7 @@ public Single<ShouldRetryResult> shouldRetry(Exception e) {
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN))
{
logger.warn("Endpoint not writable. Will refresh cache and retry. {}", e.toString());
return this.shouldRetryOnEndpointFailureAsync(false, e);
return this.shouldRetryOnEndpointFailureAsync(false, true, e);
}

// Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress)
Expand All @@ -126,7 +126,13 @@ public Single<ShouldRetryResult> shouldRetry(Exception e) {
(this.isReadRequest || this.canUseMultipleWriteLocations))
{
logger.warn("Endpoint not available for reads. Will refresh cache and retry. {}", e.toString());
return this.shouldRetryOnEndpointFailureAsync(true, e);
return this.shouldRetryOnEndpointFailureAsync(true, false, e);
}

// Received Connection error (HttpRequestException), initiate the endpoint rediscovery
if (WebExceptionUtility.isNetworkFailure(e)) {
logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString());
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false, e);
}

if (clientException != null &&
Expand Down Expand Up @@ -169,7 +175,7 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable() {
}
}

private Single<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isReadRequest, Exception e) {
private Single<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh, Exception e) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) {
logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount);
return Single.just(ShouldRetryResult.noRetry());
Expand Down Expand Up @@ -202,7 +208,7 @@ private Single<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isRe
}
this.retryContext = new RetryContext(this.failoverRetryCount, false);

Completable refreshCompletion = this.globalEndpointManager.refreshLocationAsync(null);
Completable refreshCompletion = this.globalEndpointManager.refreshLocationAsync(null, forceRefresh);

if (isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) {
// refresh cache and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class GlobalEndpointManager implements AutoCloseable {
private final ConnectionPolicy connectionPolicy;
private final DatabaseAccountManagerInternal owner;
private final AtomicBoolean isRefreshing;
private final AtomicBoolean refreshInBackground;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Scheduler scheduler = Schedulers.from(executor);
private volatile boolean isClosed;
Expand All @@ -87,6 +88,7 @@ public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPol
this.connectionPolicy = connectionPolicy;

this.isRefreshing = new AtomicBoolean(false);
this.refreshInBackground = new AtomicBoolean(false);
this.isClosed = false;
} catch (Exception e) {
throw new IllegalArgumentException(e);
Expand Down Expand Up @@ -155,9 +157,24 @@ public void close() {
logger.debug("GlobalEndpointManager closed.");
}

public Completable refreshLocationAsync(DatabaseAccount databaseAccount) {
public Completable refreshLocationAsync(DatabaseAccount databaseAccount, boolean forceRefresh) {
return Completable.defer(() -> {
logger.debug("refreshLocationAsync() invoked");

if (forceRefresh) {
Single<DatabaseAccount> databaseAccountObs = getDatabaseAccountFromAnyLocationsAsync(
this.defaultEndpoint,
new ArrayList<>(this.connectionPolicy.getPreferredLocations()),
this::getDatabaseAccountAsync);

return databaseAccountObs.map(dbAccount -> {
this.locationCache.onDatabaseAccountRead(dbAccount);
return dbAccount;
}).flatMapCompletable(dbAccount -> {
return Completable.complete();
});
}

if (!isRefreshing.compareAndSet(false, true)) {
logger.debug("in the middle of another refresh. Not invoking a new refresh.");
return Completable.complete();
Expand Down Expand Up @@ -190,17 +207,23 @@ private Completable refreshLocationPrivateAsync(DatabaseAccount databaseAccount)

return databaseAccountObs.map(dbAccount -> {
this.locationCache.onDatabaseAccountRead(dbAccount);
this.isRefreshing.set(false);
return dbAccount;
}).flatMapCompletable(dbAccount -> {
// trigger a startRefreshLocationTimerAsync don't wait on it.
this.startRefreshLocationTimerAsync();
if (!this.refreshInBackground.get()) {
this.startRefreshLocationTimerAsync();
}
return Completable.complete();
});
}

// trigger a startRefreshLocationTimerAsync don't wait on it.
this.startRefreshLocationTimerAsync();
if (!this.refreshInBackground.get()) {
this.startRefreshLocationTimerAsync();
}

this.isRefreshing.set(false);
return Completable.complete();
} else {
logger.debug("shouldRefreshEndpoints: false, nothing to do.");
Expand All @@ -227,6 +250,8 @@ private Observable startRefreshLocationTimerAsync(boolean initialization) {

int delayInMillis = initialization ? 0: this.backgroundRefreshLocationTimeIntervalInMS;

this.refreshInBackground.set(true);

return Observable.timer(delayInMillis, TimeUnit.MILLISECONDS)
.toSingle().flatMapCompletable(
t -> {
Expand All @@ -242,6 +267,7 @@ private Observable startRefreshLocationTimerAsync(boolean initialization) {

return databaseAccountObs.flatMapCompletable(dbAccount -> {
logger.debug("db account retrieved");
this.refreshInBackground.set(false);
return this.refreshLocationPrivateAsync(dbAccount);
});
}).onErrorResumeNext(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,11 @@ private void validateLocationCacheAsync(
endpointDiscoveryEnabled,
preferredAvailableWriteEndpoints,
preferredAvailableReadEndpoints,
writeLocationIndex > 0);
readLocationIndex > 0 && !currentReadEndpoints.get(0).equals(DefaultEndpoint),
writeLocationIndex > 0,
currentReadEndpoints.size() > 1,
currentWriteEndpoints.size() > 1
);

this.validateGlobalEndpointLocationCacheRefreshAsync();

Expand All @@ -277,14 +281,17 @@ private void validateEndpointRefresh(
boolean endpointDiscoveryEnabled,
URL[] preferredAvailableWriteEndpoints,
URL[] preferredAvailableReadEndpoints,
boolean isFirstWriteEndpointUnavailable) {
boolean isFirstReadEndpointUnavailable,
boolean isFirstWriteEndpointUnavailable,
boolean hasMoreThanOneReadEndpoints,
boolean hasMoreThanOneWriteEndpoints) {

Utils.ValueHolder<Boolean> canRefreshInBackgroundHolder = new Utils.ValueHolder<>();
canRefreshInBackgroundHolder.v = false;

boolean shouldRefreshEndpoints = this.cache.shouldRefreshEndpoints(canRefreshInBackgroundHolder);

boolean isMostPreferredLocationUnavailableForRead = false;
boolean isMostPreferredLocationUnavailableForRead = isFirstReadEndpointUnavailable;
boolean isMostPreferredLocationUnavailableForWrite = useMultipleWriteLocations ?
false : isFirstWriteEndpointUnavailable;
if (this.preferredLocations.size() > 0) {
Expand Down Expand Up @@ -318,7 +325,11 @@ private void validateEndpointRefresh(
}

if (shouldRefreshEndpoints) {
assertThat(canRefreshInBackgroundHolder.v).isTrue();
if (isMostPreferredLocationUnavailableForRead) {
assertThat(canRefreshInBackgroundHolder.v).isEqualTo(hasMoreThanOneReadEndpoints);
} else if (isMostPreferredLocationUnavailableForWrite) {
assertThat(canRefreshInBackgroundHolder.v).isEqualTo(hasMoreThanOneWriteEndpoints);
}
}
}

Expand All @@ -330,7 +341,7 @@ private void validateGlobalEndpointLocationCacheRefreshAsync() throws Exception

mockedClient.reset();
List<Completable> list = IntStream.range(0, 10)
.mapToObj(index -> this.endpointManager.refreshLocationAsync(null))
.mapToObj(index -> this.endpointManager.refreshLocationAsync(null, false))
.collect(Collectors.toList());

Completable.merge(list).await();
Expand All @@ -339,7 +350,7 @@ private void validateGlobalEndpointLocationCacheRefreshAsync() throws Exception
mockedClient.reset();

IntStream.range(0, 10)
.mapToObj(index -> this.endpointManager.refreshLocationAsync(null))
.mapToObj(index -> this.endpointManager.refreshLocationAsync(null, false))
.collect(Collectors.toList());
for (Completable completable : list) {
completable.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void networkFailureOnRead() throws Exception {
RetryOptions retryOptions = new RetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null));
Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions);

Exception exception = ReadTimeoutException.INSTANCE;
Expand Down Expand Up @@ -75,7 +75,7 @@ public void networkFailureOnWrite() throws Exception {
RetryOptions retryOptions = new RetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null));
Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions);

Exception exception = ReadTimeoutException.INSTANCE;
Expand Down Expand Up @@ -103,7 +103,7 @@ public void networkFailureOnUpsert() throws Exception {
RetryOptions retryOptions = new RetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null));
Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions);

Exception exception = ReadTimeoutException.INSTANCE;
Expand Down Expand Up @@ -131,7 +131,7 @@ public void networkFailureOnDelete() throws Exception {
RetryOptions retryOptions = new RetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null));
Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions);

Exception exception = ReadTimeoutException.INSTANCE;
Expand Down Expand Up @@ -159,7 +159,7 @@ public void onBeforeSendRequestNotInvoked() {
RetryOptions retryOptions = new RetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);

Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null));
Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions);

Exception exception = ReadTimeoutException.INSTANCE;
Expand Down
Loading