diff --git a/sdk/cosmos/azure-cosmos-test/CHANGELOG.md b/sdk/cosmos/azure-cosmos-test/CHANGELOG.md index 7a6c1ad5c486..838918e3fce2 100644 --- a/sdk/cosmos/azure-cosmos-test/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-test/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed an issue where `CONNECTION_DELAY` fault injection rule is not applied during `openConnectionsAndInitCaches` - See [PR 34096](https://github.com/Azure/azure-sdk-for-java/pull/34096) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-test/README.md b/sdk/cosmos/azure-cosmos-test/README.md index e67b00a5d955..42863fd9adfd 100644 --- a/sdk/cosmos/azure-cosmos-test/README.md +++ b/sdk/cosmos/azure-cosmos-test/README.md @@ -170,5 +170,3 @@ or contact [opencode@microsoft.com][coc_contact] with any additional questions o [troubleshooting]: https://docs.microsoft.com/azure/cosmos-db/troubleshoot-java-sdk-v4-sql [perf_guide]: https://docs.microsoft.com/azure/cosmos-db/performance-tips-java-sdk-v4-sql?tabs=api-async [quickstart]: https://docs.microsoft.com/azure/cosmos-db/create-sql-api-java?tabs=sync - -![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%2Fcosmos%2Fazure-cosmos-encryption%2FREADME.png) diff --git a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionConditionInternal.java b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionConditionInternal.java index 029199344e8d..bbcd61524099 100644 --- a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionConditionInternal.java +++ b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionConditionInternal.java @@ -51,11 +51,15 @@ public List getAddresses() { return physicalAddresses; } - public void setAddresses(List physicalAddresses) { + public void setAddresses(List physicalAddresses, boolean primaryOnly) { this.physicalAddresses = physicalAddresses; if (physicalAddresses != null && physicalAddresses.size() > 0) { this.validators.add(new AddressValidator(physicalAddresses)); } + + if (primaryOnly) { + this.validators.add(new PrimaryAddressValidator()); + } } public boolean isApplicable(RxDocumentServiceRequest request) { @@ -115,11 +119,18 @@ public boolean isApplicable(RxDocumentServiceRequest request) { && addresses.size() > 0) { return this.addresses .stream() - .anyMatch(address -> request.requestContext.storePhysicalAddress.toString().startsWith(address.toString())); + .anyMatch(address -> request.requestContext.storePhysicalAddressUri.getURIAsString().startsWith(address.toString())); } return true; } } + + static class PrimaryAddressValidator implements IFaultInjectionConditionValidator { + @Override + public boolean isApplicable(RxDocumentServiceRequest request) { + return request.requestContext.storePhysicalAddressUri.isPrimary(); + } + } //endregion } diff --git a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleProcessor.java b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleProcessor.java index 1a8beb8c46fe..7539b775ce10 100644 --- a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleProcessor.java +++ b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleProcessor.java @@ -168,11 +168,12 @@ private Mono getEffectiveServerErrorRule( // TODO: add handling for gateway mode // Direct connection mode, populate physical addresses + boolean primaryAddressesOnly = this.isWriteOnly(rule.getCondition()); return BackoffRetryUtility.executeRetry( () -> this.resolvePhysicalAddresses( regionEndpoints, rule.getCondition().getEndpoints(), - this.isWriteOnlyEndpoint(rule.getCondition()), + primaryAddressesOnly, documentCollection), new FaultInjectionRuleProcessorRetryPolicy(this.retryOptions) ) @@ -186,7 +187,7 @@ private Mono getEffectiveServerErrorRule( .collect(Collectors.toList()); } - effectiveCondition.setAddresses(effectiveAddresses); + effectiveCondition.setAddresses(effectiveAddresses, primaryAddressesOnly); return effectiveCondition; }); }) @@ -226,7 +227,7 @@ private Mono getEffectiveConnectionErrorRule( return this.resolvePhysicalAddresses( regionEndpoints, rule.getCondition().getEndpoints(), - this.isWriteOnlyEndpoint(rule.getCondition()), + this.isWriteOnly(rule.getCondition()), documentCollection) .map(physicalAddresses -> { List effectiveAddresses = @@ -258,7 +259,7 @@ private Mono getEffectiveConnectionErrorRule( * @return the region service endpoints. */ private List getRegionEndpoints(FaultInjectionCondition condition) { - boolean isWriteOnlyEndpoints = this.isWriteOnlyEndpoint(condition); + boolean isWriteOnlyEndpoints = this.isWriteOnly(condition); if (StringUtils.isNotEmpty(condition.getRegion())) { return Arrays.asList( @@ -362,7 +363,7 @@ private Mono> resolvePhysicalAddresses( .collectList(); } - private boolean isWriteOnlyEndpoint(FaultInjectionCondition condition) { + private boolean isWriteOnly(FaultInjectionCondition condition) { return condition.getOperationType() != null && this.getEffectiveOperationType(condition.getOperationType()).isWriteOperation(); } diff --git a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/RntbdServerErrorInjector.java b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/RntbdServerErrorInjector.java index 654333951703..6f88d70c6286 100644 --- a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/RntbdServerErrorInjector.java +++ b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/RntbdServerErrorInjector.java @@ -5,6 +5,7 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.directconnectivity.rntbd.IRequestRecord; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestRecord; import com.azure.cosmos.implementation.faultinjection.IRntbdServerErrorInjector; @@ -67,7 +68,7 @@ public boolean injectRntbdServerResponseError(RntbdRequestRecord requestRecord) @Override public boolean injectRntbdServerConnectionDelay( - RntbdRequestRecord requestRecord, + IRequestRecord requestRecord, Consumer openConnectionWithDelayConsumer) { if (requestRecord == null) { return false; @@ -79,11 +80,10 @@ public boolean injectRntbdServerConnectionDelay( if (serverConnectionDelayRule != null) { request.faultInjectionRequestContext .applyFaultInjectionRule( - requestRecord.transportRequestId(), + requestRecord.getRequestId(), serverConnectionDelayRule.getId()); openConnectionWithDelayConsumer.accept(serverConnectionDelayRule.getResult().getDelay()); return true; - } return false; diff --git a/sdk/cosmos/azure-cosmos-test/src/test/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorRuleTests.java b/sdk/cosmos/azure-cosmos-test/src/test/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorRuleTests.java index bbb78f4fea46..273afdbf62b5 100644 --- a/sdk/cosmos/azure-cosmos-test/src/test/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorRuleTests.java +++ b/sdk/cosmos/azure-cosmos-test/src/test/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorRuleTests.java @@ -91,6 +91,19 @@ public static Object[][] operationTypeProvider() { }; } + @DataProvider(name = "faultInjectionOperationTypeProvider") + public static Object[][] faultInjectionOperationTypeProvider() { + return new Object[][]{ + // fault injection operation type, primaryAddressOnly + { FaultInjectionOperationType.READ_ITEM, false }, + { FaultInjectionOperationType.REPLACE_ITEM, true }, + { FaultInjectionOperationType.CREATE_ITEM, true }, + { FaultInjectionOperationType.DELETE_ITEM, true}, + { FaultInjectionOperationType.QUERY_ITEM, false }, + { FaultInjectionOperationType.PATCH_ITEM, true } + }; + } + @DataProvider(name = "faultInjectionServerErrorResponseProvider") public static Object[][] faultInjectionServerErrorResponseProvider() { return new Object[][]{ @@ -565,6 +578,64 @@ public void faultInjectionServerErrorRuleTests_ServerConnectionDelay() throws Js } } + @Test(groups = {"multi-region"}, dataProvider = "faultInjectionOperationTypeProvider", timeOut = TIMEOUT) + public void faultInjectionServerErrorRuleTests_ServerConnectionDelay_warmup( + FaultInjectionOperationType operationType, + boolean primaryAddressesOnly) { + + CosmosAsyncClient newClient = null; // creating new client to force creating new connections + // simulate high channel acquisition/connectionTimeout during openConnection flow + String ruleId = "serverErrorRule-serverConnectionDelay-warmup" + UUID.randomUUID(); + FaultInjectionRule serverConnectionDelayRule = + new FaultInjectionRuleBuilder(ruleId) + .condition( + new FaultInjectionConditionBuilder() + .operationType(operationType) + .build() + ) + .result( + FaultInjectionResultBuilders + .getResultBuilder(FaultInjectionServerErrorType.CONNECTION_DELAY) + .delay(Duration.ofSeconds(2)) + .times(1) + .build() + ) + .duration(Duration.ofMinutes(5)) + .build(); + + try { + DirectConnectionConfig directConnectionConfig = DirectConnectionConfig.getDefaultConfig(); + directConnectionConfig.setConnectTimeout(Duration.ofSeconds(1)); + + newClient = new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .contentResponseOnWriteEnabled(true) + .consistencyLevel(BridgeInternal.getContextClient(this.client).getConsistencyLevel()) + .directMode(directConnectionConfig) + .buildAsyncClient(); + + CosmosAsyncContainer container = + newClient + .getDatabase(cosmosAsyncContainer.getDatabase().getId()) + .getContainer(cosmosAsyncContainer.getId()); + + CosmosFaultInjectionHelper.configureFaultInjectionRules(container, Arrays.asList(serverConnectionDelayRule)).block(); + + int partitionSize = container.getFeedRanges().block().size(); + container.openConnectionsAndInitCaches().block(); + + if (primaryAddressesOnly) { + assertThat(serverConnectionDelayRule.getHitCount()).isEqualTo(partitionSize); + } else { + assertThat(serverConnectionDelayRule.getHitCount()).isBetween(partitionSize * 3L, partitionSize * 5L); + } + } finally { + serverConnectionDelayRule.disable(); + safeClose(newClient); + } + } + @Test(groups = {"multi-region"}, dataProvider = "faultInjectionServerErrorResponseProvider", timeOut = TIMEOUT) public void faultInjectionServerErrorRuleTests_ServerErrorResponse( FaultInjectionServerErrorType serverErrorType, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java index 1b8b4b8f787a..2b41664f1b15 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java @@ -33,7 +33,7 @@ public class DocumentServiceRequestContext implements Cloneable { public volatile Boolean usePreferredLocations; public volatile Integer locationIndexToRoute; public volatile URI locationEndpointToRoute; - public volatile URI storePhysicalAddress; // DIRECT: rntbd physical address; GATEWAY: service endpoint + public volatile Uri storePhysicalAddressUri; // DIRECT: rntbd physical address; GATEWAY: service endpoint public volatile boolean performedBackgroundAddressRefresh; public volatile boolean performLocalRefreshOnGoneException; public volatile List storeResponses; @@ -118,7 +118,7 @@ public DocumentServiceRequestContext clone() { context.usePreferredLocations = this.usePreferredLocations; context.locationIndexToRoute = this.locationIndexToRoute; context.locationEndpointToRoute = this.locationEndpointToRoute; - context.storePhysicalAddress = this.storePhysicalAddress; + context.storePhysicalAddressUri = this.storePhysicalAddressUri; context.performLocalRefreshOnGoneException = this.performLocalRefreshOnGoneException; context.effectivePartitionKey = this.effectivePartitionKey; context.performedBackgroundAddressRefresh = this.performedBackgroundAddressRefresh; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IOpenConnectionsHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IOpenConnectionsHandler.java index 419262c30d0f..6f5f1006d2e1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IOpenConnectionsHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IOpenConnectionsHandler.java @@ -10,5 +10,5 @@ import java.util.List; public interface IOpenConnectionsHandler { - Flux openConnections(URI serviceEndpoint, List addresses); + Flux openConnections(String collectionRid, URI serviceEndpoint, List addresses); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ResourceType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ResourceType.java index 743e6467d9a1..2d69e9cee561 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ResourceType.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ResourceType.java @@ -64,7 +64,10 @@ public enum ResourceType { ClientEncryptionKey("ClientEncryptionKey", 141), //Adding client telemetry resource type, only meant for client side - ClientTelemetry("ClientTelemetry", 1001); + ClientTelemetry("ClientTelemetry", 1001), + + //Only meant to use on client side during connection open + Connection("Connection", 1002); private final int value; private final String stringValue; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressInformation.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressInformation.java index 7aff378e1e47..ec51b71e78bf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressInformation.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressInformation.java @@ -25,6 +25,7 @@ public AddressInformation(boolean isPublic, boolean isPrimary, String physicalUr this.isPublic = isPublic; this.isPrimary = isPrimary; this.physicalUri = new Uri(normalizePhysicalUri(physicalUri)); + this.physicalUri.setPrimary(this.isPrimary); } public AddressInformation(boolean isPublic, boolean isPrimary, String physicalUri, String protocolScheme) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java index d092fbbaaade..3a0062c81baf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java @@ -67,6 +67,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; public class GatewayAddressCache implements IAddressCache { @@ -666,7 +667,7 @@ private Mono getAddressesForRangeId( } if (this.replicaAddressValidationEnabled) { - this.validateReplicaAddresses(mergedAddresses); + this.validateReplicaAddresses(collectionRid, mergedAddresses); } return Mono.just(mergedAddresses); @@ -855,8 +856,9 @@ private AddressInformation[] mergeAddresses(AddressInformation[] newAddresses, A return mergedAddresses.toArray(new AddressInformation[mergedAddresses.size()]); } - private void validateReplicaAddresses(AddressInformation[] addresses) { + private void validateReplicaAddresses(String collectionRid, AddressInformation[] addresses) { checkNotNull(addresses, "Argument 'addresses' can not be null"); + checkArgument(StringUtils.isNotEmpty(collectionRid), "Argument 'collectionRid' can not be null"); // By theory, when we reach here, the status of the address should be in one of the three status: Unknown, Connected, UnhealthyPending // using open connection to validate addresses in UnhealthyPending status @@ -882,7 +884,7 @@ private void validateReplicaAddresses(AddressInformation[] addresses) { if (addressesNeedToValidation.size() > 0) { this.openConnectionsHandler - .openConnections(this.serviceEndpoint, addressesNeedToValidation) + .openConnections(collectionRid, this.serviceEndpoint, addressesNeedToValidation) .subscribeOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC) .subscribe(); } @@ -972,6 +974,7 @@ public Flux openConnectionsAndInitCaches( if (this.openConnectionsHandler != null) { return this.openConnectionsHandler.openConnections( + collection.getResourceId(), this.serviceEndpoint, Arrays .stream(addressInfo.getRight()) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java index 34c7916e53ef..2f69cc5df8b9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java @@ -237,7 +237,7 @@ public Mono invokeStoreAsync( } @Override - public Mono openConnection(URI serviceEndpoint, Uri addressUri) { + public Mono openConnection(Uri addressUri, RxDocumentServiceRequest openConnectionRequest) { throw new NotImplementedException("openConnection is not supported in httpTransportClient"); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java index 95059c92c68f..81108587e1cf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java @@ -250,11 +250,11 @@ public Mono invokeStoreAsync(final Uri addressUri, final RxDocume this.throwIfClosed(); final URI address = addressUri.getURI(); - request.requestContext.storePhysicalAddress = address; + request.requestContext.storePhysicalAddressUri = addressUri; final RntbdRequestArgs requestArgs = new RntbdRequestArgs(request, addressUri); - final RntbdEndpoint endpoint = this.endpointProvider.createIfAbsent(request.requestContext.locationEndpointToRoute, address); + final RntbdEndpoint endpoint = this.endpointProvider.createIfAbsent(request.requestContext.locationEndpointToRoute, addressUri.getURI()); final RntbdRequestRecord record = endpoint.request(requestArgs); final Context reactorContext = Context.of(KEY_ON_ERROR_DROPPED, onErrorDropHookWithReduceLogLevel); @@ -349,16 +349,19 @@ public Mono invokeStoreAsync(final Uri addressUri, final RxDocume } @Override - public Mono openConnection(URI serviceEndpoint, Uri addressUri) { + public Mono openConnection(Uri addressUri, RxDocumentServiceRequest openConnectionRequest) { + checkNotNull(openConnectionRequest, "Argument 'openConnectionRequest' should not be null"); checkNotNull(addressUri, "Argument 'addressUri' should not be null"); - checkNotNull(serviceEndpoint, "Argument 'serviceEndpoint' should not be null"); this.throwIfClosed(); - final URI address = addressUri.getURI(); + final RntbdRequestArgs requestArgs = new RntbdRequestArgs(openConnectionRequest, addressUri); + final RntbdEndpoint endpoint = + this.endpointProvider.createIfAbsent( + openConnectionRequest.requestContext.locationEndpointToRoute, + addressUri.getURI()); - final RntbdEndpoint endpoint = this.endpointProvider.createIfAbsent(serviceEndpoint, address); - return Mono.fromFuture(endpoint.openConnection(addressUri)); + return Mono.fromFuture(endpoint.openConnection(requestArgs)); } public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClient.java index fd6f1d7afccb..6a52a9cbe32d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClient.java @@ -95,8 +95,8 @@ protected Mono invokeStoreAsync(Uri physicalAddress, RxDocumentSe } @Override - public Mono openConnection(URI serviceEndpoint, Uri addressUri) { - return this.transportClient.openConnection(serviceEndpoint, addressUri); + public Mono openConnection(Uri physicalAddress, RxDocumentServiceRequest openConnectionRequest) { + return this.transportClient.openConnection(physicalAddress, openConnectionRequest); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java index 0df0648da27a..32200b92f13b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java @@ -49,11 +49,11 @@ protected abstract Mono invokeStoreAsync( /*** * Only open new connection if there is no existed established connection. * - * @param addressUri the replica address. - * + * @param physicalAddress the store physical addresses. + * @param openConnectionRequest open connection request. * @return the {@link OpenConnectionResponse}. */ - public abstract Mono openConnection(URI serviceEndpoint, final Uri addressUri); + public abstract Mono openConnection(Uri physicalAddress, RxDocumentServiceRequest openConnectionRequest); public abstract void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/Uri.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/Uri.java index 02179a15454a..8eaadb9fe984 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/Uri.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/Uri.java @@ -10,6 +10,7 @@ import java.net.URI; import java.time.Instant; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; public class Uri { @@ -22,6 +23,7 @@ public class Uri { private volatile Instant lastUnknownTimestamp; private volatile Instant lastUnhealthyPendingTimestamp; private volatile Instant lastUnhealthyTimestamp; + private volatile boolean isPrimary; public static Uri create(String uriAsString) { return new Uri(uriAsString); @@ -51,6 +53,20 @@ public String getURIAsString() { return this.uriAsString; } + /*** + * Attention: This is only used for fault injection to easier decide whether the address is primary address. + * @param primary + */ + public void setPrimary(boolean primary) { + isPrimary = primary; + } + + /*** + * Attention: This is only used for fault injection to easier detect whether the address is primary address. + * @return + */ + public boolean isPrimary() { return this.isPrimary; } + /*** * This method will be called if a connection can be established successfully to the backend. */ diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java index 7911d9ff02f6..48915e4d466f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java @@ -17,7 +17,7 @@ class ChannelPromiseWithExpiryTime implements Promise { private final Promise channelPromise; private final long expiryTimeInNanos; - private final RntbdRequestRecord requestRecord; + private final IRequestRecord requestRecord; public ChannelPromiseWithExpiryTime(Promise channelPromise, long expiryTimeInNanos) { this(channelPromise, expiryTimeInNanos, null); @@ -26,7 +26,7 @@ public ChannelPromiseWithExpiryTime(Promise channelPromise, long expiry public ChannelPromiseWithExpiryTime( Promise channelPromise, long expiryTimeInNanos, - RntbdRequestRecord requestRecord) { + IRequestRecord requestRecord) { checkNotNull(channelPromise, "channelPromise must not be null"); checkNotNull(expiryTimeInNanos, "expiryTimeInNanos must not be null"); @@ -182,7 +182,7 @@ public Promise syncUninterruptibly() { return this.channelPromise.syncUninterruptibly(); } - public RntbdRequestRecord getRntbdRequestRecord() { + public IRequestRecord getRntbdRequestRecord() { return this.requestRecord; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/IRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/IRequestRecord.java new file mode 100644 index 000000000000..da5b8f317692 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/IRequestRecord.java @@ -0,0 +1,10 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity.rntbd; + +public interface IRequestRecord { + RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline(); + RntbdRequestArgs args(); + long getRequestId(); +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/OpenChannelPromise.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/OpenChannelPromise.java index fff40ae8c2d9..caef1687d634 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/OpenChannelPromise.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/OpenChannelPromise.java @@ -7,7 +7,11 @@ import io.netty.util.concurrent.Promise; public class OpenChannelPromise extends ChannelPromiseWithExpiryTime { - public OpenChannelPromise(Promise channelPromise, long expiryTimeInNanos) { - super(channelPromise, expiryTimeInNanos); + public OpenChannelPromise( + Promise channelPromise, + long expiryTimeInNanos, + OpenConnectionRntbdRequestRecord openConnectionRntbdRequestRecord) { + + super(channelPromise, expiryTimeInNanos, openConnectionRntbdRequestRecord); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/OpenConnectionRntbdRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/OpenConnectionRntbdRequestRecord.java index c3ef2985f5e2..a4a51a838fc7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/OpenConnectionRntbdRequestRecord.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/OpenConnectionRntbdRequestRecord.java @@ -3,19 +3,33 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; -import com.azure.cosmos.implementation.directconnectivity.Uri; import com.azure.cosmos.implementation.OpenConnectionResponse; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; -public class OpenConnectionRntbdRequestRecord extends CompletableFuture { - private final Uri addressUri; +public class OpenConnectionRntbdRequestRecord extends CompletableFuture implements IRequestRecord { + private static final AtomicLong instanceCount = new AtomicLong(); + private final RntbdRequestArgs requestArgs; + private final long openConnectionRequestId; - public OpenConnectionRntbdRequestRecord(Uri addressUri) { - this.addressUri = addressUri; + public OpenConnectionRntbdRequestRecord(RntbdRequestArgs requestArgs) { + this.requestArgs = requestArgs; + this.openConnectionRequestId = instanceCount.incrementAndGet(); } - public Uri getAddressUri() { - return addressUri; + @Override + public RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() { + return null; + } + + @Override + public RntbdRequestArgs args() { + return this.requestArgs; + } + + @Override + public long getRequestId() { + return this.openConnectionRequestId; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java index cad0aa2c1e3b..7a8f94646e16 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java @@ -462,7 +462,7 @@ public Future acquire(OpenConnectionRntbdRequestRecord requestRecord) { checkNotNull(requestRecord, "Argument 'requestRecord' should not be null"); OpenChannelPromise openChannelPromise = - new OpenChannelPromise(this.getNewChannelPromise(), this.getNewPromiseExpiryTime()); + new OpenChannelPromise(this.getNewChannelPromise(), this.getNewPromiseExpiryTime(), requestRecord); try { // Compared to the normal request flow diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java index ffe1118a2679..bfd7069ee5d7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java @@ -5,7 +5,6 @@ import com.azure.cosmos.implementation.UserAgentContainer; import com.azure.cosmos.implementation.directconnectivity.IAddressResolver; -import com.azure.cosmos.implementation.directconnectivity.Uri; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import io.micrometer.core.instrument.Tag; @@ -93,7 +92,7 @@ void injectConnectionErrors( RntbdRequestRecord request(RntbdRequestArgs requestArgs); - OpenConnectionRntbdRequestRecord openConnection(Uri addressUri); + OpenConnectionRntbdRequestRecord openConnection(RntbdRequestArgs requestArgs); // endregion diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdOpenConnectionsHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdOpenConnectionsHandler.java index 502eb42c393a..2a82e9b2f49c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdOpenConnectionsHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdOpenConnectionsHandler.java @@ -6,6 +6,9 @@ import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.IOpenConnectionsHandler; import com.azure.cosmos.implementation.OpenConnectionResponse; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.directconnectivity.TransportClient; import com.azure.cosmos.implementation.directconnectivity.Uri; @@ -19,6 +22,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; public class RntbdOpenConnectionsHandler implements IOpenConnectionsHandler { @@ -36,8 +40,9 @@ public RntbdOpenConnectionsHandler(TransportClient transportClient) { } @Override - public Flux openConnections(URI serviceEndpoint, List addresses) { + public Flux openConnections(String collectionRid, URI serviceEndpoint, List addresses) { checkNotNull(addresses, "Argument 'addresses' should not be null"); + checkArgument(StringUtils.isNotEmpty(collectionRid), "Argument 'collectionRid' cannot be null nor empty"); if (logger.isDebugEnabled()) { logger.debug( @@ -49,7 +54,10 @@ public Flux openConnections(URI serviceEndpoint, List { try { if (this.openConnectionsSemaphore.tryAcquire(DEFAULT_CONNECTION_SEMAPHORE_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES)) { - return this.transportClient.openConnection(serviceEndpoint, addressUri) + + RxDocumentServiceRequest openConnectionRequest = + this.getOpenConnectionRequest(collectionRid, serviceEndpoint, addressUri); + return this.transportClient.openConnection(addressUri, openConnectionRequest) .onErrorResume(throwable -> Mono.just(new OpenConnectionResponse(addressUri, false, throwable))) .doOnNext(response -> { if (logger.isDebugEnabled()) { @@ -66,4 +74,15 @@ public Flux openConnections(URI serviceEndpoint, List { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java index af59812f5781..c8f1fb2f45d6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java @@ -34,7 +34,7 @@ import static com.azure.cosmos.implementation.guava27.Strings.lenientFormat; @JsonSerialize(using = RntbdRequestRecord.JsonSerializer.class) -public abstract class RntbdRequestRecord extends CompletableFuture { +public abstract class RntbdRequestRecord extends CompletableFuture implements IRequestRecord { private static final Logger logger = LoggerFactory.getLogger(RntbdRequestRecord.class); @@ -86,6 +86,7 @@ public UUID activityId() { return this.args.activityId(); } + @Override public RntbdRequestArgs args() { return this.args; } @@ -238,10 +239,16 @@ public long transportRequestId() { return this.args.transportRequestId(); } + @Override public RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() { return this.channelAcquisitionTimeline; } + @Override + public long getRequestId() { + return this.args.transportRequestId(); + } + // endregion // region Methods diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java index de909445bdb0..53673ea9fa89 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java @@ -15,7 +15,6 @@ import com.azure.cosmos.implementation.directconnectivity.IAddressResolver; import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient; import com.azure.cosmos.implementation.directconnectivity.TransportException; -import com.azure.cosmos.implementation.directconnectivity.Uri; import com.azure.cosmos.implementation.faultinjection.RntbdServerErrorInjector; import com.azure.cosmos.implementation.guava25.collect.ImmutableMap; import com.fasterxml.jackson.core.JsonGenerator; @@ -384,16 +383,16 @@ public RntbdRequestRecord request(final RntbdRequestArgs args) { } @Override - public OpenConnectionRntbdRequestRecord openConnection(Uri addressUri) { - checkNotNull(addressUri, "Argument 'addressUri' should not be null"); + public OpenConnectionRntbdRequestRecord openConnection(final RntbdRequestArgs args) { + checkNotNull(args, "Argument 'args' should not be null"); this.throwIfClosed(); if (this.connectionStateListener != null) { - this.connectionStateListener.onBeforeSendRequest(addressUri); + this.connectionStateListener.onBeforeSendRequest(args.physicalAddressUri()); } - OpenConnectionRntbdRequestRecord requestRecord = new OpenConnectionRntbdRequestRecord(addressUri); + OpenConnectionRntbdRequestRecord requestRecord = new OpenConnectionRntbdRequestRecord(args); final Future openChannelFuture = this.channelPool.acquire(requestRecord); if (openChannelFuture.isDone()) { @@ -419,11 +418,16 @@ private OpenConnectionRntbdRequestRecord processWhenConnectionOpened( // Releasing the channel back to the pool so other requests can use it this.releaseToPool(channel); - requestRecord.getAddressUri().setConnected(); + requestRecord.args().physicalAddressUri().setConnected(); - openConnectionResponse = new OpenConnectionResponse(requestRecord.getAddressUri(), true); + openConnectionResponse = + new OpenConnectionResponse(requestRecord.args().physicalAddressUri(), true); } else { - openConnectionResponse = new OpenConnectionResponse(requestRecord.getAddressUri(), false, openChannelFuture.cause()); + openConnectionResponse = + new OpenConnectionResponse( + requestRecord.args().physicalAddressUri(), + false, + openChannelFuture.cause()); } requestRecord.complete(openConnectionResponse); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/IRntbdServerErrorInjector.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/IRntbdServerErrorInjector.java index 6ed09df54f64..58aa3638e69e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/IRntbdServerErrorInjector.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/IRntbdServerErrorInjector.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation.faultinjection; +import com.azure.cosmos.implementation.directconnectivity.rntbd.IRequestRecord; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestRecord; import java.time.Duration; @@ -40,6 +41,6 @@ boolean injectRntbdServerResponseDelay( * @return flag to indicate whether server connection delay rule is injected. */ boolean injectRntbdServerConnectionDelay( - RntbdRequestRecord requestRecord, + IRequestRecord requestRecord, Consumer openConnectionWithDelayConsumer); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/RntbdServerErrorInjector.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/RntbdServerErrorInjector.java index a1243227b5bf..288b1e60b53d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/RntbdServerErrorInjector.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/RntbdServerErrorInjector.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation.faultinjection; +import com.azure.cosmos.implementation.directconnectivity.rntbd.IRequestRecord; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestRecord; import java.time.Duration; @@ -48,7 +49,7 @@ public boolean injectRntbdServerResponseError(RntbdRequestRecord requestRecord) @Override public boolean injectRntbdServerConnectionDelay( - RntbdRequestRecord requestRecord, + IRequestRecord requestRecord, Consumer openConnectionWithDelayConsumer) { for (IRntbdServerErrorInjector injector : this.faultInjectors) { diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionNotAvailableRetryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionNotAvailableRetryTest.java index 10a7a71e2db3..0c54cb9c655f 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionNotAvailableRetryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionNotAvailableRetryTest.java @@ -518,8 +518,8 @@ protected Mono invokeStoreAsync(Uri physicalAddress, RxDocumentSe return Mono.empty(); } @Override - public Mono openConnection(URI serviceEndpoint, Uri addressUri) { - throw new NotImplementedException("tryOpenConnection is not supported in RntbdTransportClientTest"); + public Mono openConnection(Uri physicalAddress, RxDocumentServiceRequest openConnectionRequest) { + throw new NotImplementedException("openConnection is not supported in RntbdTransportClientTest"); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java index 91945b67b4bf..527c23500818 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java @@ -270,7 +270,7 @@ public void tryGetAddresses_ForDataPartitions_AddressCachedByOpenAsync_NoHttpReq IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; IOpenConnectionsHandler openConnectionsHandler = Mockito.mock(IOpenConnectionsHandler.class); - Mockito.when(openConnectionsHandler.openConnections(Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); + Mockito.when(openConnectionsHandler.openConnections(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); GatewayAddressCache cache = new GatewayAddressCache(mockDiagnosticsClientContext(), serviceEndpoint, @@ -292,7 +292,9 @@ public void tryGetAddresses_ForDataPartitions_AddressCachedByOpenAsync_NoHttpReq assertThat(httpClientWrapper.capturedRequests).asList().hasSize(1); httpClientWrapper.capturedRequests.clear(); - Mockito.verify(openConnectionsHandler, Mockito.times(allPartitionKeyRangeIds.size())).openConnections(Mockito.any(), Mockito.any()); + Mockito + .verify(openConnectionsHandler, Mockito.times(allPartitionKeyRangeIds.size())) + .openConnections(Mockito.any(), Mockito.any(), Mockito.any()); RxDocumentServiceRequest req = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, ResourceType.Document, @@ -334,7 +336,7 @@ public void tryGetAddresses_ForDataPartitions_ForceRefresh( IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; IOpenConnectionsHandler openConnectionsHandler = Mockito.mock(IOpenConnectionsHandler.class); - Mockito.when(openConnectionsHandler.openConnections(Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); + Mockito.when(openConnectionsHandler.openConnections(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); GatewayAddressCache cache = new GatewayAddressCache(mockDiagnosticsClientContext(), serviceEndpoint, @@ -356,7 +358,9 @@ public void tryGetAddresses_ForDataPartitions_ForceRefresh( assertThat(httpClientWrapper.capturedRequests).asList().hasSize(1); httpClientWrapper.capturedRequests.clear(); - Mockito.verify(openConnectionsHandler, Mockito.times(allPartitionKeyRangeIds.size())).openConnections(Mockito.any(), Mockito.any()); + Mockito + .verify(openConnectionsHandler, Mockito.times(allPartitionKeyRangeIds.size())) + .openConnections(Mockito.any(), Mockito.any(), Mockito.any()); RxDocumentServiceRequest req = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, ResourceType.Document, @@ -398,7 +402,9 @@ public void tryGetAddresses_ForDataPartitions_Suboptimal_Refresh( IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; IOpenConnectionsHandler openConnectionsHandler = Mockito.mock(IOpenConnectionsHandler.class); - Mockito.when(openConnectionsHandler.openConnections(Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); + Mockito + .when(openConnectionsHandler.openConnections(Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(Flux.empty()); int suboptimalRefreshTime = 2; @@ -423,7 +429,9 @@ public void tryGetAddresses_ForDataPartitions_Suboptimal_Refresh( assertThat(httpClientWrapper.capturedRequests).asList().hasSize(1); httpClientWrapper.capturedRequests.clear(); - Mockito.verify(openConnectionsHandler, Mockito.times(allPartitionKeyRangeIds.size())).openConnections(Mockito.any(), Mockito.any()); + Mockito + .verify(openConnectionsHandler, Mockito.times(allPartitionKeyRangeIds.size())) + .openConnections(Mockito.any(), Mockito.any(), Mockito.any()); RxDocumentServiceRequest req = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, ResourceType.Document, @@ -514,7 +522,7 @@ public void tryGetAddresses_ForMasterPartition(Protocol protocol) throws Excepti IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; IOpenConnectionsHandler openConnectionsHandler = Mockito.mock(IOpenConnectionsHandler.class); - Mockito.when(openConnectionsHandler.openConnections(Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); + Mockito.when(openConnectionsHandler.openConnections(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); GatewayAddressCache cache = new GatewayAddressCache(mockDiagnosticsClientContext(), serviceEndpoint, @@ -875,7 +883,7 @@ public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnable IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; HttpClientUnderTestWrapper httpClientWrapper = getHttpClientUnderTestWrapper(configs); IOpenConnectionsHandler openConnectionsHandlerMock = Mockito.mock(IOpenConnectionsHandler.class); - Mockito.when(openConnectionsHandlerMock.openConnections(Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); // what returned here does not really matter + Mockito.when(openConnectionsHandlerMock.openConnections(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); // what returned here does not really matter if (replicaValidationEnabled) { System.setProperty("COSMOS.REPLICA_ADDRESS_VALIDATION_ENABLED", "true"); @@ -934,16 +942,16 @@ public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnable // If openConnectionAndInitCaches is called, then replica validation will also include for unknown status Mockito .verify(openConnectionsHandlerMock, Mockito.times(1)) - .openConnections(serviceEndpointArguments.capture(), openConnectionArguments.capture()); + .openConnections(Mockito.any(), serviceEndpointArguments.capture(), openConnectionArguments.capture()); assertThat(openConnectionArguments.getValue()).hasSize(addressInfosFromCache.size()); } else { // Open connection will only be called for unhealthyPending status address Mockito .verify(openConnectionsHandlerMock, Mockito.times(0)) - .openConnections(serviceEndpointArguments.capture(), openConnectionArguments.capture()); + .openConnections(Mockito.any(), serviceEndpointArguments.capture(), openConnectionArguments.capture()); } } else { - Mockito.verify(openConnectionsHandlerMock, Mockito.never()).openConnections(Mockito.any(), Mockito.any()); + Mockito.verify(openConnectionsHandlerMock, Mockito.never()).openConnections(Mockito.any(), Mockito.any(), Mockito.any()); } httpClientWrapper.capturedRequests.clear(); @@ -994,7 +1002,7 @@ public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnable Mockito .verify(openConnectionsHandlerMock, Mockito.times(1)) - .openConnections(serviceEndpointArguments.capture(), openConnectionArguments.capture()); + .openConnections(Mockito.any(), serviceEndpointArguments.capture(), openConnectionArguments.capture()); if (openConnectionAndInitCaches) { assertThat(openConnectionArguments.getValue()).containsExactlyElementsOf(Arrays.asList(unhealthyAddressUri, unknownAddressUri)); } else { @@ -1002,7 +1010,7 @@ public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnable } } else { - Mockito.verify(openConnectionsHandlerMock, Mockito.never()).openConnections(Mockito.any(), Mockito.any()); + Mockito.verify(openConnectionsHandlerMock, Mockito.never()).openConnections(Mockito.any(), Mockito.any(), Mockito.any()); } System.clearProperty("COSMOS.REPLICA_ADDRESS_VALIDATION_ENABLED"); @@ -1015,7 +1023,7 @@ public void tryGetAddress_failedEndpointTests() throws Exception { IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; HttpClientUnderTestWrapper httpClientWrapper = getHttpClientUnderTestWrapper(configs); IOpenConnectionsHandler openConnectionsHandlerMock = Mockito.mock(IOpenConnectionsHandler.class); - Mockito.when(openConnectionsHandlerMock.openConnections(Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); // what returned here does not really matter + Mockito.when(openConnectionsHandlerMock.openConnections(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); // what returned here does not really matter GatewayAddressCache cache = new GatewayAddressCache( mockDiagnosticsClientContext(), @@ -1077,7 +1085,7 @@ public void tryGetAddress_unhealthyStatus_forceRefresh() throws Exception { IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; HttpClientUnderTestWrapper httpClientWrapper = getHttpClientUnderTestWrapper(configs); IOpenConnectionsHandler openConnectionsHandlerMock = Mockito.mock(IOpenConnectionsHandler.class); - Mockito.when(openConnectionsHandlerMock.openConnections(Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); // what returned here does not really matter + Mockito.when(openConnectionsHandlerMock.openConnections(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); // what returned here does not really matter GatewayAddressCache cache = new GatewayAddressCache( mockDiagnosticsClientContext(), @@ -1147,7 +1155,7 @@ public void validateReplicaAddressesTests() throws URISyntaxException, NoSuchMet IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; HttpClientUnderTestWrapper httpClientWrapper = getHttpClientUnderTestWrapper(configs); IOpenConnectionsHandler openConnectionsHandlerMock = Mockito.mock(IOpenConnectionsHandler.class); - Mockito.when(openConnectionsHandlerMock.openConnections(Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); // what returned here does not really matter + Mockito.when(openConnectionsHandlerMock.openConnections(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); // what returned here does not really matter GatewayAddressCache cache = new GatewayAddressCache( mockDiagnosticsClientContext(), @@ -1161,7 +1169,8 @@ public void validateReplicaAddressesTests() throws URISyntaxException, NoSuchMet ConnectionPolicy.getDefaultPolicy(), openConnectionsHandlerMock); - Method validateReplicaAddressesMethod = GatewayAddressCache.class.getDeclaredMethod("validateReplicaAddresses", new Class[] { AddressInformation[].class }); + Method validateReplicaAddressesMethod = + GatewayAddressCache.class.getDeclaredMethod("validateReplicaAddresses", new Class[] { String.class, AddressInformation[].class }); validateReplicaAddressesMethod.setAccessible(true); // connected status @@ -1185,14 +1194,17 @@ public void validateReplicaAddressesTests() throws URISyntaxException, NoSuchMet replicaValidationScopes.add(Unknown); replicaValidationScopes.add(UnhealthyPending); - validateReplicaAddressesMethod.invoke(cache, new Object[]{ new AddressInformation[]{ address1, address2, address3, address4 }}) ; + validateReplicaAddressesMethod + .invoke( + cache, + new Object[]{ createdCollection.getResourceId(), new AddressInformation[]{ address1, address2, address3, address4 }}) ; // Validate openConnection will only be called for address in unhealthyPending status ArgumentCaptor> openConnectionArguments = ArgumentCaptor.forClass(List.class); ArgumentCaptor serviceEndpointArguments = ArgumentCaptor.forClass(URI.class); Mockito .verify(openConnectionsHandlerMock, Mockito.times(1)) - .openConnections(serviceEndpointArguments.capture(), openConnectionArguments.capture()); + .openConnections(Mockito.any(), serviceEndpointArguments.capture(), openConnectionArguments.capture()); assertThat(openConnectionArguments.getValue()).containsExactlyElementsOf( Arrays.asList(address4, address2) @@ -1209,7 +1221,7 @@ public void mergeAddressesTests() throws URISyntaxException, NoSuchMethodExcepti IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; HttpClientUnderTestWrapper httpClientWrapper = getHttpClientUnderTestWrapper(configs); IOpenConnectionsHandler openConnectionsHandlerMock = Mockito.mock(IOpenConnectionsHandler.class); - Mockito.when(openConnectionsHandlerMock.openConnections(Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); // what returned here does not really matter + Mockito.when(openConnectionsHandlerMock.openConnections(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Flux.empty()); // what returned here does not really matter GatewayAddressCache cache = new GatewayAddressCache( mockDiagnosticsClientContext(), diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java index a728f6e1141f..b4762d2ec601 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java @@ -1131,8 +1131,8 @@ public RntbdRequestRecord request(final RntbdRequestArgs requestArgs) { } @Override - public OpenConnectionRntbdRequestRecord openConnection(Uri addressUri) { - throw new NotImplementedException("tryOpenConnection is not supported in FakeEndpoint."); + public OpenConnectionRntbdRequestRecord openConnection(RntbdRequestArgs openConnectionRequestArgs) { + throw new NotImplementedException("openConnection is not supported in FakeEndpoint."); } // endregion