Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public ChangeFeedQueryImpl(RxDocumentClientImpl client,
this.options = getChangeFeedOptions(changeFeedOptions, initialNextIfNoneMatch);
}

private RxDocumentServiceRequest createDocumentServiceRequest(String continuationToken, int pageSize) {
private Mono<RxDocumentServiceRequest> createDocumentServiceRequest(String continuationToken, int pageSize) {
Map<String, String> headers = new HashMap<>();
RxDocumentServiceRequest req = RxDocumentServiceRequest.create(
OperationType.ReadFeed,
Expand Down Expand Up @@ -107,7 +107,7 @@ private RxDocumentServiceRequest createDocumentServiceRequest(String continuatio
req.routeTo(new PartitionKeyRangeIdentity(partitionKeyRangeIdInternal(this.options)));
}

return req;
return Mono.just(req);
}

private ChangeFeedOptions getChangeFeedOptions(ChangeFeedOptions options, String continuationToken) {
Expand All @@ -118,7 +118,7 @@ private ChangeFeedOptions getChangeFeedOptions(ChangeFeedOptions options, String

public Flux<FeedResponse<T>> executeAsync() {

BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc = this::createDocumentServiceRequest;
BiFunction<String, Integer, Mono<RxDocumentServiceRequest>> createRequestFunc = this::createDocumentServiceRequest;

Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc = this::executeRequestAsync;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,28 @@

package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.LocationHelper;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionPolicy;
import com.azure.cosmos.DatabaseAccount;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.LocationHelper;
import org.apache.commons.collections4.list.UnmodifiableList;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.URISyntaxException;
import java.net.URI;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -45,6 +47,8 @@ public class GlobalEndpointManager implements AutoCloseable {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Scheduler scheduler = Schedulers.fromExecutor(executor);
private volatile boolean isClosed;
private final AsyncCache<String, DatabaseAccount> databaseAccountAsyncCache;
private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true);

public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs) {
this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000;
Expand All @@ -69,6 +73,7 @@ public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPol
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
databaseAccountAsyncCache = new AsyncCache<>();
}

public void init() {
Expand All @@ -87,7 +92,7 @@ public UnmodifiableList<URI> getWriteEndpoints() {
return this.locationCache.getWriteEndpoints();
}

public static Mono<DatabaseAccount> getDatabaseAccountFromAnyLocationsAsync(
public Mono<DatabaseAccount> getDatabaseAccountFromAnyLocationsAsync(
URI defaultEndpoint, List<String> locations, Function<URI, Mono<DatabaseAccount>> getDatabaseAccountFn) {

return getDatabaseAccountFn.apply(defaultEndpoint).onErrorResume(
Expand Down Expand Up @@ -159,6 +164,12 @@ public Mono<Void> refreshLocationAsync(DatabaseAccount databaseAccount, boolean
});
}

public Mono<DatabaseAccount> getDatabaseAccountFromCache(URI defaultEndpoint) {
return this.databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, () -> this.owner.getDatabaseAccountFromEndpoint(defaultEndpoint).single().doOnSuccess(databaseAccount -> {
this.refreshLocationAsync(databaseAccount, false);
}));
}

private Mono<Void> refreshLocationPrivateAsync(DatabaseAccount databaseAccount) {
return Mono.defer(() -> {
logger.debug("refreshLocationPrivateAsync() refreshing locations");
Expand Down Expand Up @@ -236,7 +247,7 @@ private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {
}

logger.debug("startRefreshLocationTimerAsync() - Invoking refresh, I was registered on [{}]", now);
Mono<DatabaseAccount> databaseAccountObs = GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.defaultEndpoint, new ArrayList<>(this.connectionPolicy.getPreferredLocations()),
Mono<DatabaseAccount> databaseAccountObs = this.getDatabaseAccountFromAnyLocationsAsync(this.defaultEndpoint, new ArrayList<>(this.connectionPolicy.getPreferredLocations()),
this::getDatabaseAccountAsync);

return databaseAccountObs.flatMap(dbAccount -> {
Expand All @@ -253,8 +264,23 @@ private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {
}

private Mono<DatabaseAccount> getDatabaseAccountAsync(URI serviceEndpoint) {
return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint)
.doOnNext(i -> logger.debug("account retrieved: {}", i)).single();
final GlobalEndpointManager that = this;
Callable<Mono<DatabaseAccount>> fetchDatabaseAccount = () -> {
return that.owner.getDatabaseAccountFromEndpoint(serviceEndpoint).doOnNext(i -> {
logger.debug("account retrieved: {}", i);
}).single();
};

Mono<DatabaseAccount> obsoleteValueMono = databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, fetchDatabaseAccount);
return obsoleteValueMono.flatMap(obsoleteValue -> {
if (firstTimeDatabaseAccountInitialization.compareAndSet(true, false)) {
return Mono.just(obsoleteValue);
}
return databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, obsoleteValue, fetchDatabaseAccount).doOnError(t -> {
//Putting back the old value in cache, this will avoid cache corruption
databaseAccountAsyncCache.set(StringUtils.EMPTY, obsoleteValue);
});
});
}

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,29 +245,12 @@ private RxDocumentClientImpl(URI serviceEndpoint,
}

private void initializeGatewayConfigurationReader() {
String resourceToken;
if(this.tokenResolver != null) {
resourceToken = this.tokenResolver.getAuthorizationToken(RequestVerb.GET, "", CosmosResourceType.System, null);
} else if(!this.hasAuthKeyResourceToken && this.authorizationTokenProvider == null) {
resourceToken = this.firstResourceTokenFromPermissionFeed;
} else {
assert this.masterKeyOrResourceToken != null || this.cosmosKeyCredential != null;
resourceToken = this.masterKeyOrResourceToken;
}

this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint,
this.hasAuthKeyResourceToken,
resourceToken,
this.connectionPolicy,
this.authorizationTokenProvider,
this.reactorHttpClient);

DatabaseAccount databaseAccount = this.gatewayConfigurationReader.initializeReaderAsync().block();
this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, this.globalEndpointManager);
DatabaseAccount databaseAccount = this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).block();
this.useMultipleWriteLocations = this.connectionPolicy.getUsingMultipleWriteLocations() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount);

// TODO: add support for openAsync
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589
this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).block();
}

public void init() {
Expand Down Expand Up @@ -1598,7 +1581,7 @@ public IRetryPolicyFactory getResetSessionTokenRetryPolicy() {
}

@Override
public ConsistencyLevel getDefaultConsistencyLevelAsync() {
public Mono<ConsistencyLevel> getDefaultConsistencyLevelAsync() {
return RxDocumentClientImpl.this.gatewayConfigurationReader.getDefaultConsistencyLevel();
}

Expand Down Expand Up @@ -2840,15 +2823,15 @@ private <T extends Resource> Flux<FeedResponse<T>> readFeedCollectionChild(FeedO
final FeedOptions finalFeedOptions = options;
RequestOptions requestOptions = new RequestOptions();
requestOptions.setPartitionKey(options.partitionKey());
BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc = (continuationToken, pageSize) -> {
BiFunction<String, Integer, Mono<RxDocumentServiceRequest>> createRequestFunc = (continuationToken, pageSize) -> {
Map<String, String> requestHeaders = new HashMap<>();
if (continuationToken != null) {
requestHeaders.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken);
}
requestHeaders.put(HttpConstants.HttpHeaders.PAGE_SIZE, Integer.toString(pageSize));
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.ReadFeed,
resourceType, resourceLink, requestHeaders, finalFeedOptions);
return request;
return Mono.just(request);
};

Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc = request -> {
Expand All @@ -2871,15 +2854,15 @@ private <T extends Resource> Flux<FeedResponse<T>> readFeed(FeedOptions options,

int maxPageSize = options.maxItemCount() != null ? options.maxItemCount() : -1;
final FeedOptions finalFeedOptions = options;
BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc = (continuationToken, pageSize) -> {
BiFunction<String, Integer, Mono<RxDocumentServiceRequest>> createRequestFunc = (continuationToken, pageSize) -> {
Map<String, String> requestHeaders = new HashMap<>();
if (continuationToken != null) {
requestHeaders.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken);
}
requestHeaders.put(HttpConstants.HttpHeaders.PAGE_SIZE, Integer.toString(pageSize));
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.ReadFeed,
resourceType, resourceLink, requestHeaders, finalFeedOptions);
return request;
return Mono.just(request);
};

Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc = request -> {
Expand Down
Loading