Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public static class HttpHeaders {

// Address related headers.
public static final String FORCE_REFRESH = "x-ms-force-refresh";
public static final String FORCE_COLLECTION_ROUTING_MAP_REFRESH = "x-ms-collectionroutingmap-refresh";
public static final String ITEM_COUNT = "x-ms-item-count";
public static final String NEW_RESOURCE_ID = "x-ms-new-resource-id";
public static final String USE_MASTER_COLLECTION_RESOLVER = "x-ms-use-master-collection-resolver";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque

if (suboptimalServerPartitionTimestamp != null) {
logger.debug("suboptimalServerPartitionTimestamp is {}", suboptimalServerPartitionTimestamp);

boolean forceRefreshDueToSuboptimalPartitionReplicaSet = Duration.between(suboptimalServerPartitionTimestamp, Instant.now()).getSeconds()
> this.suboptimalPartitionForceRefreshIntervalInSeconds;

Expand Down Expand Up @@ -235,7 +235,7 @@ public Mono<AddressInformation[]> tryGetAddresses(RxDocumentServiceRequest reque
}
return Mono.error(unwrappedException);
}

});
}

Expand All @@ -258,6 +258,11 @@ public Mono<List<Address>> getServerAddressesViaGatewayAsync(
headers.put(HttpConstants.HttpHeaders.FORCE_REFRESH, Boolean.TRUE.toString());
}

if(request.forceCollectionRoutingMapRefresh)
{
headers.put(HttpConstants.HttpHeaders.FORCE_COLLECTION_ROUTING_MAP_REFRESH, Boolean.TRUE.toString());
}

addressQuery.put(HttpConstants.QueryStrings.FILTER, HttpUtils.urlEncode(this.protocolFilter));

addressQuery.put(HttpConstants.QueryStrings.PARTITION_KEY_RANGE_IDS, String.join(",", partitionKeyRangeIds));
Expand Down Expand Up @@ -452,6 +457,11 @@ public Mono<List<Address>> getMasterAddressesViaGatewayAsync(
headers.put(HttpConstants.HttpHeaders.USE_MASTER_COLLECTION_RESOLVER, Boolean.TRUE.toString());
}

if(request.forceCollectionRoutingMapRefresh)
{
headers.put(HttpConstants.HttpHeaders.FORCE_COLLECTION_ROUTING_MAP_REFRESH, Boolean.TRUE.toString());
}

queryParameters.put(HttpConstants.QueryStrings.FILTER, HttpUtils.urlEncode(this.protocolFilter));
headers.put(HttpConstants.HttpHeaders.X_DATE, Utils.nowAsRFC1123());
String token = this.tokenProvider.getUserAuthorizationToken(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,24 @@ public void getServerAddressesViaGateway(List<String> partitionKeyRangeIds,
authorizationTokenProvider,
null,
getHttpClient(configs));

RxDocumentServiceRequest req =
for (int i = 0; i < 2; i++) {
RxDocumentServiceRequest req =
RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document,
collectionLink + "/docs/",
getDocumentDefinition(), new HashMap<>());

Mono<List<Address>> addresses = cache.getServerAddressesViaGatewayAsync(
collectionLink + "/docs/",
getDocumentDefinition(), new HashMap<>());
if (i == 1) {
req.forceCollectionRoutingMapRefresh = true; //testing address api with x-ms-collectionroutingmap-refresh true
}
Mono<List<Address>> addresses = cache.getServerAddressesViaGatewayAsync(
req, createdCollection.getResourceId(), partitionKeyRangeIds, false);

PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder()
PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder()
.withProtocol(protocol)
.replicasOfPartitions(partitionKeyRangeIds)
.build();

validateSuccess(addresses, validator, TIMEOUT);
validateSuccess(addresses, validator, TIMEOUT);
}
}

@Test(groups = { "direct" }, dataProvider = "protocolProvider", timeOut = TIMEOUT)
Expand All @@ -129,21 +132,24 @@ public void getMasterAddressesViaGatewayAsync(Protocol protocol) throws Exceptio
authorizationTokenProvider,
null,
getHttpClient(configs));

RxDocumentServiceRequest req =
for (int i = 0; i < 2; i++) {
RxDocumentServiceRequest req =
RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Database,
"/dbs",
new Database(), new HashMap<>());

Mono<List<Address>> addresses = cache.getMasterAddressesViaGatewayAsync(req, ResourceType.Database,
"/dbs",
new Database(), new HashMap<>());
if (i == 1) {
req.forceCollectionRoutingMapRefresh = true; //testing address api with x-ms-collectionroutingmap-refresh true
}
Mono<List<Address>> addresses = cache.getMasterAddressesViaGatewayAsync(req, ResourceType.Database,
null, "/dbs/", false, false, null);

PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder()
PartitionReplicasAddressesValidator validator = new PartitionReplicasAddressesValidator.Builder()
.withProtocol(protocol)
.replicasOfSamePartition()
.build();

validateSuccess(addresses, validator, TIMEOUT);
validateSuccess(addresses, validator, TIMEOUT);
}
}

@DataProvider(name = "targetPartitionsKeyRangeAndCollectionLinkParams")
Expand Down Expand Up @@ -802,7 +808,7 @@ public static void validateSuccess(Mono<List<Address>> observable,
testSubscriber.assertValueCount(1);
validator.validate(testSubscriber.values().get(0));
}

@BeforeClass(groups = { "direct" }, timeOut = SETUP_TIMEOUT)
public void beforeClass() {
client = clientBuilder().build();
Expand Down