Skip to content

Commit ed53372

Browse files
authored
Merge pull request #3 from jeet1995/thinclientEndpointDiscoveryRefactor
Adding ConsolidatedRegionalEndpoint.
2 parents 77e6b5e + b18feed commit ed53372

File tree

39 files changed

+738
-556
lines changed

39 files changed

+738
-556
lines changed

sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
2424
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUtils;
2525
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
26+
import com.azure.cosmos.implementation.routing.LocationCache;
2627
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
2728
import com.azure.cosmos.test.faultinjection.FaultInjectionCondition;
2829
import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionErrorResult;
@@ -447,6 +448,7 @@ private Mono<List<URI>> resolvePhysicalAddresses(
447448
null);
448449

449450
faultInjectionAddressRequest.requestContext.locationEndpointToRoute = regionEndpoint;
451+
faultInjectionAddressRequest.requestContext.consolidatedRegionalEndpointToRoute = new LocationCache.ConsolidatedRegionalEndpoint(regionEndpoint, null);
450452
faultInjectionAddressRequest.setPartitionKeyRangeIdentity(new PartitionKeyRangeIdentity(pkRangeId));
451453

452454
if (isWriteOnly) {

sdk/cosmos/azure-cosmos-tests/pom.xml

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -222,18 +222,6 @@ Licensed under the MIT License.
222222
<scope>test</scope>
223223
<version>2.17.2</version> <!-- {x-version-update;com.fasterxml.jackson.module:jackson-module-blackbird;external_dependency} -->
224224
</dependency>
225-
<dependency>
226-
<groupId>com.azure</groupId>
227-
<artifactId>azure-cosmos</artifactId>
228-
<version>4.67.0-beta.1</version>
229-
<scope>test</scope>
230-
</dependency>
231-
<dependency>
232-
<groupId>com.azure</groupId>
233-
<artifactId>azure-cosmos</artifactId>
234-
<version>4.67.0-beta.1</version>
235-
<scope>test</scope>
236-
</dependency>
237225
</dependencies>
238226

239227
<build>

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForCircuitBreakerTests.java

Lines changed: 74 additions & 72 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PartitionLevelCircuitBreakerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
2727
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyImpl;
2828
import com.azure.cosmos.implementation.guava25.base.Function;
29+
import com.azure.cosmos.implementation.routing.LocationCache;
2930
import com.azure.cosmos.models.CosmosBatch;
3031
import com.azure.cosmos.models.CosmosBatchResponse;
3132
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
@@ -60,7 +61,6 @@
6061
import reactor.core.publisher.Mono;
6162

6263
import java.lang.reflect.Field;
63-
import java.net.URI;
6464
import java.time.Duration;
6565
import java.util.ArrayList;
6666
import java.util.Collections;
@@ -5144,8 +5144,8 @@ private static double getEstimatedFailureCountSeenPerRegionPerPartitionKeyRange(
51445144
return 0d;
51455145
}
51465146

5147-
ConcurrentHashMap<URI, LocationSpecificHealthContext> locationEndpointToLocationSpecificContextForPartition
5148-
= (ConcurrentHashMap<URI, LocationSpecificHealthContext>) locationEndpointToLocationSpecificContextForPartitionField.get(partitionAndLocationSpecificUnavailabilityInfo);
5147+
ConcurrentHashMap<LocationCache.ConsolidatedRegionalEndpoint, LocationSpecificHealthContext> locationEndpointToLocationSpecificContextForPartition
5148+
= (ConcurrentHashMap<LocationCache.ConsolidatedRegionalEndpoint, LocationSpecificHealthContext>) locationEndpointToLocationSpecificContextForPartitionField.get(partitionAndLocationSpecificUnavailabilityInfo);
51495149

51505150
int count = 0;
51515151
boolean failuresExist = false;

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ProactiveConnectionManagementTest.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
2828
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
2929
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
30+
import com.azure.cosmos.implementation.routing.LocationCache;
3031
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
3132
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
3233
import com.azure.cosmos.models.CosmosContainerIdentity;
@@ -180,11 +181,13 @@ public void openConnectionsAndInitCachesWithContainer(ProactiveConnectionManagem
180181

181182
cosmosAsyncContainer.openConnectionsAndInitCaches(proactiveConnectionRegionCount).block();
182183

183-
UnmodifiableList<URI> readEndpoints =
184+
UnmodifiableList<LocationCache.ConsolidatedRegionalEndpoint> readEndpoints =
184185
globalEndpointManager.getReadEndpoints();
186+
185187
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
186188
0,
187-
Math.min(readEndpoints.size(),proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
189+
Math.min(readEndpoints.size(),proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
190+
.stream().map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint).collect(Collectors.toList());
188191

189192
Mono<CosmosAsyncContainer> asyncContainerMono = Mono.just(cosmosAsyncContainer);
190193

@@ -342,10 +345,14 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
342345
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
343346
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
344347
Set<String> endpoints = ConcurrentHashMap.newKeySet();
345-
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
348+
UnmodifiableList<LocationCache.ConsolidatedRegionalEndpoint> readEndpoints = globalEndpointManager.getReadEndpoints();
349+
346350
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
347351
0,
348-
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
352+
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
353+
.stream()
354+
.map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint)
355+
.collect(Collectors.toList());
349356

350357
Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
351358
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
@@ -488,10 +495,13 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
488495
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
489496
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
490497
Set<String> endpoints = ConcurrentHashMap.newKeySet();
491-
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
498+
UnmodifiableList<LocationCache.ConsolidatedRegionalEndpoint> readEndpoints = globalEndpointManager.getReadEndpoints();
492499
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
493500
0,
494-
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
501+
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
502+
.stream()
503+
.map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint)
504+
.collect(Collectors.toList());;
495505

496506
Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
497507
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
@@ -656,10 +666,13 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
656666
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
657667
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
658668
Set<String> endpoints = ConcurrentHashMap.newKeySet();
659-
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
669+
UnmodifiableList<LocationCache.ConsolidatedRegionalEndpoint> readEndpoints = globalEndpointManager.getReadEndpoints();
660670
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
661671
0,
662-
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
672+
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
673+
.stream()
674+
.map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint)
675+
.collect(Collectors.toList());;
663676

