diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java index 267272aff625..570384f4e0ca 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java @@ -72,7 +72,7 @@ public ChangeFeedQueryImpl(RxDocumentClientImpl client, this.options = getChangeFeedOptions(changeFeedOptions, initialNextIfNoneMatch); } - private RxDocumentServiceRequest createDocumentServiceRequest(String continuationToken, int pageSize) { + private Mono createDocumentServiceRequest(String continuationToken, int pageSize) { Map headers = new HashMap<>(); RxDocumentServiceRequest req = RxDocumentServiceRequest.create( OperationType.ReadFeed, @@ -107,7 +107,7 @@ private RxDocumentServiceRequest createDocumentServiceRequest(String continuatio req.routeTo(new PartitionKeyRangeIdentity(partitionKeyRangeIdInternal(this.options))); } - return req; + return Mono.just(req); } private ChangeFeedOptions getChangeFeedOptions(ChangeFeedOptions options, String continuationToken) { @@ -118,7 +118,7 @@ private ChangeFeedOptions getChangeFeedOptions(ChangeFeedOptions options, String public Flux> executeAsync() { - BiFunction createRequestFunc = this::createDocumentServiceRequest; + BiFunction> createRequestFunc = this::createDocumentServiceRequest; Function>> executeFunc = this::executeRequestAsync; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index dcc3762eaf3d..c07c2db1002f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -3,12 +3,14 @@ package com.azure.cosmos.implementation; -import com.azure.cosmos.implementation.routing.LocationCache; -import com.azure.cosmos.implementation.routing.LocationHelper; import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.DatabaseAccount; +import com.azure.cosmos.implementation.caches.AsyncCache; +import com.azure.cosmos.implementation.routing.LocationCache; +import com.azure.cosmos.implementation.routing.LocationHelper; import org.apache.commons.collections4.list.UnmodifiableList; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -16,13 +18,13 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import java.net.URISyntaxException; import java.net.URI; import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +47,8 @@ public class GlobalEndpointManager implements AutoCloseable { private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Scheduler scheduler = Schedulers.fromExecutor(executor); private volatile boolean isClosed; + private final AsyncCache databaseAccountAsyncCache; + private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true); public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs) { this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000; @@ -69,6 +73,7 @@ public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPol } catch (Exception e) { throw new IllegalArgumentException(e); } + databaseAccountAsyncCache = new AsyncCache<>(); } public void init() { @@ -87,7 +92,7 @@ public UnmodifiableList getWriteEndpoints() { return this.locationCache.getWriteEndpoints(); } - public static Mono getDatabaseAccountFromAnyLocationsAsync( + public Mono getDatabaseAccountFromAnyLocationsAsync( URI defaultEndpoint, List locations, Function> getDatabaseAccountFn) { return getDatabaseAccountFn.apply(defaultEndpoint).onErrorResume( @@ -159,6 +164,12 @@ public Mono refreshLocationAsync(DatabaseAccount databaseAccount, boolean }); } + public Mono getDatabaseAccountFromCache(URI defaultEndpoint) { + return this.databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, () -> this.owner.getDatabaseAccountFromEndpoint(defaultEndpoint).single().doOnSuccess(databaseAccount -> { + this.refreshLocationAsync(databaseAccount, false); + })); + } + private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) { return Mono.defer(() -> { logger.debug("refreshLocationPrivateAsync() refreshing locations"); @@ -236,7 +247,7 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { } logger.debug("startRefreshLocationTimerAsync() - Invoking refresh, I was registered on [{}]", now); - Mono databaseAccountObs = GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.defaultEndpoint, new ArrayList<>(this.connectionPolicy.getPreferredLocations()), + Mono databaseAccountObs = this.getDatabaseAccountFromAnyLocationsAsync(this.defaultEndpoint, new ArrayList<>(this.connectionPolicy.getPreferredLocations()), this::getDatabaseAccountAsync); return databaseAccountObs.flatMap(dbAccount -> { @@ -253,8 +264,23 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { } private Mono getDatabaseAccountAsync(URI serviceEndpoint) { - return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint) - .doOnNext(i -> logger.debug("account retrieved: {}", i)).single(); + final GlobalEndpointManager that = this; + Callable> fetchDatabaseAccount = () -> { + return that.owner.getDatabaseAccountFromEndpoint(serviceEndpoint).doOnNext(i -> { + logger.debug("account retrieved: {}", i); + }).single(); + }; + + Mono obsoleteValueMono = databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, fetchDatabaseAccount); + return obsoleteValueMono.flatMap(obsoleteValue -> { + if (firstTimeDatabaseAccountInitialization.compareAndSet(true, false)) { + return Mono.just(obsoleteValue); + } + return databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, obsoleteValue, fetchDatabaseAccount).doOnError(t -> { + //Putting back the old value in cache, this will avoid cache corruption + databaseAccountAsyncCache.set(StringUtils.EMPTY, obsoleteValue); + }); + }); } public boolean isClosed() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index bf054bc0749c..858ab6f5d8b4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -245,29 +245,12 @@ private RxDocumentClientImpl(URI serviceEndpoint, } private void initializeGatewayConfigurationReader() { - String resourceToken; - if(this.tokenResolver != null) { - resourceToken = this.tokenResolver.getAuthorizationToken(RequestVerb.GET, "", CosmosResourceType.System, null); - } else if(!this.hasAuthKeyResourceToken && this.authorizationTokenProvider == null) { - resourceToken = this.firstResourceTokenFromPermissionFeed; - } else { - assert this.masterKeyOrResourceToken != null || this.cosmosKeyCredential != null; - resourceToken = this.masterKeyOrResourceToken; - } - - this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, - this.hasAuthKeyResourceToken, - resourceToken, - this.connectionPolicy, - this.authorizationTokenProvider, - this.reactorHttpClient); - - DatabaseAccount databaseAccount = this.gatewayConfigurationReader.initializeReaderAsync().block(); + this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, this.globalEndpointManager); + DatabaseAccount databaseAccount = this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).block(); this.useMultipleWriteLocations = this.connectionPolicy.getUsingMultipleWriteLocations() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount); // TODO: add support for openAsync // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589 - this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).block(); } public void init() { @@ -1598,7 +1581,7 @@ public IRetryPolicyFactory getResetSessionTokenRetryPolicy() { } @Override - public ConsistencyLevel getDefaultConsistencyLevelAsync() { + public Mono getDefaultConsistencyLevelAsync() { return RxDocumentClientImpl.this.gatewayConfigurationReader.getDefaultConsistencyLevel(); } @@ -2840,7 +2823,7 @@ private Flux> readFeedCollectionChild(FeedO final FeedOptions finalFeedOptions = options; RequestOptions requestOptions = new RequestOptions(); requestOptions.setPartitionKey(options.partitionKey()); - BiFunction createRequestFunc = (continuationToken, pageSize) -> { + BiFunction> createRequestFunc = (continuationToken, pageSize) -> { Map requestHeaders = new HashMap<>(); if (continuationToken != null) { requestHeaders.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken); @@ -2848,7 +2831,7 @@ private Flux> readFeedCollectionChild(FeedO requestHeaders.put(HttpConstants.HttpHeaders.PAGE_SIZE, Integer.toString(pageSize)); RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.ReadFeed, resourceType, resourceLink, requestHeaders, finalFeedOptions); - return request; + return Mono.just(request); }; Function>> executeFunc = request -> { @@ -2871,7 +2854,7 @@ private Flux> readFeed(FeedOptions options, int maxPageSize = options.maxItemCount() != null ? options.maxItemCount() : -1; final FeedOptions finalFeedOptions = options; - BiFunction createRequestFunc = (continuationToken, pageSize) -> { + BiFunction> createRequestFunc = (continuationToken, pageSize) -> { Map requestHeaders = new HashMap<>(); if (continuationToken != null) { requestHeaders.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken); @@ -2879,7 +2862,7 @@ private Flux> readFeed(FeedOptions options, requestHeaders.put(HttpConstants.HttpHeaders.PAGE_SIZE, Integer.toString(pageSize)); RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.ReadFeed, resourceType, resourceLink, requestHeaders, finalFeedOptions); - return request; + return Mono.just(request); }; Function>> executeFunc = request -> { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java index d34b5d7d262a..1c2cf1126379 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java @@ -197,48 +197,45 @@ public Mono readAsync(RxDocumentServiceRequest entity, ValueHolder targetConsistencyLevel = ValueHolder.initialize(null); ValueHolder useSessionToken = ValueHolder.initialize(null); - ReadMode desiredReadMode; - try { - desiredReadMode = this.deduceReadMode(entity, targetConsistencyLevel, useSessionToken); - } catch (CosmosClientException e) { - return Mono.error(e); - } - int maxReplicaCount = this.getMaxReplicaSetSize(entity); - int readQuorumValue = maxReplicaCount - (maxReplicaCount / 2); - - switch (desiredReadMode) { - case Primary: - return this.readPrimaryAsync(entity, useSessionToken.v); - - case Strong: - entity.requestContext.performLocalRefreshOnGoneException = true; - return this.quorumReader.readStrongAsync(entity, readQuorumValue, desiredReadMode); - - case BoundedStaleness: - entity.requestContext.performLocalRefreshOnGoneException = true; - - // for bounded staleness, we are defaulting to read strong for local region reads. - // this can be done since we are always running with majority quorum w = 3 (or 2 during quorum downshift). - // This means that the primary will always be part of the write quorum, and - // therefore can be included for barrier reads. - - // NOTE: this assumes that we are running with SYNC replication (i.e. majority quorum). - // When we run on a minority write quorum(w=2), to ensure monotonic read guarantees - // we always contact two secondary replicas and exclude primary. - // However, this model significantly reduces availability and available throughput for serving reads for bounded staleness during reconfiguration. - // Therefore, to ensure monotonic read guarantee from any replica set we will just use regular quorum read(R=2) since our write quorum is always majority(W=3) - return this.quorumReader.readStrongAsync(entity, readQuorumValue, desiredReadMode); - - case Any: - if (targetConsistencyLevel.v == ConsistencyLevel.SESSION) { - return this.readSessionAsync(entity, desiredReadMode); - } else { - return this.readAnyAsync(entity, desiredReadMode); - } + return this.deduceReadMode(entity, targetConsistencyLevel, useSessionToken).flatMap(desiredReadMode -> { + return this.getMaxReplicaSetSize(entity).flatMap(maxReplicaCount -> { + int readQuorumValue = maxReplicaCount - (maxReplicaCount / 2); + + switch (desiredReadMode) { + case Primary: + return this.readPrimaryAsync(entity, useSessionToken.v); + + case Strong: + entity.requestContext.performLocalRefreshOnGoneException = true; + return this.quorumReader.readStrongAsync(entity, readQuorumValue, desiredReadMode); + + case BoundedStaleness: + entity.requestContext.performLocalRefreshOnGoneException = true; + + // for bounded staleness, we are defaulting to read strong for local region reads. + // this can be done since we are always running with majority quorum w = 3 (or 2 during quorum downshift). + // This means that the primary will always be part of the write quorum, and + // therefore can be included for barrier reads. + + // NOTE: this assumes that we are running with SYNC replication (i.e. majority quorum). + // When we run on a minority write quorum(w=2), to ensure monotonic read guarantees + // we always contact two secondary replicas and exclude primary. + // However, this model significantly reduces availability and available throughput for serving reads for bounded staleness during reconfiguration. + // Therefore, to ensure monotonic read guarantee from any replica set we will just use regular quorum read(R=2) since our write quorum is always majority(W=3) + return this.quorumReader.readStrongAsync(entity, readQuorumValue, desiredReadMode); + + case Any: + if (targetConsistencyLevel.v == ConsistencyLevel.SESSION) { + return this.readSessionAsync(entity, desiredReadMode); + } else { + return this.readAnyAsync(entity, desiredReadMode); + } - default: - throw new IllegalStateException("invalid operation " + desiredReadMode); - } + default: + throw new IllegalStateException("invalid operation " + desiredReadMode); + } + }); + }); } private Mono readPrimaryAsync(RxDocumentServiceRequest entity, @@ -331,55 +328,57 @@ private Mono readSessionAsync(RxDocumentServiceRequest entity, }); } - ReadMode deduceReadMode(RxDocumentServiceRequest request, + Mono deduceReadMode(RxDocumentServiceRequest request, ValueHolder targetConsistencyLevel, ValueHolder useSessionToken) throws CosmosClientException { - targetConsistencyLevel.v = RequestHelper.GetConsistencyLevelToUse(this.serviceConfigReader, request); - useSessionToken.v = (targetConsistencyLevel.v == ConsistencyLevel.SESSION); - - if (request.getDefaultReplicaIndex() != null) { - // Don't use session token - this is used by internal scenarios which technically don't intend session read when they target - // request to specific replica. - useSessionToken.v = false; - return ReadMode.Primary; //Let the addressResolver decides which replica to connect to. - } + return RequestHelper.GetConsistencyLevelToUse(this.serviceConfigReader, request).map(consistencyLevel -> { + targetConsistencyLevel.v = consistencyLevel; + useSessionToken.v = (targetConsistencyLevel.v == ConsistencyLevel.SESSION); + + if (request.getDefaultReplicaIndex() != null) { + // Don't use session token - this is used by internal scenarios which technically don't intend session read when they target + // request to specific replica. + useSessionToken.v = false; + return ReadMode.Primary; //Let the addressResolver decides which replica to connect to. + } - switch (targetConsistencyLevel.v) { - case EVENTUAL: - return ReadMode.Any; + switch (targetConsistencyLevel.v) { + case EVENTUAL: + return ReadMode.Any; - case CONSISTENT_PREFIX: - return ReadMode.Any; + case CONSISTENT_PREFIX: + return ReadMode.Any; - case SESSION: - return ReadMode.Any; + case SESSION: + return ReadMode.Any; - case BOUNDED_STALENESS: - return ReadMode.BoundedStaleness; + case BOUNDED_STALENESS: + return ReadMode.BoundedStaleness; - case STRONG: - return ReadMode.Strong; + case STRONG: + return ReadMode.Strong; - default: - throw new IllegalStateException("INVALID Consistency Level " + targetConsistencyLevel.v); - } + default: + throw new IllegalStateException("INVALID Consistency Level " + targetConsistencyLevel.v); + } + }); } - public int getMaxReplicaSetSize(RxDocumentServiceRequest entity) { + public Mono getMaxReplicaSetSize(RxDocumentServiceRequest entity) { boolean isMasterResource = ReplicatedResourceClient.isReadingFromMaster(entity.getResourceType(), entity.getOperationType()); if (isMasterResource) { - return this.serviceConfigReader.getSystemReplicationPolicy().getMaxReplicaSetSize(); + return this.serviceConfigReader.getSystemReplicationPolicy().map(replicationPolicy -> replicationPolicy.getMaxReplicaSetSize()); } else { - return this.serviceConfigReader.getUserReplicationPolicy().getMaxReplicaSetSize(); + return this.serviceConfigReader.getUserReplicationPolicy().map(replicationPolicy -> replicationPolicy.getMaxReplicaSetSize()); } } - public int getMinReplicaSetSize(RxDocumentServiceRequest entity) { + public Mono getMinReplicaSetSize(RxDocumentServiceRequest entity) { boolean isMasterResource = ReplicatedResourceClient.isReadingFromMaster(entity.getResourceType(), entity.getOperationType()); if (isMasterResource) { - return this.serviceConfigReader.getSystemReplicationPolicy().getMinReplicaSetSize(); + return this.serviceConfigReader.getSystemReplicationPolicy().map(replicationPolicy -> replicationPolicy.getMinReplicaSetSize()); } else { - return this.serviceConfigReader.getUserReplicationPolicy().getMinReplicaSetSize(); + return this.serviceConfigReader.getUserReplicationPolicy().map(replicationPolicy -> replicationPolicy.getMinReplicaSetSize()); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index 41ddd62140fb..34800a44adf8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -144,49 +144,50 @@ Mono writePrivateAsync( return Mono.error(e); } }).flatMap(primaryUri -> { - try { - primaryURI.set(primaryUri); - if (this.useMultipleWriteLocations && - RequestHelper.GetConsistencyLevelToUse(this.serviceConfigReader, request) == ConsistencyLevel.SESSION) { - // Set session token to ensure session consistency for write requests - // when writes can be issued to multiple locations - SessionTokenHelper.setPartitionLocalSessionToken(request, this.sessionContainer); - } else { - // When writes can only go to single location, there is no reason - // to session session token to the server. - SessionTokenHelper.validateAndRemoveSessionToken(request); - } - - } catch (Exception e) { - return Mono.error(e); - } + return RequestHelper.GetConsistencyLevelToUse(this.serviceConfigReader, request).flatMap(consistencyLevel -> { + try { + primaryURI.set(primaryUri); + if (this.useMultipleWriteLocations && + consistencyLevel == ConsistencyLevel.SESSION) { + // Set session token to ensure session consistency for write requests + // when writes can be issued to multiple locations + SessionTokenHelper.setPartitionLocalSessionToken(request, this.sessionContainer); + } else { + // When writes can only go to single location, there is no reason + // to session session token to the server. + SessionTokenHelper.validateAndRemoveSessionToken(request); + } - return this.transportClient.invokeResourceOperationAsync(primaryUri, request) - .doOnError( - t -> { - try { - Throwable unwrappedException = Exceptions.unwrap(t); - CosmosClientException ex = Utils.as(unwrappedException, CosmosClientException.class); - try { - BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request, - storeReader.createStoreResult(null, ex, false, false, primaryUri)); - } catch (Exception e) { - logger.error("Error occurred while recording response", e); - } - String value = ex.getResponseHeaders().get(HttpConstants.HttpHeaders.WRITE_REQUEST_TRIGGER_ADDRESS_REFRESH); - if (!Strings.isNullOrWhiteSpace(value)) { - Integer result = Integers.tryParse(value); - if (result != null && result == 1) { - startBackgroundAddressRefresh(request); - } - } - } catch (Throwable throwable) { - logger.error("Unexpected failure in handling orig [{}]", t.getMessage(), t); - logger.error("Unexpected failure in handling orig [{}] : new [{}]", t.getMessage(), throwable.getMessage(), throwable); - } - } - ); + } catch (Exception e) { + return Mono.error(e); + } + return this.transportClient.invokeResourceOperationAsync(primaryUri, request) + .doOnError( + t -> { + try { + Throwable unwrappedException = Exceptions.unwrap(t); + CosmosClientException ex = Utils.as(unwrappedException, CosmosClientException.class); + try { + BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request, + storeReader.createStoreResult(null, ex, false, false, primaryUri)); + } catch (Exception e) { + logger.error("Error occurred while recording response", e); + } + String value = ex.getResponseHeaders().get(HttpConstants.HttpHeaders.WRITE_REQUEST_TRIGGER_ADDRESS_REFRESH); + if (!Strings.isNullOrWhiteSpace(value)) { + Integer result = Integers.tryParse(value); + if (result != null && result == 1) { + startBackgroundAddressRefresh(request); + } + } + } catch (Throwable throwable) { + logger.error("Unexpected failure in handling orig [{}]", t.getMessage(), t); + logger.error("Unexpected failure in handling orig [{}] : new [{}]", t.getMessage(), throwable.getMessage(), throwable); + } + } + ); + }); }).flatMap(response -> { try { BridgeInternal.recordResponse(request.requestContext.cosmosResponseDiagnostics, request, @@ -212,77 +213,81 @@ Mono writePrivateAsync( } } - boolean isGlobalStrongRequest(RxDocumentServiceRequest request, StoreResponse response) { - if (this.serviceConfigReader.getDefaultConsistencyLevel() == ConsistencyLevel.STRONG) { - int numberOfReadRegions = -1; - String headerValue = null; - if ((headerValue = response.getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS)) != null) { - numberOfReadRegions = Integer.parseInt(headerValue); - } + Mono isGlobalStrongRequest(RxDocumentServiceRequest request, StoreResponse response) { + return this.serviceConfigReader.getDefaultConsistencyLevel().map(consistencyLevel -> { + if (consistencyLevel == ConsistencyLevel.STRONG) { + int numberOfReadRegions = -1; + String headerValue = null; + if ((headerValue = response.getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS)) != null) { + numberOfReadRegions = Integer.parseInt(headerValue); + } - if (numberOfReadRegions > 0 && this.serviceConfigReader.getDefaultConsistencyLevel() == ConsistencyLevel.STRONG) { - return true; + if (numberOfReadRegions > 0 && consistencyLevel == ConsistencyLevel.STRONG) { + return true; + } } - } - return false; + return false; + }); } Mono barrierForGlobalStrong(RxDocumentServiceRequest request, StoreResponse response) { - try { - if (ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response)) { - Utils.ValueHolder lsn = Utils.ValueHolder.initialize(-1l); - Utils.ValueHolder globalCommittedLsn = Utils.ValueHolder.initialize(-1l); - - getLsnAndGlobalCommittedLsn(response, lsn, globalCommittedLsn); - if (lsn.v == -1 || globalCommittedLsn.v == -1) { - logger.error("ConsistencyWriter: lsn {} or GlobalCommittedLsn {} is not set for global strong request", - lsn, globalCommittedLsn); - throw new GoneException(RMResources.Gone); - } + return this.isGlobalStrongRequest(request, response).flatMap(isGlobalStrongRequest -> { + try { + if (ReplicatedResourceClient.isGlobalStrongEnabled() && isGlobalStrongRequest) { + Utils.ValueHolder lsn = Utils.ValueHolder.initialize(-1l); + Utils.ValueHolder globalCommittedLsn = Utils.ValueHolder.initialize(-1l); + + getLsnAndGlobalCommittedLsn(response, lsn, globalCommittedLsn); + if (lsn.v == -1 || globalCommittedLsn.v == -1) { + logger.error("ConsistencyWriter: lsn {} or GlobalCommittedLsn {} is not set for global strong request", + lsn, globalCommittedLsn); + throw new GoneException(RMResources.Gone); + } - request.requestContext.globalStrongWriteResponse = response; - request.requestContext.globalCommittedSelectedLSN = lsn.v; + request.requestContext.globalStrongWriteResponse = response; + request.requestContext.globalCommittedSelectedLSN = lsn.v; - //if necessary we would have already refreshed cache by now. - request.requestContext.forceRefreshAddressCache = false; + //if necessary we would have already refreshed cache by now. + request.requestContext.forceRefreshAddressCache = false; - logger.debug("ConsistencyWriter: globalCommittedLsn {}, lsn {}", globalCommittedLsn, lsn); - //barrier only if necessary, i.e. when write region completes write, but read regions have not. + logger.debug("ConsistencyWriter: globalCommittedLsn {}, lsn {}", globalCommittedLsn, lsn); + //barrier only if necessary, i.e. when write region completes write, but read regions have not. - if (globalCommittedLsn.v < lsn.v) { - Mono barrierRequestObs = BarrierRequestHelper.createAsync(request, - this.authorizationTokenProvider, - null, - request.requestContext.globalCommittedSelectedLSN); + if (globalCommittedLsn.v < lsn.v) { + Mono barrierRequestObs = BarrierRequestHelper.createAsync(request, + this.authorizationTokenProvider, + null, + request.requestContext.globalCommittedSelectedLSN); - return barrierRequestObs.flatMap(barrierRequest -> { - Mono barrierWait = this.waitForWriteBarrierAsync(barrierRequest, request.requestContext.globalCommittedSelectedLSN); + return barrierRequestObs.flatMap(barrierRequest -> { + Mono barrierWait = this.waitForWriteBarrierAsync(barrierRequest, request.requestContext.globalCommittedSelectedLSN); - return barrierWait.flatMap(res -> { - if (!res) { - logger.error("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", - request.requestContext.globalCommittedSelectedLSN); - // RxJava1 doesn't allow throwing checked exception - return Mono.error(new GoneException(RMResources.GlobalStrongWriteBarrierNotMet)); - } + return barrierWait.flatMap(res -> { + if (!res) { + logger.error("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", + request.requestContext.globalCommittedSelectedLSN); + // RxJava1 doesn't allow throwing checked exception + return Mono.error(new GoneException(RMResources.GlobalStrongWriteBarrierNotMet)); + } - return Mono.just(request.requestContext.globalStrongWriteResponse); - }); + return Mono.just(request.requestContext.globalStrongWriteResponse); + }); - }); + }); - } else { - return Mono.just(request.requestContext.globalStrongWriteResponse); - } - } else { - return Mono.just(response); - } + } else { + return Mono.just(request.requestContext.globalStrongWriteResponse); + } + } else { + return Mono.just(response); + } - } catch (CosmosClientException e) { - // RxJava1 doesn't allow throwing checked exception from Observable operators - return Mono.error(e); - } + } catch (CosmosClientException e) { + // RxJava1 doesn't allow throwing checked exception from Observable operators + return Mono.error(e); + } + }); } private Mono waitForWriteBarrierAsync(RxDocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java index d9f562885cac..4a3b72019f30 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java @@ -4,29 +4,12 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.BridgeInternal; -import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.RequestVerb; -import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider; -import com.azure.cosmos.implementation.Constants; -import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.GlobalEndpointManager; -import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ReplicationPolicy; -import com.azure.cosmos.implementation.UserAgentContainer; -import com.azure.cosmos.implementation.Utils; -import com.azure.cosmos.implementation.http.HttpClient; -import com.azure.cosmos.implementation.http.HttpHeaders; -import com.azure.cosmos.implementation.http.HttpRequest; -import com.azure.cosmos.implementation.http.HttpResponse; -import io.netty.handler.codec.http.HttpMethod; import reactor.core.publisher.Mono; -import java.net.MalformedURLException; import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; /** @@ -42,118 +25,28 @@ */ public class GatewayServiceConfigurationReader { - public static final String GATEWAY_READER_NOT_INITIALIZED = "GatewayServiceConfigurationReader has not been initialized"; - - public ReplicationPolicy userReplicationPolicy; - private ReplicationPolicy systemReplicationPolicy; - private ConsistencyLevel consistencyLevel; - private volatile boolean initialized; private URI serviceEndpoint; - private final ConnectionPolicy connectionPolicy; - private Map queryEngineConfiguration; - private final BaseAuthorizationTokenProvider baseAuthorizationTokenProvider; - private final boolean hasAuthKeyResourceToken; - private final String authKeyResourceToken; - private HttpClient httpClient; + private GlobalEndpointManager globalEndpointManager; - public GatewayServiceConfigurationReader(URI serviceEndpoint, boolean hasResourceToken, String resourceToken, - ConnectionPolicy connectionPolicy, BaseAuthorizationTokenProvider baseAuthorizationTokenProvider, - HttpClient httpClient) { + public GatewayServiceConfigurationReader(URI serviceEndpoint, GlobalEndpointManager globalEndpointManager) { this.serviceEndpoint = serviceEndpoint; - this.baseAuthorizationTokenProvider = baseAuthorizationTokenProvider; - this.hasAuthKeyResourceToken = hasResourceToken; - this.authKeyResourceToken = resourceToken; - this.connectionPolicy = connectionPolicy; - this.httpClient = httpClient; - } - - public ReplicationPolicy getUserReplicationPolicy() { - this.throwIfNotInitialized(); - return this.userReplicationPolicy; - } - - public ReplicationPolicy getSystemReplicationPolicy() { - this.throwIfNotInitialized(); - return this.systemReplicationPolicy; - } - - public boolean enableAuthorization() { - return true; - } - - public ConsistencyLevel getDefaultConsistencyLevel() { - this.throwIfNotInitialized(); - return this.consistencyLevel; - } - - public void setDefaultConsistencyLevel(ConsistencyLevel value) { - this.throwIfNotInitialized(); - this.consistencyLevel = value; + this.globalEndpointManager = globalEndpointManager; + this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).block(); } - public Map getQueryEngineConfiguration() { - this.throwIfNotInitialized(); - return this.queryEngineConfiguration; + public Mono getUserReplicationPolicy() { + return this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).map(databaseAccount -> BridgeInternal.getReplicationPolicy(databaseAccount)); } - private Mono getDatabaseAccountAsync(URI serviceEndpoint) { - - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.set(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION); - - UserAgentContainer userAgentContainer = new UserAgentContainer(); - String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix(); - if (userAgentSuffix != null && userAgentSuffix.length() > 0) { - userAgentContainer.setSuffix(userAgentSuffix); - } - - httpHeaders.set(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); - httpHeaders.set(HttpConstants.HttpHeaders.API_TYPE, Constants.Properties.SQL_API_TYPE); - - String xDate = Utils.nowAsRFC1123(); - httpHeaders.set(HttpConstants.HttpHeaders.X_DATE, xDate); - - String authorizationToken; - if (this.hasAuthKeyResourceToken || baseAuthorizationTokenProvider == null) { - authorizationToken = HttpUtils.urlEncode(this.authKeyResourceToken); - } else { - // Retrieve the document service properties. - Map header = new HashMap<>(); - header.put(HttpConstants.HttpHeaders.X_DATE, xDate); - authorizationToken = baseAuthorizationTokenProvider - .generateKeyAuthorizationSignature(RequestVerb.GET, serviceEndpoint, header); - } - httpHeaders.set(HttpConstants.HttpHeaders.AUTHORIZATION, authorizationToken); - - HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, serviceEndpoint, serviceEndpoint.getPort(), httpHeaders); - Mono httpResponse = httpClient.send(httpRequest); - return toDatabaseAccountObservable(httpResponse, httpRequest); + public Mono getSystemReplicationPolicy() { + return this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).map(databaseAccount -> BridgeInternal.getSystemReplicationPolicy(databaseAccount)); } - public Mono initializeReaderAsync() { - return GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.serviceEndpoint, - - new ArrayList<>(this.connectionPolicy.getPreferredLocations()), url -> { - return getDatabaseAccountAsync(url); - - }).doOnSuccess(databaseAccount -> { - userReplicationPolicy = BridgeInternal.getReplicationPolicy(databaseAccount); - systemReplicationPolicy = BridgeInternal.getSystemReplicationPolicy(databaseAccount); - queryEngineConfiguration = BridgeInternal.getQueryEngineConfiuration(databaseAccount); - consistencyLevel = BridgeInternal.getConsistencyPolicy(databaseAccount).getDefaultConsistencyLevel(); - initialized = true; - }); - } - - private Mono toDatabaseAccountObservable(Mono httpResponse, HttpRequest httpRequest) { - - return HttpClientUtils.parseResponseAsync(httpResponse, httpRequest) - .map(rxDocumentServiceResponse -> rxDocumentServiceResponse.getResource(DatabaseAccount.class)); + public Mono getDefaultConsistencyLevel() { + return this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).map(databaseAccount -> BridgeInternal.getConsistencyPolicy(databaseAccount).getDefaultConsistencyLevel()); } - private void throwIfNotInitialized() { - if (!this.initialized) { - throw new IllegalArgumentException(GATEWAY_READER_NOT_INITIALIZED); - } + public Mono> getQueryEngineConfiguration() { + return this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).map(databaseAccount -> BridgeInternal.getQueryEngineConfiuration(databaseAccount)); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index 3c3efab0ea86..e7fa61391170 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -315,38 +315,40 @@ private Mono readLsn = new ValueHolder(-1); - ValueHolder globalCommittedLSN = new ValueHolder(-1); - ValueHolder storeResult = new ValueHolder(null); - - if (this.isQuorumMet( - responseResult, - readQuorum, - false, - isGlobalStrongReadCandidate, - readLsn, - globalCommittedLSN, - storeResult)) { - return Mono.just(Pair.of(new ReadQuorumResult( - entity.requestContext.requestChargeTracker, - ReadQuorumResultKind.QuorumMet, - readLsn.v, - globalCommittedLSN.v, - storeResult.v, - storeResponses), null)); - } + return this.serviceConfigReader.getDefaultConsistencyLevel().flatMap(consistencyLevel -> { + //either request overrides consistency level with strong, or request does not override and account default consistency level is strong + boolean isGlobalStrongReadCandidate = + (ReplicatedResourceClient.isGlobalStrongEnabled() && consistencyLevel == ConsistencyLevel.STRONG) && + (entity.requestContext.originalRequestConsistencyLevel == null || entity.requestContext.originalRequestConsistencyLevel == ConsistencyLevel.STRONG); + + ValueHolder readLsn = new ValueHolder(-1); + ValueHolder globalCommittedLSN = new ValueHolder(-1); + ValueHolder storeResult = new ValueHolder(null); + + if (this.isQuorumMet( + responseResult, + readQuorum, + false, + isGlobalStrongReadCandidate, + readLsn, + globalCommittedLSN, + storeResult)) { + return Mono.just(Pair.of(new ReadQuorumResult( + entity.requestContext.requestChargeTracker, + ReadQuorumResultKind.QuorumMet, + readLsn.v, + globalCommittedLSN.v, + storeResult.v, + storeResponses), null)); + } - // at this point, if refresh were necessary, we would have refreshed it in ReadMultipleReplicaAsync - // so set to false here to avoid further refrehses for this request. - entity.requestContext.forceRefreshAddressCache = false; + // at this point, if refresh were necessary, we would have refreshed it in ReadMultipleReplicaAsync + // so set to false here to avoid further refrehses for this request. + entity.requestContext.forceRefreshAddressCache = false; - Quadruple> state = Quadruple.with(readLsn.v, globalCommittedLSN.v, storeResult.v, storeResponses); - return Mono.just(Pair.of(null, state)); + Quadruple> state = Quadruple.with(readLsn.v, globalCommittedLSN.v, storeResult.v, storeResponses); + return Mono.just(Pair.of(null, state)); + }); } ); } else { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClient.java index 1398a124839c..df160e88ceb8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClient.java @@ -146,13 +146,15 @@ public Mono invokeAsync(RxDocumentServiceRequest request, }; } - int retryTimeout = this.serviceConfigReader.getDefaultConsistencyLevel() == ConsistencyLevel.STRONG ? + Function, Mono> finalInBackoffFuncDelegate = inBackoffFuncDelegate; + return this.serviceConfigReader.getDefaultConsistencyLevel().flatMap(consistencyLevel -> { + int retryTimeout = consistencyLevel == ConsistencyLevel.STRONG ? ReplicatedResourceClient.STRONG_GONE_AND_RETRY_WITH_RETRY_TIMEOUT_SECONDS : ReplicatedResourceClient.GONE_AND_RETRY_WITH_TIMEOUT_IN_SECONDS; - - return BackoffRetryUtility.executeAsync(funcDelegate, new GoneAndRetryWithRetryPolicy(request, retryTimeout), - inBackoffFuncDelegate, Duration.ofSeconds( - ReplicatedResourceClient.MIN_BACKOFF_FOR_FAILLING_BACK_TO_OTHER_REGIONS_FOR_READ_REQUESTS_IN_SECONDS), request); + return BackoffRetryUtility.executeAsync(funcDelegate, new GoneAndRetryWithRetryPolicy(request, retryTimeout), + finalInBackoffFuncDelegate, Duration.ofSeconds( + ReplicatedResourceClient.MIN_BACKOFF_FOR_FAILLING_BACK_TO_OTHER_REGIONS_FOR_READ_REQUESTS_IN_SECONDS), request); + }); } private Mono invokeAsync(RxDocumentServiceRequest request, TimeoutHelper timeout, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RequestHelper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RequestHelper.java index 3b0217198fdb..e0de71faabbc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RequestHelper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RequestHelper.java @@ -10,27 +10,29 @@ import com.azure.cosmos.implementation.RMResources; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.Strings; +import reactor.core.publisher.Mono; public class RequestHelper { - public static ConsistencyLevel GetConsistencyLevelToUse(GatewayServiceConfigurationReader serviceConfigReader, - RxDocumentServiceRequest request) throws CosmosClientException { - ConsistencyLevel consistencyLevelToUse = serviceConfigReader.getDefaultConsistencyLevel(); + public static Mono GetConsistencyLevelToUse(GatewayServiceConfigurationReader serviceConfigReader, + RxDocumentServiceRequest request) throws CosmosClientException { + return serviceConfigReader.getDefaultConsistencyLevel().map(consistencyLevelToUse -> { - String requestConsistencyLevelHeaderValue = request.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); + String requestConsistencyLevelHeaderValue = request.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); if (!Strings.isNullOrEmpty(requestConsistencyLevelHeaderValue)) { ConsistencyLevel requestConsistencyLevel = ConsistencyLevel.fromServiceSerializedFormat(requestConsistencyLevelHeaderValue); if (requestConsistencyLevel == null) { throw new BadRequestException( String.format( - RMResources.InvalidHeaderValue, - requestConsistencyLevelHeaderValue, - HttpConstants.HttpHeaders.CONSISTENCY_LEVEL)); - } + RMResources.InvalidHeaderValue, + requestConsistencyLevelHeaderValue, + HttpConstants.HttpHeaders.CONSISTENCY_LEVEL)); + } - consistencyLevelToUse = requestConsistencyLevel; - } + consistencyLevelToUse = requestConsistencyLevel; + } - return consistencyLevelToUse; + return consistencyLevelToUse; + }); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClient.java index f2cced7ad666..c1e660e621a8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClient.java @@ -86,46 +86,45 @@ public Mono processMessageAsync(RxDocumentServiceRequ return Mono.error(e); } - storeResponse = storeResponse.doOnError(e -> { - try { - Throwable unwrappedException = reactor.core.Exceptions.unwrap(e); - CosmosClientException exception = Utils.as(unwrappedException, CosmosClientException.class); - - if (exception == null) { - return; - } - - BridgeInternal.recordRetryContext(request.requestContext.cosmosResponseDiagnostics, request); - exception = BridgeInternal.setCosmosResponseDiagnostics(exception, request.requestContext.cosmosResponseDiagnostics); - - handleUnsuccessfulStoreResponse(request, exception); - } catch (Throwable throwable) { - logger.error("Unexpected failure in handling orig [{}]", e.getMessage(), e); - logger.error("Unexpected failure in handling orig [{}] : new [{}]", e.getMessage(), throwable.getMessage(), throwable); + storeResponse = storeResponse.onErrorResume(e -> { + try { + Throwable unwrappedException = reactor.core.Exceptions.unwrap(e); + CosmosClientException exception = Utils.as(unwrappedException, CosmosClientException.class); + + if (exception == null) { + return Mono.empty(); } + + exception = BridgeInternal.setCosmosResponseDiagnostics(exception, request.requestContext.cosmosResponseDiagnostics); + return handleUnsuccessfulStoreResponse(request, exception).then(Mono.error(e)); + } catch (Throwable throwable) { + logger.error("Unexpected failure in handling orig [{}]", e.getMessage(), e); + logger.error("Unexpected failure in handling orig [{}] : new [{}]", e.getMessage(), throwable.getMessage(), throwable); } - ); + return Mono.error(e); + }); return storeResponse.flatMap(sr -> { try { - return Mono.just(this.completeResponse(sr, request)); + return this.completeResponse(sr, request); } catch (Exception e) { return Mono.error(e); } }); } - private void handleUnsuccessfulStoreResponse(RxDocumentServiceRequest request, CosmosClientException exception) { - this.updateResponseHeader(request, exception.getResponseHeaders()); - if ((!ReplicatedResourceClient.isMasterResource(request.getResourceType())) && + private Mono handleUnsuccessfulStoreResponse(RxDocumentServiceRequest request, CosmosClientException exception) { + return this.updateResponseHeader(request, exception.getResponseHeaders()).doOnSuccess(Void -> { + if ((!ReplicatedResourceClient.isMasterResource(request.getResourceType())) && (Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.PRECONDITION_FAILED) || Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.CONFLICT) || - (Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.NOTFOUND) && - !Exceptions.isSubStatusCode(exception, HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE)))) { - this.captureSessionToken(request, exception.getResponseHeaders()); - } + (Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.NOTFOUND) && + !Exceptions.isSubStatusCode(exception, HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE)))) { + this.captureSessionToken(request, exception.getResponseHeaders()); + } + }); } - private RxDocumentServiceResponse completeResponse( + private Mono completeResponse( StoreResponse storeResponse, RxDocumentServiceRequest request) throws InternalServerErrorException { if (storeResponse.getResponseHeaderNames().length != storeResponse.getResponseHeaderValues().length) { @@ -140,11 +139,11 @@ private RxDocumentServiceResponse completeResponse( headers.put(name, value); } - this.updateResponseHeader(request, headers); - this.captureSessionToken(request, headers); + return this.updateResponseHeader(request, headers).doOnSuccess(aVoid -> { + this.captureSessionToken(request, headers); BridgeInternal.recordRetryContext(request.requestContext.cosmosResponseDiagnostics, request); storeResponse.setCosmosResponseDiagnostics(request.requestContext.cosmosResponseDiagnostics); - return new RxDocumentServiceResponse(storeResponse); + }).then(Mono.just(new RxDocumentServiceResponse(storeResponse))); } private long getLSN(Map headers) { @@ -159,44 +158,46 @@ private long getLSN(Map headers) { return defaultValue; } - private void updateResponseHeader(RxDocumentServiceRequest request, Map headers) { - String requestConsistencyLevel = request.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); + private Mono updateResponseHeader(RxDocumentServiceRequest request, Map headers) { + return this.serviceConfigurationReader.getDefaultConsistencyLevel().doOnSuccess(consistencyLevel -> { + String requestConsistencyLevel = request.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); - boolean sessionConsistency = - this.serviceConfigurationReader.getDefaultConsistencyLevel() == ConsistencyLevel.SESSION || - (!Strings.isNullOrEmpty(requestConsistencyLevel) - && Strings.areEqualIgnoreCase(requestConsistencyLevel, ConsistencyLevel.SESSION.toString())); + boolean sessionConsistency = + consistencyLevel == ConsistencyLevel.SESSION || + (!Strings.isNullOrEmpty(requestConsistencyLevel) + && Strings.areEqualIgnoreCase(requestConsistencyLevel, ConsistencyLevel.SESSION.toString())); - long storeLSN = this.getLSN(headers); - if (storeLSN == -1) { - return; - } + long storeLSN = this.getLSN(headers); + if (storeLSN == -1) { + return; + } - String partitionKeyRangeId = headers.get(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); + String partitionKeyRangeId = headers.get(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); - if (Strings.isNullOrEmpty(partitionKeyRangeId)) { - String inputSession = request.getHeaders().get(HttpConstants.HttpHeaders.SESSION_TOKEN); - if (!Strings.isNullOrEmpty(inputSession) + if (Strings.isNullOrEmpty(partitionKeyRangeId)) { + String inputSession = request.getHeaders().get(HttpConstants.HttpHeaders.SESSION_TOKEN); + if (!Strings.isNullOrEmpty(inputSession) && inputSession.indexOf(ISessionToken.PARTITION_KEY_RANGE_SESSION_SEPARATOR) >= 1) { - partitionKeyRangeId = inputSession.substring(0, + partitionKeyRangeId = inputSession.substring(0, inputSession.indexOf(ISessionToken.PARTITION_KEY_RANGE_SESSION_SEPARATOR)); - } else { - partitionKeyRangeId = ZERO_PARTITION_KEY_RANGE; + } else { + partitionKeyRangeId = ZERO_PARTITION_KEY_RANGE; + } } - } - ISessionToken sessionToken = null; - String sessionTokenResponseHeader = headers.get(HttpConstants.HttpHeaders.SESSION_TOKEN); - if (!Strings.isNullOrEmpty(sessionTokenResponseHeader)) { - sessionToken = SessionTokenHelper.parse(sessionTokenResponseHeader); - } + ISessionToken sessionToken = null; + String sessionTokenResponseHeader = headers.get(HttpConstants.HttpHeaders.SESSION_TOKEN); + if (!Strings.isNullOrEmpty(sessionTokenResponseHeader)) { + sessionToken = SessionTokenHelper.parse(sessionTokenResponseHeader); + } - if (sessionToken != null) { - headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, - SessionTokenHelper.concatPartitionKeyRangeIdWithSessionToken(partitionKeyRangeId, sessionToken.convertToString())); - } + if (sessionToken != null) { + headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, + SessionTokenHelper.concatPartitionKeyRangeIdWithSessionToken(partitionKeyRangeId, sessionToken.convertToString())); + } - headers.remove(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); + headers.remove(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); + }).then(); } private void captureSessionToken(RxDocumentServiceRequest request, Map headers) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java index 9131bba96149..5a22471cfd49 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java @@ -105,7 +105,7 @@ public Flux> executeAsync() { int maxPageSize = newFeedOptions.maxItemCount() != null ? newFeedOptions.maxItemCount() : Constants.Properties.DEFAULT_MAX_PAGE_SIZE; - BiFunction createRequestFunc = (continuationToken, pageSize) -> this.createRequestAsync(continuationToken, pageSize); + BiFunction> createRequestFunc = (continuationToken, pageSize) -> this.createRequestAsync(continuationToken, pageSize); // TODO: clean up if we want to use single vs observable. Function>> executeFunc = executeInternalAsyncFunc(); @@ -181,55 +181,55 @@ private Mono> executeOnceAsync(DocumentClientRetryPolicy retryPo // The code leaves some temporary garbage in request (in RequestContext etc.), // which shold be erased during retries. - RxDocumentServiceRequest request = this.createRequestAsync(continuationToken, this.feedOptions.maxItemCount()); - if (retryPolicyInstance != null) { - retryPolicyInstance.onBeforeSendRequest(request); - } + return this.createRequestAsync(continuationToken, this.feedOptions.maxItemCount()).flatMap(request -> { + if (retryPolicyInstance != null) { + retryPolicyInstance.onBeforeSendRequest(request); + } - if (!Strings.isNullOrEmpty(request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY)) + if (!Strings.isNullOrEmpty(request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY)) || !request.getResourceType().isPartitioned()) { - return this.executeRequestAsync(request); - } + return this.executeRequestAsync(request); + } - // TODO: remove this as partition key range id is not relevant - // TODO; has to be rx async - //CollectionCache collectionCache = this.client.getCollectionCache(); + // TODO: remove this as partition key range id is not relevant + // TODO; has to be rx async + //CollectionCache collectionCache = this.client.getCollectionCache(); - // TODO: has to be rx async - //DocumentCollection collection = - // collectionCache.resolveCollection(request); + // TODO: has to be rx async + //DocumentCollection collection = + // collectionCache.resolveCollection(request); - // TODO: this code is not relevant because partition key range id should not be exposed - // if (!Strings.isNullOrEmpty(super.getPartitionKeyId())) - // { - // request.RouteTo(new PartitionKeyRangeIdentity(collection.ResourceId, base.PartitionKeyRangeId)); - // return await this.ExecuteRequestAsync(request); - // } + // TODO: this code is not relevant because partition key range id should not be exposed + // if (!Strings.isNullOrEmpty(super.getPartitionKeyId())) + // { + // request.RouteTo(new PartitionKeyRangeIdentity(collection.ResourceId, base.PartitionKeyRangeId)); + // return await this.ExecuteRequestAsync(request); + // } - request.UseGatewayMode = true; - return this.executeRequestAsync(request); + request.UseGatewayMode = true; + return this.executeRequestAsync(request); + }); } - public RxDocumentServiceRequest createRequestAsync(String continuationToken, Integer maxPageSize) { - - // TODO this should be async - Map requestHeaders = this.createCommonHeadersAsync( - this.getFeedOptions(continuationToken, maxPageSize)); + public Mono createRequestAsync(String continuationToken, Integer maxPageSize) { - // TODO: add support for simple continuation for single partition query - //requestHeaders.put(keyHttpConstants.HttpHeaders.IsContinuationExpected, isContinuationExpected.ToString()) + return this.createCommonHeadersAsync( + this.getFeedOptions(continuationToken, maxPageSize)).map(requestHeaders -> { + // TODO: add support for simple continuation for single partition query + //requestHeaders.put(keyHttpConstants.HttpHeaders.IsContinuationExpected, isContinuationExpected.ToString()) - RxDocumentServiceRequest request = this.createDocumentServiceRequest( + RxDocumentServiceRequest request = this.createDocumentServiceRequest( requestHeaders, this.query, this.getPartitionKeyInternal()); - if (!StringUtils.isEmpty(partitionKeyRangeIdInternal(feedOptions))) { - request.routeTo(new PartitionKeyRangeIdentity(partitionKeyRangeIdInternal(feedOptions))); - } + if (!StringUtils.isEmpty(partitionKeyRangeIdInternal(feedOptions))) { + request.routeTo(new PartitionKeyRangeIdentity(partitionKeyRangeIdInternal(feedOptions))); + } - return request; + return request; + }); } private static boolean isClientSideContinuationToken(String continuationToken) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java index 2396e91fae7e..f7d9142a3b86 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentProducer.java @@ -155,8 +155,8 @@ public DocumentProducer( } public Flux produceAsync() { - BiFunction sourcePartitionCreateRequestFunc = - (token, maxItemCount) -> createRequestFunc.apply(targetRange, token, maxItemCount); + BiFunction> sourcePartitionCreateRequestFunc = + (token, maxItemCount) -> Mono.just(createRequestFunc.apply(targetRange, token, maxItemCount)); Flux> obs = Paginator .getPaginatedQueryResultAsObservable( feedOptions.requestContinuation(), diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java index 02c216706ee5..fb68f832a3ac 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java @@ -123,59 +123,62 @@ private Mono executeQueryRequestInternalAsync(RxDocum return this.client.executeQueryAsync(request); } - public Map createCommonHeadersAsync(FeedOptions feedOptions) { + public Mono> createCommonHeadersAsync(FeedOptions feedOptions) { Map requestHeaders = new HashMap<>(); - ConsistencyLevel defaultConsistencyLevel = this.client.getDefaultConsistencyLevelAsync(); - ConsistencyLevel desiredConsistencyLevel = this.client.getDesiredConsistencyLevelAsync(); - if (!Strings.isNullOrEmpty(feedOptions.getSessionToken()) - && !ReplicatedResourceClientUtils.isReadingFromMaster(this.resourceTypeEnum, OperationType.ReadFeed)) { - if (defaultConsistencyLevel == ConsistencyLevel.SESSION - || (desiredConsistencyLevel == ConsistencyLevel.SESSION)) { - // Query across partitions is not supported today. Master resources (for e.g., - // database) - // can span across partitions, whereas server resources (viz: collection, - // document and attachment) - // don't span across partitions. Hence, session token returned by one partition - // should not be used - // when quering resources from another partition. - // Since master resources can span across partitions, don't send session token - // to the backend. - // As master resources are sync replicated, we should always get consistent - // query result for master resources, - // irrespective of the chosen replica. - // For server resources, which don't span partitions, specify the session token - // for correct replica to be chosen for servicing the query result. - requestHeaders.put(HttpConstants.HttpHeaders.SESSION_TOKEN, feedOptions.getSessionToken()); + return this.client.getDefaultConsistencyLevelAsync().map(defaultConsistencyLevel -> { + ConsistencyLevel desiredConsistencyLevel = this.client.getDesiredConsistencyLevelAsync(); + if (!Strings.isNullOrEmpty(feedOptions.getSessionToken()) + && !ReplicatedResourceClientUtils.isReadingFromMaster(this.resourceTypeEnum, OperationType.ReadFeed)) { + if (defaultConsistencyLevel == ConsistencyLevel.SESSION + || (desiredConsistencyLevel == ConsistencyLevel.SESSION)) { + // Query across partitions is not supported today. Master resources (for e.g., + // database) + // can span across partitions, whereas server resources (viz: collection, + // document and attachment) + // don't span across partitions. Hence, session token returned by one partition + // should not be used + // when quering resources from another partition. + // Since master resources can span across partitions, don't send session token + // to the backend. + // As master resources are sync replicated, we should always get consistent + // query result for master resources, + // irrespective of the chosen replica. + // For server resources, which don't span partitions, specify the session token + // for correct replica to be chosen for servicing the query result. + requestHeaders.put(HttpConstants.HttpHeaders.SESSION_TOKEN, feedOptions.getSessionToken()); + } + } + + requestHeaders.put(HttpConstants.HttpHeaders.CONTINUATION, feedOptions.requestContinuation()); + requestHeaders.put(HttpConstants.HttpHeaders.IS_QUERY, Strings.toString(true)); + + // Flow the pageSize only when we are not doing client eval + if (feedOptions.maxItemCount() != null && feedOptions.maxItemCount() > 0) { + requestHeaders.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(feedOptions.maxItemCount())); + } + + if (feedOptions.getMaxDegreeOfParallelism() != 0) { + requestHeaders.put(HttpConstants.HttpHeaders.PARALLELIZE_CROSS_PARTITION_QUERY, Strings.toString(true)); + } + + if (this.feedOptions.setResponseContinuationTokenLimitInKb() > 0) { + requestHeaders.put(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB, + Strings.toString(feedOptions.setResponseContinuationTokenLimitInKb())); + } + + if (desiredConsistencyLevel != null) { + requestHeaders.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, desiredConsistencyLevel.toString()); + } + + if(feedOptions.populateQueryMetrics()){ + requestHeaders.put(HttpConstants.HttpHeaders.POPULATE_QUERY_METRICS, String.valueOf(feedOptions.populateQueryMetrics())); + } + + return requestHeaders; } - } - - requestHeaders.put(HttpConstants.HttpHeaders.CONTINUATION, feedOptions.requestContinuation()); - requestHeaders.put(HttpConstants.HttpHeaders.IS_QUERY, Strings.toString(true)); - - // Flow the pageSize only when we are not doing client eval - if (feedOptions.maxItemCount() != null && feedOptions.maxItemCount() > 0) { - requestHeaders.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(feedOptions.maxItemCount())); - } - - if (feedOptions.getMaxDegreeOfParallelism() != 0) { - requestHeaders.put(HttpConstants.HttpHeaders.PARALLELIZE_CROSS_PARTITION_QUERY, Strings.toString(true)); - } - - if (this.feedOptions.setResponseContinuationTokenLimitInKb() > 0) { - requestHeaders.put(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB, - Strings.toString(feedOptions.setResponseContinuationTokenLimitInKb())); - } - - if (desiredConsistencyLevel != null) { - requestHeaders.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, desiredConsistencyLevel.toString()); - } - - if(feedOptions.populateQueryMetrics()){ - requestHeaders.put(HttpConstants.HttpHeaders.POPULATE_QUERY_METRICS, String.valueOf(feedOptions.populateQueryMetrics())); - } + ); - return requestHeaders; } private void populatePartitionKeyInfo(RxDocumentServiceRequest request, PartitionKeyInternal partitionKey) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java index 3791a644f4b4..b8fe16ce9556 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java @@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.function.BiFunction; @@ -18,7 +19,7 @@ class Fetcher { private final static Logger logger = LoggerFactory.getLogger(Fetcher.class); - private final BiFunction createRequestFunc; + private final BiFunction> createRequestFunc; private final Function>> executeFunc; private final boolean isChangeFeed; @@ -27,7 +28,7 @@ class Fetcher { private volatile int top; private volatile String continuationToken; - public Fetcher(BiFunction createRequestFunc, + public Fetcher(BiFunction> createRequestFunc, Function>> executeFunc, String continuationToken, boolean isChangeFeed, @@ -54,8 +55,7 @@ public boolean shouldFetchMore() { } public Mono> nextPage() { - RxDocumentServiceRequest request = createRequest(); - return nextPage(request); + return createRequest().flatMap(request -> nextPage(request)); } private void updateState(FeedResponse response) { @@ -82,7 +82,7 @@ private void updateState(FeedResponse response) { isChangeFeed, continuationToken, maxItemCount, shouldFetchMore); } - private RxDocumentServiceRequest createRequest() { + private Mono createRequest() { if (!shouldFetchMore) { // this should never happen logger.error("invalid state, trying to fetch more after completion"); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java index a6e5b592b113..e734ef9c8c5e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/IDocumentQueryClient.java @@ -35,13 +35,13 @@ public interface IDocumentQueryClient { /** * TODO: this should be async returning observable - * @return + * @return */ - ConsistencyLevel getDefaultConsistencyLevelAsync(); + Mono getDefaultConsistencyLevelAsync(); /** * TODO: this should be async returning observable - * @return + * @return */ ConsistencyLevel getDesiredConsistencyLevelAsync(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java index 2869f5f0bfbb..a5c06930ce0f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java @@ -104,24 +104,24 @@ public static Flux> cre correlatedActivityId); try { - context.initialize(partitionKeyRanges, + Mono> monoContext = context.initialize(partitionKeyRanges, partitionedQueryExecutionInfo.getQueryInfo().getOrderBy(), partitionedQueryExecutionInfo.getQueryInfo().getOrderByExpressions(), initialPageSize, - feedOptions.requestContinuation()); - - return Flux.just(context); + feedOptions.requestContinuation()).then(Mono.just(context)); + return monoContext.flux(); } catch (CosmosClientException dce) { return Flux.error(dce); } } - private void initialize( + private Mono initialize( List partitionKeyRanges, List sortOrders, Collection orderByExpressions, int initialPageSize, String continuationToken) throws CosmosClientException { + Mono init; if (continuationToken == null) { // First iteration so use null continuation tokens and "true" filters Map partitionKeyRangeToContinuationToken = new HashMap(); @@ -130,7 +130,7 @@ private void initialize( null); } - super.initialize(collectionRid, + init = super.initialize(collectionRid, partitionKeyRangeToContinuationToken, initialPageSize, new SqlQuerySpec(querySpec.getQueryText().replace(FormatPlaceHolder, @@ -173,41 +173,46 @@ private void initialize( // Left String filterForRangesLeftOfTheTargetRange = formattedFilterInfo.getFilterForRangesLeftOfTheTargetRange(); - this.initializeRangeWithContinuationTokenAndFilter(partitionKeyRanges, + init = this.initializeRangeWithContinuationTokenAndFilter(partitionKeyRanges, /* startInclusive */ 0, /* endExclusive */ targetIndex, /* continuationToken */ null, filterForRangesLeftOfTheTargetRange, - initialPageSize); - - // Target - String filterForTargetRange = formattedFilterInfo.getFilterForTargetRange(); - this.initializeRangeWithContinuationTokenAndFilter(partitionKeyRanges, + initialPageSize).then( // Target + this.initializeRangeWithContinuationTokenAndFilter(partitionKeyRanges, /* startInclusive */ targetIndex, /* endExclusive */ targetIndex + 1, null, - filterForTargetRange, - initialPageSize); - - // Right - String filterForRangesRightOfTheTargetRange = formattedFilterInfo.getFilterForRangesRightOfTheTargetRange(); - this.initializeRangeWithContinuationTokenAndFilter(partitionKeyRanges, + formattedFilterInfo.getFilterForTargetRange(), + initialPageSize)).then( // Right + // String filterForRangesRightOfTheTargetRange = formattedFilterInfo.getFilterForRangesRightOfTheTargetRange(); + this.initializeRangeWithContinuationTokenAndFilter(partitionKeyRanges, /* startInclusive */ targetIndex + 1, /* endExclusive */ partitionKeyRanges.size(), /* continuationToken */ null, - filterForRangesRightOfTheTargetRange, - initialPageSize); + formattedFilterInfo.getFilterForRangesRightOfTheTargetRange(), + initialPageSize) + ).doOnSuccess(aVoid -> { + orderByObservable = OrderByUtils.orderedMerge(resourceType, + consumeComparer, + tracker, + documentProducers, + queryMetricMap, + targetRangeToOrderByContinuationTokenMap); + }); } - orderByObservable = OrderByUtils.orderedMerge(resourceType, + return init.doOnSuccess(aVoid -> { + orderByObservable = OrderByUtils.orderedMerge(resourceType, consumeComparer, tracker, documentProducers, queryMetricMap, targetRangeToOrderByContinuationTokenMap); + }); } - private void initializeRangeWithContinuationTokenAndFilter( + private Mono initializeRangeWithContinuationTokenAndFilter( List partitionKeyRanges, int startInclusive, int endExclusive, @@ -221,7 +226,7 @@ private void initializeRangeWithContinuationTokenAndFilter( continuationToken); } - super.initialize(collectionRid, + return super.initialize(collectionRid, partitionKeyRangeToContinuationToken, initialPageSize, new SqlQuerySpec(querySpec.getQueryText().replace(FormatPlaceHolder, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java index d23d29c3a42c..a622b04ce50f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java @@ -24,7 +24,7 @@ public class Paginator { private final static Logger logger = LoggerFactory.getLogger(Paginator.class); public static Flux> getPaginatedChangeFeedQueryResultAsObservable( - ChangeFeedOptions feedOptions, BiFunction createRequestFunc, + ChangeFeedOptions feedOptions, BiFunction> createRequestFunc, Function>> executeFunc, Class resourceType, int maxPageSize) { return getPaginatedQueryResultAsObservable(feedOptions.getRequestContinuation(), createRequestFunc, executeFunc, resourceType, @@ -33,27 +33,27 @@ public static Flux> getPaginatedChangeFeedQ public static Flux> getPaginatedQueryResultAsObservable( FeedOptions feedOptions, - BiFunction createRequestFunc, + BiFunction> createRequestFunc, Function>> executeFunc, Class resourceType, int maxPageSize) { return getPaginatedQueryResultAsObservable(feedOptions.requestContinuation(), createRequestFunc, executeFunc, resourceType, - -1, maxPageSize); + -1, maxPageSize); } public static Flux> getPaginatedQueryResultAsObservable( - String continuationToken, - BiFunction createRequestFunc, - Function>> executeFunc, Class resourceType, - int top, int maxPageSize) { + String continuationToken, + BiFunction> createRequestFunc, + Function>> executeFunc, Class resourceType, + int top, int maxPageSize) { return getPaginatedQueryResultAsObservable(continuationToken, createRequestFunc, executeFunc, resourceType, - top, maxPageSize, false); + top, maxPageSize, false); } private static Flux> getPaginatedQueryResultAsObservable( - String continuationToken, - BiFunction createRequestFunc, - Function>> executeFunc, Class resourceType, - int top, int maxPageSize, boolean isChangeFeed) { + String continuationToken, + BiFunction> createRequestFunc, + Function>> executeFunc, Class resourceType, + int top, int maxPageSize, boolean isChangeFeed) { return Flux.defer(() -> { Flux>> generate = Flux.generate(() -> diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java index 1009b27a6a51..902459798d4d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java @@ -90,11 +90,11 @@ public static Flux> cre correlatedActivityId); try { - context.initialize(collectionRid, + Mono> monoContext = context.initialize(collectionRid, targetRanges, initialPageSize, - feedOptions.requestContinuation()); - return Flux.just(context); + feedOptions.requestContinuation()).then(Mono.just(context)); + return monoContext.flux(); } catch (CosmosClientException dce) { return Flux.error(dce); } @@ -106,7 +106,7 @@ public static Flux> cre Map rangeQueryMap, FeedOptions feedOptions, String collectionRid, String collectionLink, UUID activityId, Class klass, ResourceType resourceTypeEnum) { - + List ranges = new ArrayList<>(); ranges.addAll(rangeQueryMap.keySet()); @@ -123,14 +123,14 @@ public static Flux> cre false, activityId); - context + Mono> monoContext = context .initializeReadMany(queryClient, collectionResourceId, sqlQuery, rangeQueryMap, feedOptions, - activityId, collectionRid); - return Flux.just(context); + activityId, collectionRid).then(Mono.just(context)); + return monoContext.flux(); } - - private void initialize( + + private Mono initialize( String collectionRid, List targetRanges, int initialPageSize, @@ -183,7 +183,7 @@ private void initialize( } } - super.initialize(collectionRid, + return super.initialize(collectionRid, partitionKeyRangeToContinuationTokenMap, initialPageSize, this.querySpec); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java index a8b463fdb7fb..f0cf75ba4a34 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java @@ -63,40 +63,41 @@ protected ParallelDocumentQueryExecutionContextBase(IDocumentQueryClient client, } } - protected void initialize(String collectionRid, - Map partitionKeyRangeToContinuationTokenMap, int initialPageSize, - SqlQuerySpec querySpecForInit) { - this.pageSize = initialPageSize; - Map commonRequestHeaders = createCommonHeadersAsync(this.getFeedOptions(null, null)); - - for (PartitionKeyRange targetRange : partitionKeyRangeToContinuationTokenMap.keySet()) { - TriFunction createRequestFunc = (partitionKeyRange, - continuationToken, pageSize) -> { - Map headers = new HashMap<>(commonRequestHeaders); - headers.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken); - headers.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(pageSize)); - - PartitionKeyInternal partitionKeyInternal = null; - if (feedOptions.partitionKey() != null && feedOptions.partitionKey() != PartitionKey.NONE) { - partitionKeyInternal = BridgeInternal.getPartitionKeyInternal(feedOptions.partitionKey()); - headers.put(HttpConstants.HttpHeaders.PARTITION_KEY, - partitionKeyInternal.toJson()); - - } - return this.createDocumentServiceRequest(headers, querySpecForInit, partitionKeyInternal, partitionKeyRange, collectionRid); - }; - - Function>> executeFunc = (request) -> { - return this.executeRequestAsync(request); - }; - - DocumentProducer dp = createDocumentProducer(collectionRid, targetRange, + protected Mono initialize(String collectionRid, + Map partitionKeyRangeToContinuationTokenMap, int initialPageSize, + SqlQuerySpec querySpecForInit) { + return createCommonHeadersAsync(this.getFeedOptions(null, null)).doOnSuccess(commonRequestHeaders -> { + this.pageSize = initialPageSize; + + for (PartitionKeyRange targetRange : partitionKeyRangeToContinuationTokenMap.keySet()) { + TriFunction createRequestFunc = (partitionKeyRange, + continuationToken, pageSize) -> { + Map headers = new HashMap<>(commonRequestHeaders); + headers.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken); + headers.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(pageSize)); + + PartitionKeyInternal partitionKeyInternal = null; + if (feedOptions.partitionKey() != null && feedOptions.partitionKey() != PartitionKey.NONE) { + partitionKeyInternal = BridgeInternal.getPartitionKeyInternal(feedOptions.partitionKey()); + headers.put(HttpConstants.HttpHeaders.PARTITION_KEY, + partitionKeyInternal.toJson()); + + } + return this.createDocumentServiceRequest(headers, querySpecForInit, partitionKeyInternal, partitionKeyRange, collectionRid); + }; + + Function>> executeFunc = (request) -> { + return this.executeRequestAsync(request); + }; + + DocumentProducer dp = createDocumentProducer(collectionRid, targetRange, partitionKeyRangeToContinuationTokenMap.get(targetRange), initialPageSize, feedOptions, querySpecForInit, commonRequestHeaders, createRequestFunc, executeFunc, () -> client.getResetSessionTokenRetryPolicy().getRequestPolicy()); - documentProducers.add(dp); + documentProducers.add(dp); } + }).then(); } protected int FindTargetRangeAndExtractContinuationTokens( @@ -150,43 +151,46 @@ public void setTop(int newTop) { } } - protected void initializeReadMany( + protected Mono initializeReadMany( IDocumentQueryClient queryClient, String collectionResourceId, SqlQuerySpec sqlQuerySpec, Map rangeQueryMap, FeedOptions feedOptions, UUID activityId, String collectionRid) { - Map commonRequestHeaders = createCommonHeadersAsync(this.getFeedOptions(null, null)); - - for (PartitionKeyRange targetRange : rangeQueryMap.keySet()) { - TriFunction createRequestFunc = ( - partitionKeyRange, - continuationToken, pageSize) -> { - Map headers = new HashMap<>(commonRequestHeaders); - headers.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken); - headers.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(pageSize)); - - PartitionKeyInternal partitionKeyInternal = null; - return this.createDocumentServiceRequest(headers, - rangeQueryMap.get(targetRange), - partitionKeyInternal, - partitionKeyRange, - collectionRid); - }; - - Function>> executeFunc = (request) -> { - return this.executeRequestAsync(request); - }; - - // TODO: Review pagesize -1 - DocumentProducer dp = createDocumentProducer(collectionRid, targetRange, - null, -1, feedOptions, - rangeQueryMap.get(targetRange), - commonRequestHeaders, createRequestFunc, executeFunc, - () -> client.getResetSessionTokenRetryPolicy() - .getRequestPolicy()); - - documentProducers.add(dp); - } + Mono> commonRequestHeadersMono = createCommonHeadersAsync(this.getFeedOptions(null, null)); + + return commonRequestHeadersMono.doOnSuccess(commonRequestHeaders -> { + for (PartitionKeyRange targetRange : rangeQueryMap.keySet()) { + TriFunction createRequestFunc = ( + partitionKeyRange, + continuationToken, pageSize) -> { + Map headers = new HashMap<>(commonRequestHeaders); + headers.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken); + headers.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(pageSize)); + + PartitionKeyInternal partitionKeyInternal = null; + return this.createDocumentServiceRequest(headers, + rangeQueryMap.get(targetRange), + partitionKeyInternal, + partitionKeyRange, + collectionRid); + }; + + Function>> executeFunc = (request) -> { + return this.executeRequestAsync(request); + }; + + // TODO: Review pagesize -1 + DocumentProducer dp = createDocumentProducer(collectionRid, targetRange, + null, -1, feedOptions, + rangeQueryMap.get(targetRange), + commonRequestHeaders, createRequestFunc, executeFunc, + () -> client.getResetSessionTokenRetryPolicy() + .getRequestPolicy()); + + documentProducers.add(dp); + } + }).then(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java index 0aa980af999d..0315df8a89e4 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java @@ -80,7 +80,7 @@ public void deduceReadMode(ConsistencyLevel accountConsistencyLevel, Consistency ValueHolder consistencyLevel = ValueHolder.initialize(null); ValueHolder useSession = ValueHolder.initialize(null); - ReadMode readMode = consistencyReader.deduceReadMode(request, consistencyLevel, useSession); + ReadMode readMode = consistencyReader.deduceReadMode(request, consistencyLevel, useSession).block(); assertThat(readMode).isEqualTo(expectedReadMode); assertThat(consistencyLevel.v).isEqualTo(expectedConsistencyToUse); @@ -130,8 +130,8 @@ public void replicaSizes(int systemMaxReplicaCount, OperationType.Read, "/dbs/db/colls/col/docs/docId", ResourceType.Document); } - assertThat(consistencyReader.getMaxReplicaSetSize(request)).isEqualTo(isReadingFromMasterOperation? systemMaxReplicaCount : userMaxReplicaCount); - assertThat(consistencyReader.getMinReplicaSetSize(request)).isEqualTo(isReadingFromMasterOperation? systemMinReplicaCount : userMinReplicaCount); + assertThat(consistencyReader.getMaxReplicaSetSize(request).block()).isEqualTo(isReadingFromMasterOperation? systemMaxReplicaCount : userMaxReplicaCount); + assertThat(consistencyReader.getMinReplicaSetSize(request).block()).isEqualTo(isReadingFromMasterOperation? systemMinReplicaCount : userMinReplicaCount); } @Test(groups = "unit") @@ -679,6 +679,7 @@ public void basicReadStrong_AllReplicasSameLSN(int replicaCountToRead, ReadMode StoreReader storeReader = new StoreReader(transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, sessionContainer); GatewayServiceConfigurationReader serviceConfigurator = Mockito.mock(GatewayServiceConfigurationReader.class); + Mockito.doReturn(Mono.just(ConsistencyLevel.SESSION)).when(serviceConfigurator).getDefaultConsistencyLevel(); IAuthorizationTokenProvider authTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); QuorumReader quorumReader = new QuorumReader(configs, transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, storeReader, serviceConfigurator, authTokenProvider); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java index ce3d1c6537dc..acf6857d3d83 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java @@ -77,6 +77,7 @@ public void exception(Exception ex, Class klass, int expectedStatusCo sessionContainer = Mockito.mock(ISessionContainer.class); IAuthorizationTokenProvider authorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); serviceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); + Mockito.doReturn(Mono.just(ConsistencyLevel.SESSION)).when(serviceConfigReader).getDefaultConsistencyLevel(); consistencyWriter = new ConsistencyWriter( addressSelectorWrapper.addressSelector, @@ -242,10 +243,10 @@ public Object[][] globalStrongArgProvider() { @Test(groups = "unit", dataProvider = "globalStrongArgProvider") public void isGlobalStrongRequest(ConsistencyLevel defaultConsistencyLevel, RxDocumentServiceRequest req, StoreResponse storeResponse, boolean isGlobalStrongExpected) { initializeConsistencyWriter(false); - Mockito.doReturn(defaultConsistencyLevel).when(this.serviceConfigReader).getDefaultConsistencyLevel(); + Mockito.doReturn(Mono.just(defaultConsistencyLevel)).when(this.serviceConfigReader).getDefaultConsistencyLevel(); - assertThat(consistencyWriter.isGlobalStrongRequest(req, storeResponse)).isEqualTo(isGlobalStrongExpected); + assertThat(consistencyWriter.isGlobalStrongRequest(req, storeResponse).block()).isEqualTo(isGlobalStrongExpected); } private void initializeConsistencyWriter(boolean useMultipleWriteLocation) { diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java index 3eee10ca6cfc..9afaff67868c 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java @@ -5,30 +5,27 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionPolicy; -import com.azure.cosmos.CosmosKeyCredential; +import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.AsyncDocumentClient.Builder; -import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider; -import com.azure.cosmos.implementation.SpyClientUnderTestFactory; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.DatabaseAccountManagerInternal; +import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.TestSuiteBase; -import com.azure.cosmos.implementation.http.HttpClient; -import com.azure.cosmos.implementation.http.HttpHeaders; -import com.azure.cosmos.implementation.http.HttpRequest; -import com.azure.cosmos.implementation.http.HttpResponse; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufUtil; import io.reactivex.subscribers.TestSubscriber; -import org.apache.commons.io.IOUtils; +import org.mockito.Matchers; import org.mockito.Mockito; import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Factory; import org.testng.annotations.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.lang.reflect.Field; import java.net.URI; import java.util.concurrent.TimeUnit; @@ -37,71 +34,69 @@ public class GatewayServiceConfigurationReaderTest extends TestSuiteBase { private static final int TIMEOUT = 8000; - private HttpClient mockHttpClient; - private BaseAuthorizationTokenProvider baseAuthorizationTokenProvider; - private ConnectionPolicy connectionPolicy; - private GatewayServiceConfigurationReader mockGatewayServiceConfigurationReader; - private GatewayServiceConfigurationReader gatewayServiceConfigurationReader; private AsyncDocumentClient client; - private String databaseAccountJson; - private DatabaseAccount expectedDatabaseAccount; @Factory(dataProvider = "clientBuilders") public GatewayServiceConfigurationReaderTest(Builder clientBuilder) { super(clientBuilder); } - @BeforeClass(groups = "simple") - public void before_GatewayServiceConfigurationReaderTest() throws Exception { - client = clientBuilder().build(); - SpyClientUnderTestFactory.ClientUnderTest clientUnderTest = SpyClientUnderTestFactory.createClientUnderTest(this.clientBuilder()); - HttpClient httpClient = clientUnderTest.getSpyHttpClient(); - baseAuthorizationTokenProvider = new BaseAuthorizationTokenProvider(new CosmosKeyCredential(TestConfigurations.MASTER_KEY)); - connectionPolicy = ConnectionPolicy.getDefaultPolicy(); - mockHttpClient = Mockito.mock(HttpClient.class); - mockGatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), - false, TestConfigurations.MASTER_KEY, connectionPolicy, baseAuthorizationTokenProvider, mockHttpClient); - - gatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), - false, - TestConfigurations.MASTER_KEY, - connectionPolicy, - baseAuthorizationTokenProvider, - httpClient); - databaseAccountJson = IOUtils - .toString(getClass().getClassLoader().getResourceAsStream("databaseAccount.json"), "UTF-8"); - expectedDatabaseAccount = new DatabaseAccount(databaseAccountJson); - HttpResponse mockResponse = getMockResponse(databaseAccountJson); - Mockito.when(mockHttpClient.send(Mockito.any(HttpRequest.class))).thenReturn(Mono.just(mockResponse)); - } - @AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(client); } @Test(groups = "simple") - public void mockInitializeReaderAsync() { - Mono databaseAccount = mockGatewayServiceConfigurationReader.initializeReaderAsync(); - validateSuccess(databaseAccount, expectedDatabaseAccount); - } - - @Test(groups = "simple") - public void mockInitializeReaderAsyncWithResourceToken() throws Exception { - HttpResponse mockResponse = getMockResponse(databaseAccountJson); - Mockito.when(mockHttpClient.send(Mockito.any(HttpRequest.class))).thenReturn(Mono.just(mockResponse)); - - mockGatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), - true, "SampleResourceToken", connectionPolicy, baseAuthorizationTokenProvider, mockHttpClient); - - Mono databaseAccount = mockGatewayServiceConfigurationReader.initializeReaderAsync(); - validateSuccess(databaseAccount, expectedDatabaseAccount); + public void clientInitialization() throws Exception { + client = this.clientBuilder().build(); + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) client; + GatewayServiceConfigurationReader serviceConfigurationReader = ReflectionUtils.getServiceConfigurationReader(rxDocumentClient); + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(serviceConfigurationReader); + Mono databaseAccountMono = globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)); + validateSuccess(databaseAccountMono); + assertThat(serviceConfigurationReader.getDefaultConsistencyLevel().block()).isNotNull(); + assertThat(serviceConfigurationReader.getQueryEngineConfiguration().block()).isNotNull(); + assertThat(serviceConfigurationReader.getSystemReplicationPolicy().block()).isNotNull(); + assertThat(serviceConfigurationReader.getSystemReplicationPolicy().block()).isNotNull(); } @Test(groups = "simple") - public void initializeReaderAsync() { - Mono databaseAccount = gatewayServiceConfigurationReader.initializeReaderAsync(); - validateSuccess(databaseAccount); + public void configurationPropertyReads() throws Exception { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.setUsingMultipleWriteLocations(true); + DatabaseAccountManagerInternal databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson1))); + GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs()); + ReflectionUtils.setBackgroundRefreshLocationTimeIntervalInMS(globalEndpointManager, 1000); + globalEndpointManager.init(); + + GatewayServiceConfigurationReader configurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), globalEndpointManager); + assertThat(configurationReader.getDefaultConsistencyLevel().block()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().block().get("enableSpatialIndexing")).isTrue(); + assertThat(configurationReader.getSystemReplicationPolicy().block().getMaxReplicaSetSize()).isEqualTo(4); + assertThat(configurationReader.getSystemReplicationPolicy().block().getMaxReplicaSetSize()).isEqualTo(4); + + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson2))); + Thread.sleep(2000); + assertThat(configurationReader.getDefaultConsistencyLevel().block()).isEqualTo(ConsistencyLevel.EVENTUAL); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().block().get("enableSpatialIndexing")).isFalse(); + assertThat(configurationReader.getSystemReplicationPolicy().block().getMaxReplicaSetSize()).isEqualTo(5); + assertThat(configurationReader.getSystemReplicationPolicy().block().getMaxReplicaSetSize()).isEqualTo(5); + + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson3))); + Thread.sleep(2000); + assertThat(configurationReader.getDefaultConsistencyLevel().block()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().block().get("enableSpatialIndexing")).isTrue(); + assertThat(configurationReader.getSystemReplicationPolicy().block().getMaxReplicaSetSize()).isEqualTo(4); + assertThat(configurationReader.getSystemReplicationPolicy().block().getMaxReplicaSetSize()).isEqualTo(4); + + //Testing scenario of scheduled cache refresh with error + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenThrow(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.FORBIDDEN)); + Thread.sleep(2000); + assertThat(configurationReader.getDefaultConsistencyLevel().block()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().block().get("enableSpatialIndexing")).isTrue(); + assertThat(configurationReader.getSystemReplicationPolicy().block().getMaxReplicaSetSize()).isEqualTo(4); + assertThat(configurationReader.getSystemReplicationPolicy().block().getMaxReplicaSetSize()).isEqualTo(4); } public static void validateSuccess(Mono observable) { @@ -117,38 +112,4 @@ public static void validateSuccess(Mono observable) { assertThat(BridgeInternal.getReplicationPolicy(databaseAccount)).isNotNull(); assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount)).isNotNull(); } - - public static void validateSuccess(Mono observable, DatabaseAccount expectedDatabaseAccount) { - TestSubscriber testSubscriber = new TestSubscriber(); - - observable.subscribe(testSubscriber); - testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS); - testSubscriber.assertNoErrors(); - testSubscriber.assertComplete(); - testSubscriber.assertValueCount(1); - DatabaseAccount databaseAccount = testSubscriber.values().get(0); - assertThat(databaseAccount.getId()).isEqualTo(expectedDatabaseAccount.getId()); - assertThat(BridgeInternal.getAddressesLink(databaseAccount)) - .isEqualTo(BridgeInternal.getAddressesLink(expectedDatabaseAccount)); - assertThat(databaseAccount.getWritableLocations().iterator().next().getEndpoint()) - .isEqualTo(expectedDatabaseAccount.getWritableLocations().iterator().next().getEndpoint()); - assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount).getMaxReplicaSetSize()) - .isEqualTo(BridgeInternal.getSystemReplicationPolicy(expectedDatabaseAccount).getMaxReplicaSetSize()); - assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount).getMaxReplicaSetSize()) - .isEqualTo(BridgeInternal.getSystemReplicationPolicy(expectedDatabaseAccount).getMaxReplicaSetSize()); - assertThat(BridgeInternal.getQueryEngineConfiuration(databaseAccount)) - .isEqualTo(BridgeInternal.getQueryEngineConfiuration(expectedDatabaseAccount)); - } - - private HttpResponse getMockResponse(String databaseAccountJson) { - HttpResponse httpResponse = Mockito.mock(HttpResponse.class); - Mockito.doReturn(200).when(httpResponse).statusCode(); - Mockito.doReturn(Flux.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, databaseAccountJson))) - .when(httpResponse).body(); - Mockito.doReturn(Mono.just(databaseAccountJson)) - .when(httpResponse).bodyAsString(); - - Mockito.doReturn(new HttpHeaders()).when(httpResponse).headers(); - return httpResponse; - } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java index d7859b4a1b40..b2be3cdb5c21 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java @@ -47,9 +47,8 @@ public GatewayServiceConfiguratorReaderMock(ReplicationPolicy userReplicationPol ConsistencyLevel defaultConsistencyLevel) { this.gatewayServiceConfigurationReader = Mockito.mock(GatewayServiceConfigurationReader.class); - Mockito.doReturn(Mono.just(Mockito.mock(DatabaseAccount.class))).when(this.gatewayServiceConfigurationReader).initializeReaderAsync(); - Mockito.doReturn(defaultConsistencyLevel).when(this.gatewayServiceConfigurationReader).getDefaultConsistencyLevel(); - Mockito.doReturn(systemReplicationPolicy).when(this.gatewayServiceConfigurationReader).getSystemReplicationPolicy(); - Mockito.doReturn(userReplicationPolicy).when(this.gatewayServiceConfigurationReader).getUserReplicationPolicy(); + Mockito.doReturn(Mono.just(defaultConsistencyLevel)).when(this.gatewayServiceConfigurationReader).getDefaultConsistencyLevel(); + Mockito.doReturn(Mono.just(systemReplicationPolicy)).when(this.gatewayServiceConfigurationReader).getSystemReplicationPolicy(); + Mockito.doReturn(Mono.just(userReplicationPolicy)).when(this.gatewayServiceConfigurationReader).getUserReplicationPolicy(); } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java index 92ef88cf54b8..7fab9e5fd1d0 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java @@ -32,7 +32,7 @@ public class GlobalEndPointManagerTest { protected static final int TIMEOUT = 6000000; DatabaseAccountManagerInternal databaseAccountManagerInternal; - private String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + static String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"}]," + "\"readableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false," + @@ -44,18 +44,18 @@ public class GlobalEndPointManagerTest { "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}\n"; - private String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + static String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure" + ".com:443/\"}],\"readableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}]," + - "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + - "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":5}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Eventual\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":5}," + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + - "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":false}\"}"; - private String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + static String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure" + ".com:443/\"}],\"readableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure.com:443/\"}]," + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + @@ -66,7 +66,7 @@ public class GlobalEndPointManagerTest { "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; - private String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + static String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"readableLocations\":[{\"name\":\"East US\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents" + diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/QuorumReaderTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/QuorumReaderTest.java index e39006feb13f..cb400adea620 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/QuorumReaderTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/QuorumReaderTest.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation.directconnectivity; +import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.DocumentServiceRequestContext; import com.azure.cosmos.implementation.DocumentServiceRequestContextValidator; @@ -107,6 +108,7 @@ public void basicReadStrong_AllReplicasSameLSN(int replicaCountToRead, ReadMode StoreReader storeReader = new StoreReader(transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, sessionContainer); GatewayServiceConfigurationReader serviceConfigurator = Mockito.mock(GatewayServiceConfigurationReader.class); + Mockito.when(serviceConfigurator.getDefaultConsistencyLevel()).thenReturn(Mono.just(ConsistencyLevel.SESSION)); IAuthorizationTokenProvider authTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); QuorumReader quorumReader = new QuorumReader(configs, transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, storeReader, serviceConfigurator, authTokenProvider); @@ -225,6 +227,7 @@ public void readStrong_OnlySecondary_RequestBarrier_Success(int numberOfBarrierR StoreReader storeReader = new StoreReader(transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, sessionContainer); GatewayServiceConfigurationReader serviceConfigurator = Mockito.mock(GatewayServiceConfigurationReader.class); + Mockito.when(serviceConfigurator.getDefaultConsistencyLevel()).thenReturn(Mono.just(ConsistencyLevel.SESSION)); IAuthorizationTokenProvider authTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); QuorumReader quorumReader = new QuorumReader(configs, transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, storeReader, serviceConfigurator, authTokenProvider); @@ -374,6 +377,7 @@ public void readStrong_SecondaryReadBarrierExhausted_ReadBarrierOnPrimary_Succes StoreReader storeReader = new StoreReader(transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, sessionContainer); GatewayServiceConfigurationReader serviceConfigurator = Mockito.mock(GatewayServiceConfigurationReader.class); + Mockito.when(serviceConfigurator.getDefaultConsistencyLevel()).thenReturn(Mono.just(ConsistencyLevel.SESSION)); IAuthorizationTokenProvider authTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); QuorumReader quorumReader = new QuorumReader(configs, transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, storeReader, serviceConfigurator, authTokenProvider); @@ -499,6 +503,7 @@ public void readStrong_QuorumNotSelected_ReadPrimary() { StoreReader storeReader = new StoreReader(transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, sessionContainer); GatewayServiceConfigurationReader serviceConfigurator = Mockito.mock(GatewayServiceConfigurationReader.class); + Mockito.when(serviceConfigurator.getDefaultConsistencyLevel()).thenReturn(Mono.just(ConsistencyLevel.SESSION)); IAuthorizationTokenProvider authTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); QuorumReader quorumReader = new QuorumReader(configs, transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, storeReader, serviceConfigurator, authTokenProvider); @@ -593,6 +598,7 @@ public void readPrimary(EndpointMock endpointMock, ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class); GatewayServiceConfigurationReader serviceConfigurator = Mockito.mock(GatewayServiceConfigurationReader.class); + Mockito.when(serviceConfigurator.getDefaultConsistencyLevel()).thenReturn(Mono.just(ConsistencyLevel.SESSION)); IAuthorizationTokenProvider authTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); QuorumReader quorumReader = new QuorumReader(configs, endpointMock.transportClientWrapper.transportClient, diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index cca205185add..52a22002a54e 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -5,10 +5,13 @@ import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.implementation.AsyncDocumentClient; +import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.http.HttpClient; import org.apache.commons.lang3.reflect.FieldUtils; +import java.lang.reflect.Field; + /** * * TransportClient transportClient = ReflectionUtils.getDirectHttpsHttpClient(documentClient); @@ -61,12 +64,24 @@ public static void setDirectHttpsHttpClient(RxDocumentClientImpl client, HttpCli assert transportClient instanceof HttpTransportClient; set(transportClient, newHttpClient, "httpClient"); } - + public static AsyncDocumentClient getAsyncDocumentClient(CosmosAsyncClient client) { return get(AsyncDocumentClient.class, client, "asyncDocumentClient"); } - + public static void setAsyncDocumentClient(CosmosAsyncClient client, RxDocumentClientImpl rxClient) { set(client, rxClient, "asyncDocumentClient"); } + + public static GatewayServiceConfigurationReader getServiceConfigurationReader(RxDocumentClientImpl rxDocumentClient){ + return get(GatewayServiceConfigurationReader.class, rxDocumentClient, "gatewayConfigurationReader"); + } + + public static GlobalEndpointManager getGlobalEndpointManager(GatewayServiceConfigurationReader serviceConfigurationReader){ + return get(GlobalEndpointManager.class, serviceConfigurationReader, "globalEndpointManager"); + } + + public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec){ + set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS"); + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientTest.java index 31cff004ac3c..f0daa00be341 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReplicatedResourceClientTest.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation.directconnectivity; +import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosClientException; import com.azure.cosmos.GoneException; import com.azure.cosmos.implementation.Configs; @@ -37,6 +38,7 @@ public void before_ReplicatedResourceClientTest() throws Exception { transportClient = Mockito.mock(TransportClient.class); serviceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); authorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); + Mockito.when(serviceConfigReader.getDefaultConsistencyLevel()).thenReturn(Mono.just(ConsistencyLevel.SESSION)); } /** diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderDotNetTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderDotNetTest.java index 13eeffc09570..814326a96f9e 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderDotNetTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderDotNetTest.java @@ -622,7 +622,7 @@ public void storeClient() throws URISyntaxException { // setup max replica set size on the config reader ReplicationPolicy replicationPolicy = new ReplicationPolicy(); GatewayServiceConfigurationReader mockServiceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); - Mockito.when(mockServiceConfigReader.getUserReplicationPolicy()).thenReturn(replicationPolicy); + Mockito.when(mockServiceConfigReader.getUserReplicationPolicy()).thenReturn(Mono.just(replicationPolicy)); try { StoreClient storeClient = new StoreClient(new Configs(),mockAddressCache, sessionContainer, mockServiceConfigReader, mockAuthorizationTokenProvider, mockTransportClient, false); @@ -687,6 +687,7 @@ public void globalStrongConsistentWrite() { // create a real session container - we don't need session for this test anyway SessionContainer sessionContainer = new SessionContainer(StringUtils.EMPTY); GatewayServiceConfigurationReader serviceConfigurationReader = Mockito.mock(GatewayServiceConfigurationReader.class); + Mockito.when(serviceConfigurationReader.getDefaultConsistencyLevel()).thenReturn(Mono.just(ConsistencyLevel.SESSION)); IAuthorizationTokenProvider mockAuthorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); Mockito.when(mockAuthorizationTokenProvider.getUserAuthorizationToken(Matchers.anyString(), Matchers.any(), Matchers.any(RequestVerb.class), Matchers.anyMap(), @@ -788,8 +789,8 @@ public void globalStrongConsistency() { // setup max replica set size on the config reader ReplicationPolicy replicationPolicy = new ReplicationPolicy(); GatewayServiceConfigurationReader mockServiceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); - Mockito.when(mockServiceConfigReader.getUserReplicationPolicy()).thenReturn(replicationPolicy); - + Mockito.when(mockServiceConfigReader.getUserReplicationPolicy()).thenReturn(Mono.just(replicationPolicy)); + Mockito.when(mockServiceConfigReader.getDefaultConsistencyLevel()).thenReturn(Mono.just(ConsistencyLevel.SESSION)); QuorumReader reader = new QuorumReader(new Configs(),mockTransportClient, addressSelector, storeReader, mockServiceConfigReader, mockAuthorizationTokenProvider); entity.requestContext.originalRequestConsistencyLevel = ConsistencyLevel.STRONG; @@ -823,8 +824,8 @@ public void globalStrongConsistency() { BridgeInternal.setMaxReplicaSetSize(replicationPolicy,4); GatewayServiceConfigurationReader mockServiceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); - Mockito.when(mockServiceConfigReader.getUserReplicationPolicy()).thenReturn(replicationPolicy); - Mockito.when(mockServiceConfigReader.getDefaultConsistencyLevel()).thenReturn(ConsistencyLevel.STRONG); + Mockito.when(mockServiceConfigReader.getUserReplicationPolicy()).thenReturn(Mono.just(replicationPolicy)); + Mockito.when(mockServiceConfigReader.getDefaultConsistencyLevel()).thenReturn(Mono.just(ConsistencyLevel.STRONG)); QuorumReader reader = new QuorumReader(new Configs(), mockTransportClient, addressSelector, storeReader, mockServiceConfigReader, mockAuthorizationTokenProvider); entity.requestContext.originalRequestConsistencyLevel = ConsistencyLevel.STRONG; @@ -865,8 +866,8 @@ public void globalStrongConsistency() { BridgeInternal.setMaxReplicaSetSize(replicationPolicy,4); GatewayServiceConfigurationReader mockServiceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); - Mockito.when(mockServiceConfigReader.getUserReplicationPolicy()).thenReturn(replicationPolicy); - Mockito.when(mockServiceConfigReader.getDefaultConsistencyLevel()).thenReturn(ConsistencyLevel.STRONG); + Mockito.when(mockServiceConfigReader.getUserReplicationPolicy()).thenReturn(Mono.just(replicationPolicy)); + Mockito.when(mockServiceConfigReader.getDefaultConsistencyLevel()).thenReturn(Mono.just(ConsistencyLevel.STRONG)); QuorumReader reader = new QuorumReader(new Configs(), mockTransportClient, addressSelector, storeReader, mockServiceConfigReader, mockAuthorizationTokenProvider); entity.requestContext.originalRequestConsistencyLevel = ConsistencyLevel.STRONG; diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/FetcherTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/FetcherTest.java index 83677cfb296e..f911ee837b1f 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/FetcherTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/query/FetcherTest.java @@ -74,14 +74,14 @@ public void query(FeedOptions options, int top) { AtomicInteger requestIndex = new AtomicInteger(0); - BiFunction createRequestFunc = (token, maxItemCount) -> { + BiFunction> createRequestFunc = (token, maxItemCount) -> { assertThat(maxItemCount).describedAs("max item count").isEqualTo( getExpectedMaxItemCountInRequest(options, top, feedResponseList, requestIndex.get())); assertThat(token).describedAs("continuation token").isEqualTo( getExpectedContinuationTokenInRequest(options.requestContinuation(), feedResponseList, requestIndex.get())); requestIndex.getAndIncrement(); - return mock(RxDocumentServiceRequest.class); + return Mono.just(mock(RxDocumentServiceRequest.class)); }; AtomicInteger executeIndex = new AtomicInteger(0); @@ -142,12 +142,12 @@ public void changeFeed() { AtomicInteger requestIndex = new AtomicInteger(0); - BiFunction createRequestFunc = (token, maxItemCount) -> { + BiFunction> createRequestFunc = (token, maxItemCount) -> { assertThat(maxItemCount).describedAs("max getItem count").isEqualTo(options.getMaxItemCount()); assertThat(token).describedAs("continuation token").isEqualTo( getExpectedContinuationTokenInRequest(options.getRequestContinuation(), feedResponseList, requestIndex.getAndIncrement())); - return mock(RxDocumentServiceRequest.class); + return Mono.just(mock(RxDocumentServiceRequest.class)); }; AtomicInteger executeIndex = new AtomicInteger(0); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java index 9911b3cde18a..8aedb0fdd570 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java @@ -507,7 +507,9 @@ private Permission getPermission(Resource resource, String permissionId, Permiss private TokenResolver getTokenResolver(PermissionMode permissionMode) { return (RequestVerb requestVerb, String resourceIdOrFullName, CosmosResourceType resourceType, Map properties) -> { - if (permissionMode == null) { + if(resourceType.equals(CosmosResourceType.System)) { + return readPermission.getToken(); + } if (permissionMode == null) { return "invalid"; } else if (permissionMode.equals(PermissionMode.READ)) { return readPermission.getToken();