diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java index 85456e2e7d3a..b9d2160a5c80 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java @@ -82,7 +82,7 @@ public Mono shouldRetry(Exception e) { Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN)) { logger.warn("Endpoint not writable. Will refresh cache and retry. {}", e.toString()); - return this.shouldRetryOnEndpointFailureAsync(false); + return this.shouldRetryOnEndpointFailureAsync(false, true); } // Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress) @@ -92,13 +92,13 @@ public Mono shouldRetry(Exception e) { this.isReadRequest) { logger.warn("Endpoint not available for reads. Will refresh cache and retry. {}", e.toString()); - return this.shouldRetryOnEndpointFailureAsync(true); + return this.shouldRetryOnEndpointFailureAsync(true, false); } // Received Connection error (HttpRequestException), initiate the endpoint rediscovery if (WebExceptionUtility.isNetworkFailure(e)) { logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString()); - return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest); + return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false); } if (clientException != null && @@ -141,7 +141,7 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable() { } } - private Mono shouldRetryOnEndpointFailureAsync(boolean isReadRequest) { + private Mono shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) { if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); return Mono.just(ShouldRetryResult.noRetry()); @@ -173,7 +173,7 @@ private Mono shouldRetryOnEndpointFailureAsync(boolean isRead retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS); } this.retryContext = new RetryContext(this.failoverRetryCount, false); - return this.globalEndpointManager.refreshLocationAsync(null) + return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh) .then(Mono.just(ShouldRetryResult.retryAfter(retryDelay))); } diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/GlobalEndpointManager.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/GlobalEndpointManager.java index 22643869cbaa..7592952b25f5 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/GlobalEndpointManager.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/GlobalEndpointManager.java @@ -41,6 +41,7 @@ public class GlobalEndpointManager implements AutoCloseable { private final ConnectionPolicy connectionPolicy; private final DatabaseAccountManagerInternal owner; private final AtomicBoolean isRefreshing; + private final AtomicBoolean refreshInBackground; private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Scheduler scheduler = Schedulers.fromExecutor(executor); private volatile boolean isClosed; @@ -63,6 +64,7 @@ public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPol this.connectionPolicy = connectionPolicy; this.isRefreshing = new AtomicBoolean(false); + this.refreshInBackground = new AtomicBoolean(false); this.isClosed = false; } catch (Exception e) { throw new IllegalArgumentException(e); @@ -129,9 +131,24 @@ public void close() { logger.debug("GlobalEndpointManager closed."); } - public Mono refreshLocationAsync(DatabaseAccount databaseAccount) { + public Mono refreshLocationAsync(DatabaseAccount databaseAccount, boolean forceRefresh) { return Mono.defer(() -> { logger.debug("refreshLocationAsync() invoked"); + + if (forceRefresh) { + Mono databaseAccountObs = getDatabaseAccountFromAnyLocationsAsync( + this.defaultEndpoint, + new ArrayList<>(this.connectionPolicy.preferredLocations()), + this::getDatabaseAccountAsync); + + return databaseAccountObs.map(dbAccount -> { + this.locationCache.onDatabaseAccountRead(dbAccount); + return dbAccount; + }).flatMap(dbAccount -> { + return Mono.empty(); + }); + } + if (!isRefreshing.compareAndSet(false, true)) { logger.debug("in the middle of another refresh. Not invoking a new refresh."); return Mono.empty(); @@ -164,17 +181,23 @@ private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) return databaseAccountObs.map(dbAccount -> { this.locationCache.onDatabaseAccountRead(dbAccount); + this.isRefreshing.set(false); return dbAccount; }).flatMap(dbAccount -> { // trigger a startRefreshLocationTimerAsync don't wait on it. - this.startRefreshLocationTimerAsync(); + if (!this.refreshInBackground.get()) { + this.startRefreshLocationTimerAsync(); + } return Mono.empty(); }); } // trigger a startRefreshLocationTimerAsync don't wait on it. - this.startRefreshLocationTimerAsync(); + if (!this.refreshInBackground.get()) { + this.startRefreshLocationTimerAsync(); + } + this.isRefreshing.set(false); return Mono.empty(); } else { logger.debug("shouldRefreshEndpoints: false, nothing to do."); @@ -201,6 +224,8 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { int delayInMillis = initialization ? 0: this.backgroundRefreshLocationTimeIntervalInMS; + this.refreshInBackground.set(true); + return Mono.delay(Duration.ofMillis(delayInMillis)) .flatMap( t -> { @@ -216,6 +241,7 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { return databaseAccountObs.flatMap(dbAccount -> { logger.debug("db account retrieved"); + this.refreshInBackground.set(false); return this.refreshLocationPrivateAsync(dbAccount); }); }).onErrorResume(ex -> { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxDocumentClientImpl.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxDocumentClientImpl.java index 3e0bcc0d1506..1d4617d6cd5f 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxDocumentClientImpl.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RxDocumentClientImpl.java @@ -248,7 +248,7 @@ private void initializeGatewayConfigurationReader() { // TODO: add support for openAsync // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589 - this.globalEndpointManager.refreshLocationAsync(databaseAccount).block(); + this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).block(); } public void init() { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/LocationCache.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/LocationCache.java index ba0596c8e5f5..b6b3ecb5ae56 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/LocationCache.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/routing/LocationCache.java @@ -184,9 +184,20 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBackground) if (this.enableEndpointDiscovery) { boolean shouldRefresh = this.useMultipleWriteLocations && !this.enableMultipleWriteLocations; + List readLocationEndpoints = currentLocationInfo.readEndpoints; + if (this.isEndpointUnavailable(readLocationEndpoints.get(0), OperationType.Read)) { + // Since most preferred read endpoint is unavailable, we can only refresh in background if + // we have an alternate read endpoint + canRefreshInBackground.v = anyEndpointsAvailable(readLocationEndpoints,OperationType.Read); + logger.debug("shouldRefreshEndpoints = true, since the first read endpoint " + + "[{}] is not available for read. canRefreshInBackground = [{}]", + readLocationEndpoints.get(0), + canRefreshInBackground.v); + return true; + } + if (!Strings.isNullOrEmpty(mostPreferredLocation)) { Utils.ValueHolder mostPreferredReadEndpointHolder = new Utils.ValueHolder<>(); - List readLocationEndpoints = currentLocationInfo.readEndpoints; logger.debug("getReadEndpoints [{}]", readLocationEndpoints); if (Utils.tryGetValue(currentLocationInfo.availableReadEndpointByLocation, mostPreferredLocation, mostPreferredReadEndpointHolder)) { @@ -218,7 +229,7 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBackground) if (this.isEndpointUnavailable(writeLocationEndpoints.get(0), OperationType.Write)) { // Since most preferred write endpoint is unavailable, we can only refresh in background if // we have an alternate write endpoint - canRefreshInBackground.v = writeLocationEndpoints.size() > 1; + canRefreshInBackground.v = anyEndpointsAvailable(writeLocationEndpoints,OperationType.Write); logger.debug("shouldRefreshEndpoints = true, most preferred location " + "[{}] endpoint [{}] is not available for write. canRefreshInBackground = [{}]", mostPreferredLocation, @@ -305,6 +316,18 @@ private boolean isEndpointUnavailable(URL endpoint, OperationType expectedAvaila } } + private boolean anyEndpointsAvailable(List endpoints, OperationType expectedAvailableOperations) { + Utils.ValueHolder unavailabilityInfoHolder = new Utils.ValueHolder<>(); + boolean anyEndpointsAvailable = false; + for (URL endpoint : endpoints) { + if (!isEndpointUnavailable(endpoint, expectedAvailableOperations)) { + anyEndpointsAvailable = true; + break; + } + } + return anyEndpointsAvailable; + } + private void markEndpointUnavailable( URL unavailableEndpoint, OperationType unavailableOperationType) { diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ClientRetryPolicyTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ClientRetryPolicyTest.java index 64ba5fe5feb4..e71dd73a27f6 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ClientRetryPolicyTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ClientRetryPolicyTest.java @@ -6,6 +6,7 @@ import com.azure.data.cosmos.RetryOptions; import io.netty.handler.timeout.ReadTimeoutException; import io.reactivex.subscribers.TestSubscriber; +import org.mockito.Matchers; import org.mockito.Mockito; import org.testng.annotations.Test; import reactor.core.publisher.Mono; @@ -22,7 +23,7 @@ public void networkFailureOnRead() throws Exception { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; @@ -52,7 +53,7 @@ public void networkFailureOnWrite() throws Exception { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; @@ -80,7 +81,7 @@ public void onBeforeSendRequestNotInvoked() { RetryOptions retryOptions = new RetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); Exception exception = ReadTimeoutException.INSTANCE; diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java index e5b551e2e401..08de0deb6f12 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/RenameCollectionAwareClientRetryPolicyTest.java @@ -22,7 +22,7 @@ public class RenameCollectionAwareClientRetryPolicyTest { @Test(groups = "unit", timeOut = TIMEOUT) public void onBeforeSendRequestNotInvoked() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.defaultPolicy()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); @@ -51,7 +51,7 @@ public void onBeforeSendRequestNotInvoked() { @Test(groups = "unit", timeOut = TIMEOUT) public void shouldRetryWithNotFoundStatusCode() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null),Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.defaultPolicy()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); @@ -77,7 +77,7 @@ public void shouldRetryWithNotFoundStatusCode() { @Test(groups = "unit", timeOut = TIMEOUT) public void shouldRetryWithNotFoundStatusCodeAndReadSessionNotAvailableSubStatusCode() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.defaultPolicy()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); @@ -114,7 +114,7 @@ public void shouldRetryWithNotFoundStatusCodeAndReadSessionNotAvailableSubStatus @Test(groups = "unit", timeOut = TIMEOUT) public void shouldRetryWithGenericException() { GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); IRetryPolicyFactory retryPolicyFactory = new RetryPolicy(endpointManager, ConnectionPolicy.defaultPolicy()); RxClientCollectionCache rxClientCollectionCache = Mockito.mock(RxClientCollectionCache.class); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GlobalEndPointManagerTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GlobalEndPointManagerTest.java new file mode 100644 index 000000000000..45ebb2f9ea3f --- /dev/null +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/directconnectivity/GlobalEndPointManagerTest.java @@ -0,0 +1,351 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.data.cosmos.internal.directconnectivity; + +import com.azure.data.cosmos.ConnectionPolicy; +import com.azure.data.cosmos.DatabaseAccount; +import com.azure.data.cosmos.internal.Configs; +import com.azure.data.cosmos.internal.DatabaseAccountManagerInternal; +import com.azure.data.cosmos.internal.GlobalEndpointManager; +import com.azure.data.cosmos.internal.routing.LocationCache; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; + +import java.lang.reflect.Field; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This test file will cover various scenarios of disaster recovery. + */ +public class GlobalEndPointManagerTest { + + protected static final int TIMEOUT = 6000000; + DatabaseAccountManagerInternal databaseAccountManagerInternal; + + private String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"}]," + + "\"readableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false," + + "\"minReplicaSetSize\":3,\"maxReplicasetSize\":4},\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3," + + "\"maxReplicasetSize\":4},\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144," + + "\\\"maxJoinsPerSqlQuery\\\":5,\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}\n"; + + + private String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure" + + ".com:443/\"}],\"readableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}]," + + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; + + private String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure" + + ".com:443/\"}],\"readableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure.com:443/\"}]," + + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; + + private String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"readableLocations\":[{\"name\":\"East US\"," + + "\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents" + + ".azure.com:443/\"}],\"enableMultipleWriteLocations\":true,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}\n" + + "0\n" + + "\n"; + + @BeforeClass(groups = "unit") + public void setup() throws Exception { + databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); + } + + /** + * Test for refresh location cache on connectivity issue with no preferred region + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void refreshLocationAsyncForConnectivityIssue() throws Exception { + GlobalEndpointManager globalEndPointManager = getGlobalEndPointManager(); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastus.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).block(); // Cache will be refreshed as there is no preferred active region remaining + LocationCache locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1, "ReadEnpoints should have 1 value"); + + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableReadEndpointByLocation.size(), 1); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastasia.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).block();// Cache will be refreshed as there is no preferred active region remaining + locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + + availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("West US")); + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + } + + /** + * Test for refresh location cache in background on network failure, + * switching to different preferredLocation region + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void refreshLocationAsyncForConnectivityIssueWithPreferredLocation() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.enableEndpointDiscovery(true); + List preferredLocation = new ArrayList<>(); + preferredLocation.add("East US"); + preferredLocation.add("East Asia"); + connectionPolicy.preferredLocations(preferredLocation); + connectionPolicy.usingMultipleWriteLocations(true); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson1); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + globalEndPointManager.init(); + + LocationCache locationCache = getLocationCache(globalEndPointManager); + databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastus.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).block(); // Refreshing location cache due to region outage, moving from East US to East Asia + + locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 2); //Cache will not refresh immediately, other preferred region East Asia is still active + + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableReadEndpointByLocation.size(), 2); + Assert.assertTrue(availableReadEndpointByLocation.keySet().iterator().next().equalsIgnoreCase("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForRead(new URL(("https://testaccount-eastasia.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, false).block();// Making eastasia unavailable + + locationCache = this.getLocationCache(globalEndPointManager); + + availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("West US"));// Cache will be refreshed as both the preferred region are unavailable now + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + } + + /** + * Test for refresh location cache on write forbidden + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void refreshLocationAsyncForWriteForbidden() throws Exception { + GlobalEndpointManager globalEndPointManager = getGlobalEndPointManager(); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForWrite(new URL(("https://testaccount-eastus.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, true).block(); // Refreshing location cache due to write forbidden, moving from East US to East Asia + + LocationCache locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + + Map availableWriteEndpointByLocation = this.getAvailableWriteEndpointByLocation(locationCache); + Assert.assertTrue(availableWriteEndpointByLocation.keySet().contains("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + globalEndPointManager.markEndpointUnavailableForWrite(new URL(("https://testaccount-eastasia.documents.azure.com:443/"))); + globalEndPointManager.refreshLocationAsync(null, true).block();// Refreshing location cache due to write forbidden, moving from East Asia to West US + + locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + + availableWriteEndpointByLocation = this.getAvailableWriteEndpointByLocation(locationCache); + Assert.assertTrue(availableWriteEndpointByLocation.keySet().contains("West US")); + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + isRefreshInBackground = this.getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + Assert.assertTrue(isRefreshInBackground.get()); + } + + /** + * Test for background refresh disable for multimaster + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void backgroundRefreshForMultiMaster() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.enableEndpointDiscovery(true); + connectionPolicy.usingMultipleWriteLocations(true); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson4); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + globalEndPointManager.init(); + + AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager); + Assert.assertFalse(isRefreshInBackground.get()); + } + + /** + * Test for background refresh cycle, + */ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void startRefreshLocationTimerAsync() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.enableEndpointDiscovery(true); + connectionPolicy.usingMultipleWriteLocations(true); + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson1); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + setBackgroundRefreshLocationTimeIntervalInMS(globalEndPointManager, 1000); + globalEndPointManager.init(); + + databaseAccount = new DatabaseAccount(dbAccountJson2); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Thread.sleep(2000); + + LocationCache locationCache = this.getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1); + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableReadEndpointByLocation.size(), 1); + Assert.assertTrue(availableReadEndpointByLocation.keySet().iterator().next().equalsIgnoreCase("East Asia")); + + AtomicBoolean isRefreshing = getIsRefreshing(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + + databaseAccount = new DatabaseAccount(dbAccountJson3); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Thread.sleep(2000); + locationCache = this.getLocationCache(globalEndPointManager); + + availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("West US")); + + isRefreshing = this.getIsRefreshing(globalEndPointManager); + Assert.assertFalse(isRefreshing.get()); + } + + private LocationCache getLocationCache(GlobalEndpointManager globalEndPointManager) throws Exception { + Field locationCacheField = GlobalEndpointManager.class.getDeclaredField("locationCache"); + locationCacheField.setAccessible(true); + LocationCache locationCache = (LocationCache) locationCacheField.get(globalEndPointManager); + return locationCache; + } + + private Map getAvailableWriteEndpointByLocation(LocationCache locationCache) throws Exception { + Field locationInfoField = LocationCache.class.getDeclaredField("locationInfo"); + locationInfoField.setAccessible(true); + Object locationInfo = locationInfoField.get(locationCache); + + Class DatabaseAccountLocationsInfoClass = Class.forName("com.azure.data.cosmos.internal.routing.LocationCache$DatabaseAccountLocationsInfo"); + Field availableWriteEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableWriteEndpointByLocation"); + availableWriteEndpointByLocationField.setAccessible(true); + Field availableReadEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableReadEndpointByLocation"); + availableReadEndpointByLocationField.setAccessible(true); + + return (Map) availableWriteEndpointByLocationField.get(locationInfo); + } + + private Map getAvailableReadEndpointByLocation(LocationCache locationCache) throws Exception { + Field locationInfoField = LocationCache.class.getDeclaredField("locationInfo"); + locationInfoField.setAccessible(true); + Object locationInfo = locationInfoField.get(locationCache); + + Class DatabaseAccountLocationsInfoClass = Class.forName("com.azure.data.cosmos.internal.routing.LocationCache$DatabaseAccountLocationsInfo"); + Field availableReadEndpointByLocationField = DatabaseAccountLocationsInfoClass.getDeclaredField("availableReadEndpointByLocation"); + availableReadEndpointByLocationField.setAccessible(true); + + return (Map) availableReadEndpointByLocationField.get(locationInfo); + } + + private AtomicBoolean getIsRefreshing(GlobalEndpointManager globalEndPointManager) throws Exception { + Field isRefreshingField = GlobalEndpointManager.class.getDeclaredField("isRefreshing"); + isRefreshingField.setAccessible(true); + AtomicBoolean isRefreshing = (AtomicBoolean) isRefreshingField.get(globalEndPointManager); + return isRefreshing; + } + + private AtomicBoolean getRefreshInBackground(GlobalEndpointManager globalEndPointManager) throws Exception { + Field isRefreshInBackgroundField = GlobalEndpointManager.class.getDeclaredField("refreshInBackground"); + isRefreshInBackgroundField.setAccessible(true); + AtomicBoolean isRefreshInBackground = (AtomicBoolean) isRefreshInBackgroundField.get(globalEndPointManager); + return isRefreshInBackground; + } + + private void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec) throws Exception { + Field backgroundRefreshLocationTimeIntervalInMSField = GlobalEndpointManager.class.getDeclaredField("backgroundRefreshLocationTimeIntervalInMS"); + backgroundRefreshLocationTimeIntervalInMSField.setAccessible(true); + backgroundRefreshLocationTimeIntervalInMSField.setInt(globalEndPointManager, millSec); + } + + private GlobalEndpointManager getGlobalEndPointManager() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.enableEndpointDiscovery(true); + connectionPolicy.usingMultipleWriteLocations(true); // currently without this proper, background refresh will not work + DatabaseAccount databaseAccount = new DatabaseAccount(dbAccountJson1); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(databaseAccount)); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443")); + GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + globalEndPointManager.init(); + + LocationCache locationCache = getLocationCache(globalEndPointManager); + Assert.assertEquals(locationCache.getReadEndpoints().size(), 1, "ReadEnpoints should have 1 value"); + + Map availableWriteEndpointByLocation = this.getAvailableWriteEndpointByLocation(locationCache); + Map availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache); + Assert.assertEquals(availableWriteEndpointByLocation.size(), 1); + Assert.assertEquals(availableReadEndpointByLocation.size(), 2); + Assert.assertTrue(availableWriteEndpointByLocation.keySet().contains("East US")); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("East US")); + Assert.assertTrue(availableReadEndpointByLocation.keySet().contains("East Asia")); + return globalEndPointManager; + } +} diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/LocationCacheTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/LocationCacheTest.java index a6f2d7953f4d..f019d2c8cd96 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/LocationCacheTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/routing/LocationCacheTest.java @@ -227,7 +227,11 @@ private void validateLocationCacheAsync( endpointDiscoveryEnabled, preferredAvailableWriteEndpoints, preferredAvailableReadEndpoints, - writeLocationIndex > 0); + readLocationIndex > 0 && !currentReadEndpoints.get(0).equals(DefaultEndpoint), + writeLocationIndex > 0, + currentReadEndpoints.size() > 1, + currentWriteEndpoints.size() > 1 + ); this.validateGlobalEndpointLocationCacheRefreshAsync(); @@ -252,14 +256,17 @@ private void validateEndpointRefresh( boolean endpointDiscoveryEnabled, URL[] preferredAvailableWriteEndpoints, URL[] preferredAvailableReadEndpoints, - boolean isFirstWriteEndpointUnavailable) { + boolean isFirstReadEndpointUnavailable, + boolean isFirstWriteEndpointUnavailable, + boolean hasMoreThanOneReadEndpoints, + boolean hasMoreThanOneWriteEndpoints) { Utils.ValueHolder canRefreshInBackgroundHolder = new Utils.ValueHolder<>(); canRefreshInBackgroundHolder.v = false; boolean shouldRefreshEndpoints = this.cache.shouldRefreshEndpoints(canRefreshInBackgroundHolder); - boolean isMostPreferredLocationUnavailableForRead = false; + boolean isMostPreferredLocationUnavailableForRead = isFirstReadEndpointUnavailable; boolean isMostPreferredLocationUnavailableForWrite = useMultipleWriteLocations ? false : isFirstWriteEndpointUnavailable; if (this.preferredLocations.size() > 0) { @@ -293,7 +300,11 @@ private void validateEndpointRefresh( } if (shouldRefreshEndpoints) { - assertThat(canRefreshInBackgroundHolder.v).isTrue(); + if (isMostPreferredLocationUnavailableForRead) { + assertThat(canRefreshInBackgroundHolder.v).isEqualTo(hasMoreThanOneReadEndpoints); + } else if (isMostPreferredLocationUnavailableForWrite) { + assertThat(canRefreshInBackgroundHolder.v).isEqualTo(hasMoreThanOneWriteEndpoints); + } } } @@ -305,7 +316,7 @@ private void validateGlobalEndpointLocationCacheRefreshAsync() throws Exception mockedClient.reset(); List> list = IntStream.range(0, 10) - .mapToObj(index -> this.endpointManager.refreshLocationAsync(null)) + .mapToObj(index -> this.endpointManager.refreshLocationAsync(null, false)) .collect(Collectors.toList()); Flux.merge(list).then().block(); @@ -314,7 +325,7 @@ private void validateGlobalEndpointLocationCacheRefreshAsync() throws Exception mockedClient.reset(); IntStream.range(0, 10) - .mapToObj(index -> this.endpointManager.refreshLocationAsync(null)) + .mapToObj(index -> this.endpointManager.refreshLocationAsync(null, false)) .collect(Collectors.toList()); for (Mono completable : list) { completable.block(); diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/CollectionCrudTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/CollectionCrudTest.java index ddd5157eb6fb..215c3889c6ff 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/CollectionCrudTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/rx/CollectionCrudTest.java @@ -35,6 +35,7 @@ import reactor.core.publisher.Mono; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -83,7 +84,7 @@ private CosmosContainerProperties getCollectionDefinition(String collectionName) @Test(groups = { "emulator" }, timeOut = TIMEOUT, dataProvider = "collectionCrudArgProvider") public void createCollection(String collectionName) throws InterruptedException { CosmosContainerProperties collectionDefinition = getCollectionDefinition(collectionName); - + Mono createObservable = database .createContainer(collectionDefinition); @@ -146,14 +147,12 @@ public void createCollectionWithCompositeIndexAndSpatialSpec() throws Interrupte }; List spatialIndexes = new ArrayList(); for (int index = 0; index < 2; index++) { - List collectionOfSpatialTypes = new ArrayList(); SpatialSpec spec = new SpatialSpec(); spec.path("/path" + index + "/*"); - for (int i = index; i < index + 3; i++) { - collectionOfSpatialTypes.add(spatialTypes[i]); - } + List collectionOfSpatialTypes = new ArrayList(Arrays.asList(spatialTypes).subList(0, index + 3)); + spec.spatialTypes(collectionOfSpatialTypes); spatialIndexes.add(spec); } @@ -161,7 +160,7 @@ public void createCollectionWithCompositeIndexAndSpatialSpec() throws Interrupte indexingPolicy.spatialIndexes(spatialIndexes); collection.indexingPolicy(indexingPolicy); - + Mono createObservable = database .createContainer(collection, new CosmosContainerRequestOptions()); @@ -178,7 +177,7 @@ public void createCollectionWithCompositeIndexAndSpatialSpec() throws Interrupte @Test(groups = { "emulator" }, timeOut = TIMEOUT, dataProvider = "collectionCrudArgProvider") public void readCollection(String collectionName) throws InterruptedException { CosmosContainerProperties collectionDefinition = getCollectionDefinition(collectionName); - + Mono createObservable = database.createContainer(collectionDefinition); CosmosContainer collection = createObservable.block().container(); @@ -203,7 +202,7 @@ public void readCollection_DoesntExist(String collectionName) throws Exception { @Test(groups = { "emulator" }, timeOut = TIMEOUT, dataProvider = "collectionCrudArgProvider") public void deleteCollection(String collectionName) throws InterruptedException { CosmosContainerProperties collectionDefinition = getCollectionDefinition(collectionName); - + Mono createObservable = database.createContainer(collectionDefinition); CosmosContainer collection = createObservable.block().container(); @@ -223,13 +222,13 @@ public void replaceCollection(String collectionName) throws InterruptedException CosmosContainerProperties collectionSettings = collection.read().block().properties(); // sanity check assertThat(collectionSettings.indexingPolicy().indexingMode()).isEqualTo(IndexingMode.CONSISTENT); - + // replace indexing mode IndexingPolicy indexingMode = new IndexingPolicy(); indexingMode.indexingMode(IndexingMode.LAZY); collectionSettings.indexingPolicy(indexingMode); Mono readObservable = collection.replace(collectionSettings, new CosmosContainerRequestOptions()); - + // validate CosmosResponseValidator validator = new CosmosResponseValidator.Builder() .indexingMode(IndexingMode.LAZY).build();