664677
Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
665678
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.azure.cosmos.ThrottlingRetryOptions;
99
import com.azure.cosmos.implementation.circuitBreaker.GlobalPartitionEndpointManagerForCircuitBreaker;
1010
import com.azure.cosmos.implementation.directconnectivity.ChannelAcquisitionException;
11+
import com.azure.cosmos.implementation.routing.LocationCache;
1112
import io.netty.handler.timeout.ReadTimeoutException;
1213
import io.reactivex.subscribers.TestSubscriber;
1314
import org.mockito.Mockito;
@@ -65,7 +66,7 @@ public void networkFailureOnRead() throws Exception {
6566
ThrottlingRetryOptions throttlingRetryOptions = new ThrottlingRetryOptions();
6667
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
6768
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
68-
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
69+
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
6970
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
7071
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);
7172

@@ -106,7 +107,7 @@ public void shouldRetryOnGatewayTimeout(
106107
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
107108
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
108109

109-
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
110+
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
110111
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(true));
111112
ClientRetryPolicy clientRetryPolicy =
112113
new ClientRetryPolicy(
@@ -149,7 +150,7 @@ public void tcpNetworkFailureOnRead() throws Exception {
149150
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
150151
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
151152

152-
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
153+
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
153154
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
154155
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
155156
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
@@ -197,7 +198,7 @@ public void networkFailureOnWrite() throws Exception {
197198
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
198199
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
199200

200-
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
201+
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
201202
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
202203
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);
203204

@@ -232,7 +233,7 @@ public void tcpNetworkFailureOnWrite(
232233
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
233234
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
234235

235-
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
236+
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
236237
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
237238
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
238239
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
@@ -292,7 +293,7 @@ public void networkFailureOnUpsert() throws Exception {
292293
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
293294
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
294295

295-
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
296+
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
296297
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
297298
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);
298299

@@ -325,7 +326,7 @@ public void tcpNetworkFailureOnUpsert() throws Exception {
325326
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
326327
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
327328

328-
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
329+
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
329330
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
330331
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
331332
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
@@ -361,7 +362,7 @@ public void networkFailureOnDelete() throws Exception {
361362
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
362363
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
363364

364-
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
365+
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
365366
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
366367
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);
367368

@@ -395,7 +396,7 @@ public void tcpNetworkFailureOnDelete() throws Exception {
395396
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
396397
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
397398

398-
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
399+
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
399400
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
400401
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
401402
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ConfigsTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
import com.azure.cosmos.implementation.clienttelemetry.MetricCategory;
77
import com.azure.cosmos.implementation.clienttelemetry.TagName;
88
import com.azure.cosmos.implementation.directconnectivity.Protocol;
9-
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
10-
import io.netty.handler.ssl.SslContext;
119
import org.testng.annotations.Test;
1210

1311
import java.net.URI;
@@ -170,12 +168,12 @@ public void http2MaxConcurrentStreams() {
170168
@Test(groups = { "unit" })
171169
public void thinClientEnabledTest() {
172170
Configs config = new Configs();
173-
assertThat(config.getThinclientEnabled()).isFalse();
171+
assertThat(config.isThinClientEnabled()).isFalse();
174172

175173
System.clearProperty("COSMOS.THINCLIENT_ENABLED");
176174
System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
177175
try {
178-
assertThat(config.getThinclientEnabled()).isTrue();
176+
assertThat(config.isThinClientEnabled()).isTrue();
179177
} finally {
180178
System.clearProperty("COSMOS.THINCLIENT_ENABLED");
181179
}

0 commit comments

Comments
 (0)