diff --git a/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GlobalEndPointManagerTest.java b/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GlobalEndPointManagerTest.java new file mode 100644 index 000000000..b115cc570 --- /dev/null +++ b/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GlobalEndPointManagerTest.java @@ -0,0 +1,351 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.cosmosdb.internal.directconnectivity; + +import com.microsoft.azure.cosmosdb.ConnectionPolicy; +import com.microsoft.azure.cosmosdb.DatabaseAccount; +import com.microsoft.azure.cosmosdb.DatabaseAccountManagerInternal; +import com.microsoft.azure.cosmosdb.internal.routing.LocationCache; +import com.microsoft.azure.cosmosdb.rx.internal.Configs; +import com.microsoft.azure.cosmosdb.rx.internal.GlobalEndpointManager; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import rx.Observable; + +import java.lang.reflect.Field; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This test file will cover various scenarios of disaster recovery. + */ +public class GlobalEndPointManagerTest { + + protected static final int TIMEOUT = 6000000; + DatabaseAccountManagerInternal databaseAccountManagerInternal; + + private String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"}]," + + "\"readableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false," + + "\"minReplicaSetSize\":3,\"maxReplicasetSize\":4},\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3," + + "\"maxReplicasetSize\":4},\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144," + + "\\\"maxJoinsPerSqlQuery\\\":5,\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}\n"; + + + private String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure" + + ".com:443/\"}],\"readableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}]," + + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; + + private String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure" + + ".com:443/\"}],\"readableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure.com:443/\"}]," + + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; + + private String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"readableLocations\":[{\"name\":\"East US\"," + + "\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents" + + ".azure.com:443/\"}],\"enableMultipleWriteLocations\":true,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}\n" + + "0\n" + + "\n"; + + @BeforeClass(groups = "unit") + public void setup() throws Exception { + databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); + } + + /** + * Test for refresh location cache on connectivity issue with no preferred region + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void refreshLocationAsyncForConnectivityIssue() throws Exception { + GlobalEndpointManager globalEndPointManager = getGlobalEndPointManager(); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastus.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).await(); // Cache will be refreshed as there is no preferred active region remaining + LocationCache locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1, "ReadEnpoints should have 1 value"); + + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableReadEndpointByLocation.size(), 1); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastasia.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).await();// Cache will be refreshed as there is no preferred active region remaining + locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + + availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("West US")); + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + } + + /** + * Test for refresh location cache in background on network failure, + * switching to different preferredLocation region + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void refreshLocationAsyncForConnectivityIssueWithPreferredLocation() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.setEnableEndpointDiscovery(true); + List preferredLocation = new ArrayList<>(); + preferredLocation.add("East US"); + preferredLocation.add("East Asia"); + connectionPolicy.setPreferredLocations(preferredLocation); + connectionPolicy.setUsingMultipleWriteLocations(true); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson1); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + globalEndPointManager.init(); + + LocationCache locationCache = getLocationCache(globalEndPointManager); + databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastus.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).await(); // Refreshing location cache due to region outage, moving from East US to East Asia + + locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 2); //Cache will not refresh immediately, other preferred region East Asia is still active + + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableReadEndpointByLocation.size(), 2); + Assert.assertTrue(availableReadEndpointByLocation.keySet().iterator().next().equalsIgnoreCase("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastasia.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).await();// Making eastasia unavailable + + locationCache = this.getLocationCache(globalEndPointManager); + + availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("West US"));// Cache will be refreshed as both the preferred region are unavailable now + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + } + + /** + * Test for refresh location cache on write forbidden + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void refreshLocationAsyncForWriteForbidden() throws Exception { + GlobalEndpointManager globalEndPointManager = getGlobalEndPointManager(); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForWrite(new URL(("https://testaccount-eastus.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, true).await(); // Refreshing location cache due to write forbidden, moving from East US to East Asia + + LocationCache locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + + Map availableWriteEndpointByLocation = this.getAvailableWriteEndpointByLocation(locationCache); + Assert.assertTrue(availableWriteEndpointByLocation.keySet().contains("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForWrite(new URL(("https://testaccount-eastasia.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, true).await();// Refreshing location cache due to write forbidden, moving from East Asia to West US + + locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + + availableWriteEndpointByLocation = this.getAvailableWriteEndpointByLocation(locationCache); + Assert.assertTrue(availableWriteEndpointByLocation.keySet().contains("West US")); + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + } + + /** + * Test for background refresh disable for multimaster + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void backgroundRefreshForMultiMaster() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.setEnableEndpointDiscovery(true); + connectionPolicy.setUsingMultipleWriteLocations(true); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson4); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + globalEndPointManager.init(); + + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshInBackground.get()); + } + + /** + * Test for background refresh cycle, + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void startRefreshLocationTimerAsync() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.setEnableEndpointDiscovery(true); + connectionPolicy.setUsingMultipleWriteLocations(true); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson1); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + setBackgroundRefreshLocationTimeIntervalInMS(globalEndPointManager, 1000); + globalEndPointManager.init(); + + databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + Thread.sleep(2000); + + LocationCache locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableReadEndpointByLocation.size(), 1); + Assert.assertTrue(availableReadEndpointByLocation.keySet().iterator().next().equalsIgnoreCase("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + Thread.sleep(2000); + locationCache = this.getLocationCache(globalEndPointManager); + + availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("West US")); + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + } + + private LocationCache getLocationCache(GlobalEndpointManager globalEndPointManager) throws Exception { + Field locationCacheField = GlobalEndpointManager.class.getDeclaredField("locationCache"); + locationCacheField.setAccessible(true); + LocationCache locationCache = (LocationCache) locationCacheField.get(globalEndPointManager); + return locationCache; + } + + private Map getAvailableWriteEndpointByLocation(LocationCache locationCache) throws Exception { + Field locationInfoField = LocationCache.class.getDeclaredField("locationInfo"); + locationInfoField.setAccessible(true); + Object locationInfo = locationInfoField.get(locationCache); + + Class DatabaseAccountLocationsInfoClass = Class.forName("com.microsoft.azure.cosmosdb.internal.routing.LocationCache$DatabaseAccountLocationsInfo"); + Field availableWriteEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableWriteEndpointByLocation"); + availableWriteEndpointByLocationField.setAccessible(true); + Field availableReadEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableReadEndpointByLocation"); + availableReadEndpointByLocationField.setAccessible(true); + + return (Map) availableWriteEndpointByLocationField.get(locationInfo); + } + + private Map getAvailableReadEndpointByLocation(LocationCache locationCache) throws Exception { + Field locationInfoField = LocationCache.class.getDeclaredField("locationInfo"); + locationInfoField.setAccessible(true); + Object locationInfo = locationInfoField.get(locationCache); + + Class DatabaseAccountLocationsInfoClass = Class.forName("com.microsoft.azure.cosmosdb.internal.routing.LocationCache$DatabaseAccountLocationsInfo"); + Field availableReadEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableReadEndpointByLocation"); + availableReadEndpointByLocationField.setAccessible(true); + + return (Map) availableReadEndpointByLocationField.get(locationInfo); + } + + private AtomicBoolean getIsRefreshing(GlobalEndpointManager globalEndPointManager) throws Exception { + Field isRefreshingField = GlobalEndpointManager.class.getDeclaredField("isRefreshing"); + isRefreshingField.setAccessible(true); + AtomicBoolean isRefreshing = (AtomicBoolean) isRefreshingField.get(globalEndPointManager); + return isRefreshing; + } + + private AtomicBoolean getRefreshInBackground(GlobalEndpointManager globalEndPointManager) throws Exception { + Field isRefreshInBackgroundField = GlobalEndpointManager.class.getDeclaredField("refreshInBackground"); + isRefreshInBackgroundField.setAccessible(true); + AtomicBoolean isRefreshInBackground = (AtomicBoolean) isRefreshInBackgroundField.get(globalEndPointManager); + return isRefreshInBackground; + } + + private void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec) throws Exception { + Field backgroundRefreshLocationTimeIntervalInMSField = GlobalEndpointManager.class.getDeclaredField("backgroundRefreshLocationTimeIntervalInMS"); + backgroundRefreshLocationTimeIntervalInMSField.setAccessible(true); + backgroundRefreshLocationTimeIntervalInMSField.setInt(globalEndPointManager, millSec); + } + + private GlobalEndpointManager getGlobalEndPointManager() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.setEnableEndpointDiscovery(true); + connectionPolicy.setUsingMultipleWriteLocations(true); // currently without this proper, background refresh will not work + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson1); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Observable.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + globalEndPointManager.init(); + + LocationCache locationCache = getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1, "ReadEnpoints should have 1 value"); + + Map availableWriteEndpointByLocation = this.getAvailableWriteEndpointByLocation(locationCache); + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableWriteEndpointByLocation.size(), 1); + Assert.assertEquals(availableReadEndpointByLocation.size(), 2); + Assert.assertTrue(availableWriteEndpointByLocation.keySet().contains("East US")); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("East US")); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("East Asia")); + return globalEndPointManager; + } +} diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/LocationCache.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/LocationCache.java index d73034352..71b1cd840 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/LocationCache.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/LocationCache.java @@ -203,9 +203,20 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBackground) if (this.enableEndpointDiscovery) { boolean shouldRefresh = this.useMultipleWriteLocations && !this.enableMultipleWriteLocations; + List readLocationEndpoints = currentLocationInfo.readEndpoints; + if (this.isEndpointUnavailable(readLocationEndpoints.get(0), OperationType.Read)) { + // Since most preferred read endpoint is unavailable, we can only refresh in background if + // we have an alternate read endpoint + canRefreshInBackground.v = anyEndpointsAvailable(readLocationEndpoints,OperationType.Read); + logger.debug("shouldRefreshEndpoints = true, since the first read endpoint " + + "[{}] is not available for read. canRefreshInBackground = [{}]", + readLocationEndpoints.get(0), + canRefreshInBackground.v); + return true; + } + if (!Strings.isNullOrEmpty(mostPreferredLocation)) { Utils.ValueHolder mostPreferredReadEndpointHolder = new Utils.ValueHolder<>(); - List readLocationEndpoints = currentLocationInfo.readEndpoints; logger.debug("getReadEndpoints [{}]", readLocationEndpoints); if (Utils.tryGetValue(currentLocationInfo.availableReadEndpointByLocation, mostPreferredLocation, mostPreferredReadEndpointHolder)) { @@ -237,7 +248,7 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBackground) if (this.isEndpointUnavailable(writeLocationEndpoints.get(0), OperationType.Write)) { // Since most preferred write endpoint is unavailable, we can only refresh in background if // we have an alternate write endpoint - canRefreshInBackground.v = writeLocationEndpoints.size() > 1; + canRefreshInBackground.v = anyEndpointsAvailable(writeLocationEndpoints,OperationType.Write); logger.debug("shouldRefreshEndpoints = true, most preferred location " + "[{}] endpoint [{}] is not available for write. canRefreshInBackground = [{}]", mostPreferredLocation, @@ -324,6 +335,18 @@ private boolean isEndpointUnavailable(URL endpoint, OperationType expectedAvaila } } + private boolean anyEndpointsAvailable(List endpoints, OperationType expectedAvailableOperations) { + Utils.ValueHolder unavailabilityInfoHolder = new Utils.ValueHolder<>(); + boolean anyEndpointsAvailable = false; + for (URL endpoint : endpoints) { + if (!isEndpointUnavailable(endpoint, expectedAvailableOperations)) { + anyEndpointsAvailable = true; + break; + } + } + return anyEndpointsAvailable; + } + private void markEndpointUnavailable( URL unavailableEndpoint, OperationType unavailableOperationType) { diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicy.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicy.java index 75efb0083..f5cc3b5a9 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicy.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicy.java @@ -102,7 +102,7 @@ public Single shouldRetry(Exception e) { // Received Connection error (HttpRequestException), initiate the endpoint rediscovery if (WebExceptionUtility.isNetworkFailure(e)) { logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString()); - return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, e); + return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false, e); } this.retryContext = null; @@ -116,7 +116,7 @@ public Single shouldRetry(Exception e) { Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN)) { logger.warn("Endpoint not writable. Will refresh cache and retry. {}", e.toString()); - return this.shouldRetryOnEndpointFailureAsync(false, e); + return this.shouldRetryOnEndpointFailureAsync(false, true, e); } // Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress) @@ -126,7 +126,13 @@ public Single shouldRetry(Exception e) { (this.isReadRequest || this.canUseMultipleWriteLocations)) { logger.warn("Endpoint not available for reads. Will refresh cache and retry. {}", e.toString()); - return this.shouldRetryOnEndpointFailureAsync(true, e); + return this.shouldRetryOnEndpointFailureAsync(true, false, e); + } + + // Received Connection error (HttpRequestException), initiate the endpoint rediscovery + if (WebExceptionUtility.isNetworkFailure(e)) { + logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString()); + return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false, e); } if (clientException != null && @@ -169,7 +175,7 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable() { } } - private Single shouldRetryOnEndpointFailureAsync(boolean isReadRequest, Exception e) { + private Single shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh, Exception e) { if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); return Single.just(ShouldRetryResult.noRetry()); @@ -202,7 +208,7 @@ private Single shouldRetryOnEndpointFailureAsync(boolean isRe } this.retryContext = new RetryContext(this.failoverRetryCount, false); - Completable refreshCompletion = this.globalEndpointManager.refreshLocationAsync(null); + Completable refreshCompletion = this.globalEndpointManager.refreshLocationAsync(null, forceRefresh); if (isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) { // refresh cache and diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/GlobalEndpointManager.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/GlobalEndpointManager.java index 6e304f39b..e8c52275b 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/GlobalEndpointManager.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/GlobalEndpointManager.java @@ -65,6 +65,7 @@ public class GlobalEndpointManager implements AutoCloseable { private final ConnectionPolicy connectionPolicy; private final DatabaseAccountManagerInternal owner; private final AtomicBoolean isRefreshing; + private final AtomicBoolean refreshInBackground; private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Scheduler scheduler = Schedulers.from(executor); private volatile boolean isClosed; @@ -87,6 +88,7 @@ public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPol this.connectionPolicy = connectionPolicy; this.isRefreshing = new AtomicBoolean(false); + this.refreshInBackground = new AtomicBoolean(false); this.isClosed = false; } catch (Exception e) { throw new IllegalArgumentException(e); @@ -155,9 +157,24 @@ public void close() { logger.debug("GlobalEndpointManager closed."); } - public Completable refreshLocationAsync(DatabaseAccount databaseAccount) { + public Completable refreshLocationAsync(DatabaseAccount databaseAccount, boolean forceRefresh) { return Completable.defer(() -> { logger.debug("refreshLocationAsync() invoked"); + + if (forceRefresh) { + Single databaseAccountObs = getDatabaseAccountFromAnyLocationsAsync( + this.defaultEndpoint, + new ArrayList<>(this.connectionPolicy.getPreferredLocations()), + this::getDatabaseAccountAsync); + + return databaseAccountObs.map(dbAccount -> { + this.locationCache.onDatabaseAccountRead(dbAccount); + return dbAccount; + }).flatMapCompletable(dbAccount -> { + return Completable.complete(); + }); + } + if (!isRefreshing.compareAndSet(false, true)) { logger.debug("in the middle of another refresh. Not invoking a new refresh."); return Completable.complete(); @@ -190,17 +207,23 @@ private Completable refreshLocationPrivateAsync(DatabaseAccount databaseAccount) return databaseAccountObs.map(dbAccount -> { this.locationCache.onDatabaseAccountRead(dbAccount); + this.isRefreshing.set(false); return dbAccount; }).flatMapCompletable(dbAccount -> { // trigger a startRefreshLocationTimerAsync don't wait on it. - this.startRefreshLocationTimerAsync(); + if (!this.refreshInBackground.get()) { + this.startRefreshLocationTimerAsync(); + } return Completable.complete(); }); } // trigger a startRefreshLocationTimerAsync don't wait on it. - this.startRefreshLocationTimerAsync(); + if (!this.refreshInBackground.get()) { + this.startRefreshLocationTimerAsync(); + } + this.isRefreshing.set(false); return Completable.complete(); } else { logger.debug("shouldRefreshEndpoints: false, nothing to do."); @@ -227,6 +250,8 @@ private Observable startRefreshLocationTimerAsync(boolean initialization) { int delayInMillis = initialization ? 0: this.backgroundRefreshLocationTimeIntervalInMS; + this.refreshInBackground.set(true); + return Observable.timer(delayInMillis, TimeUnit.MILLISECONDS) .toSingle().flatMapCompletable( t -> { @@ -242,6 +267,7 @@ private Observable startRefreshLocationTimerAsync(boolean initialization) { return databaseAccountObs.flatMapCompletable(dbAccount -> { logger.debug("db account retrieved"); + this.refreshInBackground.set(false); return this.refreshLocationPrivateAsync(dbAccount); }); }).onErrorResumeNext(ex -> { diff --git a/gateway/src/test/java/com/microsoft/azure/cosmosdb/internal/routing/LocationCacheTest.java b/gateway/src/test/java/com/microsoft/azure/cosmosdb/internal/routing/LocationCacheTest.java index bb2d519a7..a7063f344 100644 --- a/gateway/src/test/java/com/microsoft/azure/cosmosdb/internal/routing/LocationCacheTest.java +++ b/gateway/src/test/java/com/microsoft/azure/cosmosdb/internal/routing/LocationCacheTest.java @@ -252,7 +252,11 @@ private void validateLocationCacheAsync( endpointDiscoveryEnabled, preferredAvailableWriteEndpoints, preferredAvailableReadEndpoints, - writeLocationIndex > 0); + readLocationIndex > 0 && !currentReadEndpoints.get(0).equals(DefaultEndpoint), + writeLocationIndex > 0, + currentReadEndpoints.size() > 1, + currentWriteEndpoints.size() > 1 + ); this.validateGlobalEndpointLocationCacheRefreshAsync(); @@ -277,14 +281,17 @@ private void validateEndpointRefresh( boolean endpointDiscoveryEnabled, URL[] preferredAvailableWriteEndpoints, URL[] preferredAvailableReadEndpoints, - boolean isFirstWriteEndpointUnavailable) { + boolean isFirstReadEndpointUnavailable, + boolean isFirstWriteEndpointUnavailable, + boolean hasMoreThanOneReadEndpoints, + boolean hasMoreThanOneWriteEndpoints) { Utils.ValueHolder canRefreshInBackgroundHolder = new Utils.ValueHolder<>(); canRefreshInBackgroundHolder.v = false; boolean shouldRefreshEndpoints = this.cache.shouldRefreshEndpoints(canRefreshInBackgroundHolder); - boolean isMostPreferredLocationUnavailableForRead = false; + boolean isMostPreferredLocationUnavailableForRead = isFirstReadEndpointUnavailable; boolean isMostPreferredLocationUnavailableForWrite = useMultipleWriteLocations ? false : isFirstWriteEndpointUnavailable; if (this.preferredLocations.size() > 0) { @@ -318,7 +325,11 @@ private void validateEndpointRefresh( } if (shouldRefreshEndpoints) { - assertThat(canRefreshInBackgroundHolder.v).isTrue(); + if (isMostPreferredLocationUnavailableForRead) { + assertThat(canRefreshInBackgroundHolder.v).isEqualTo(hasMoreThanOneReadEndpoints); + } else if (isMostPreferredLocationUnavailableForWrite) { + assertThat(canRefreshInBackgroundHolder.v).isEqualTo(hasMoreThanOneWriteEndpoints); + } } } @@ -330,7 +341,7 @@ private void validateGlobalEndpointLocationCacheRefreshAsync() throws Exception mockedClient.reset(); List list = IntStream.range(0, 10) - .mapToObj(index -> this.endpointManager.refreshLocationAsync(null)) + .mapToObj(index -> this.endpointManager.refreshLocationAsync(null, false)) .collect(Collectors.toList()); Completable.merge(list).await(); @@ -339,7 +350,7 @@ private void validateGlobalEndpointLocationCacheRefreshAsync() throws Exception mockedClient.reset(); IntStream.range(0, 10) - .mapToObj(index -> this.endpointManager.refreshLocationAsync(null)) + .mapToObj(index -> this.endpointManager.refreshLocationAsync(null, false)) .collect(Collectors.toList()); for (Completable completable : list) { completable.await(); diff --git a/gateway/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicyTest.java b/gateway/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicyTest.java index 7f007c771..54dceeb48 100644 --- a/gateway/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicyTest.java +++ b/gateway/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ClientRetryPolicyTest.java @@ -45,7 +45,7 @@ public void networkFailureOnRead() throws Exception { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); - Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; @@ -75,7 +75,7 @@ public void networkFailureOnWrite() throws Exception { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); - Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; @@ -103,7 +103,7 @@ public void networkFailureOnUpsert() throws Exception { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); - Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; @@ -131,7 +131,7 @@ public void networkFailureOnDelete() throws Exception { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); - Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; @@ -159,7 +159,7 @@ public void onBeforeSendRequestNotInvoked() { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; diff --git a/gateway/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/RenameCollectionAwareClientRetryPolicyTest.java b/gateway/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/RenameCollectionAwareClientRetryPolicyTest.java index bfb362d7b..19955b6b8 100644 --- a/gateway/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/RenameCollectionAwareClientRetryPolicyTest.java +++ b/gateway/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/RenameCollectionAwareClientRetryPolicyTest.java @@ -46,7 +46,7 @@ public class RenameCollectionAwareClientRetryPolicyTest { @Test(groups = "unit", timeOut = TIMEOUT) public void onBeforeSendRequestNotInvoked() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.GetDefault()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); @@ -74,7 +74,7 @@ public void onBeforeSendRequestNotInvoked() { @Test(groups = "unit", timeOut = TIMEOUT) public void shouldRetryWithNotFoundStatusCode() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.GetDefault()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); @@ -100,7 +100,7 @@ public void shouldRetryWithNotFoundStatusCode() { @Test(groups = "unit", timeOut = TIMEOUT) public void shouldRetryWithNotFoundStatusCodeAndReadSessionNotAvailableSubStatusCode() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.GetDefault()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); @@ -137,7 +137,7 @@ public void shouldRetryWithNotFoundStatusCodeAndReadSessionNotAvailableSubStatus @Test(groups = "unit", timeOut = TIMEOUT) public void shouldRetryWithGenericException() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Completable.complete()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.GetDefault()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); diff --git a/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java b/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java index 0d90e8598..67d93b3e7 100644 --- a/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java +++ b/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java @@ -299,7 +299,7 @@ private void initializeGatewayConfigurationReader() { // TODO: add support for openAsync // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589 - this.globalEndpointManager.refreshLocationAsync(databaseAccount).await(); + this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).await(); } public void init() {