diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java index dde4f9d2ac47..60c7fe2db4cc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java @@ -231,9 +231,14 @@ private ImmutablePair getFiltersForPartitions( List partitionKeyRanges, List sortOrders, Collection orderByExpressions) { + + ValueHolder> valueHolder = new ValueHolder<>(); + valueHolder.v = this.targetRangeToOrderByContinuationTokenMap; // Find the partition key range we left off on int startIndex = this.findTargetRangeAndExtractContinuationTokens(partitionKeyRanges, - orderByContinuationToken.getCompositeContinuationToken().getRange()); + orderByContinuationToken.getCompositeContinuationToken().getRange(), + valueHolder, + orderByContinuationToken); // Get the filters. FormattedFilterInfo formattedFilterInfo = this.getFormattedFilters(orderByExpressions, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java index ca4f88e2ce64..7555136d18b9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java @@ -42,7 +42,8 @@ */ public class ParallelDocumentQueryExecutionContext extends ParallelDocumentQueryExecutionContextBase { - private CosmosQueryRequestOptions cosmosQueryRequestOptions; + private final CosmosQueryRequestOptions cosmosQueryRequestOptions; + private final Map partitionKeyRangeToContinuationTokenMap; private ParallelDocumentQueryExecutionContext( DiagnosticsClientContext diagnosticsClientContext, @@ -61,6 +62,7 @@ private ParallelDocumentQueryExecutionContext( super(diagnosticsClientContext, client, partitionKeyRanges, resourceTypeEnum, resourceType, query, cosmosQueryRequestOptions, resourceLink, rewrittenQuery, isContinuationExpected, getLazyFeedResponse, correlatedActivityId); this.cosmosQueryRequestOptions = cosmosQueryRequestOptions; + partitionKeyRangeToContinuationTokenMap = new HashMap<>(); } public static Flux> createAsync( @@ -133,7 +135,6 @@ private void initialize( int initialPageSize, String continuationToken) { // Generate the corresponding continuation token map. - Map partitionKeyRangeToContinuationTokenMap = new HashMap(); if (continuationToken == null) { // If the user does not give a continuation token, // then just start the query from the first partition. @@ -152,7 +153,7 @@ private void initialize( // then we know that partitions 0, 1, 2 are fully drained. // Check to see if composite continuation token is a valid JSON. - ValueHolder outCompositeContinuationToken = new ValueHolder(); + ValueHolder outCompositeContinuationToken = new ValueHolder<>(); if (!CompositeContinuationToken.tryParse(continuationToken, outCompositeContinuationToken)) { String message = String.format("INVALID JSON in continuation token %s for Parallel~Context", @@ -163,20 +164,17 @@ private void initialize( CompositeContinuationToken compositeContinuationToken = outCompositeContinuationToken.v; - // Get the right hand side of the query ranges: + // Get the right hand side of the query ranges and set continuation token for relevant ranges in the + // partitionKeyRangeToContinuationTokenMap List filteredPartitionKeyRanges = this.getPartitionKeyRangesForContinuation( compositeContinuationToken, targetRanges); - // The first partition is the one we left off on and have a backend continuation - // token for. - partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(0), - compositeContinuationToken.getToken()); - // The remaining partitions we have yet to touch / have null continuation tokens for (int i = 1; i < filteredPartitionKeyRanges.size(); i++) { - partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(i), - null); + if (!partitionKeyRangeToContinuationTokenMap.containsKey(filteredPartitionKeyRanges.get(i))) { + partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(i), null); + } } } @@ -187,14 +185,21 @@ private void initialize( } private List getPartitionKeyRangesForContinuation( - CompositeContinuationToken compositeContinuationToken, - List partitionKeyRanges) { - // Find the partition key range we left off on + CompositeContinuationToken compositeContinuationToken, + List partitionKeyRanges) { + Map partitionRangeIdToTokenMap = new HashMap<>(); + ValueHolder> outPartitionRangeIdToTokenMap = new ValueHolder<>(partitionRangeIdToTokenMap); + // Find the partition key range we left off on and fill the range to continuation token map int startIndex = this.findTargetRangeAndExtractContinuationTokens(partitionKeyRanges, - compositeContinuationToken.getRange()); - + compositeContinuationToken.getRange(), + outPartitionRangeIdToTokenMap, + compositeContinuationToken.getToken()); List rightHandSideRanges = new ArrayList(); for (int i = startIndex; i < partitionKeyRanges.size(); i++) { + PartitionKeyRange range = partitionKeyRanges.get(i); + if (partitionRangeIdToTokenMap.containsKey(range.getId())) { + this.partitionKeyRangeToContinuationTokenMap.put(range, compositeContinuationToken.getToken()); + } rightHandSideRanges.add(partitionKeyRanges.get(i)); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java index e15f21aba69d..744abfcd9840 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java @@ -11,6 +11,7 @@ import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.Strings; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.Range; import com.azure.cosmos.models.CosmosQueryRequestOptions; @@ -23,12 +24,14 @@ import reactor.core.publisher.Mono; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; import java.util.function.Function; +import java.util.stream.Collectors; /** * While this class is public, but it is not part of our published public APIs. @@ -103,7 +106,9 @@ protected void initialize(String collectionRid, } protected int findTargetRangeAndExtractContinuationTokens( - List partitionKeyRanges, Range range) { + List partitionKeyRanges, Range range, + Utils.ValueHolder> outPartitionRangeToContinuation, + TContinuationToken continuation) { if (partitionKeyRanges == null) { throw new IllegalArgumentException("partitionKeyRanges can not be null."); } @@ -132,12 +137,29 @@ protected int findTargetRangeAndExtractContinuationTokens( String.format("Could not find partition key range for continuation token: {0}", needle)); } + List replacementRanges; + + // find what ranges make up the supplied continuation token + replacementRanges = partitionKeyRanges.stream() + .filter(p -> range.getMin().compareTo(p.getMinInclusive()) <= 0 && + range.getMax().compareTo(p.getMaxExclusive()) >= 0) + .sorted(Comparator.comparing(PartitionKeyRange::getId)) + .collect(Collectors.toList()); + + if (replacementRanges.isEmpty()) { + throw BridgeInternal.createCosmosException(HttpConstants.StatusCodes.BADREQUEST, + String.format("Cannot find ranges for continuation {}", continuation)); + } + + replacementRanges.forEach(r -> outPartitionRangeToContinuation.v.put(r.getId(), continuation)); + return minIndex; } abstract protected DocumentProducer createDocumentProducer(String collectionRid, PartitionKeyRange targetRange, - String initialContinuationToken - , int initialPageSize, CosmosQueryRequestOptions cosmosQueryRequestOptions, SqlQuerySpec querySpecForInit, + String initialContinuationToken, int initialPageSize, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + SqlQuerySpec querySpecForInit, Map commonRequestHeaders, TriFunction createRequestFunc, Function>> executeFunc, diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java index 4940c16af0c7..dee610d8d6cc 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos.rx; +import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosAsyncDatabase; @@ -11,8 +12,10 @@ import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.PartitionKeyRange; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; +import com.azure.cosmos.models.CosmosContainerResponse; import com.azure.cosmos.models.CosmosDatabaseProperties; import com.azure.cosmos.models.CosmosPermissionProperties; import com.azure.cosmos.models.CosmosQueryRequestOptions; @@ -26,16 +29,21 @@ import com.azure.cosmos.models.SqlParameter; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.models.ThroughputProperties; +import com.azure.cosmos.models.ThroughputResponse; import com.azure.cosmos.models.TriggerOperation; import com.azure.cosmos.models.TriggerType; import com.azure.cosmos.util.CosmosPagedFlux; +import com.fasterxml.jackson.databind.JsonNode; import io.reactivex.subscribers.TestSubscriber; +import org.jetbrains.annotations.NotNull; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; +import reactor.core.publisher.Flux; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -317,6 +325,129 @@ public void queryPlanCacheSinglePartitionParameterizedQueriesCorrectness() { } + @Test(groups = {"simple"}, timeOut = TIMEOUT * 10) + public void splitQueryContinuationToken() throws Exception { + String containerId = "splittestcontainer_" + UUID.randomUUID(); + int itemCount = 20; + + //Create container + CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/mypk"); + CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties).block(); + CosmosAsyncContainer container = createdDatabase.getContainer(containerId); + AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(this.client); + + //Insert some documents + List testObjects = insertDocuments(itemCount, Arrays.asList("CA", "US"), container); + + List sortedObjects = testObjects.stream() + .sorted(Comparator.comparing(TestObject::getProp)) + .map(TestObject::getId) + .collect(Collectors.toList()); + + String query = "Select * from c"; + String orderByQuery = "select * from c order by c.prop"; + + List partitionKeyRanges = getPartitionKeyRanges(containerId, asyncDocumentClient); + String requestContinuation = null; + String orderByRequestContinuation = null; + int preferredPageSize = 15; + ArrayList resultList = new ArrayList<>(); + ArrayList orderByResultList = new ArrayList<>(); + + // Query + FeedResponse jsonNodeFeedResponse = container + .queryItems(query, new CosmosQueryRequestOptions(), TestObject.class) + .byPage(preferredPageSize).blockFirst(); + assert jsonNodeFeedResponse != null; + resultList.addAll(jsonNodeFeedResponse.getResults()); + requestContinuation = jsonNodeFeedResponse.getContinuationToken(); + + // Orderby query + FeedResponse orderByFeedResponse = container + .queryItems(orderByQuery, new CosmosQueryRequestOptions(), + TestObject.class) + .byPage(preferredPageSize).blockFirst(); + assert orderByFeedResponse != null; + orderByResultList.addAll(orderByFeedResponse.getResults()); + orderByRequestContinuation = orderByFeedResponse.getContinuationToken(); + + // Scale up the throughput for a split + logger.info("Scaling up throughput for split"); + ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(16000); + ThroughputResponse throughputResponse = container.replaceThroughput(throughputProperties).block(); + logger.info("Throughput replace request submitted for {} ", + throughputResponse.getProperties().getManualThroughput()); + throughputResponse = container.readThroughput().block(); + + + // Wait for the throughput update to complete so that we get the partition split + while (true) { + assert throughputResponse != null; + if (!throughputResponse.isReplacePending()) { + break; + } + logger.info("Waiting for split to complete"); + Thread.sleep(10 * 1000); + throughputResponse = container.readThroughput().block(); + } + + logger.info("Resuming query from the continuation"); + // Read number of partitions. Should be greater than one + List partitionKeyRangesAfterSplit = getPartitionKeyRanges(containerId, asyncDocumentClient); + assertThat(partitionKeyRangesAfterSplit.size()).isGreaterThan(partitionKeyRanges.size()) + .as("Partition ranges should increase after split"); + logger.info("After split num partitions = {}", partitionKeyRangesAfterSplit.size()); + + // Reading item to refresh cache + container.readItem(testObjects.get(0).getId(), new PartitionKey(testObjects.get(0).getMypk()), + JsonNode.class).block(); + + // Resume the query with continuation token saved above and make sure you get all the documents + Flux> feedResponseFlux = container + .queryItems(query, new CosmosQueryRequestOptions(), + TestObject.class) + .byPage(requestContinuation, preferredPageSize); + + for (FeedResponse nodeFeedResponse : feedResponseFlux.toIterable()) { + resultList.addAll(nodeFeedResponse.getResults()); + } + + // Resume the orderby query with continuation token saved above and make sure you get all the documents + Flux> orderfeedResponseFlux = container + .queryItems(orderByQuery, new CosmosQueryRequestOptions(), + TestObject.class) + .byPage(orderByRequestContinuation, preferredPageSize); + + for (FeedResponse nodeFeedResponse : orderfeedResponseFlux.toIterable()) { + orderByResultList.addAll(nodeFeedResponse.getResults()); + } + + List sourceIds = testObjects.stream().map(obj -> obj.getId()).collect(Collectors.toList()); + List resultIds = resultList.stream().map(obj -> obj.getId()).collect(Collectors.toList()); + List orderResultIds = orderByResultList.stream().map(obj -> obj.getId()).collect(Collectors.toList()); + + assertThat(resultIds).containsExactlyInAnyOrderElementsOf(sourceIds) + .as("Resuming query from continuation token after split validated"); + + assertThat(orderResultIds).containsExactlyElementsOf(sortedObjects) + .as("Resuming orderby query from continuation token after split validated"); + + container.delete().block(); + } + + @NotNull + private List getPartitionKeyRanges( + String containerId, AsyncDocumentClient asyncDocumentClient) { + List partitionKeyRanges = new ArrayList<>(); + List> partitionFeedResponseList = asyncDocumentClient + .readPartitionKeyRanges("/dbs/" + createdDatabase.getId() + + "/colls/" + containerId, + new CosmosQueryRequestOptions()) + .collectList().block(); + partitionFeedResponseList.forEach(f -> partitionKeyRanges.addAll(f.getResults())); + return partitionKeyRanges; + } + private List queryAndGetResults(SqlQuerySpec querySpec, CosmosQueryRequestOptions options, Class type) { CosmosPagedFlux queryPagedFlux = createdContainer.queryItems(querySpec, options, type); TestSubscriber testSubscriber = new TestSubscriber<>();