Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ protected void performWorkload(BaseSubscriber<Document> documentBaseSubscriber,

FeedOptions options = new FeedOptions();
options.maxItemCount(10);
options.setEnableCrossPartitionQuery(true);

String sqlQuery = "Select top 100 * from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscr
if (configuration.getOperationType() == Configuration.Operation.QueryCross) {

int index = r.nextInt(1000);
options.setEnableCrossPartitionQuery(true);
String sqlQuery = "Select * from c where c._rid = \"" + docsToRead.get(index).getResourceId() + "\"";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Configuration.Operation.QuerySingle) {
Expand All @@ -91,35 +90,29 @@ protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscr
} else if (configuration.getOperationType() == Configuration.Operation.QueryParallel) {

options.maxItemCount(10);
options.setEnableCrossPartitionQuery(true);
String sqlQuery = "Select * from c";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Configuration.Operation.QueryOrderby) {

options.maxItemCount(10);
options.setEnableCrossPartitionQuery(true);
String sqlQuery = "Select * from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Configuration.Operation.QueryAggregate) {

options.maxItemCount(10);
options.setEnableCrossPartitionQuery(true);
String sqlQuery = "Select value max(c._ts) from c";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Configuration.Operation.QueryAggregateTopOrderby) {

options.setEnableCrossPartitionQuery(true);
String sqlQuery = "Select top 1 value count(c) from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Configuration.Operation.QueryTopOrderby) {

options.setEnableCrossPartitionQuery(true);
String sqlQuery = "Select top 1000 * from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Configuration.Operation.QueryInClauseParallel) {

ReadMyWriteWorkflow.QueryBuilder queryBuilder = new ReadMyWriteWorkflow.QueryBuilder();
options.setEnableCrossPartitionQuery(true);
options.setMaxDegreeOfParallelism(200);
List<SqlParameter> parameters = new ArrayList<>();
int j = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ private SqlQuerySpec generateRandomQuery() {
private Flux<Document> xPartitionQuery(SqlQuerySpec query) {
FeedOptions options = new FeedOptions();
options.setMaxDegreeOfParallelism(-1);
options.setEnableCrossPartitionQuery(true);

return client.<Document>queryDocuments(getCollectionLink(), query, options)
.flatMap(p -> Flux.fromIterable(p.getResults()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ private void queryItems() {
log("+ Querying the collection ");
String query = "SELECT * from root";
FeedOptions options = new FeedOptions();
options.setEnableCrossPartitionQuery(true);
options.setMaxDegreeOfParallelism(2);
Flux<FeedResponse<TestObject>> queryFlux = container.queryItems(query, options, TestObject.class);

Expand All @@ -128,7 +127,6 @@ private void queryWithContinuationToken() {
log("+ Query with paging using continuation token");
String query = "SELECT * from root r ";
FeedOptions options = new FeedOptions();
options.setEnableCrossPartitionQuery(true);
options.populateQueryMetrics(true);
options.maxItemCount(1);
String continuation = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ public void documentDelete_Async() throws Exception {

// Assert document is deleted
FeedOptions queryOptions = new FeedOptions();
queryOptions.setEnableCrossPartitionQuery(true);
List<Document> listOfDocuments = client
.queryDocuments(getCollectionLink(), String.format("SELECT * FROM r where r.id = '%s'", createdDocument.getId()), queryOptions)
.map(FeedResponse::getResults) // Map page to its list of documents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public void queryDocuments_Async() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.maxItemCount(requestPageSize);
options.setEnableCrossPartitionQuery(true);

Flux<FeedResponse<Document>> documentQueryObservable = client
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
Expand Down Expand Up @@ -176,7 +175,6 @@ public void queryDocuments_Async_withoutLambda() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.maxItemCount(requestPageSize);
options.setEnableCrossPartitionQuery(true);

Flux<FeedResponse<Document>> documentQueryObservable = client
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
Expand Down Expand Up @@ -225,7 +223,6 @@ public void queryDocuments_findTotalRequestCharge() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.maxItemCount(requestPageSize);
options.setEnableCrossPartitionQuery(true);

Flux<Double> totalChargeObservable = client
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options)
Expand All @@ -250,7 +247,6 @@ public void queryDocuments_unsubscribeAfterFirstPage() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.maxItemCount(requestPageSize);
options.setEnableCrossPartitionQuery(true);

Flux<FeedResponse<Document>> requestChargeObservable = client
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
Expand Down Expand Up @@ -287,7 +283,6 @@ public void queryDocuments_filterFetchedResults() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.maxItemCount(requestPageSize);
options.setEnableCrossPartitionQuery(true);

Predicate<Document> isPrimeNumber = new Predicate<Document>() {

Expand Down Expand Up @@ -347,7 +342,6 @@ public void queryDocuments_toBlocking_toIterator() {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.maxItemCount(requestPageSize);
options.setEnableCrossPartitionQuery(true);

Flux<FeedResponse<Document>> documentQueryObservable = client
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
Expand Down Expand Up @@ -396,7 +390,6 @@ public void qrderBy_Async() throws Exception {
// Query for the documents order by the prop field
SqlQuerySpec query = new SqlQuerySpec("SELECT r.id FROM r ORDER BY r.prop", new SqlParameterList());
FeedOptions options = new FeedOptions();
options.setEnableCrossPartitionQuery(true);
options.maxItemCount(5);

// Max degree of parallelism determines the number of partitions that
Expand Down Expand Up @@ -434,7 +427,6 @@ public void transformObservableToCompletableFuture() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.maxItemCount(requestPageSize);
options.setEnableCrossPartitionQuery(true);

Flux<FeedResponse<Document>> documentQueryObservable = client
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public void groupByInMemory() {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.maxItemCount(requestPageSize);
options.setEnableCrossPartitionQuery(true);

Flux<Document> documentsObservable = client
.<Document>queryDocuments(getCollectionLink(),
Expand Down Expand Up @@ -138,7 +137,6 @@ public void groupByInMemory_MoreDetail() {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.maxItemCount(requestPageSize);
options.setEnableCrossPartitionQuery(true);

Flux<Document> documentsObservable = client
.<Document>queryDocuments(getCollectionLink(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ public static Map<String, String> getFeedHeaders(FeedOptions options) {
options.getEmitVerboseTracesInQuery().toString());
}

if (options.getEnableCrossPartitionQuery() != null) {
headers.put(HttpConstants.HttpHeaders.ENABLE_CROSS_PARTITION_QUERY,
options.getEnableCrossPartitionQuery().toString());
}

if (options.getMaxDegreeOfParallelism() != 0) {
headers.put(HttpConstants.HttpHeaders.PARALLELIZE_CROSS_PARTITION_QUERY, Boolean.TRUE.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public final class FeedOptions {
private String partitionKeyRangeId;
private Boolean enableScanInQuery;
private Boolean emitVerboseTracesInQuery;
private Boolean enableCrossPartitionQuery;
private int maxDegreeOfParallelism;
private int maxBufferedItemCount;
private int responseContinuationTokenLimitInKb;
Expand All @@ -23,6 +22,7 @@ public final class FeedOptions {
private PartitionKey partitionkey;
private boolean populateQueryMetrics;
private Map<String, Object> properties;
private boolean allowEmptyPages;

public FeedOptions() {
}
Expand All @@ -32,14 +32,14 @@ public FeedOptions(FeedOptions options) {
this.partitionKeyRangeId = options.partitionKeyRangeId;
this.enableScanInQuery = options.enableScanInQuery;
this.emitVerboseTracesInQuery = options.emitVerboseTracesInQuery;
this.enableCrossPartitionQuery = options.enableCrossPartitionQuery;
this.maxDegreeOfParallelism = options.maxDegreeOfParallelism;
this.maxBufferedItemCount = options.maxBufferedItemCount;
this.responseContinuationTokenLimitInKb = options.responseContinuationTokenLimitInKb;
this.maxItemCount = options.maxItemCount;
this.requestContinuation = options.requestContinuation;
this.partitionkey = options.partitionkey;
this.populateQueryMetrics = options.populateQueryMetrics;
this.allowEmptyPages = options.allowEmptyPages;
}

/**
Expand Down Expand Up @@ -126,30 +126,6 @@ public FeedOptions setEmitVerboseTracesInQuery(Boolean emitVerboseTracesInQuery)
return this;
}

/**
* Gets the option to allow queries to run across all partitions of the
* collection.
*
* @return whether to allow queries to run across all partitions of the
* collection.
*/
public Boolean getEnableCrossPartitionQuery() {
return this.enableCrossPartitionQuery;
}

/**
* Sets the option to allow queries to run across all partitions of the
* collection.
*
* @param enableCrossPartitionQuery whether to allow queries to run across all
* partitions of the collection.
* @return the FeedOptions.
*/
public FeedOptions setEnableCrossPartitionQuery(Boolean enableCrossPartitionQuery) {
this.enableCrossPartitionQuery = enableCrossPartitionQuery;
return this;
}

/**
* Gets the number of concurrent operations run client side during parallel
* query execution.
Expand Down Expand Up @@ -338,4 +314,19 @@ public FeedOptions properties(Map<String, Object> properties) {
this.properties = properties;
return this;
}

/**
* Gets the option to allow empty result pages in feed response.
*/
public boolean getAllowEmptyPages() {
return allowEmptyPages;
}

/**
* Sets the option to allow empty result pages in feed response. Defaults to false
* @param allowEmptyPages whether to allow empty pages in feed response
*/
public void setAllowEmptyPages(boolean allowEmptyPages) {
this.allowEmptyPages = allowEmptyPages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ static Object getValue(JsonNode value) {
case STRING:
return value.asText();
default:
throw new IllegalStateException("Unexpected value: " + value.getNodeType());
return value;
}
}
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public static class HttpHeaders {
public static final String IS_QUERY = "x-ms-documentdb-isquery";
public static final String ENABLE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-enablecrosspartition";
public static final String PARALLELIZE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-parallelizecrosspartitionquery";
public static final String IS_QUERY_PLAN_REQUEST = "x-ms-cosmos-is-query-plan-request";
public static final String SUPPORTED_QUERY_FEATURES = "x-ms-cosmos-supported-query-features";
public static final String QUERY_VERSION = "x-ms-cosmos-query-version";

// Our custom DocDB headers
public static final String CONTINUATION = "x-ms-continuation";
Expand Down Expand Up @@ -249,6 +252,7 @@ public static class A_IMHeaderValues {

public static class Versions {
public static final String CURRENT_VERSION = "2018-12-31";
public static final String QUERY_VERSION = "1.0";

// TODO: FIXME we can use maven plugin for generating a version file
// @see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum OperationType {
Replace,
Resume,
SqlQuery,
QueryPlan,
Stop,
Throttle,
Update,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ private Flux<RxDocumentServiceResponse> readFeed(RxDocumentServiceRequest reques
}

private Flux<RxDocumentServiceResponse> query(RxDocumentServiceRequest request) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
if(request.getOperationType() != OperationType.QueryPlan) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
}

switch (this.queryCompatibilityMode) {
case SqlQuery:
Expand Down Expand Up @@ -369,6 +371,7 @@ private Flux<RxDocumentServiceResponse> invokeAsyncInternal(RxDocumentServiceReq
return this.replace(request);
case SqlQuery:
case Query:
case QueryPlan:
return this.query(request);
default:
throw new IllegalStateException("Unknown operation setType " + request.getOperationType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public CosmosItemRequestOptions createRequestOptions(Lease lease) {
@Override
public FeedOptions createFeedOptions() {
FeedOptions feedOptions = new FeedOptions();
feedOptions.setEnableCrossPartitionQuery(true);

return feedOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils.ValueHolder;
import com.azure.cosmos.implementation.routing.RoutingMapProviderHelper;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -112,11 +114,18 @@ public Flux<FeedResponse<T>> executeAsync() {
.getPaginatedQueryResultAsObservable(newFeedOptions, createRequestFunc, executeFunc, resourceType, maxPageSize);
}

public Mono<ValueHolder<List<PartitionKeyRange>>> getTargetPartitionKeyRanges(String resourceId, List<Range<String>> queryRanges) {
// TODO: FIXME this needs to be revisited
public Mono<List<PartitionKeyRange>> getTargetPartitionKeyRanges(String resourceId, List<Range<String>> queryRanges) {
return RoutingMapProviderHelper.getOverlappingRanges(client.getPartitionKeyRangeCache(), resourceId, queryRanges);
}

Range<String> r = new Range<>("", "FF", true, false);
return client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(resourceId, r, false, null);
public Mono<List<PartitionKeyRange>> getTargetPartitionKeyRangesById(String resourceId,
String partitionKeyRangeIdInternal) {
return client.getPartitionKeyRangeCache()
.tryGetPartitionKeyRangeByIdAsync(resourceId,
partitionKeyRangeIdInternal,
false,
null)
.flatMap(partitionKeyRange -> Mono.just(Collections.singletonList(partitionKeyRange.v)));
}

protected Function<RxDocumentServiceRequest, Flux<FeedResponse<T>>> executeInternalAsyncFunc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,10 @@ public Map<String, String> createCommonHeadersAsync(FeedOptions feedOptions) {
requestHeaders.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(feedOptions.maxItemCount()));
}

if (feedOptions.getEnableCrossPartitionQuery() != null) {

requestHeaders.put(HttpConstants.HttpHeaders.ENABLE_CROSS_PARTITION_QUERY,
Strings.toString(feedOptions.getEnableCrossPartitionQuery()));
}

if (feedOptions.getMaxDegreeOfParallelism() != 0) {
requestHeaders.put(HttpConstants.HttpHeaders.PARALLELIZE_CROSS_PARTITION_QUERY, Strings.toString(true));
}

if (this.feedOptions.getEnableCrossPartitionQuery() != null) {
requestHeaders.put(HttpConstants.HttpHeaders.ENABLE_SCAN_IN_QUERY,
Strings.toString(this.feedOptions.getEnableCrossPartitionQuery()));
}

if (this.feedOptions.setResponseContinuationTokenLimitInKb() > 0) {
requestHeaders.put(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB,
Strings.toString(feedOptions.setResponseContinuationTokenLimitInKb()));
Expand Down Expand Up @@ -207,7 +196,10 @@ public void populatePartitionKeyRangeInfo(RxDocumentServiceRequest request, Part
}

if (this.resourceTypeEnum.isPartitioned()) {
request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.getId()));
boolean hasPartitionKey = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) != null;
if(!hasPartitionKey){
request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.getId()));
}
}
}

Expand Down
Loading