diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncMixedBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncMixedBenchmark.java index 8e1f2b527cc5..d5e3eb6c8900 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncMixedBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncMixedBenchmark.java @@ -50,7 +50,6 @@ protected void performWorkload(BaseSubscriber documentBaseSubscriber, FeedOptions options = new FeedOptions(); options.maxItemCount(10); - options.setEnableCrossPartitionQuery(true); String sqlQuery = "Select top 100 * from c order by c._ts"; obs = client.queryDocuments(getCollectionLink(), sqlQuery, options) diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java index bb82bc9e4146..ee0e2484dc70 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncQueryBenchmark.java @@ -78,7 +78,6 @@ protected void performWorkload(BaseSubscriber> baseSubscr if (configuration.getOperationType() == Configuration.Operation.QueryCross) { int index = r.nextInt(1000); - options.setEnableCrossPartitionQuery(true); String sqlQuery = "Select * from c where c._rid = \"" + docsToRead.get(index).getResourceId() + "\""; obs = client.queryDocuments(getCollectionLink(), sqlQuery, options); } else if (configuration.getOperationType() == Configuration.Operation.QuerySingle) { @@ -91,35 +90,29 @@ protected void performWorkload(BaseSubscriber> baseSubscr } else if (configuration.getOperationType() == Configuration.Operation.QueryParallel) { options.maxItemCount(10); - options.setEnableCrossPartitionQuery(true); String sqlQuery = "Select * from c"; obs = client.queryDocuments(getCollectionLink(), sqlQuery, options); } else if (configuration.getOperationType() == Configuration.Operation.QueryOrderby) { options.maxItemCount(10); - options.setEnableCrossPartitionQuery(true); String sqlQuery = "Select * from c order by c._ts"; obs = client.queryDocuments(getCollectionLink(), sqlQuery, options); } else if (configuration.getOperationType() == Configuration.Operation.QueryAggregate) { options.maxItemCount(10); - options.setEnableCrossPartitionQuery(true); String sqlQuery = "Select value max(c._ts) from c"; obs = client.queryDocuments(getCollectionLink(), sqlQuery, options); } else if (configuration.getOperationType() == Configuration.Operation.QueryAggregateTopOrderby) { - options.setEnableCrossPartitionQuery(true); String sqlQuery = "Select top 1 value count(c) from c order by c._ts"; obs = client.queryDocuments(getCollectionLink(), sqlQuery, options); } else if (configuration.getOperationType() == Configuration.Operation.QueryTopOrderby) { - options.setEnableCrossPartitionQuery(true); String sqlQuery = "Select top 1000 * from c order by c._ts"; obs = client.queryDocuments(getCollectionLink(), sqlQuery, options); } else if (configuration.getOperationType() == Configuration.Operation.QueryInClauseParallel) { ReadMyWriteWorkflow.QueryBuilder queryBuilder = new ReadMyWriteWorkflow.QueryBuilder(); - options.setEnableCrossPartitionQuery(true); options.setMaxDegreeOfParallelism(200); List parameters = new ArrayList<>(); int j = 0; diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java index b35649236961..29e192ff3871 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java @@ -232,7 +232,6 @@ private SqlQuerySpec generateRandomQuery() { private Flux xPartitionQuery(SqlQuerySpec query) { FeedOptions options = new FeedOptions(); options.setMaxDegreeOfParallelism(-1); - options.setEnableCrossPartitionQuery(true); return client.queryDocuments(getCollectionLink(), query, options) .flatMap(p -> Flux.fromIterable(p.getResults())); diff --git a/sdk/cosmos/azure-cosmos-examples/src/main/java/com/azure/cosmos/examples/BasicDemo.java b/sdk/cosmos/azure-cosmos-examples/src/main/java/com/azure/cosmos/examples/BasicDemo.java index a7abad6eaf70..a980af696db1 100644 --- a/sdk/cosmos/azure-cosmos-examples/src/main/java/com/azure/cosmos/examples/BasicDemo.java +++ b/sdk/cosmos/azure-cosmos-examples/src/main/java/com/azure/cosmos/examples/BasicDemo.java @@ -107,7 +107,6 @@ private void queryItems() { log("+ Querying the collection "); String query = "SELECT * from root"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); options.setMaxDegreeOfParallelism(2); Flux> queryFlux = container.queryItems(query, options, TestObject.class); @@ -128,7 +127,6 @@ private void queryWithContinuationToken() { log("+ Query with paging using continuation token"); String query = "SELECT * from root r "; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); options.populateQueryMetrics(true); options.maxItemCount(1); String continuation = null; diff --git a/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/DocumentCRUDAsyncAPITest.java b/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/DocumentCRUDAsyncAPITest.java index 0666056a77d2..ce65f2a1edef 100644 --- a/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/DocumentCRUDAsyncAPITest.java +++ b/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/DocumentCRUDAsyncAPITest.java @@ -413,7 +413,6 @@ public void documentDelete_Async() throws Exception { // Assert document is deleted FeedOptions queryOptions = new FeedOptions(); - queryOptions.setEnableCrossPartitionQuery(true); List listOfDocuments = client .queryDocuments(getCollectionLink(), String.format("SELECT * FROM r where r.id = '%s'", createdDocument.getId()), queryOptions) .map(FeedResponse::getResults) // Map page to its list of documents diff --git a/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/DocumentQueryAsyncAPITest.java b/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/DocumentQueryAsyncAPITest.java index fe93d528cb51..4274ea06a3e3 100644 --- a/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/DocumentQueryAsyncAPITest.java +++ b/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/DocumentQueryAsyncAPITest.java @@ -130,7 +130,6 @@ public void queryDocuments_Async() throws Exception { int requestPageSize = 3; FeedOptions options = new FeedOptions(); options.maxItemCount(requestPageSize); - options.setEnableCrossPartitionQuery(true); Flux> documentQueryObservable = client .queryDocuments(getCollectionLink(), "SELECT * FROM root", options); @@ -176,7 +175,6 @@ public void queryDocuments_Async_withoutLambda() throws Exception { int requestPageSize = 3; FeedOptions options = new FeedOptions(); options.maxItemCount(requestPageSize); - options.setEnableCrossPartitionQuery(true); Flux> documentQueryObservable = client .queryDocuments(getCollectionLink(), "SELECT * FROM root", options); @@ -225,7 +223,6 @@ public void queryDocuments_findTotalRequestCharge() throws Exception { int requestPageSize = 3; FeedOptions options = new FeedOptions(); options.maxItemCount(requestPageSize); - options.setEnableCrossPartitionQuery(true); Flux totalChargeObservable = client .queryDocuments(getCollectionLink(), "SELECT * FROM root", options) @@ -250,7 +247,6 @@ public void queryDocuments_unsubscribeAfterFirstPage() throws Exception { int requestPageSize = 3; FeedOptions options = new FeedOptions(); options.maxItemCount(requestPageSize); - options.setEnableCrossPartitionQuery(true); Flux> requestChargeObservable = client .queryDocuments(getCollectionLink(), "SELECT * FROM root", options); @@ -287,7 +283,6 @@ public void queryDocuments_filterFetchedResults() throws Exception { int requestPageSize = 3; FeedOptions options = new FeedOptions(); options.maxItemCount(requestPageSize); - options.setEnableCrossPartitionQuery(true); Predicate isPrimeNumber = new Predicate() { @@ -347,7 +342,6 @@ public void queryDocuments_toBlocking_toIterator() { int requestPageSize = 3; FeedOptions options = new FeedOptions(); options.maxItemCount(requestPageSize); - options.setEnableCrossPartitionQuery(true); Flux> documentQueryObservable = client .queryDocuments(getCollectionLink(), "SELECT * FROM root", options); @@ -396,7 +390,6 @@ public void qrderBy_Async() throws Exception { // Query for the documents order by the prop field SqlQuerySpec query = new SqlQuerySpec("SELECT r.id FROM r ORDER BY r.prop", new SqlParameterList()); FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); options.maxItemCount(5); // Max degree of parallelism determines the number of partitions that @@ -434,7 +427,6 @@ public void transformObservableToCompletableFuture() throws Exception { int requestPageSize = 3; FeedOptions options = new FeedOptions(); options.maxItemCount(requestPageSize); - options.setEnableCrossPartitionQuery(true); Flux> documentQueryObservable = client .queryDocuments(getCollectionLink(), "SELECT * FROM root", options); diff --git a/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/InMemoryGroupbyTest.java b/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/InMemoryGroupbyTest.java index ef2bbbd015f8..e428dad0ab72 100644 --- a/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/InMemoryGroupbyTest.java +++ b/sdk/cosmos/azure-cosmos-examples/src/test/java/com/azure/cosmos/rx/examples/InMemoryGroupbyTest.java @@ -106,7 +106,6 @@ public void groupByInMemory() { int requestPageSize = 3; FeedOptions options = new FeedOptions(); options.maxItemCount(requestPageSize); - options.setEnableCrossPartitionQuery(true); Flux documentsObservable = client .queryDocuments(getCollectionLink(), @@ -138,7 +137,6 @@ public void groupByInMemory_MoreDetail() { int requestPageSize = 3; FeedOptions options = new FeedOptions(); options.maxItemCount(requestPageSize); - options.setEnableCrossPartitionQuery(true); Flux documentsObservable = client .queryDocuments(getCollectionLink(), diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java index 7a95cc847ecb..16117824788b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java @@ -153,11 +153,6 @@ public static Map getFeedHeaders(FeedOptions options) { options.getEmitVerboseTracesInQuery().toString()); } - if (options.getEnableCrossPartitionQuery() != null) { - headers.put(HttpConstants.HttpHeaders.ENABLE_CROSS_PARTITION_QUERY, - options.getEnableCrossPartitionQuery().toString()); - } - if (options.getMaxDegreeOfParallelism() != 0) { headers.put(HttpConstants.HttpHeaders.PARALLELIZE_CROSS_PARTITION_QUERY, Boolean.TRUE.toString()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/FeedOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/FeedOptions.java index 4d624b9db69e..98bef214ba82 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/FeedOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/FeedOptions.java @@ -14,7 +14,6 @@ public final class FeedOptions { private String partitionKeyRangeId; private Boolean enableScanInQuery; private Boolean emitVerboseTracesInQuery; - private Boolean enableCrossPartitionQuery; private int maxDegreeOfParallelism; private int maxBufferedItemCount; private int responseContinuationTokenLimitInKb; @@ -23,6 +22,7 @@ public final class FeedOptions { private PartitionKey partitionkey; private boolean populateQueryMetrics; private Map properties; + private boolean allowEmptyPages; public FeedOptions() { } @@ -32,7 +32,6 @@ public FeedOptions(FeedOptions options) { this.partitionKeyRangeId = options.partitionKeyRangeId; this.enableScanInQuery = options.enableScanInQuery; this.emitVerboseTracesInQuery = options.emitVerboseTracesInQuery; - this.enableCrossPartitionQuery = options.enableCrossPartitionQuery; this.maxDegreeOfParallelism = options.maxDegreeOfParallelism; this.maxBufferedItemCount = options.maxBufferedItemCount; this.responseContinuationTokenLimitInKb = options.responseContinuationTokenLimitInKb; @@ -40,6 +39,7 @@ public FeedOptions(FeedOptions options) { this.requestContinuation = options.requestContinuation; this.partitionkey = options.partitionkey; this.populateQueryMetrics = options.populateQueryMetrics; + this.allowEmptyPages = options.allowEmptyPages; } /** @@ -126,30 +126,6 @@ public FeedOptions setEmitVerboseTracesInQuery(Boolean emitVerboseTracesInQuery) return this; } - /** - * Gets the option to allow queries to run across all partitions of the - * collection. - * - * @return whether to allow queries to run across all partitions of the - * collection. - */ - public Boolean getEnableCrossPartitionQuery() { - return this.enableCrossPartitionQuery; - } - - /** - * Sets the option to allow queries to run across all partitions of the - * collection. - * - * @param enableCrossPartitionQuery whether to allow queries to run across all - * partitions of the collection. - * @return the FeedOptions. - */ - public FeedOptions setEnableCrossPartitionQuery(Boolean enableCrossPartitionQuery) { - this.enableCrossPartitionQuery = enableCrossPartitionQuery; - return this; - } - /** * Gets the number of concurrent operations run client side during parallel * query execution. @@ -338,4 +314,19 @@ public FeedOptions properties(Map properties) { this.properties = properties; return this; } + + /** + * Gets the option to allow empty result pages in feed response. + */ + public boolean getAllowEmptyPages() { + return allowEmptyPages; + } + + /** + * Sets the option to allow empty result pages in feed response. Defaults to false + * @param allowEmptyPages whether to allow empty pages in feed response + */ + public void setAllowEmptyPages(boolean allowEmptyPages) { + this.allowEmptyPages = allowEmptyPages; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/JsonSerializable.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/JsonSerializable.java index f94ce6188b87..3e004b5e55c9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/JsonSerializable.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/JsonSerializable.java @@ -98,7 +98,7 @@ static Object getValue(JsonNode value) { case STRING: return value.asText(); default: - throw new IllegalStateException("Unexpected value: " + value.getNodeType()); + return value; } } return value; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index 804ea12585d6..f00847d52e01 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -78,6 +78,9 @@ public static class HttpHeaders { public static final String IS_QUERY = "x-ms-documentdb-isquery"; public static final String ENABLE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-enablecrosspartition"; public static final String PARALLELIZE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-parallelizecrosspartitionquery"; + public static final String IS_QUERY_PLAN_REQUEST = "x-ms-cosmos-is-query-plan-request"; + public static final String SUPPORTED_QUERY_FEATURES = "x-ms-cosmos-supported-query-features"; + public static final String QUERY_VERSION = "x-ms-cosmos-query-version"; // Our custom DocDB headers public static final String CONTINUATION = "x-ms-continuation"; @@ -249,6 +252,7 @@ public static class A_IMHeaderValues { public static class Versions { public static final String CURRENT_VERSION = "2018-12-31"; + public static final String QUERY_VERSION = "1.0"; // TODO: FIXME we can use maven plugin for generating a version file // @see diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/OperationType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/OperationType.java index d7324aaf376e..01bc4e85ec03 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/OperationType.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/OperationType.java @@ -36,6 +36,7 @@ public enum OperationType { Replace, Resume, SqlQuery, + QueryPlan, Stop, Throttle, Update, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index acee4a09c6af..52c1676c2f57 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -111,7 +111,9 @@ private Flux readFeed(RxDocumentServiceRequest reques } private Flux query(RxDocumentServiceRequest request) { - request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true"); + if(request.getOperationType() != OperationType.QueryPlan) { + request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true"); + } switch (this.queryCompatibilityMode) { case SqlQuery: @@ -369,6 +371,7 @@ private Flux invokeAsyncInternal(RxDocumentServiceReq return this.replace(request); case SqlQuery: case Query: + case QueryPlan: return this.query(request); default: throw new IllegalStateException("Unknown operation setType " + request.getOperationType()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionedByIdCollectionRequestOptionsFactory.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionedByIdCollectionRequestOptionsFactory.java index a148f0885320..df316496b105 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionedByIdCollectionRequestOptionsFactory.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionedByIdCollectionRequestOptionsFactory.java @@ -23,7 +23,6 @@ public CosmosItemRequestOptions createRequestOptions(Lease lease) { @Override public FeedOptions createFeedOptions() { FeedOptions feedOptions = new FeedOptions(); - feedOptions.setEnableCrossPartitionQuery(true); return feedOptions; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java index ccfc42a9fc11..bfee9ef37af9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DefaultDocumentQueryExecutionContext.java @@ -29,12 +29,14 @@ import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.Utils.ValueHolder; +import com.azure.cosmos.implementation.routing.RoutingMapProviderHelper; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -112,11 +114,18 @@ public Flux> executeAsync() { .getPaginatedQueryResultAsObservable(newFeedOptions, createRequestFunc, executeFunc, resourceType, maxPageSize); } - public Mono>> getTargetPartitionKeyRanges(String resourceId, List> queryRanges) { - // TODO: FIXME this needs to be revisited + public Mono> getTargetPartitionKeyRanges(String resourceId, List> queryRanges) { + return RoutingMapProviderHelper.getOverlappingRanges(client.getPartitionKeyRangeCache(), resourceId, queryRanges); + } - Range r = new Range<>("", "FF", true, false); - return client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(resourceId, r, false, null); + public Mono> getTargetPartitionKeyRangesById(String resourceId, + String partitionKeyRangeIdInternal) { + return client.getPartitionKeyRangeCache() + .tryGetPartitionKeyRangeByIdAsync(resourceId, + partitionKeyRangeIdInternal, + false, + null) + .flatMap(partitionKeyRange -> Mono.just(Collections.singletonList(partitionKeyRange.v))); } protected Function>> executeInternalAsyncFunc() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java index 46774541392f..1364d68dcc45 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextBase.java @@ -153,21 +153,10 @@ public Map createCommonHeadersAsync(FeedOptions feedOptions) { requestHeaders.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(feedOptions.maxItemCount())); } - if (feedOptions.getEnableCrossPartitionQuery() != null) { - - requestHeaders.put(HttpConstants.HttpHeaders.ENABLE_CROSS_PARTITION_QUERY, - Strings.toString(feedOptions.getEnableCrossPartitionQuery())); - } - if (feedOptions.getMaxDegreeOfParallelism() != 0) { requestHeaders.put(HttpConstants.HttpHeaders.PARALLELIZE_CROSS_PARTITION_QUERY, Strings.toString(true)); } - if (this.feedOptions.getEnableCrossPartitionQuery() != null) { - requestHeaders.put(HttpConstants.HttpHeaders.ENABLE_SCAN_IN_QUERY, - Strings.toString(this.feedOptions.getEnableCrossPartitionQuery())); - } - if (this.feedOptions.setResponseContinuationTokenLimitInKb() > 0) { requestHeaders.put(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB, Strings.toString(feedOptions.setResponseContinuationTokenLimitInKb())); @@ -207,7 +196,10 @@ public void populatePartitionKeyRangeInfo(RxDocumentServiceRequest request, Part } if (this.resourceTypeEnum.isPartitioned()) { - request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.getId())); + boolean hasPartitionKey = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) != null; + if(!hasPartitionKey){ + request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.getId())); + } } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java index f83eb39a04de..2e1998096d15 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java @@ -2,6 +2,8 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation.query; +import com.azure.cosmos.CommonsBridgeInternal; +import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.caches.RxCollectionCache; import com.azure.cosmos.BadRequestException; import com.azure.cosmos.BridgeInternal; @@ -15,9 +17,13 @@ import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.routing.PartitionKeyInternal; +import com.azure.cosmos.implementation.routing.Range; +import org.apache.commons.lang3.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -62,29 +68,77 @@ public static Flux> proxyQueryExecutionContext = - collectionObs.flatMap(collectionValueHolder -> { - - if (collectionValueHolder.v != null && feedOptions != null && feedOptions.partitionKey() != null && feedOptions.partitionKey().equals(PartitionKey.NONE)) { - feedOptions.partitionKey(BridgeInternal.getPartitionKey(BridgeInternal.getNonePartitionKey(collectionValueHolder.v.getPartitionKey()))); - } - return ProxyDocumentQueryExecutionContext.createAsync( - client, - resourceTypeEnum, - resourceType, - query, - feedOptions, - resourceLink, - collectionValueHolder.v, - isContinuationExpected, - correlatedActivityId); - }); - - return proxyQueryExecutionContext; + DefaultDocumentQueryExecutionContext queryExecutionContext = new DefaultDocumentQueryExecutionContext( + client, + resourceTypeEnum, + resourceType, + query, + feedOptions, + resourceLink, + correlatedActivityId, + isContinuationExpected); + + if (ResourceType.Document != resourceTypeEnum) { + return Flux.just(queryExecutionContext); + } + + Mono queryExecutionInfoMono = + QueryPlanRetriever + .getQueryPlanThroughGatewayAsync(client, query, resourceLink); + + return collectionObs.single().flatMap(collectionValueHolder -> + queryExecutionInfoMono.flatMap(partitionedQueryExecutionInfo -> { + QueryInfo queryInfo = + partitionedQueryExecutionInfo.getQueryInfo(); + + Mono> partitionKeyRanges; + // The partitionKeyRangeIdInternal is no more a public API on + // FeedOptions, but have the below condition + // for handling ParallelDocumentQueryTest#partitionKeyRangeId + if (feedOptions != null && !StringUtils + .isEmpty(CommonsBridgeInternal + .partitionKeyRangeIdInternal(feedOptions))) { + partitionKeyRanges = queryExecutionContext + .getTargetPartitionKeyRangesById(collectionValueHolder.v + .getResourceId(), + CommonsBridgeInternal + .partitionKeyRangeIdInternal(feedOptions)); + } else { + List> queryRanges = + partitionedQueryExecutionInfo.getQueryRanges(); + + if (feedOptions != null + && feedOptions.partitionKey() != null + && feedOptions.partitionKey() != PartitionKey.NONE) { + PartitionKeyInternal internalPartitionKey = + BridgeInternal.getPartitionKeyInternal(feedOptions.partitionKey()); + Range range = Range + .getPointRange(internalPartitionKey + .getEffectivePartitionKeyString(internalPartitionKey, + collectionValueHolder.v + .getPartitionKey())); + queryRanges = Collections.singletonList(range); + } + partitionKeyRanges = queryExecutionContext + .getTargetPartitionKeyRanges(collectionValueHolder.v + .getResourceId(), queryRanges); + } + return partitionKeyRanges + .flatMap(pkranges -> createSpecializedDocumentQueryExecutionContextAsync(client, + resourceTypeEnum, + resourceType, + query, + feedOptions, + resourceLink, + isContinuationExpected, + partitionedQueryExecutionInfo, + pkranges, + collectionValueHolder.v + .getResourceId(), + correlatedActivityId) + .single()); + + })).flux(); } public static Flux> createSpecializedDocumentQueryExecutionContextAsync( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OffsetContinuationToken.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OffsetContinuationToken.java new file mode 100644 index 000000000000..165d15d779b3 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OffsetContinuationToken.java @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.JsonSerializable; +import com.azure.cosmos.implementation.Utils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class OffsetContinuationToken extends JsonSerializable { + private static final String TOKEN_PROPERTY_NAME = "sourceToken"; + private static final String OFFSET_PROPERTY_NAME = "offset"; + private static final Logger logger = LoggerFactory.getLogger(CompositeContinuationToken.class); + + public OffsetContinuationToken(int offset, String sourceToken) { + + if (offset < 0) { + throw new IllegalArgumentException("offset should be non negative"); + } + + this.setOffset(offset); + this.setSourceToken(sourceToken); + } + + public OffsetContinuationToken(String serializedCompositeToken) { + super(serializedCompositeToken); + this.getOffset(); + this.getSourceToken(); + } + + public static boolean tryParse(String serializedOffsetContinuationToken, + Utils.ValueHolder outOffsetContinuationToken) { + if (StringUtils.isEmpty(serializedOffsetContinuationToken)) { + return false; + } + + boolean parsed; + try { + outOffsetContinuationToken.v = new OffsetContinuationToken(serializedOffsetContinuationToken); + parsed = true; + } catch (Exception ex) { + logger.debug("Received exception {} when trying to parse: {}", + ex.getMessage(), + serializedOffsetContinuationToken); + parsed = false; + outOffsetContinuationToken.v = null; + } + + return parsed; + } + + public String getSourceToken() { + return super.getString(TOKEN_PROPERTY_NAME); + } + + private void setSourceToken(String sourceToken) { + BridgeInternal.setProperty(this, TOKEN_PROPERTY_NAME, sourceToken); + } + + public int getOffset() { + return super.getInt(OFFSET_PROPERTY_NAME); + } + + private void setOffset(int offset) { + BridgeInternal.setProperty(this, OFFSET_PROPERTY_NAME, offset); + } +} + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java index 36a3c7704695..9b844a2f1e10 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java @@ -36,7 +36,8 @@ */ public class ParallelDocumentQueryExecutionContext extends ParallelDocumentQueryExecutionContextBase { - + private FeedOptions feedOptions; + private ParallelDocumentQueryExecutionContext( IDocumentQueryClient client, List partitionKeyRanges, @@ -52,6 +53,7 @@ private ParallelDocumentQueryExecutionContext( UUID correlatedActivityId) { super(client, partitionKeyRanges, resourceTypeEnum, resourceType, query, feedOptions, resourceLink, rewrittenQuery, isContinuationExpected, getLazyFeedResponse, correlatedActivityId); + this.feedOptions = feedOptions; } public static Flux> createAsync( @@ -171,9 +173,9 @@ private static class EmptyPagesFilterTransformer implements Function.DocumentProducerFeedResponse>, Flux>> { private final RequestChargeTracker tracker; private DocumentProducer.DocumentProducerFeedResponse previousPage; - - public EmptyPagesFilterTransformer( - RequestChargeTracker tracker) { + private final FeedOptions feedOptions; + + public EmptyPagesFilterTransformer(RequestChargeTracker tracker, FeedOptions options) { if (tracker == null) { throw new IllegalArgumentException("Request Charge Tracker must not be null."); @@ -181,6 +183,7 @@ public EmptyPagesFilterTransformer( this.tracker = tracker; this.previousPage = null; + this.feedOptions = options; } private DocumentProducer.DocumentProducerFeedResponse plusCharge( @@ -224,7 +227,8 @@ public Flux> apply(Flux.DocumentProducerFeed // Emit an empty page so the downstream observables know when there are no more // results. return source.filter(documentProducerFeedResponse -> { - if (documentProducerFeedResponse.pageResult.getResults().isEmpty()) { + if (documentProducerFeedResponse.pageResult.getResults().isEmpty() + && !this.feedOptions.getAllowEmptyPages()) { // filter empty pages and accumulate charge tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge()); return false; @@ -313,7 +317,7 @@ public Flux> drainAsync( logger.debug("ParallelQuery: flux mergeSequential" + " concurrency {}, prefetch {}", fluxConcurrency, fluxPrefetch); return Flux.mergeSequential(obs, fluxConcurrency, fluxPrefetch) - .compose(new EmptyPagesFilterTransformer<>(new RequestChargeTracker())); + .compose(new EmptyPagesFilterTransformer<>(new RequestChargeTracker(), this.feedOptions)); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java index 7eaae1f5a1d9..547a96f97bac 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContextBase.java @@ -2,6 +2,8 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation.query; +import com.azure.cosmos.implementation.routing.Range; +import com.azure.cosmos.PartitionKey; import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosClientException; import com.azure.cosmos.FeedOptions; @@ -72,6 +74,10 @@ protected void initialize(String collectionRid, Map headers = new HashMap<>(commonRequestHeaders); headers.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken); headers.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(pageSize)); + if (feedOptions.partitionKey() != null && feedOptions.partitionKey() != PartitionKey.NONE) { + headers.put(HttpConstants.HttpHeaders.PARTITION_KEY, + BridgeInternal.getPartitionKeyInternal(feedOptions.partitionKey()).toJson()); + } return this.createDocumentServiceRequest(headers, querySpecForInit, partitionKeyRange, collectionRid); }; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java index 071640b423fe..4308262bbef2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java @@ -82,14 +82,41 @@ public static Flux>> createSkipComponentFunction; + if (queryInfo.hasOffset()) { + createSkipComponentFunction = (continuationToken) -> { + return SkipDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction, + queryInfo.getOffset(), + continuationToken); + }; + } else { + createSkipComponentFunction = createAggregateComponentFunction; + } + Function>> createTopComponentFunction; if (queryInfo.hasTop()) { createTopComponentFunction = (continuationToken) -> { - return TopDocumentQueryExecutionContext.createAsync(createAggregateComponentFunction, - queryInfo.getTop(), continuationToken); + return TopDocumentQueryExecutionContext.createAsync(createSkipComponentFunction, + queryInfo.getTop(), queryInfo.getTop(), continuationToken); + }; + } else { + createTopComponentFunction = createSkipComponentFunction; + } + + Function>> createTakeComponentFunction; + if (queryInfo.hasLimit()) { + createTakeComponentFunction = (continuationToken) -> { + int totalLimit = queryInfo.getLimit(); + if (queryInfo.hasOffset()) { + // This is being done to match the limit from rewritten query + totalLimit = queryInfo.getOffset() + queryInfo.getLimit(); + } + return TopDocumentQueryExecutionContext.createAsync(createTopComponentFunction, + queryInfo.getLimit(), totalLimit, + continuationToken); }; } else { - createTopComponentFunction = createAggregateComponentFunction; + createTakeComponentFunction = createTopComponentFunction; } int actualPageSize = Utils.getValueOrDefault(feedOptions.maxItemCount(), @@ -100,7 +127,7 @@ public static Flux new PipelinedDocumentQueryExecutionContext<>(c, pageSize, correlatedActivityId)); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ProxyDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ProxyDocumentQueryExecutionContext.java deleted file mode 100644 index a2567ee7caa0..000000000000 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ProxyDocumentQueryExecutionContext.java +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.azure.cosmos.implementation.query; - -import com.azure.cosmos.CosmosClientException; -import com.azure.cosmos.implementation.DocumentCollection; -import com.azure.cosmos.FeedOptions; -import com.azure.cosmos.FeedResponse; -import com.azure.cosmos.Resource; -import com.azure.cosmos.SqlQuerySpec; -import com.azure.cosmos.implementation.Exceptions; -import com.azure.cosmos.implementation.HttpConstants; -import com.azure.cosmos.implementation.PartitionKeyRange; -import com.azure.cosmos.implementation.ResourceType; -import com.azure.cosmos.implementation.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.UUID; -import java.util.function.Function; - -/** - * While this class is public, but it is not part of our published public APIs. - * This is meant to be internally used only by our sdk. - * - * This class is used as a proxy to wrap the - * DefaultDocumentQueryExecutionContext which is needed for sending the query to - * GATEWAY first and then uses PipelinedDocumentQueryExecutionContext after it - * gets the necessary info. - */ -public class ProxyDocumentQueryExecutionContext implements IDocumentQueryExecutionContext { - - private IDocumentQueryExecutionContext innerExecutionContext; - private IDocumentQueryClient client; - private ResourceType resourceTypeEnum; - private Class resourceType; - private FeedOptions feedOptions; - private SqlQuerySpec query; - private String resourceLink; - private DocumentCollection collection; - private UUID correlatedActivityId; - private boolean isContinuationExpected; - private final static Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - public ProxyDocumentQueryExecutionContext( - IDocumentQueryExecutionContext innerExecutionContext, - IDocumentQueryClient client, - ResourceType resourceTypeEnum, - Class resourceType, - SqlQuerySpec query, - FeedOptions feedOptions, - String resourceLink, - DocumentCollection collection, - boolean isContinuationExpected, - UUID correlatedActivityId) { - this.innerExecutionContext = innerExecutionContext; - - this.client = client; - this.resourceTypeEnum = resourceTypeEnum; - this.resourceType = resourceType; - this.query = query; - this.feedOptions = feedOptions; - this.resourceLink = resourceLink; - - this.collection = collection; - this.isContinuationExpected = isContinuationExpected; - this.correlatedActivityId = correlatedActivityId; - } - - @Override - public Flux> executeAsync() { - - Function>> func = t -> { - - Throwable unwrappedException = reactor.core.Exceptions.unwrap(t); - - logger.debug("Received non result message from gateway", unwrappedException); - if (!(unwrappedException instanceof Exception)) { - logger.error("Unexpected failure", unwrappedException); - return Flux.error(unwrappedException); - } - - if (!isCrossPartitionQuery((Exception) unwrappedException)) { - // If this is not a cross partition query then propagate error - logger.debug("Failure from gateway", unwrappedException); - return Flux.error(unwrappedException); - } - - logger.debug("Setting up query pipeline using the query plan received form gateway"); - - // cross partition query construct pipeline - - CosmosClientException dce = (CosmosClientException) unwrappedException; - - PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = new - PartitionedQueryExecutionInfo(dce.getError().getPartitionedQueryExecutionInfo()); - - logger.debug("Query Plan from gateway {}", partitionedQueryExecutionInfo); - - DefaultDocumentQueryExecutionContext queryExecutionContext = - (DefaultDocumentQueryExecutionContext) this.innerExecutionContext; - - Mono>> partitionKeyRanges = queryExecutionContext.getTargetPartitionKeyRanges(collection.getResourceId(), - partitionedQueryExecutionInfo.getQueryRanges()); - - Flux> exContext = partitionKeyRanges.flux() - .flatMap(pkrangesValueHolder -> DocumentQueryExecutionContextFactory.createSpecializedDocumentQueryExecutionContextAsync( - this.client, - this.resourceTypeEnum, - this.resourceType, - this.query, - this.feedOptions, - this.resourceLink, - isContinuationExpected, - partitionedQueryExecutionInfo, - pkrangesValueHolder.v, - this.collection.getResourceId(), - this.correlatedActivityId)); - - return exContext.flatMap(IDocumentQueryExecutionContext::executeAsync); - }; - - return this.innerExecutionContext.executeAsync().onErrorResume(func); - } - - private boolean isCrossPartitionQuery(Exception exception) { - - CosmosClientException clientException = Utils.as(exception, CosmosClientException.class); - - if (clientException == null) { - return false; - } - - return (Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.BADREQUEST) && - Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.CROSS_PARTITION_QUERY_NOT_SERVABLE)); - } - - public static Flux> createAsync(IDocumentQueryClient client, - ResourceType resourceTypeEnum, Class resourceType, SqlQuerySpec query, FeedOptions feedOptions, - String resourceLink, DocumentCollection collection, boolean isContinuationExpected, - UUID correlatedActivityId) { - - IDocumentQueryExecutionContext innerExecutionContext = - new DefaultDocumentQueryExecutionContext( - client, - resourceTypeEnum, - resourceType, - query, - feedOptions, - resourceLink, - correlatedActivityId, - isContinuationExpected); - - return Flux.just(new ProxyDocumentQueryExecutionContext(innerExecutionContext, client, - resourceTypeEnum, - resourceType, - query, - feedOptions, - resourceLink, - collection, - isContinuationExpected, - correlatedActivityId)); - } -} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java new file mode 100644 index 000000000000..82fd7470b762 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.query; + +public enum QueryFeature { + None, + Aggregate, + CompositeAggregate, + Distinct, + GroupBy, + MultipleAggregates, + MultipleOrderBy, + OffsetAndLimit, + OrderBy, + Top +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java index 40fab3562dca..7de5ea04a96e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java @@ -14,11 +14,14 @@ * Used internally to encapsulates a query's information in the Azure Cosmos DB database service. */ public final class QueryInfo extends JsonSerializable { + private static final String HAS_SELECT_VALUE = "hasSelectValue"; private Integer top; private List orderBy; private Collection aggregates; private Collection orderByExpressions; private String rewrittenQuery; + private Integer offset; + private Integer limit; public QueryInfo() { } @@ -68,4 +71,25 @@ public Collection getOrderByExpressions() { ? this.orderByExpressions : (this.orderByExpressions = super.getCollection("orderByExpressions", String.class)); } + + public boolean hasSelectValue(){ + return super.has(HAS_SELECT_VALUE) && super.getBoolean(HAS_SELECT_VALUE); + } + + public boolean hasOffset() { + return this.getOffset() != null; + } + + public boolean hasLimit() { + return this.getLimit() != null; + } + + public Integer getLimit() { + return this.limit != null ? this.limit : (this.limit = super.getInt("limit")); + } + + public Integer getOffset() { + return this.offset != null ? this.offset : (this.offset = super.getInt("offset")); + } } + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java new file mode 100644 index 000000000000..95f9ab0428eb --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.SqlQuerySpec; +import com.azure.cosmos.implementation.BackoffRetryUtility; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.IDocumentClientRetryPolicy; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RuntimeConstants; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import reactor.core.publisher.Mono; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +class QueryPlanRetriever { + private static final String TRUE = "True"; + private static final String SUPPORTED_QUERY_FEATURES = QueryFeature.Aggregate.name() + ", " + + QueryFeature.CompositeAggregate.name() + ", " + + QueryFeature.MultipleOrderBy.name() + ", " + + QueryFeature.OrderBy.name() + ", " + + QueryFeature.OffsetAndLimit.name() + ", " + + QueryFeature.Top.name(); + + static Mono getQueryPlanThroughGatewayAsync(IDocumentQueryClient queryClient, + SqlQuerySpec sqlQuerySpec, + String resourceLink) { + final Map requestHeaders = new HashMap<>(); + requestHeaders.put(HttpConstants.HttpHeaders.CONTENT_TYPE, RuntimeConstants.MediaTypes.JSON); + requestHeaders.put(HttpConstants.HttpHeaders.IS_QUERY_PLAN_REQUEST, TRUE); + requestHeaders.put(HttpConstants.HttpHeaders.SUPPORTED_QUERY_FEATURES, SUPPORTED_QUERY_FEATURES); + requestHeaders.put(HttpConstants.HttpHeaders.QUERY_VERSION, HttpConstants.Versions.QUERY_VERSION); + + final RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.QueryPlan, + ResourceType.Document, + resourceLink, + requestHeaders); + request.UseGatewayMode = true; + request.setContentBytes(sqlQuerySpec.toJson().getBytes(StandardCharsets.UTF_8)); + + final IDocumentClientRetryPolicy retryPolicyInstance = + queryClient.getResetSessionTokenRetryPolicy().getRequestPolicy(); + + Function> executeFunc = req -> { + return BackoffRetryUtility.executeRetry(() -> { + retryPolicyInstance.onBeforeSendRequest(req); + return queryClient.executeQueryAsync(request).flatMap(rxDocumentServiceResponse -> { + PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = + new PartitionedQueryExecutionInfo(rxDocumentServiceResponse.getReponseBodyAsString()); + return Mono.just(partitionedQueryExecutionInfo); + + }); + }, retryPolicyInstance); + }; + + return executeFunc.apply(request); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/SkipDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/SkipDocumentQueryExecutionContext.java new file mode 100644 index 000000000000..710efdc82649 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/SkipDocumentQueryExecutionContext.java @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosClientException; +import com.azure.cosmos.FeedResponse; +import com.azure.cosmos.Resource; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.Utils; +import reactor.core.publisher.Flux; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public final class SkipDocumentQueryExecutionContext implements IDocumentQueryExecutionComponent { + + private final IDocumentQueryExecutionComponent component; + private int skipCount; + + SkipDocumentQueryExecutionContext(IDocumentQueryExecutionComponent component, int skipCount) { + if (component == null) { + throw new IllegalArgumentException("documentQueryExecutionComponent cannot be null"); + } + this.component = component; + this.skipCount = skipCount; + } + + public static Flux> createAsync( + Function>> createSourceComponentFunction, + int skipCount, + String continuationToken) { + OffsetContinuationToken offsetContinuationToken; + Utils.ValueHolder outOffsetContinuationToken = new Utils.ValueHolder<>(); + if (continuationToken != null) { + if (!OffsetContinuationToken.tryParse(continuationToken, outOffsetContinuationToken)) { + String message = String.format("Invalid JSON in continuation token %s for Skip~Context", + continuationToken); + CosmosClientException dce = + BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.BADREQUEST, + message); + return Flux.error(dce); + } + + offsetContinuationToken = outOffsetContinuationToken.v; + } else { + offsetContinuationToken = new OffsetContinuationToken(skipCount, null); + } + + return createSourceComponentFunction.apply(offsetContinuationToken.getSourceToken()) + .map(component -> new SkipDocumentQueryExecutionContext<>(component, + offsetContinuationToken.getOffset())); + } + + @Override + public Flux> drainAsync(int maxPageSize) { + + return this.component.drainAsync(maxPageSize).map(tFeedResponse -> { + + List documentsAfterSkip = + tFeedResponse.getResults().stream().skip(this.skipCount).collect(Collectors.toList()); + + int numberOfDocumentsSkipped = tFeedResponse.getResults().size() - documentsAfterSkip.size(); + this.skipCount -= numberOfDocumentsSkipped; + + Map headers = new HashMap<>(tFeedResponse.getResponseHeaders()); + if (this.skipCount >= 0) { + // Add Offset Continuation Token + String sourceContinuationToken = tFeedResponse.getContinuationToken(); + OffsetContinuationToken offsetContinuationToken = new OffsetContinuationToken(this.skipCount, + sourceContinuationToken); + headers.put(HttpConstants.HttpHeaders.CONTINUATION, offsetContinuationToken.toJson()); + } + + return BridgeInternal.createFeedResponseWithQueryMetrics(documentsAfterSkip, headers, + BridgeInternal.queryMetricsFromFeedResponse(tFeedResponse)); + }); + } + + IDocumentQueryExecutionComponent getComponent() { + return this.component; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/TopDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/TopDocumentQueryExecutionContext.java index 94b7242bb71b..171392777742 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/TopDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/TopDocumentQueryExecutionContext.java @@ -20,15 +20,18 @@ public class TopDocumentQueryExecutionContext implements IDo private final IDocumentQueryExecutionComponent component; private final int top; + // limit from rewritten query + private final int limit; - public TopDocumentQueryExecutionContext(IDocumentQueryExecutionComponent component, int top) { + public TopDocumentQueryExecutionContext(IDocumentQueryExecutionComponent component, int top, int limit) { this.component = component; this.top = top; + this.limit = limit; } public static Flux> createAsync( Function>> createSourceComponentFunction, - int topCount, String topContinuationToken) { + int topCount, int limit, String topContinuationToken) { TakeContinuationToken takeContinuationToken; if (topContinuationToken == null) { @@ -56,7 +59,8 @@ public static Flux> cre return createSourceComponentFunction .apply(takeContinuationToken.getSourceToken()) - .map(component -> new TopDocumentQueryExecutionContext<>(component, takeContinuationToken.getTakeCount())); + .map(component -> new TopDocumentQueryExecutionContext<>(component, + takeContinuationToken.getTakeCount(), limit)); } @Override @@ -64,13 +68,18 @@ public Flux> drainAsync(int maxPageSize) { ParallelDocumentQueryExecutionContextBase context; if (this.component instanceof AggregateDocumentQueryExecutionContext) { - context = (ParallelDocumentQueryExecutionContextBase) ((AggregateDocumentQueryExecutionContext) this.component) - .getComponent(); + context = + (ParallelDocumentQueryExecutionContextBase) ((AggregateDocumentQueryExecutionContext) this.component) + .getComponent(); + } else if (this.component instanceof SkipDocumentQueryExecutionContext) { + context = + (ParallelDocumentQueryExecutionContextBase) ((SkipDocumentQueryExecutionContext) this.component) + .getComponent(); } else { context = (ParallelDocumentQueryExecutionContextBase) this.component; } - context.setTop(this.top); + context.setTop(this.limit); return this.component.drainAsync(maxPageSize).takeUntil(new Predicate>() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/metrics/QueryMetricsTextWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/metrics/QueryMetricsTextWriter.java index 736c36cb3cc5..45c776c3b067 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/metrics/QueryMetricsTextWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/metrics/QueryMetricsTextWriter.java @@ -178,7 +178,7 @@ protected void writeIndexHitRatio(double indexHitRatio) { @Override protected void writeTotalQueryExecutionTime(Duration totalQueryExecutionTime) { - QueryMetricsTextWriter.appendNanosecondsToStringBuilder(stringBuilder, + QueryMetricsTextWriter.appendMillisecondsToStringBuilder(stringBuilder, QueryMetricsTextWriter.TotalQueryExecutionTime, durationToMilliseconds(totalQueryExecutionTime), 0); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternal.java index a0410115c397..c104770a4d6d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternal.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation.routing; +import com.azure.cosmos.PartitionKeyDefinition; import com.azure.cosmos.implementation.Undefined; import com.azure.cosmos.implementation.RMResources; import com.azure.cosmos.implementation.Strings; @@ -220,6 +221,10 @@ public List getComponents() { return components; } + public String getEffectivePartitionKeyString(PartitionKeyInternal internalPartitionKey, PartitionKeyDefinition partitionKey) { + return PartitionKeyInternalHelper.getEffectivePartitionKeyString(internalPartitionKey, partitionKey); + } + @SuppressWarnings("serial") static final class PartitionKeyInternalJsonSerializer extends StdSerializer { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelper.java index 143390b187ea..fa5201cc1ee4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelper.java @@ -3,11 +3,16 @@ package com.azure.cosmos.implementation.routing; +import com.azure.cosmos.implementation.IRoutingMapProvider; import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.Utils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.ListIterator; /** * Provide utility functionality to route request in direct connectivity mode in the Azure Cosmos DB database service. @@ -19,7 +24,7 @@ private static String max(String left, String right) { return left.compareTo(right) < 0 ? right : left; } - private static > boolean IsSortedAndNonOverlapping(List> list) { + private static > boolean isSortedAndNonOverlapping(List> list) { for (int i = 1; i < list.size(); i++) { Range previousRange = list.get(i - 1); Range currentRange = list.get(i); @@ -37,7 +42,7 @@ private static > boolean IsSortedAndNonOverlapping(List< public static Collection getOverlappingRanges(RoutingMapProvider routingMapProvider, String collectionSelfLink, List> sortedRanges) { - if (!IsSortedAndNonOverlapping(sortedRanges)) { + if (!isSortedAndNonOverlapping(sortedRanges)) { throw new IllegalArgumentException("sortedRanges"); } @@ -74,4 +79,66 @@ public static Collection getOverlappingRanges(RoutingMapProvi return targetRanges; } + + public static Mono> getOverlappingRanges( + IRoutingMapProvider routingMapProvider, + String resourceId, List> sortedRanges) { + + if (routingMapProvider == null){ + throw new IllegalArgumentException("routingMapProvider"); + } + + if (sortedRanges == null) { + throw new IllegalArgumentException("sortedRanges"); + } + + if (!isSortedAndNonOverlapping(sortedRanges)) { + throw new IllegalArgumentException("sortedRanges"); + } + + List targetRanges = new ArrayList<>(); + final ListIterator> iterator = sortedRanges.listIterator(); + + return Flux.defer(() -> { + if (!iterator.hasNext()) { + return Flux.empty(); + } + + Range queryRange; + Range sortedRange = iterator.next(); + if (!targetRanges.isEmpty()) { + String left = max(targetRanges.get(targetRanges.size() - 1).getMaxExclusive(), + sortedRange.getMin()); + + boolean leftInclusive = left.compareTo(sortedRange.getMin()) == 0 && sortedRange.isMinInclusive(); + + queryRange = new Range(left, sortedRange.getMax(), leftInclusive, + sortedRange.isMaxInclusive()); + } else { + queryRange = sortedRange; + } + + return routingMapProvider.tryGetOverlappingRangesAsync(resourceId, queryRange, false, null) + .map(ranges -> ranges.v) + .map(targetRanges::addAll) + .flatMap(aBoolean -> { + if (!targetRanges.isEmpty()) { + Range lastKnownTargetRange = targetRanges.get(targetRanges.size() - 1).toRange(); + while (iterator.hasNext()) { + Range value = iterator.next(); + if (MAX_COMPARATOR.compare(value, lastKnownTargetRange) > 0) { + // Since we already moved forward on iterator to check above condition, we + // go to previous when it fails so the the value is not skipped on iteration + iterator.previous(); + break; + } + } + } + return Mono.just(targetRanges); + }).flux(); + }).repeat(sortedRanges.size()) + .takeUntil(stringRange -> !iterator.hasNext()) + .last() + .single(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosItemTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosItemTest.java index aad50a559163..0b99b057f37f 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosItemTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosItemTest.java @@ -125,7 +125,7 @@ public void readAllItems() throws Exception{ CosmosItemResponse itemResponse = container.createItem(properties); FeedOptions feedOptions = new FeedOptions(); - feedOptions.setEnableCrossPartitionQuery(true); + Iterator> feedResponseIterator3 = container.readAllItems(feedOptions, CosmosItemProperties.class); assertThat(feedResponseIterator3.hasNext()).isTrue(); @@ -138,7 +138,7 @@ public void queryItems() throws Exception{ CosmosItemResponse itemResponse = container.createItem(properties); String query = String.format("SELECT * from c where c.id = '%s'", properties.getId()); - FeedOptions feedOptions = new FeedOptions().setEnableCrossPartitionQuery(true); + FeedOptions feedOptions = new FeedOptions(); Iterator> feedResponseIterator1 = container.queryItems(query, feedOptions, CosmosItemProperties.class); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosSyncStoredProcTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosSyncStoredProcTest.java index a71af5a0d525..35f0afcb8dab 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosSyncStoredProcTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosSyncStoredProcTest.java @@ -162,7 +162,7 @@ private void readAllSprocs() throws Exception { container.getScripts().createStoredProcedure(storedProcedureDef); FeedOptions feedOptions = new FeedOptions(); - feedOptions.setEnableCrossPartitionQuery(true); + Iterator> feedResponseIterator3 = container.getScripts().readAllStoredProcedures(feedOptions); assertThat(feedResponseIterator3.hasNext()).isTrue(); @@ -175,7 +175,7 @@ private void querySprocs() throws Exception { container.getScripts().createStoredProcedure(properties); String query = String.format("SELECT * from c where c.id = '%s'", properties.getId()); - FeedOptions feedOptions = new FeedOptions().setEnableCrossPartitionQuery(true); + FeedOptions feedOptions = new FeedOptions(); Iterator> feedResponseIterator1 = container.getScripts().queryStoredProcedures(query, feedOptions); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosSyncUDFTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosSyncUDFTest.java index 09dd78937963..987d56f84c67 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosSyncUDFTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosSyncUDFTest.java @@ -104,7 +104,7 @@ public void readAllUDFs() throws Exception { container.getScripts().createUserDefinedFunction(udf); FeedOptions feedOptions = new FeedOptions(); - feedOptions.setEnableCrossPartitionQuery(true); + Iterator> feedResponseIterator3 = container.getScripts().readAllUserDefinedFunctions(feedOptions); assertThat(feedResponseIterator3.hasNext()).isTrue(); @@ -117,7 +117,7 @@ public void queryUDFs() throws Exception { container.getScripts().createUserDefinedFunction(properties); String query = String.format("SELECT * from c where c.id = '%s'", properties.getId()); FeedOptions feedOptions = new FeedOptions(); - feedOptions.setEnableCrossPartitionQuery(true); + Iterator> feedResponseIterator1 = container.getScripts().queryUserDefinedFunctions(query, feedOptions); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTriggerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTriggerTest.java index 08566de78ba0..330834cafd01 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTriggerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTriggerTest.java @@ -88,7 +88,7 @@ public void readAllTriggers() throws Exception { container.getScripts().createTrigger(trigger); FeedOptions feedOptions = new FeedOptions(); - feedOptions.setEnableCrossPartitionQuery(true); + Iterator> feedResponseIterator3 = container.getScripts().readAllTriggers(feedOptions); assertThat(feedResponseIterator3.hasNext()).isTrue(); @@ -108,7 +108,7 @@ public void queryTriggers() throws Exception { CosmosTriggerProperties properties = getCosmosTriggerProperties(); container.getScripts().createTrigger(properties); String query = String.format("SELECT * from c where c.id = '%s'", properties.getId()); - FeedOptions feedOptions = new FeedOptions().setEnableCrossPartitionQuery(true); + FeedOptions feedOptions = new FeedOptions(); Iterator> feedResponseIterator1 = container.getScripts().queryTriggers(query, feedOptions); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosUserTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosUserTest.java index abbd37ac8078..da75a14997d4 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosUserTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosUserTest.java @@ -102,7 +102,7 @@ public void queryUsers() throws Exception{ CosmosUserResponse response = createdDatabase.createUser(userProperties); String query = String.format("SELECT * from c where c.id = '%s'", userProperties.getId()); - FeedOptions feedOptions = new FeedOptions().setEnableCrossPartitionQuery(true); + FeedOptions feedOptions = new FeedOptions(); Iterator> feedResponseIterator1 = createdDatabase.queryUsers(query, feedOptions); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ConsistencyTests2.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ConsistencyTests2.java index 4ca445b488d3..c1db1dc4b8e1 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ConsistencyTests2.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ConsistencyTests2.java @@ -237,7 +237,7 @@ public void validateSessionTokenAsync() { Mono task2 = ParallelAsync.forEachAsync(Range.between(0, 1000), 5, index -> { try { FeedOptions feedOptions = new FeedOptions(); - feedOptions.setEnableCrossPartitionQuery(true); + feedOptions.setAllowEmptyPages(true); FeedResponse queryResponse = client.queryDocuments(createdCollection.getSelfLink(), "SELECT * FROM c WHERE c.Id = " + "'foo'", feedOptions) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/DocumentQuerySpyWireContentTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/DocumentQuerySpyWireContentTest.java index 9688c30d0d86..bfcdc910f13d 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/DocumentQuerySpyWireContentTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/DocumentQuerySpyWireContentTest.java @@ -111,17 +111,19 @@ public void queryWithContinuationTokenLimit(FeedOptions options, String query, b private void validateRequestHasContinuationTokenLimit(HttpRequest request, Integer expectedValue) { Map headers = request.headers().toMap(); - if (expectedValue != null && expectedValue > 0) { - assertThat(headers - .containsKey(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) + if (headers.get(HttpConstants.HttpHeaders.IS_QUERY) != null) { + if (expectedValue != null && expectedValue > 0) { + assertThat(headers + .containsKey(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) .isTrue(); - assertThat(headers - .get("x-ms-documentdb-responsecontinuationtokenlimitinkb")) + assertThat(headers + .get("x-ms-documentdb-responsecontinuationtokenlimitinkb")) .isEqualTo(Integer.toString(expectedValue)); - } else { - assertThat(headers - .containsKey(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) + } else { + assertThat(headers + .containsKey(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) .isFalse(); + } } } @@ -158,8 +160,7 @@ public void before_DocumentQuerySpyWireContentTest() throws Exception { TimeUnit.SECONDS.sleep(1); FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); - + // do the query once to ensure the collection is cached. client.queryDocuments(getMultiPartitionCollectionLink(), "select * from root", options) .then().block(); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java index e13fb6a63f55..7e189a125314 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/TestSuiteBase.java @@ -170,7 +170,7 @@ protected static void truncateCollection(DocumentCollection collection) { FeedOptions options = new FeedOptions(); options.setMaxDegreeOfParallelism(-1); - options.setEnableCrossPartitionQuery(true); + options.maxItemCount(100); logger.info("Truncating collection {} documents ...", collection.getId()); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/DCDocumentCrudTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/DCDocumentCrudTest.java index d4a3673dc106..f1ab4ebae7e0 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/DCDocumentCrudTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/DCDocumentCrudTest.java @@ -219,9 +219,9 @@ public void crossPartitionQuery() { waitIfNeededForReplicasToCatchUp(clientBuilder()); FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); options.setMaxDegreeOfParallelism(-1); options.maxItemCount(100); + Flux> results = client.queryDocuments(getCollectionLink(), "SELECT * FROM r", options); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() @@ -275,7 +275,8 @@ private void validateNoDocumentQueryOperationThroughGateway() { // validate that all gateway captured requests are non document resources for(RxDocumentServiceRequest request: client.getCapturedRequests()) { - if (request.getOperationType() == OperationType.Query) { + if (request.getOperationType() == OperationType.Query + || request.getOperationType() == OperationType.QueryPlan) { assertThat(request.getPartitionKeyRangeIdentity()).isNull(); } else { validateResourceTypesSentToGateway.validate(request); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelperTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelperTest.java index 76e3d1f140df..b94014719d6e 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelperTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/routing/RoutingMapProviderHelperTest.java @@ -3,16 +3,23 @@ package com.azure.cosmos.implementation.routing; +import com.azure.cosmos.implementation.IRoutingMapProvider; import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.Utils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.mockito.Matchers; +import org.mockito.Mockito; import org.testng.annotations.Test; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -126,4 +133,56 @@ public String apply(PartitionKeyRange range) { assertThat("3,4").isEqualTo(ranges.stream().map(func).collect(Collectors.joining(","))); } + + @Test(groups = {"unit"}) + public void getOverlappingRangesWithList() { + + Function func = new Function() { + @Override + public String apply(PartitionKeyRange range) { + return range.getId(); + } + }; + + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + List rangeList = Arrays.asList(new PartitionKeyRange("0", "", "000A"), + new PartitionKeyRange("1", "000A", "000D"), + new PartitionKeyRange("2", "000D", "0012"), + new PartitionKeyRange("3", "0012", "0015"), + new PartitionKeyRange("4", "0015", "0020"), + new PartitionKeyRange("5", "0020", "0040"), + new PartitionKeyRange("6", "0040", "FF")); + Mono> listSingle = Mono.just(rangeList); + + Map> resultMap = new HashMap<>(); + + resultMap.put(new Range<>("000D", "0012", true, false), + Collections.singletonList(new PartitionKeyRange("2", "000D", "0012"))); + resultMap.put(new Range<>("0012", "0015", true, false), + Collections.singletonList(new PartitionKeyRange("3", "0012", "0015"))); + resultMap.put(new Range<>("0015", "0020", true, false), + Collections.singletonList(new PartitionKeyRange("4", "0015", "00120"))); + + Mockito.doAnswer(invocationOnMock -> { + Range range = invocationOnMock.getArgumentAt(1, Range.class); + return Mono.just(new Utils.ValueHolder<>(resultMap.get(range))); + }).when(routingMapProviderMock).tryGetOverlappingRangesAsync(Matchers.anyString(), + Matchers.any(), + Matchers.anyBoolean(), + Matchers.anyMap()); + + Mono> overlappingRanges; + overlappingRanges = RoutingMapProviderHelper.getOverlappingRanges(routingMapProviderMock, + "coll1", + Arrays.asList(new Range("000D", "0012", true, false), + new Range("0012", "0015", true, false), + new Range<>("0015", "0020", true, false))); + assertThat("2,3,4").isEqualTo(overlappingRanges.block().stream().map(func).collect(Collectors.joining(","))); + + overlappingRanges = RoutingMapProviderHelper.getOverlappingRanges(routingMapProviderMock, + "coll1", + Arrays.asList(new Range("000D", "0012", true, false))); + assertThat("2").isEqualTo(overlappingRanges.block().stream().map(func).collect(Collectors.joining(","))); + + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java index c96596afbf99..06d8ce90dba3 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java @@ -73,7 +73,7 @@ public AggregateQueryTests(CosmosClientBuilder clientBuilder) { public void queryDocumentsWithAggregates(boolean qmEnabled) throws Exception { FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + options.populateQueryMetrics(qmEnabled); options.setMaxDegreeOfParallelism(2); @@ -163,9 +163,15 @@ public void generateTestConfigs() { String testName = String.format("%s SinglePartition %s", config.operator, "SELECT VALUE"); queryConfigs.add(new QueryConfig(testName, query, config.expected)); - query = String.format(aggregateSinglePartitionQueryFormatSelect, config.operator, field, partitionKey, uniquePartitionKey); + // Should add support for non value aggregates before enabling these. + // https://github.com/Azure/azure-sdk-for-java/issues/6088 + /* + query = String.format(aggregateSinglePartitionQueryFormatSelect, config.operator, field, partitionKey, + uniquePartitionKey); testName = String.format("%s SinglePartition %s", config.operator, "SELECT"); - queryConfigs.add(new QueryConfig(testName, query, new Document("{'$1':" + removeTrailingZerosIfInteger(config.expected) + "}"))); + queryConfigs.add(new QueryConfig(testName, query, new Document("{'$1':" + removeTrailingZerosIfInteger + (config.expected) + "}"))); + */ } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java index 7c47ad721e28..c0b234d1f122 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java @@ -96,7 +96,6 @@ public BackPressureCrossPartitionTest(CosmosClientBuilder clientBuilder) { private void warmUp() { FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); // ensure collection is cached createdCollection.queryItems("SELECT * FROM r", options, CosmosItemProperties.class).blockFirst(); } @@ -117,7 +116,6 @@ public Object[][] queryProvider() { @Test(groups = { "long" }, dataProvider = "queryProvider", timeOut = 2 * TIMEOUT) public void query(String query, int maxItemCount, int maxExpectedBufferedCountForBackPressure, int expectedNumberOfResults) throws Exception { FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); options.maxItemCount(maxItemCount); options.setMaxDegreeOfParallelism(2); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java index 6cd9c184b47f..75c62806fcee 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java @@ -68,7 +68,7 @@ public BackPressureTest(CosmosClientBuilder clientBuilder) { public void readFeed() throws Exception { FeedOptions options = new FeedOptions(); options.maxItemCount(1); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.readAllItems(options, CosmosItemProperties.class); RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest) CosmosBridgeInternal.getAsyncDocumentClient(client); @@ -113,7 +113,7 @@ public void readFeed() throws Exception { public void query() throws Exception { FeedOptions options = new FeedOptions(); options.maxItemCount(1); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.queryItems("SELECT * from r", options, CosmosItemProperties.class); RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest)CosmosBridgeInternal.getAsyncDocumentClient(client); @@ -190,7 +190,7 @@ public void before_BackPressureTest() throws Exception { private void warmUp() { // ensure collection is cached FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + createdCollection.queryItems("SELECT * from r", options, CosmosItemProperties.class).blockFirst(); } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java index 60504ac53c24..6620435caf8a 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java @@ -261,7 +261,6 @@ public void staledLeaseAcquiring() { new SqlParameterList(param)); FeedOptions feedOptions = new FeedOptions(); - feedOptions.setEnableCrossPartitionQuery(true); createdLeaseCollection.queryItems(querySpec, feedOptions, CosmosItemProperties.class) .delayElements(Duration.ofMillis(CHANGE_FEED_PROCESSOR_TIMEOUT / 2)) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/CollectionQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/CollectionQueryTest.java index e1437533be9d..731797c35d4f 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/CollectionQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/CollectionQueryTest.java @@ -99,7 +99,6 @@ public void queryCollections_NoResults() throws Exception { String query = "SELECT * from root r where r.id = '2'"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); Flux> queryObservable = createdDatabase.queryContainers(query, options); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DatabaseQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DatabaseQueryTest.java index 27e1c06d02ca..dbf521220701 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DatabaseQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DatabaseQueryTest.java @@ -97,7 +97,6 @@ public void queryDatabases_NoResults() throws Exception { String query = "SELECT * from root r where r.id = '2'"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); Flux> queryObservable = client.queryDatabases(query, options); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java index 16fc179dc10d..dae71f0c395b 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java @@ -189,7 +189,7 @@ private CosmosItemProperties generateMultiOrderByDocument() { @Test(groups = { "simple" }, timeOut = TIMEOUT) public void queryDocumentsWithMultiOrder() throws CosmosClientException, InterruptedException { FeedOptions feedOptions = new FeedOptions(); - feedOptions.setEnableCrossPartitionQuery(true); + boolean[] booleanValues = new boolean[] {true, false}; CosmosContainerProperties containerSettings = documentCollection.read().block().getProperties(); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OfferQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OfferQueryTest.java index e23c373ddda8..da8e6fe634ce 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OfferQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OfferQueryTest.java @@ -145,7 +145,7 @@ public void before_OfferQueryTest() throws Exception { } } - @AfterClass(groups = { "emulator" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterClass(groups = { "emulator" }, timeOut = 2*SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeDeleteDatabase(client, databaseId); safeClose(client); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java new file mode 100644 index 000000000000..84b6b4eeec81 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java @@ -0,0 +1,217 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.rx; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosItemProperties; +import com.azure.cosmos.FeedOptions; +import com.azure.cosmos.FeedResponse; +import com.azure.cosmos.implementation.FeedResponseListValidator; +import com.azure.cosmos.implementation.FeedResponseValidator; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.query.OffsetContinuationToken; +import io.reactivex.subscribers.TestSubscriber; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class OffsetLimitQueryTests extends TestSuiteBase { + private CosmosAsyncDatabase createdDatabase; + private CosmosAsyncContainer createdCollection; + private ArrayList docs = new ArrayList<>(); + + private String partitionKey = "mypk"; + private int firstPk = 0; + private int secondPk = 1; + private String field = "field"; + + private CosmosAsyncClient client; + + @Factory(dataProvider = "clientBuildersWithDirect") + public OffsetLimitQueryTests(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider") + public void queryDocuments(boolean qmEnabled) { + int skipCount = 4; + int takeCount = 10; + String query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount; + FeedOptions options = new FeedOptions(); + options.maxItemCount(5); + options.populateQueryMetrics(qmEnabled); + options.setMaxDegreeOfParallelism(2); + Flux> queryObservable = createdCollection.queryItems(query, options, + CosmosItemProperties.class); + + FeedResponseListValidator validator = + new FeedResponseListValidator.Builder() + .totalSize(takeCount) + .allPagesSatisfy(new FeedResponseValidator.Builder() + .requestChargeGreaterThanOrEqualTo(1.0) + .build()) + .hasValidQueryMetrics(qmEnabled) + .build(); + + validateQuerySuccess(queryObservable, validator, TIMEOUT); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void drainAllDocumentsUsingOffsetLimit() { + int skipCount = 0; + int takeCount = 2; + String query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount; + FeedOptions options = new FeedOptions(); + options.maxItemCount(5); + Flux> queryObservable; + + int totalDocsObtained = 0; + int totalDocs = docs.size(); + int expectedNumCalls = totalDocs / takeCount; + int numCalls = 0; + FeedResponse finalResponse = null; + + while (numCalls < expectedNumCalls) { + query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount; + queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); + Iterator> iterator = queryObservable.toIterable().iterator(); + while (iterator.hasNext()) { + FeedResponse next = iterator.next(); + totalDocsObtained += next.getResults().size(); + finalResponse = next; + } + numCalls++; + skipCount += takeCount; + } + assertThat(totalDocsObtained).isEqualTo(docs.size()); + assertThat(finalResponse.getContinuationToken()).isNull(); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void offsetContinuationTokenRoundTrips() { + // Positive + OffsetContinuationToken offsetContinuationToken = new OffsetContinuationToken(42, "asdf"); + String serialized = offsetContinuationToken.toString(); + Utils.ValueHolder outOffsetContinuationToken = new Utils.ValueHolder<>(); + + assertThat(OffsetContinuationToken.tryParse(serialized, outOffsetContinuationToken)).isTrue(); + OffsetContinuationToken deserialized = outOffsetContinuationToken.v; + + assertThat(deserialized.getOffset()).isEqualTo(42); + assertThat(deserialized.getSourceToken()).isEqualTo("asdf"); + + // Negative + Utils.ValueHolder outTakeContinuationToken = + new Utils.ValueHolder(); + assertThat( + OffsetContinuationToken.tryParse("{\"property\": \"Not a valid token\"}", outTakeContinuationToken)) + .isFalse(); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT * 10) + public void queryDocumentsWithOffsetContinuationTokens() { + int skipCount = 3; + int takeCount = 10; + String query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount; + this.queryWithContinuationTokensAndPageSizes(query, new int[] {1, 5, 15}, takeCount); + } + + private void queryWithContinuationTokensAndPageSizes(String query, int[] pageSizes, int takeCount) { + for (int pageSize : pageSizes) { + List receivedDocuments = this.queryWithContinuationTokens(query, pageSize); + Set actualIds = new HashSet(); + for (CosmosItemProperties CosmosItemProperties : receivedDocuments) { + actualIds.add(CosmosItemProperties.getResourceId()); + } + + assertThat(actualIds.size()).describedAs("total number of results").isEqualTo(takeCount); + } + } + + private List queryWithContinuationTokens(String query, int pageSize) { + String requestContinuation = null; + List continuationTokens = new ArrayList(); + List receivedDocuments = new ArrayList(); + + do { + FeedOptions options = new FeedOptions(); + options.maxItemCount(pageSize); + options.maxItemCount(5); + options.requestContinuation(requestContinuation); + Flux> queryObservable = + createdCollection.queryItems(query, options, CosmosItemProperties.class); + + TestSubscriber> testSubscriber = new TestSubscriber<>(); + queryObservable.subscribe(testSubscriber); + testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS); + testSubscriber.assertNoErrors(); + testSubscriber.assertComplete(); + + FeedResponse firstPage = + (FeedResponse) testSubscriber.getEvents().get(0).get(0); + requestContinuation = firstPage.getContinuationToken(); + receivedDocuments.addAll(firstPage.getResults()); + + continuationTokens.add(requestContinuation); + } while (requestContinuation != null); + + return receivedDocuments; + } + + public void bulkInsert() { + generateTestData(); + voidBulkInsertBlocking(createdCollection, docs); + } + + public void generateTestData() { + + for (int i = 0; i < 10; i++) { + CosmosItemProperties d = new CosmosItemProperties(); + d.setId(Integer.toString(i)); + BridgeInternal.setProperty(d, field, i); + BridgeInternal.setProperty(d, partitionKey, firstPk); + docs.add(d); + } + + for (int i = 10; i < 20; i++) { + CosmosItemProperties d = new CosmosItemProperties(); + d.setId(Integer.toString(i)); + BridgeInternal.setProperty(d, field, i); + BridgeInternal.setProperty(d, partitionKey, secondPk); + docs.add(d); + } + } + + @AfterClass(groups = {"simple"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + safeClose(client); + } + + @BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() throws Exception { + client = this.clientBuilder().buildAsyncClient(); + createdCollection = getSharedMultiPartitionCosmosContainer(client); + truncateCollection(createdCollection); + + bulkInsert(); + + waitIfNeededForReplicasToCatchUp(clientBuilder()); + } +} + diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index bfc14d60bf41..11fd6c67249d 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -74,7 +74,6 @@ public void queryDocumentsValidateContent(boolean qmEnabled) throws Exception { , expectedDocument.getString("propStr")); FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); options.populateQueryMetrics(qmEnabled); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); @@ -103,7 +102,7 @@ public void queryDocumentsValidateContent(boolean qmEnabled) throws Exception { public void queryDocuments_NoResults() throws Exception { String query = "SELECT * from root r where r.id = '2' ORDER BY r.propInt"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() @@ -126,7 +125,7 @@ public Object[][] sortOrder() { public void queryOrderBy(String sortOrder) throws Exception { String query = String.format("SELECT * FROM r ORDER BY r.propInt %s", sortOrder); FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + int pageSize = 3; options.maxItemCount(pageSize); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); @@ -154,7 +153,7 @@ public void queryOrderBy(String sortOrder) throws Exception { public void queryOrderByInt() throws Exception { String query = "SELECT * FROM r ORDER BY r.propInt"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + int pageSize = 3; options.maxItemCount(pageSize); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); @@ -178,7 +177,7 @@ public void queryOrderByInt() throws Exception { public void queryOrderByString() throws Exception { String query = "SELECT * FROM r ORDER BY r.propStr"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + int pageSize = 3; options.maxItemCount(pageSize); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); @@ -208,7 +207,7 @@ public Object[][] topValueParameter() { public void queryOrderWithTop(int topValue) throws Exception { String query = String.format("SELECT TOP %d * FROM r ORDER BY r.propInt", topValue); FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + int pageSize = 3; options.maxItemCount(pageSize); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); @@ -239,19 +238,6 @@ private List sortDocumentsAndCollectResourceIds(String propName, Fun .map(Resource::getResourceId).collect(Collectors.toList()); } - @Test(groups = { "simple" }, timeOut = TIMEOUT) - public void crossPartitionQueryNotEnabled() throws Exception { - String query = "SELECT * FROM r ORDER BY r.propInt"; - FeedOptions options = new FeedOptions(); - Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); - - FailureValidator validator = new FailureValidator.Builder() - .instanceOf(CosmosClientException.class) - .statusCode(400) - .build(); - validateQueryFailure(queryObservable, validator); - } - @Test(groups = { "simple" }, timeOut = TIMEOUT) public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exception { String query = "SELECT * FROM r ORDER BY r.propScopedPartitionInt ASC"; @@ -459,7 +445,7 @@ private void assertInvalidContinuationToken(String query, int[] pageSize, List queryWithContinuationTokens(String query, int do { FeedOptions options = new FeedOptions(); options.maxItemCount(pageSize); - options.setEnableCrossPartitionQuery(true); + options.setMaxDegreeOfParallelism(2); options.requestContinuation(requestContinuation); Flux> queryObservable = createdCollection.queryItems(query, diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java index 5c07b37cc0de..26accce8bc5b 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java @@ -75,7 +75,7 @@ public void queryDocuments(boolean qmEnabled) { String query = "SELECT * from c where c.prop = 99"; FeedOptions options = new FeedOptions(); options.maxItemCount(5); - options.setEnableCrossPartitionQuery(true); + options.populateQueryMetrics(qmEnabled); options.setMaxDegreeOfParallelism(2); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); @@ -99,7 +99,7 @@ public void queryMetricEquality() throws Exception { String query = "SELECT * from c where c.prop = 99"; FeedOptions options = new FeedOptions(); options.maxItemCount(5); - options.setEnableCrossPartitionQuery(true); + options.populateQueryMetrics(true); options.setMaxDegreeOfParallelism(0); @@ -134,7 +134,7 @@ private void compareQueryMetrics(Map qm1, Map> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() @@ -154,7 +154,7 @@ public void queryDocumentsWithPageSize() { int pageSize = 3; options.maxItemCount(pageSize); options.setMaxDegreeOfParallelism(-1); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); List expectedDocs = createdDocuments; @@ -179,14 +179,14 @@ public void queryDocumentsWithPageSize() { public void invalidQuerySyntax() { String query = "I am an invalid query"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); FailureValidator validator = new FailureValidator.Builder() - .instanceOf(CosmosClientException.class) - .statusCode(400) - .notNullActivityId() - .build(); + .instanceOf(CosmosClientException.class) + .statusCode(400) + .notNullActivityId() + .build(); validateQueryFailure(queryObservable, validator); } @@ -196,11 +196,16 @@ public void crossPartitionQueryNotEnabled() { FeedOptions options = new FeedOptions(); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); - FailureValidator validator = new FailureValidator.Builder() - .instanceOf(CosmosClientException.class) - .statusCode(400) + List expectedDocs = createdDocuments; + FeedResponseListValidator validator = + new FeedResponseListValidator.Builder() + .totalSize(expectedDocs.size()) + .exactlyContainsInAnyOrder(expectedDocs.stream().map(Resource::getResourceId).collect(Collectors.toList())) + .allPagesSatisfy(new FeedResponseValidator.Builder() + .requestChargeGreaterThanOrEqualTo(1.0) + .build()) .build(); - validateQueryFailure(queryObservable, validator); + validateQuerySuccess(queryObservable, validator); } @Test(groups = { "simple" }, timeOut = 2 * TIMEOUT) @@ -273,7 +278,7 @@ public void queryDocumentsWithCompositeContinuationTokens() throws Exception { @Test(groups = { "simple" }) public void queryDocumentsStringValue(){ FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + options.setMaxDegreeOfParallelism(2); List expectedValues = createdDocuments.stream().map(d -> d.getId()).collect(Collectors.toList()); @@ -291,7 +296,7 @@ public void queryDocumentsStringValue(){ @Test(groups = { "simple" }) public void queryDocumentsArrayValue(){ FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + options.setMaxDegreeOfParallelism(2); Collection>> expectedValues = new ArrayList<>(); @@ -323,7 +328,7 @@ public void queryDocumentsArrayValue(){ @Test(groups = { "simple" }) public void queryDocumentsIntegerValue(){ FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + options.setMaxDegreeOfParallelism(2); List expectedValues = createdDocuments.stream().map(d -> d.getInt("prop")).collect(Collectors.toList()); @@ -341,7 +346,7 @@ public void queryDocumentsIntegerValue(){ @Test(groups = { "simple" }) public void queryDocumentsPojo(){ FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + options.setMaxDegreeOfParallelism(2); String query = "Select * from c"; Flux> queryObservable = createdCollection.queryItems(query, options, TestObject.class); @@ -362,6 +367,7 @@ public void queryDocumentsPojo(){ .containsAll(assertTuples); } + // TODO (DANOBLE) ParallelDocumentQueryTest initialization intermittently fails in CI environments @@ -475,7 +481,7 @@ public void invalidQuerySytax() throws Exception { String query = "I am an invalid query"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); FailureValidator validator = new FailureValidator.Builder().instanceOf(CosmosClientException.class) @@ -514,7 +520,7 @@ private List queryWithContinuationTokens(String query, int do { FeedOptions options = new FeedOptions(); options.maxItemCount(pageSize); - options.setEnableCrossPartitionQuery(true); + options.setMaxDegreeOfParallelism(2); options.requestContinuation(requestContinuation); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/PermissionQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/PermissionQueryTest.java index 1fd637903358..522f7b038fb3 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/PermissionQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/PermissionQueryTest.java @@ -77,7 +77,7 @@ public void query_NoResults() throws Exception { String query = "SELECT * from root r where r.id = '2'"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = client .queryPermissions(getUserLink(), query, options); @@ -96,7 +96,7 @@ public void queryAll() throws Exception { String query = "SELECT * from root"; FeedOptions options = new FeedOptions(); options.maxItemCount(3); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = client .queryPermissions(getUserLink(), query, options); @@ -119,7 +119,7 @@ public void queryAll() throws Exception { public void invalidQuerySytax() throws Exception { String query = "I am an invalid query"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = client .queryPermissions(getUserLink(), query, options); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java index 97a889f460b7..e51ab0914d1b 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedDocumentsTest.java @@ -6,17 +6,15 @@ import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosAsyncDatabase; import com.azure.cosmos.CosmosClientBuilder; -import com.azure.cosmos.CosmosClientException; import com.azure.cosmos.CosmosItemProperties; import com.azure.cosmos.FeedOptions; import com.azure.cosmos.FeedResponse; -import com.azure.cosmos.implementation.FailureValidator; +import com.azure.cosmos.Resource; import com.azure.cosmos.implementation.FeedResponseListValidator; import com.azure.cosmos.implementation.FeedResponseValidator; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Factory; -import org.testng.annotations.Ignore; import org.testng.annotations.Test; import reactor.core.publisher.Flux; @@ -41,7 +39,7 @@ public ReadFeedDocumentsTest(CosmosClientBuilder clientBuilder) { @Test(groups = { "simple" }, timeOut = FEED_TIMEOUT) public void readDocuments() { FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + options.maxItemCount(2); Flux> feedObservable = createdCollection.readAllItems(options, CosmosItemProperties.class); @@ -59,18 +57,27 @@ public void readDocuments() { @Test(groups = { "simple" }, timeOut = FEED_TIMEOUT) public void readDocuments_withoutEnableCrossPartitionQuery() { + // With introduction of queryplan, crosspartition need not be enabled anymore. + FeedOptions options = new FeedOptions(); options.maxItemCount(2); - Flux> feedObservable = createdCollection.readAllItems(options, CosmosItemProperties.class); - FailureValidator validator = FailureValidator.builder().instanceOf(CosmosClientException.class) - .statusCode(400) - .errorMessageContains("Cross partition query is required but disabled." + - " Please set x-ms-documentdb-query-enablecrosspartition to true," + - " specify x-ms-documentdb-partitionkey," + - " or revise your query to avoid this exception.") - .build(); - validateQueryFailure(feedObservable, validator, FEED_TIMEOUT); + FeedResponseListValidator validator = + new FeedResponseListValidator.Builder() + .totalSize(createdDocuments.size()) + .numberOfPagesIsGreaterThanOrEqualTo(1) + .exactlyContainsInAnyOrder(createdDocuments + .stream() + .map(Resource::getResourceId) + .collect(Collectors + .toList())) + .allPagesSatisfy(new FeedResponseValidator.Builder() + .requestChargeGreaterThanOrEqualTo(1.0) + .pageSizeIsLessThanOrEqualTo(options + .maxItemCount()) + .build()) + .build(); + validateQuerySuccess(feedObservable, validator, FEED_TIMEOUT); } // TODO (DANOBLE) ReadFeedDocumentsTest initialization consistently times out in CI environments. diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java index 9fb71bfb25f9..ee7cf3c1e9b2 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java @@ -57,7 +57,7 @@ public void queryDocuments(boolean queryMetricsEnabled) throws Exception { FeedOptions options = new FeedOptions(); options.maxItemCount(5); - options.setEnableCrossPartitionQuery(true); + options.populateQueryMetrics(queryMetricsEnabled); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); @@ -86,7 +86,7 @@ public void queryDocuments_ParameterizedQueryWithInClause() throws Exception { FeedOptions options = new FeedOptions(); options.maxItemCount(5); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.queryItems(sqs, options, CosmosItemProperties.class); List expectedDocs = createdDocuments.stream().filter(d -> (3 == d.getInt("prop") || 4 == d.getInt("prop"))).collect(Collectors.toList()); @@ -113,7 +113,7 @@ public void queryDocuments_ParameterizedQuery() throws Exception { FeedOptions options = new FeedOptions(); options.maxItemCount(5); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.queryItems(sqs, options, CosmosItemProperties.class); List expectedDocs = createdDocuments.stream().filter(d -> 3 == d.getInt("prop")).collect(Collectors.toList()); @@ -137,7 +137,7 @@ public void queryDocuments_NoResults() throws Exception { String query = "SELECT * from root r where r.id = '2'"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() @@ -155,7 +155,7 @@ public void queryDocumentsWithPageSize() throws Exception { String query = "SELECT * from root"; FeedOptions options = new FeedOptions(); options.maxItemCount(3); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); List expectedDocs = createdDocuments; @@ -180,7 +180,7 @@ public void queryOrderBy() throws Exception { String query = "SELECT * FROM r ORDER BY r.prop ASC"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + options.maxItemCount(3); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); @@ -203,7 +203,7 @@ public void queryOrderBy() throws Exception { public void continuationToken() throws Exception { String query = "SELECT * FROM r ORDER BY r.prop ASC"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + options.maxItemCount(3); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); @@ -243,7 +243,7 @@ public void continuationToken() throws Exception { public void invalidQuerySytax() throws Exception { String query = "I am an invalid query"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); FailureValidator validator = new FailureValidator.Builder() diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java index bd380488ae07..2200de5872fd 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionReadFeedDocumentsTest.java @@ -36,7 +36,7 @@ public SinglePartitionReadFeedDocumentsTest(CosmosClientBuilder clientBuilder) { @Test(groups = { "simple" }, timeOut = FEED_TIMEOUT) public void readDocuments() { final FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + options.maxItemCount(2); final Flux> feedObservable = createdCollection.readAllItems(options, CosmosItemProperties.class); final int expectedPageSize = (createdDocuments.size() + options.maxItemCount() - 1) / options.maxItemCount(); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java index 41a5d4e0413e..c3ab1da96977 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/StoredProcedureQueryTest.java @@ -71,7 +71,7 @@ public void query_NoResults() throws Exception { String query = "SELECT * from root r where r.id = '2'"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.getScripts() .queryStoredProcedures(query, options); @@ -89,7 +89,7 @@ public void queryAll() throws Exception { String query = "SELECT * from root"; FeedOptions options = new FeedOptions(); options.maxItemCount(3); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.getScripts() .queryStoredProcedures(query, options); @@ -111,7 +111,7 @@ public void queryAll() throws Exception { public void invalidQuerySytax() throws Exception { String query = "I am an invalid query"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.getScripts() .queryStoredProcedures(query, options); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 70859794e1e1..c0a88c09314b 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -217,7 +217,6 @@ protected static void truncateCollection(CosmosAsyncContainer cosmosContainer) { List paths = cosmosContainerProperties.getPartitionKeyDefinition().getPaths(); FeedOptions options = new FeedOptions(); options.setMaxDegreeOfParallelism(-1); - options.setEnableCrossPartitionQuery(true); options.maxItemCount(100); logger.info("Truncating collection {} documents ...", cosmosContainer.getId()); @@ -563,7 +562,7 @@ public static void deleteDocumentIfExists(CosmosAsyncClient client, String datab public static void safeDeleteDocument(CosmosAsyncContainer cosmosContainer, String documentId, Object partitionKey) { if (cosmosContainer != null && documentId != null) { try { - cosmosContainer.getItem(documentId, partitionKey).read().block().getItem().delete().block(); + cosmosContainer.getItem(documentId, partitionKey).delete().block(); } catch (Exception e) { CosmosClientException dce = Utils.as(e, CosmosClientException.class); if (dce == null || dce.getStatusCode() != 404) { @@ -574,7 +573,7 @@ public static void safeDeleteDocument(CosmosAsyncContainer cosmosContainer, Stri } public static void deleteDocument(CosmosAsyncContainer cosmosContainer, String documentId) { - cosmosContainer.getItem(documentId, PartitionKey.NONE).read().block().getItem().delete(); + cosmosContainer.getItem(documentId, PartitionKey.NONE).delete(); } public static void deleteUserIfExists(CosmosAsyncClient client, String databaseId, String userId) { @@ -967,7 +966,7 @@ static protected CosmosClientBuilder createGatewayHouseKeepingDocumentClient() { RetryOptions options = new RetryOptions(); options.setMaxRetryWaitTimeInSeconds(SUITE_SETUP_TIMEOUT); connectionPolicy.setRetryOptions(options); - return new CosmosClientBuilder().setEndpoint(TestConfigurations.HOST) + return CosmosAsyncClient.cosmosClientBuilder().setEndpoint(TestConfigurations.HOST) .setCosmosKeyCredential(cosmosKeyCredential) .setConnectionPolicy(connectionPolicy) .setConsistencyLevel(ConsistencyLevel.SESSION); @@ -978,7 +977,7 @@ static protected CosmosClientBuilder createGatewayRxDocumentClient(ConsistencyLe connectionPolicy.setConnectionMode(ConnectionMode.GATEWAY); connectionPolicy.setUsingMultipleWriteLocations(multiMasterEnabled); connectionPolicy.setPreferredLocations(preferredLocations); - return new CosmosClientBuilder().setEndpoint(TestConfigurations.HOST) + return CosmosAsyncClient.cosmosClientBuilder().setEndpoint(TestConfigurations.HOST) .setCosmosKeyCredential(cosmosKeyCredential) .setConnectionPolicy(connectionPolicy) .setConsistencyLevel(consistencyLevel); @@ -1006,7 +1005,7 @@ static protected CosmosClientBuilder createDirectRxDocumentClient(ConsistencyLev Configs configs = spy(new Configs()); doAnswer((Answer)invocation -> protocol).when(configs).getProtocol(); - CosmosClientBuilder builder = new CosmosClientBuilder().setEndpoint(TestConfigurations.HOST) + CosmosClientBuilder builder = CosmosAsyncClient.cosmosClientBuilder().setEndpoint(TestConfigurations.HOST) .setCosmosKeyCredential(cosmosKeyCredential) .setConnectionPolicy(connectionPolicy) .setConsistencyLevel(consistencyLevel); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java index 29921957675c..120c620d2b08 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java @@ -340,7 +340,7 @@ public void readDocumentsWithAllPermission(ConnectionMode connectionMode) { String query = "SELECT * FROM r WHERE r._rid=\"" + rid1 + "\" or r._rid=\"" + rid2 + "\""; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = asyncClientWithTokenResolver.queryDocuments(createdCollection.getSelfLink(), query, options); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() .totalSize(2) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TopQueryTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TopQueryTests.java index 98085444af2f..d21f6adeee32 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TopQueryTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TopQueryTests.java @@ -50,7 +50,7 @@ public TopQueryTests(CosmosClientBuilder clientBuilder) { public void queryDocumentsWithTop(boolean qmEnabled) throws Exception { FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + options.maxItemCount(9); options.setMaxDegreeOfParallelism(2); options.populateQueryMetrics(qmEnabled); @@ -88,8 +88,6 @@ public void queryDocumentsWithTop(boolean qmEnabled) throws Exception { if (i == 0) { options.partitionKey(new PartitionKey(firstPk)); - options.setEnableCrossPartitionQuery(false); - expectedTotalSize = 10; expectedNumberOfPages = 2; expectedPageLengths = new int[] { 9, 1 }; @@ -148,7 +146,7 @@ private List queryWithContinuationTokens(String query, int do { FeedOptions options = new FeedOptions(); options.maxItemCount(pageSize); - options.setEnableCrossPartitionQuery(true); + options.setMaxDegreeOfParallelism(2); options.requestContinuation(requestContinuation); Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java index 1471a06830ce..244575062aae 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TriggerQueryTest.java @@ -75,7 +75,7 @@ public void query_NoResults() throws Exception { String query = "SELECT * from root r where r.id = '2'"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.getScripts().queryTriggers(query, options); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() @@ -93,7 +93,7 @@ public void queryAll() throws Exception { String query = "SELECT * from root"; FeedOptions options = new FeedOptions(); options.maxItemCount(3); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.getScripts().queryTriggers(query, options); createdTriggers.forEach(cosmosTriggerSettings -> logger.info("Created trigger in method: {}", cosmosTriggerSettings.getResourceId())); @@ -119,7 +119,7 @@ public void queryAll() throws Exception { public void invalidQuerySytax() throws Exception { String query = "I am an invalid query"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.getScripts().queryTriggers(query, options); FailureValidator validator = new FailureValidator.Builder() diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java index 1e32bb90fc3c..56cb5f2b4054 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/UserDefinedFunctionQueryTest.java @@ -76,7 +76,7 @@ public void query_NoResults() throws Exception { String query = "SELECT * from root r where r.id = '2'"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.getScripts().queryUserDefinedFunctions(query, options); FeedResponseListValidator validator = new FeedResponseListValidator.Builder() @@ -94,7 +94,7 @@ public void queryAll() throws Exception { String query = "SELECT * from root"; FeedOptions options = new FeedOptions(); options.maxItemCount(3); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.getScripts().queryUserDefinedFunctions(query, options); List expectedDocs = createdUDF; @@ -118,7 +118,7 @@ public void queryAll() throws Exception { public void invalidQuerySytax() throws Exception { String query = "I am an invalid query"; FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); + Flux> queryObservable = createdCollection.getScripts().queryUserDefinedFunctions(query, options); FailureValidator validator = new FailureValidator.Builder() diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java index f9e70ebc6fe0..528068e301b2 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/VeryLargeDocumentQueryTest.java @@ -43,7 +43,7 @@ public VeryLargeDocumentQueryTest(CosmosClientBuilder clientBuilder) { // TODO (DANOBLE) VeryLargeDocumentQueryTest::queryLargeDocuments intermittently times out // Move this test back into the emulator group after we've addressed query performance on 4.X. // see https://github.com/Azure/azure-sdk-for-java/issues/6377 - @Test(groups = { "emulator" }, timeOut = 2 * TIMEOUT) + @Test(groups = { "simple" }, timeOut = 2 * TIMEOUT) public void queryLargeDocuments() { int cnt = 5; @@ -53,8 +53,7 @@ public void queryLargeDocuments() { } FeedOptions options = new FeedOptions(); - options.setEnableCrossPartitionQuery(true); - + Flux> feedResponseFlux = createdCollection.queryItems("SELECT * FROM r", options, CosmosItemProperties.class); @@ -66,7 +65,7 @@ public void queryLargeDocuments() { return true; }) .expectComplete() - .verify(Duration.ofMillis(subscriberValidationTimeout)); + .verify(Duration.ofMillis(2 * TIMEOUT)); // TODO: Doubling timeout. Remove after increasing perf. } private void createLargeDocument() { @@ -86,14 +85,14 @@ private void createLargeDocument() { // TODO (DANOBLE) beforeClass method intermittently times out within the SETUP_TIMEOUT interval. // see see https://github.com/Azure/azure-sdk-for-java/issues/6377 - @BeforeClass(groups = { "emulator" }, timeOut = 2 * SETUP_TIMEOUT) + @BeforeClass(groups = { "simple" }, timeOut = 2 * SETUP_TIMEOUT) public void before_VeryLargeDocumentQueryTest() { client = clientBuilder().buildAsyncClient(); createdCollection = getSharedMultiPartitionCosmosContainer(client); truncateCollection(createdCollection); } - @AfterClass(groups = { "emulator" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(client); }