Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,14 @@ private ImmutablePair<Integer, FormattedFilterInfo> getFiltersForPartitions(
List<PartitionKeyRange> partitionKeyRanges,
List<SortOrder> sortOrders,
Collection<String> orderByExpressions) {

ValueHolder<Map<String, OrderByContinuationToken>> valueHolder = new ValueHolder<>();
valueHolder.v = this.targetRangeToOrderByContinuationTokenMap;
// Find the partition key range we left off on
int startIndex = this.findTargetRangeAndExtractContinuationTokens(partitionKeyRanges,
orderByContinuationToken.getCompositeContinuationToken().getRange());
orderByContinuationToken.getCompositeContinuationToken().getRange(),
valueHolder,
orderByContinuationToken);

// Get the filters.
FormattedFilterInfo formattedFilterInfo = this.getFormattedFilters(orderByExpressions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
*/
public class ParallelDocumentQueryExecutionContext<T extends Resource>
extends ParallelDocumentQueryExecutionContextBase<T> {
private CosmosQueryRequestOptions cosmosQueryRequestOptions;
private final CosmosQueryRequestOptions cosmosQueryRequestOptions;
private final Map<PartitionKeyRange, String> partitionKeyRangeToContinuationTokenMap;

private ParallelDocumentQueryExecutionContext(
DiagnosticsClientContext diagnosticsClientContext,
Expand All @@ -61,6 +62,7 @@ private ParallelDocumentQueryExecutionContext(
super(diagnosticsClientContext, client, partitionKeyRanges, resourceTypeEnum, resourceType, query, cosmosQueryRequestOptions, resourceLink,
rewrittenQuery, isContinuationExpected, getLazyFeedResponse, correlatedActivityId);
this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
partitionKeyRangeToContinuationTokenMap = new HashMap<>();
}

public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
Expand Down Expand Up @@ -133,7 +135,6 @@ private void initialize(
int initialPageSize,
String continuationToken) {
// Generate the corresponding continuation token map.
Map<PartitionKeyRange, String> partitionKeyRangeToContinuationTokenMap = new HashMap<PartitionKeyRange, String>();
if (continuationToken == null) {
// If the user does not give a continuation token,
// then just start the query from the first partition.
Expand All @@ -152,7 +153,7 @@ private void initialize(
// then we know that partitions 0, 1, 2 are fully drained.

// Check to see if composite continuation token is a valid JSON.
ValueHolder<CompositeContinuationToken> outCompositeContinuationToken = new ValueHolder<CompositeContinuationToken>();
ValueHolder<CompositeContinuationToken> outCompositeContinuationToken = new ValueHolder<>();
if (!CompositeContinuationToken.tryParse(continuationToken,
outCompositeContinuationToken)) {
String message = String.format("INVALID JSON in continuation token %s for Parallel~Context",
Expand All @@ -163,20 +164,17 @@ private void initialize(

CompositeContinuationToken compositeContinuationToken = outCompositeContinuationToken.v;

// Get the right hand side of the query ranges:
// Get the right hand side of the query ranges and set continuation token for relevant ranges in the
// partitionKeyRangeToContinuationTokenMap
List<PartitionKeyRange> filteredPartitionKeyRanges = this.getPartitionKeyRangesForContinuation(
compositeContinuationToken,
targetRanges);

// The first partition is the one we left off on and have a backend continuation
// token for.
partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(0),
compositeContinuationToken.getToken());

// The remaining partitions we have yet to touch / have null continuation tokens
for (int i = 1; i < filteredPartitionKeyRanges.size(); i++) {
partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(i),
null);
if (!partitionKeyRangeToContinuationTokenMap.containsKey(filteredPartitionKeyRanges.get(i))) {
partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(i), null);
}
}
}

Expand All @@ -187,14 +185,21 @@ private void initialize(
}

private List<PartitionKeyRange> getPartitionKeyRangesForContinuation(
CompositeContinuationToken compositeContinuationToken,
List<PartitionKeyRange> partitionKeyRanges) {
// Find the partition key range we left off on
CompositeContinuationToken compositeContinuationToken,
List<PartitionKeyRange> partitionKeyRanges) {
Map<String, String> partitionRangeIdToTokenMap = new HashMap<>();
ValueHolder<Map<String, String>> outPartitionRangeIdToTokenMap = new ValueHolder<>(partitionRangeIdToTokenMap);
// Find the partition key range we left off on and fill the range to continuation token map
int startIndex = this.findTargetRangeAndExtractContinuationTokens(partitionKeyRanges,
compositeContinuationToken.getRange());

compositeContinuationToken.getRange(),
outPartitionRangeIdToTokenMap,
compositeContinuationToken.getToken());
List<PartitionKeyRange> rightHandSideRanges = new ArrayList<PartitionKeyRange>();
for (int i = startIndex; i < partitionKeyRanges.size(); i++) {
PartitionKeyRange range = partitionKeyRanges.get(i);
if (partitionRangeIdToTokenMap.containsKey(range.getId())) {
this.partitionKeyRangeToContinuationTokenMap.put(range, compositeContinuationToken.getToken());
}
rightHandSideRanges.add(partitionKeyRanges.get(i));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
Expand All @@ -23,12 +24,14 @@
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* While this class is public, but it is not part of our published public APIs.
Expand Down Expand Up @@ -103,7 +106,9 @@ protected void initialize(String collectionRid,
}

protected <TContinuationToken> int findTargetRangeAndExtractContinuationTokens(
List<PartitionKeyRange> partitionKeyRanges, Range<String> range) {
List<PartitionKeyRange> partitionKeyRanges, Range<String> range,
Utils.ValueHolder<Map<String, TContinuationToken>> outPartitionRangeToContinuation,
TContinuationToken continuation) {
if (partitionKeyRanges == null) {
throw new IllegalArgumentException("partitionKeyRanges can not be null.");
}
Expand Down Expand Up @@ -132,12 +137,29 @@ protected <TContinuationToken> int findTargetRangeAndExtractContinuationTokens(
String.format("Could not find partition key range for continuation token: {0}", needle));
}

List<PartitionKeyRange> replacementRanges;

// find what ranges make up the supplied continuation token
replacementRanges = partitionKeyRanges.stream()
.filter(p -> range.getMin().compareTo(p.getMinInclusive()) <= 0 &&
range.getMax().compareTo(p.getMaxExclusive()) >= 0)
.sorted(Comparator.comparing(PartitionKeyRange::getId))
.collect(Collectors.toList());

if (replacementRanges.isEmpty()) {
throw BridgeInternal.createCosmosException(HttpConstants.StatusCodes.BADREQUEST,
String.format("Cannot find ranges for continuation {}", continuation));
}

replacementRanges.forEach(r -> outPartitionRangeToContinuation.v.put(r.getId(), continuation));

return minIndex;
}

abstract protected DocumentProducer<T> createDocumentProducer(String collectionRid, PartitionKeyRange targetRange,
String initialContinuationToken
, int initialPageSize, CosmosQueryRequestOptions cosmosQueryRequestOptions, SqlQuerySpec querySpecForInit,
String initialContinuationToken, int initialPageSize,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
SqlQuerySpec querySpecForInit,
Map<String, String> commonRequestHeaders,
TriFunction<PartitionKeyRange, String, Integer, RxDocumentServiceRequest> createRequestFunc,
Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.
package com.azure.cosmos.rx;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
Expand All @@ -11,8 +12,10 @@
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseProperties;
import com.azure.cosmos.models.CosmosPermissionProperties;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
Expand All @@ -26,16 +29,21 @@
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import com.azure.cosmos.models.TriggerOperation;
import com.azure.cosmos.models.TriggerType;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.fasterxml.jackson.databind.JsonNode;
import io.reactivex.subscribers.TestSubscriber;
import org.jetbrains.annotations.NotNull;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -317,6 +325,129 @@ public void queryPlanCacheSinglePartitionParameterizedQueriesCorrectness() {

}

@Test(groups = {"simple"}, timeOut = TIMEOUT * 10)
public void splitQueryContinuationToken() throws Exception {
String containerId = "splittestcontainer_" + UUID.randomUUID();
int itemCount = 20;

//Create container
CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/mypk");
CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties).block();
CosmosAsyncContainer container = createdDatabase.getContainer(containerId);
AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(this.client);

//Insert some documents
List<TestObject> testObjects = insertDocuments(itemCount, Arrays.asList("CA", "US"), container);

List<String> sortedObjects = testObjects.stream()
.sorted(Comparator.comparing(TestObject::getProp))
.map(TestObject::getId)
.collect(Collectors.toList());

String query = "Select * from c";
String orderByQuery = "select * from c order by c.prop";

List<PartitionKeyRange> partitionKeyRanges = getPartitionKeyRanges(containerId, asyncDocumentClient);
String requestContinuation = null;
String orderByRequestContinuation = null;
int preferredPageSize = 15;
ArrayList<TestObject> resultList = new ArrayList<>();
ArrayList<TestObject> orderByResultList = new ArrayList<>();

// Query
FeedResponse<TestObject> jsonNodeFeedResponse = container
.queryItems(query, new CosmosQueryRequestOptions(), TestObject.class)
.byPage(preferredPageSize).blockFirst();
assert jsonNodeFeedResponse != null;
resultList.addAll(jsonNodeFeedResponse.getResults());
requestContinuation = jsonNodeFeedResponse.getContinuationToken();

// Orderby query
FeedResponse<TestObject> orderByFeedResponse = container
.queryItems(orderByQuery, new CosmosQueryRequestOptions(),
TestObject.class)
.byPage(preferredPageSize).blockFirst();
assert orderByFeedResponse != null;
orderByResultList.addAll(orderByFeedResponse.getResults());
orderByRequestContinuation = orderByFeedResponse.getContinuationToken();

// Scale up the throughput for a split
logger.info("Scaling up throughput for split");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(16000);
ThroughputResponse throughputResponse = container.replaceThroughput(throughputProperties).block();
logger.info("Throughput replace request submitted for {} ",
throughputResponse.getProperties().getManualThroughput());
throughputResponse = container.readThroughput().block();


// Wait for the throughput update to complete so that we get the partition split
while (true) {
assert throughputResponse != null;
if (!throughputResponse.isReplacePending()) {
break;
}
logger.info("Waiting for split to complete");
Thread.sleep(10 * 1000);
throughputResponse = container.readThroughput().block();
}

logger.info("Resuming query from the continuation");
// Read number of partitions. Should be greater than one
List<PartitionKeyRange> partitionKeyRangesAfterSplit = getPartitionKeyRanges(containerId, asyncDocumentClient);
assertThat(partitionKeyRangesAfterSplit.size()).isGreaterThan(partitionKeyRanges.size())
.as("Partition ranges should increase after split");
logger.info("After split num partitions = {}", partitionKeyRangesAfterSplit.size());

// Reading item to refresh cache
container.readItem(testObjects.get(0).getId(), new PartitionKey(testObjects.get(0).getMypk()),
JsonNode.class).block();

// Resume the query with continuation token saved above and make sure you get all the documents
Flux<FeedResponse<TestObject>> feedResponseFlux = container
.queryItems(query, new CosmosQueryRequestOptions(),
TestObject.class)
.byPage(requestContinuation, preferredPageSize);

for (FeedResponse<TestObject> nodeFeedResponse : feedResponseFlux.toIterable()) {
resultList.addAll(nodeFeedResponse.getResults());
}

// Resume the orderby query with continuation token saved above and make sure you get all the documents
Flux<FeedResponse<TestObject>> orderfeedResponseFlux = container
.queryItems(orderByQuery, new CosmosQueryRequestOptions(),
TestObject.class)
.byPage(orderByRequestContinuation, preferredPageSize);

for (FeedResponse<TestObject> nodeFeedResponse : orderfeedResponseFlux.toIterable()) {
orderByResultList.addAll(nodeFeedResponse.getResults());
}

List<String> sourceIds = testObjects.stream().map(obj -> obj.getId()).collect(Collectors.toList());
List<String> resultIds = resultList.stream().map(obj -> obj.getId()).collect(Collectors.toList());
List<String> orderResultIds = orderByResultList.stream().map(obj -> obj.getId()).collect(Collectors.toList());

assertThat(resultIds).containsExactlyInAnyOrderElementsOf(sourceIds)
.as("Resuming query from continuation token after split validated");

assertThat(orderResultIds).containsExactlyElementsOf(sortedObjects)
.as("Resuming orderby query from continuation token after split validated");

container.delete().block();
}

@NotNull
private List<PartitionKeyRange> getPartitionKeyRanges(
String containerId, AsyncDocumentClient asyncDocumentClient) {
List<PartitionKeyRange> partitionKeyRanges = new ArrayList<>();
List<FeedResponse<PartitionKeyRange>> partitionFeedResponseList = asyncDocumentClient
.readPartitionKeyRanges("/dbs/" + createdDatabase.getId()
+ "/colls/" + containerId,
new CosmosQueryRequestOptions())
.collectList().block();
partitionFeedResponseList.forEach(f -> partitionKeyRanges.addAll(f.getResults()));
return partitionKeyRanges;
}

private <T> List<T> queryAndGetResults(SqlQuerySpec querySpec, CosmosQueryRequestOptions options, Class<T> type) {
CosmosPagedFlux<T> queryPagedFlux = createdContainer.queryItems(querySpec, options, type);
TestSubscriber<T> testSubscriber = new TestSubscriber<>();
Expand Down