From 78b030cd555bfb2470ddfb2e44a50631d7e2d93c Mon Sep 17 00:00:00 2001 From: Naveen Kumar Singh Date: Mon, 11 Nov 2019 18:40:22 -0500 Subject: [PATCH 1/3] DR fix in V4 --- .../cosmos/internal/ClientRetryPolicy.java | 10 +- .../internal/GlobalEndpointManager.java | 32 +- .../cosmos/internal/RxDocumentClientImpl.java | 2 +- .../internal/routing/LocationCache.java | 27 +- .../internal/ClientRetryPolicyTest.java | 6 +- ...eCollectionAwareClientRetryPolicyTest.java | 8 +- .../GlobalEndPointManagerTest.java | 355 ++++++++++++++++++ .../internal/routing/LocationCacheTest.java | 23 +- 8 files changed, 439 insertions(+), 24 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/directconnectivity/GlobalEndPointManagerTest.java diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/ClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/ClientRetryPolicy.java index 545bfae20d1f..73c0214b3979 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/ClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/ClientRetryPolicy.java @@ -83,7 +83,7 @@ public Mono shouldRetry(Exception e) { Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN)) { logger.warn("Endpoint not writable. Will refresh cache and retry. {}", e.toString()); - return this.shouldRetryOnEndpointFailureAsync(false); + return this.shouldRetryOnEndpointFailureAsync(false, true); } // Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress) @@ -93,13 +93,13 @@ public Mono shouldRetry(Exception e) { this.isReadRequest) { logger.warn("Endpoint not available for reads. Will refresh cache and retry. {}", e.toString()); - return this.shouldRetryOnEndpointFailureAsync(true); + return this.shouldRetryOnEndpointFailureAsync(true, false); } // Received Connection error (HttpRequestException), initiate the endpoint rediscovery if (WebExceptionUtility.isNetworkFailure(e)) { logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString()); - return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest); + return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false); } if (clientException != null && @@ -142,7 +142,7 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable() { } } - private Mono shouldRetryOnEndpointFailureAsync(boolean isReadRequest) { + private Mono shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) { if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); return Mono.just(ShouldRetryResult.noRetry()); @@ -174,7 +174,7 @@ private Mono shouldRetryOnEndpointFailureAsync(boolean isRead retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS); } this.retryContext = new RetryContext(this.failoverRetryCount, false); - return this.globalEndpointManager.refreshLocationAsync(null) + return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh) .then(Mono.just(ShouldRetryResult.retryAfter(retryDelay))); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/GlobalEndpointManager.java index 5ee071a04398..3cd3ab70da87 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/GlobalEndpointManager.java @@ -43,6 +43,7 @@ public class GlobalEndpointManager implements AutoCloseable { private final ConnectionPolicy connectionPolicy; private final DatabaseAccountManagerInternal owner; private final AtomicBoolean isRefreshing; + private final AtomicBoolean refreshInBackground; private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Scheduler scheduler = Schedulers.fromExecutor(executor); private volatile boolean isClosed; @@ -65,6 +66,7 @@ public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPol this.connectionPolicy = connectionPolicy; this.isRefreshing = new AtomicBoolean(false); + this.refreshInBackground = new AtomicBoolean(false); this.isClosed = false; } catch (Exception e) { throw new IllegalArgumentException(e); @@ -131,9 +133,24 @@ public void close() { logger.debug("GlobalEndpointManager closed."); } - public Mono refreshLocationAsync(DatabaseAccount databaseAccount) { + public Mono refreshLocationAsync(DatabaseAccount databaseAccount, boolean forceRefresh) { return Mono.defer(() -> { logger.debug("refreshLocationAsync() invoked"); + + if (forceRefresh) { + Mono databaseAccountObs = getDatabaseAccountFromAnyLocationsAsync( + this.defaultEndpoint, + new ArrayList<>(this.connectionPolicy.getPreferredLocations()), + this::getDatabaseAccountAsync); + + return databaseAccountObs.map(dbAccount -> { + this.locationCache.onDatabaseAccountRead(dbAccount); + return dbAccount; + }).flatMap(dbAccount -> { + return Mono.empty(); + }); + } + if (!isRefreshing.compareAndSet(false, true)) { logger.debug("in the middle of another refresh. Not invoking a new refresh."); return Mono.empty(); @@ -166,17 +183,23 @@ private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) return databaseAccountObs.map(dbAccount -> { this.locationCache.onDatabaseAccountRead(dbAccount); + this.isRefreshing.set(false); return dbAccount; }).flatMap(dbAccount -> { // trigger a startRefreshLocationTimerAsync don't wait on it. - this.startRefreshLocationTimerAsync(); + if (!this.refreshInBackground.get()) { + this.startRefreshLocationTimerAsync(); + } return Mono.empty(); }); } // trigger a startRefreshLocationTimerAsync don't wait on it. - this.startRefreshLocationTimerAsync(); + if (!this.refreshInBackground.get()) { + this.startRefreshLocationTimerAsync(); + } + this.isRefreshing.set(false); return Mono.empty(); } else { logger.debug("shouldRefreshEndpoints: false, nothing to do."); @@ -203,6 +226,8 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { int delayInMillis = initialization ? 0: this.backgroundRefreshLocationTimeIntervalInMS; + this.refreshInBackground.set(true); + return Mono.delay(Duration.ofMillis(delayInMillis)) .flatMap( t -> { @@ -218,6 +243,7 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { return databaseAccountObs.flatMap(dbAccount -> { logger.debug("db account retrieved"); + this.refreshInBackground.set(false); return this.refreshLocationPrivateAsync(dbAccount); }); }).onErrorResume(ex -> { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/RxDocumentClientImpl.java index e68e6cd580d6..1b2bb9109ef7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/RxDocumentClientImpl.java @@ -264,7 +264,7 @@ private void initializeGatewayConfigurationReader() { // TODO: add support for openAsync // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589 - this.globalEndpointManager.refreshLocationAsync(databaseAccount).block(); + this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).block(); } public void init() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/routing/LocationCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/routing/LocationCache.java index d4f4064e4cbd..e3dacbd5a688 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/routing/LocationCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/internal/routing/LocationCache.java @@ -185,9 +185,20 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBackground) if (this.enableEndpointDiscovery) { boolean shouldRefresh = this.useMultipleWriteLocations && !this.enableMultipleWriteLocations; + List readLocationEndpoints = currentLocationInfo.readEndpoints; + if (this.isEndpointUnavailable(readLocationEndpoints.get(0), OperationType.Read)) { + // Since most preferred read endpoint is unavailable, we can only refresh in background if + // we have an alternate read endpoint + canRefreshInBackground.v = anyEndpointsAvailable(readLocationEndpoints,OperationType.Read); + logger.debug("shouldRefreshEndpoints = true, since the first read endpoint " + + "[{}] is not available for read. canRefreshInBackground = [{}]", + readLocationEndpoints.get(0), + canRefreshInBackground.v); + return true; + } + if (!Strings.isNullOrEmpty(mostPreferredLocation)) { Utils.ValueHolder mostPreferredReadEndpointHolder = new Utils.ValueHolder<>(); - List readLocationEndpoints = currentLocationInfo.readEndpoints; logger.debug("getReadEndpoints [{}]", readLocationEndpoints); if (Utils.tryGetValue(currentLocationInfo.availableReadEndpointByLocation, mostPreferredLocation, mostPreferredReadEndpointHolder)) { @@ -219,7 +230,7 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBackground) if (this.isEndpointUnavailable(writeLocationEndpoints.get(0), OperationType.Write)) { // Since most preferred write endpoint is unavailable, we can only refresh in background if // we have an alternate write endpoint - canRefreshInBackground.v = writeLocationEndpoints.size() > 1; + canRefreshInBackground.v = anyEndpointsAvailable(writeLocationEndpoints,OperationType.Write); logger.debug("shouldRefreshEndpoints = true, most preferred location " + "[{}] endpoint [{}] is not available for write. canRefreshInBackground = [{}]", mostPreferredLocation, @@ -306,6 +317,18 @@ private boolean isEndpointUnavailable(URL endpoint, OperationType expectedAvaila } } + private boolean anyEndpointsAvailable(List endpoints, OperationType expectedAvailableOperations) { + Utils.ValueHolder unavailabilityInfoHolder = new Utils.ValueHolder<>(); + boolean anyEndpointsAvailable = false; + for (URL endpoint : endpoints) { + if (!isEndpointUnavailable(endpoint, expectedAvailableOperations)) { + anyEndpointsAvailable = true; + break; + } + } + return anyEndpointsAvailable; + } + private void markEndpointUnavailable( URL unavailableEndpoint, OperationType unavailableOperationType) { diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/ClientRetryPolicyTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/ClientRetryPolicyTest.java index f9913f6f879b..c896cfd14d20 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/ClientRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/ClientRetryPolicyTest.java @@ -22,7 +22,7 @@ public void networkFailureOnRead() throws Exception { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; @@ -52,7 +52,7 @@ public void networkFailureOnWrite() throws Exception { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; @@ -80,7 +80,7 @@ public void onBeforeSendRequestNotInvoked() { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java index babb0176c91c..4d11ee183656 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java @@ -22,7 +22,7 @@ public class RenameCollectionAwareClientRetryPolicyTest { @Test(groups = "unit", timeOut = TIMEOUT) public void onBeforeSendRequestNotInvoked() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.getDefaultPolicy()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); @@ -51,7 +51,7 @@ public void onBeforeSendRequestNotInvoked() { @Test(groups = "unit", timeOut = TIMEOUT) public void shouldRetryWithNotFoundStatusCode() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null),Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.getDefaultPolicy()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); @@ -77,7 +77,7 @@ public void shouldRetryWithNotFoundStatusCode() { @Test(groups = "unit", timeOut = TIMEOUT) public void shouldRetryWithNotFoundStatusCodeAndReadSessionNotAvailableSubStatusCode() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.getDefaultPolicy()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); @@ -114,7 +114,7 @@ public void shouldRetryWithNotFoundStatusCodeAndReadSessionNotAvailableSubStatus @Test(groups = "unit", timeOut = TIMEOUT) public void shouldRetryWithGenericException() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.getDefaultPolicy()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/directconnectivity/GlobalEndPointManagerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/directconnectivity/GlobalEndPointManagerTest.java new file mode 100644 index 000000000000..9700e8c3e815 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/directconnectivity/GlobalEndPointManagerTest.java @@ -0,0 +1,355 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.internal.directconnectivity; + +import com.azure.cosmos.ConnectionPolicy; +import com.azure.cosmos.DatabaseAccount; +import com.azure.cosmos.internal.Configs; +import com.azure.cosmos.internal.DatabaseAccountManagerInternal; +import com.azure.cosmos.internal.GlobalEndpointManager; +import com.azure.cosmos.internal.routing.LocationCache; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; + +import java.lang.reflect.Field; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This test file will cover various scenarios of disaster recovery. + */ +public class GlobalEndPointManagerTest { + + protected static final int TIMEOUT = 6000000; + DatabaseAccountManagerInternal databaseAccountManagerInternal; + + private String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"}]," + + "\"readableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false," + + "\"minReplicaSetSize\":3,\"maxReplicasetSize\":4},\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3," + + "\"maxReplicasetSize\":4},\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144," + + "\\\"maxJoinsPerSqlQuery\\\":5,\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}\n"; + + + private String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure" + + ".com:443/\"}],\"readableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}]," + + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; + + private String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure" + + ".com:443/\"}],\"readableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure.com:443/\"}]," + + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; + + private String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"readableLocations\":[{\"name\":\"East US\"," + + "\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents" + + ".azure.com:443/\"}],\"enableMultipleWriteLocations\":true,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}\n" + + "0\n" + + "\n"; + + @BeforeClass(groups = "unit") + public void setup() throws Exception { + databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); + } + + /** + * Test for refresh location cache on connectivity issue with no preferred region + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void refreshLocationAsyncForConnectivityIssue() throws Exception { + GlobalEndpointManager globalEndPointManager = getGlobalEndPointManager(); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastus.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).block(); // Cache will be refreshed as there is no preferred active region remaining + LocationCache locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1, "ReadEnpoints should have 1 value"); + + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableReadEndpointByLocation.size(), 1); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastasia.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).block();// Cache will be refreshed as there is no preferred active region remaining + locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + + availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("West US")); + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + } + + /** + * Test for refresh location cache in background on network failure, + * switching to different preferredLocation region + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void refreshLocationAsyncForConnectivityIssueWithPreferredLocation() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.setEnableEndpointDiscovery(true); + List preferredLocation = new ArrayList<>(); + preferredLocation.add("East US"); + preferredLocation.add("East Asia"); + connectionPolicy.setPreferredLocations(preferredLocation); + connectionPolicy.setUsingMultipleWriteLocations(true); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson1); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + globalEndPointManager.init(); + + LocationCache locationCache = getLocationCache(globalEndPointManager); + databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastus.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).block(); // Refreshing location cache due to region outage, moving from East US to East Asia + + locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 2); //Cache will not refresh immediately, other preferred region East Asia is still active + + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableReadEndpointByLocation.size(), 2); + Assert.assertTrue(availableReadEndpointByLocation.keySet().iterator().next().equalsIgnoreCase("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastasia.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).block();// Making eastasia unavailable + + locationCache = this.getLocationCache(globalEndPointManager); + + availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("West US"));// Cache will be refreshed as both the preferred region are unavailable now + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + } + + /** + * Test for refresh location cache on write forbidden + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void refreshLocationAsyncForWriteForbidden() throws Exception { + GlobalEndpointManager globalEndPointManager = getGlobalEndPointManager(); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForWrite(new URL(("https://testaccount-eastus.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, true).block(); // Refreshing location cache due to write forbidden, moving from East US to East Asia + + LocationCache locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + + Map availableWriteEndpointByLocation = this.getAvailableWriteEndpointByLocation(locationCache); + Assert.assertTrue(availableWriteEndpointByLocation.keySet().contains("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForWrite(new URL(("https://testaccount-eastasia.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, true).block();// Refreshing location cache due to write forbidden, moving from East Asia to West US + + locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + + availableWriteEndpointByLocation = this.getAvailableWriteEndpointByLocation(locationCache); + Assert.assertTrue(availableWriteEndpointByLocation.keySet().contains("West US")); + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + } + + /** + * Test for background refresh disable for multimaster + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void backgroundRefreshForMultiMaster() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.setEnableEndpointDiscovery(true); + connectionPolicy.setUsingMultipleWriteLocations(true); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson4); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + globalEndPointManager.init(); + + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshInBackground.get()); + } + + /** + * Test for background refresh cycle, + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void startRefreshLocationTimerAsync() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.setEnableEndpointDiscovery(true); + connectionPolicy.setUsingMultipleWriteLocations(true); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson1); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + setBackgroundRefreshLocationTimeIntervalInMS(globalEndPointManager, 1000); + globalEndPointManager.init(); + + databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Thread.sleep(2000); + + LocationCache locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableReadEndpointByLocation.size(), 1); + Assert.assertTrue(availableReadEndpointByLocation.keySet().iterator().next().equalsIgnoreCase("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Thread.sleep(2000); + locationCache = this.getLocationCache(globalEndPointManager); + + availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("West US")); + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + } + + private LocationCache getLocationCache(GlobalEndpointManager globalEndPointManager) throws Exception { + Field locationCacheField = GlobalEndpointManager.class.getDeclaredField("locationCache"); + locationCacheField.setAccessible(true); + LocationCache locationCache = (LocationCache) locationCacheField.get(globalEndPointManager); + return locationCache; + } + + private Map getAvailableWriteEndpointByLocation(LocationCache locationCache) throws Exception { + Field locationInfoField = LocationCache.class.getDeclaredField("locationInfo"); + locationInfoField.setAccessible(true); + Object locationInfo = locationInfoField.get(locationCache); + + Class DatabaseAccountLocationsInfoClass = Class.forName("com.azure.cosmos.internal.routing.LocationCache$DatabaseAccountLocationsInfo"); + Field availableWriteEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableWriteEndpointByLocation"); + availableWriteEndpointByLocationField.setAccessible(true); + Field availableReadEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableReadEndpointByLocation"); + availableReadEndpointByLocationField.setAccessible(true); + + return (Map) availableWriteEndpointByLocationField.get(locationInfo); + } + + private Map getAvailableReadEndpointByLocation(LocationCache locationCache) throws Exception { + Field locationInfoField = LocationCache.class.getDeclaredField("locationInfo"); + locationInfoField.setAccessible(true); + Object locationInfo = locationInfoField.get(locationCache); + + Class DatabaseAccountLocationsInfoClass = Class.forName("com.azure.cosmos.internal.routing.LocationCache$DatabaseAccountLocationsInfo"); + Field availableReadEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableReadEndpointByLocation"); + availableReadEndpointByLocationField.setAccessible(true); + + return (Map) availableReadEndpointByLocationField.get(locationInfo); + } + + private AtomicBoolean getIsRefreshing(GlobalEndpointManager globalEndPointManager) throws Exception { + Field isRefreshingField = GlobalEndpointManager.class.getDeclaredField("isRefreshing"); + isRefreshingField.setAccessible(true); + AtomicBoolean isRefreshing = (AtomicBoolean) isRefreshingField.get(globalEndPointManager); + return isRefreshing; + } + + private AtomicBoolean getRefreshInBackground(GlobalEndpointManager globalEndPointManager) throws Exception { + Field isRefreshInBackgroundField = GlobalEndpointManager.class.getDeclaredField("refreshInBackground"); + isRefreshInBackgroundField.setAccessible(true); + AtomicBoolean isRefreshInBackground = (AtomicBoolean) isRefreshInBackgroundField.get(globalEndPointManager); + return isRefreshInBackground; + } + + private void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec) throws Exception { + Field backgroundRefreshLocationTimeIntervalInMSField = GlobalEndpointManager.class.getDeclaredField("backgroundRefreshLocationTimeIntervalInMS"); + backgroundRefreshLocationTimeIntervalInMSField.setAccessible(true); + backgroundRefreshLocationTimeIntervalInMSField.setInt(globalEndPointManager, millSec); + } + + private GlobalEndpointManager getGlobalEndPointManager() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.setEnableEndpointDiscovery(true); + connectionPolicy.setUsingMultipleWriteLocations(true); // currently without this proper, background refresh will not work + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson1); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + globalEndPointManager.init(); + + LocationCache locationCache = getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1, "ReadEnpoints should have 1 value"); + + Map availableWriteEndpointByLocation = this.getAvailableWriteEndpointByLocation(locationCache); + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableWriteEndpointByLocation.size(), 1); + Assert.assertEquals(availableReadEndpointByLocation.size(), 2); + Assert.assertTrue(availableWriteEndpointByLocation.keySet().contains("East US")); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("East US")); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("East Asia")); + return globalEndPointManager; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/routing/LocationCacheTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/routing/LocationCacheTest.java index 3ca9e4b3c49b..2ef4e38eee00 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/routing/LocationCacheTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/internal/routing/LocationCacheTest.java @@ -232,7 +232,11 @@ private void validateLocationCacheAsync( endpointDiscoveryEnabled, preferredAvailableWriteEndpoints, preferredAvailableReadEndpoints, - writeLocationIndex > 0); + readLocationIndex > 0 && !currentReadEndpoints.get(0).equals(DefaultEndpoint), + writeLocationIndex > 0, + currentReadEndpoints.size() > 1, + currentWriteEndpoints.size() > 1 + ); this.validateGlobalEndpointLocationCacheRefreshAsync(); @@ -257,14 +261,17 @@ private void validateEndpointRefresh( boolean endpointDiscoveryEnabled, URL[] preferredAvailableWriteEndpoints, URL[] preferredAvailableReadEndpoints, - boolean isFirstWriteEndpointUnavailable) { + boolean isFirstReadEndpointUnavailable, + boolean isFirstWriteEndpointUnavailable, + boolean hasMoreThanOneReadEndpoints, + boolean hasMoreThanOneWriteEndpoints) { Utils.ValueHolder canRefreshInBackgroundHolder = new Utils.ValueHolder<>(); canRefreshInBackgroundHolder.v = false; boolean shouldRefreshEndpoints = this.cache.shouldRefreshEndpoints(canRefreshInBackgroundHolder); - boolean isMostPreferredLocationUnavailableForRead = false; + boolean isMostPreferredLocationUnavailableForRead = isFirstReadEndpointUnavailable; boolean isMostPreferredLocationUnavailableForWrite = useMultipleWriteLocations ? false : isFirstWriteEndpointUnavailable; if (this.preferredLocations.size() > 0) { @@ -298,7 +305,11 @@ private void validateEndpointRefresh( } if (shouldRefreshEndpoints) { - assertThat(canRefreshInBackgroundHolder.v).isTrue(); + if (isMostPreferredLocationUnavailableForRead) { + assertThat(canRefreshInBackgroundHolder.v).isEqualTo(hasMoreThanOneReadEndpoints); + } else if (isMostPreferredLocationUnavailableForWrite) { + assertThat(canRefreshInBackgroundHolder.v).isEqualTo(hasMoreThanOneWriteEndpoints); + } } } @@ -310,7 +321,7 @@ private void validateGlobalEndpointLocationCacheRefreshAsync() throws Exception mockedClient.reset(); List> list = IntStream.range(0, 10) - .mapToObj(index -> this.endpointManager.refreshLocationAsync(null)) + .mapToObj(index -> this.endpointManager.refreshLocationAsync(null, false)) .collect(Collectors.toList()); Flux.merge(list).then().block(); @@ -319,7 +330,7 @@ private void validateGlobalEndpointLocationCacheRefreshAsync() throws Exception mockedClient.reset(); IntStream.range(0, 10) - .mapToObj(index -> this.endpointManager.refreshLocationAsync(null)) + .mapToObj(index -> this.endpointManager.refreshLocationAsync(null, false)) .collect(Collectors.toList()); for (Mono completable : list) { completable.block(); From 869af40042e0cfe447f7b9dfe957f87379e7e494 Mon Sep 17 00:00:00 2001 From: Naveen Kumar Singh Date: Fri, 15 Nov 2019 15:45:35 -0500 Subject: [PATCH 2/3] fixing build failure occure due to recent renaming of packages --- .../GlobalEndPointManagerTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java index 9700e8c3e815..e84c6d9f6e17 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java @@ -1,14 +1,14 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -package com.azure.cosmos.internal.directconnectivity; +package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.DatabaseAccount; -import com.azure.cosmos.internal.Configs; -import com.azure.cosmos.internal.DatabaseAccountManagerInternal; -import com.azure.cosmos.internal.GlobalEndpointManager; -import com.azure.cosmos.internal.routing.LocationCache; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.DatabaseAccountManagerInternal; +import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.routing.LocationCache; import org.mockito.Matchers; import org.mockito.Mockito; import org.testng.Assert; @@ -289,7 +289,7 @@ private Map getAvailableWriteEndpointByLocation(LocationCache locat locationInfoField.setAccessible(true); Object locationInfo = locationInfoField.get(locationCache); - Class DatabaseAccountLocationsInfoClass = Class.forName("com.azure.cosmos.internal.routing.LocationCache$DatabaseAccountLocationsInfo"); + Class DatabaseAccountLocationsInfoClass = Class.forName("com.azure.cosmos.implementation.routing.LocationCache$DatabaseAccountLocationsInfo"); Field availableWriteEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableWriteEndpointByLocation"); availableWriteEndpointByLocationField.setAccessible(true); Field availableReadEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableReadEndpointByLocation"); @@ -303,7 +303,7 @@ private Map getAvailableReadEndpointByLocation(LocationCache locati locationInfoField.setAccessible(true); Object locationInfo = locationInfoField.get(locationCache); - Class DatabaseAccountLocationsInfoClass = Class.forName("com.azure.cosmos.internal.routing.LocationCache$DatabaseAccountLocationsInfo"); + Class DatabaseAccountLocationsInfoClass = Class.forName("com.azure.cosmos.implementation.routing.LocationCache$DatabaseAccountLocationsInfo"); Field availableReadEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableReadEndpointByLocation"); availableReadEndpointByLocationField.setAccessible(true); From 7edd90b9b4f1048c31eb1b59138a53d62a2dc3f6 Mon Sep 17 00:00:00 2001 From: Naveen Kumar Singh Date: Fri, 15 Nov 2019 17:27:52 -0500 Subject: [PATCH 3/3] removing flaky assert from the test --- .../directconnectivity/GlobalEndPointManagerTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java index e84c6d9f6e17..edeefae5a9a8 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java @@ -261,7 +261,6 @@ public void startRefreshLocationTimerAsync() throws Exception { AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); Assert.assertFalse(isRefreshing.get()); - Assert.assertTrue(isRefreshInBackground.get()); databaseAccount = new DatabaseAccount(dbAccountJson3); Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); @@ -274,7 +273,6 @@ public void startRefreshLocationTimerAsync() throws Exception { isRefreshing = this.getIsRefreshing(globalEndPointManager); isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); Assert.assertFalse(isRefreshing.get()); - Assert.assertTrue(isRefreshInBackground.get()); } private LocationCache getLocationCache(GlobalEndpointManager globalEndPointManager) throws Exception {