Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class FeedOptions {
private PartitionKey partitionkey;
private boolean populateQueryMetrics;
private Map<String, Object> properties;
private boolean allowEmptyPages;

public FeedOptions() {
}
Expand All @@ -40,6 +41,7 @@ public FeedOptions(FeedOptions options) {
this.requestContinuation = options.requestContinuation;
this.partitionkey = options.partitionkey;
this.populateQueryMetrics = options.populateQueryMetrics;
this.allowEmptyPages = options.allowEmptyPages;
}

/**
Expand Down Expand Up @@ -338,4 +340,19 @@ public FeedOptions properties(Map<String, Object> properties) {
this.properties = properties;
return this;
}

/**
* Gets the option to allow empty result pages in feed response.
*/
public boolean allowEmptyPages() {
return allowEmptyPages;
}

/**
* Sets the option to allow empty result pages in feed response. Defaults to false
* @param allowEmptyPages whether to allow empty pages in feed response
*/
public void allowEmptyPages(boolean allowEmptyPages) {
this.allowEmptyPages = allowEmptyPages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public static class HttpHeaders {
public static final String IS_QUERY = "x-ms-documentdb-isquery";
public static final String ENABLE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-enablecrosspartition";
public static final String PARALLELIZE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-parallelizecrosspartitionquery";
public static final String IS_QUERY_PLAN_REQUEST = "x-ms-cosmos-is-query-plan-request";
public static final String SUPPORTED_QUERY_FEATURES = "x-ms-cosmos-supported-query-features";
public static final String QUERY_VERSION = "x-ms-cosmos-query-version";

// Our custom DocDB headers
public static final String CONTINUATION = "x-ms-continuation";
Expand Down Expand Up @@ -248,6 +251,7 @@ public static class A_IMHeaderValues {

public static class Versions {
public static final String CURRENT_VERSION = "2018-12-31";
public static final String QUERY_VERSION = "1.0";

// TODO: FIXME we can use maven plugin for generating a version file
// @see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum OperationType {
Replace,
Resume,
SqlQuery,
QueryPlan,
Stop,
Throttle,
Update,
Expand All @@ -50,4 +51,4 @@ public boolean isWriteOperation() {
this == Upsert ||
this == Update;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ private Flux<RxDocumentServiceResponse> readFeed(RxDocumentServiceRequest reques
}

private Flux<RxDocumentServiceResponse> query(RxDocumentServiceRequest request) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
if(request.getOperationType() != OperationType.QueryPlan) {
request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true");
}

switch (this.queryCompatibilityMode) {
case SqlQuery:
Expand Down Expand Up @@ -352,6 +354,7 @@ private Flux<RxDocumentServiceResponse> invokeAsyncInternal(RxDocumentServiceReq
return this.replace(request);
case SqlQuery:
case Query:
case QueryPlan:
return this.query(request);
default:
throw new IllegalStateException("Unknown operation type " + request.getOperationType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,22 +499,22 @@ public static <O, I> O as(I i, Class<O> klass) {
return null;
}
}

@SuppressWarnings("unchecked")
public static <V> List<V> immutableListOf() {
return Collections.EMPTY_LIST;
}

public static <V> List<V> immutableListOf(V v1) {
List<V> list = new ArrayList<>();
list.add(v1);
return Collections.unmodifiableList(list);
}

public static <K, V> Map<K, V>immutableMapOf() {
return Collections.emptyMap();
}

public static <K, V> Map<K, V>immutableMapOf(K k1, V v1) {
Map<K, V> map = new HashMap<K ,V>();
map.put(k1, v1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import com.azure.data.cosmos.internal.routing.PartitionKeyInternal;
import com.azure.data.cosmos.internal.routing.PartitionKeyRangeIdentity;
import com.azure.data.cosmos.internal.routing.Range;
import com.azure.data.cosmos.internal.routing.RoutingMapProviderHelper;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -85,16 +87,16 @@ public Flux<FeedResponse<T>> executeAsync() {
if (feedOptions == null) {
feedOptions = new FeedOptions();
}

FeedOptions newFeedOptions = new FeedOptions(feedOptions);

// We can not go to backend with the composite continuation token,
// but we still need the gateway for the query plan.
// The workaround is to try and parse the continuation token as a composite continuation token.
// If it is, then we send the query to the gateway with max degree of parallelism to force getting back the query plan

String originalContinuation = newFeedOptions.requestContinuation();

if (isClientSideContinuationToken(originalContinuation)) {
// At this point we know we want back a query plan
newFeedOptions.requestContinuation(null);
Expand All @@ -115,8 +117,14 @@ public Flux<FeedResponse<T>> executeAsync() {
public Mono<List<PartitionKeyRange>> getTargetPartitionKeyRanges(String resourceId, List<Range<String>> queryRanges) {
// TODO: FIXME this needs to be revisited

Range<String> r = new Range<>("", "FF", true, false);
return client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(resourceId, r, false, null);
return RoutingMapProviderHelper.getOverlappingRanges(client.getPartitionKeyRangeCache(), resourceId, queryRanges);
}

public Mono<List<PartitionKeyRange>> getTargetPartitionKeyRangesById(String resourceId, String partitionKeyRangeIdInternal) {
return client.getPartitionKeyRangeCache().tryGetPartitionKeyRangeByIdAsync(resourceId,
partitionKeyRangeIdInternal,
false,
null).flatMap(partitionKeyRange -> Mono.just(Collections.singletonList(partitionKeyRange)));
}

protected Function<RxDocumentServiceRequest, Flux<FeedResponse<T>>> executeInternalAsyncFunc() {
Expand Down Expand Up @@ -222,7 +230,7 @@ public RxDocumentServiceRequest createRequestAsync(String continuationToken, Int

return request;
}

private static boolean isClientSideContinuationToken(String continuationToken) {
if (continuationToken != null) {
ValueHolder<CompositeContinuationToken> outCompositeContinuationToken = new ValueHolder<CompositeContinuationToken>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ public void populatePartitionKeyRangeInfo(RxDocumentServiceRequest request, Part
}

if (this.resourceTypeEnum.isPartitioned()) {
request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.id()));
boolean hasPartitionKey = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) != null;
if(!hasPartitionKey){
request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.id()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@

import com.azure.data.cosmos.BadRequestException;
import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.CommonsBridgeInternal;
import com.azure.data.cosmos.internal.DocumentCollection;
import com.azure.data.cosmos.FeedOptions;
import com.azure.data.cosmos.PartitionKey;
import com.azure.data.cosmos.Resource;
import com.azure.data.cosmos.SqlQuerySpec;
import com.azure.data.cosmos.internal.HttpConstants;
import com.azure.data.cosmos.internal.OperationType;
import com.azure.data.cosmos.internal.PartitionKeyRange;
import com.azure.data.cosmos.internal.ResourceType;
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.Strings;
import com.azure.data.cosmos.internal.Utils;
import com.azure.data.cosmos.internal.caches.RxCollectionCache;
import com.azure.data.cosmos.internal.routing.PartitionKeyInternal;
import com.azure.data.cosmos.internal.routing.Range;
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -60,37 +66,82 @@ public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext
collectionObs = resolveCollection(client, query, resourceTypeEnum, resourceLink).flux();
}

// We create a ProxyDocumentQueryExecutionContext that will be initialized with DefaultDocumentQueryExecutionContext
// which will be used to send the query to GATEWAY and on getting 400(bad request) with 1004(cross parition query not servable), we initialize it with
// PipelinedDocumentQueryExecutionContext by providing the partition query execution info that's needed(which we get from the exception returned from GATEWAY).

Flux<ProxyDocumentQueryExecutionContext<T>> proxyQueryExecutionContext =
collectionObs.flatMap(collection -> {
if (feedOptions != null && feedOptions.partitionKey() != null && feedOptions.partitionKey().equals(PartitionKey.None)) {
feedOptions.partitionKey(BridgeInternal.getPartitionKey(BridgeInternal.getNonePartitionKey(collection.getPartitionKey())));
}
return ProxyDocumentQueryExecutionContext.createAsync(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
collection,
isContinuationExpected,
correlatedActivityId);
}).switchIfEmpty(ProxyDocumentQueryExecutionContext.createAsync(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
null,
isContinuationExpected,
correlatedActivityId));

return proxyQueryExecutionContext;
DefaultDocumentQueryExecutionContext<T> queryExecutionContext = new DefaultDocumentQueryExecutionContext<T>(
client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
correlatedActivityId,
isContinuationExpected);

if (ResourceType.Document != resourceTypeEnum) {
return Flux.just(queryExecutionContext);
}

Mono<PartitionedQueryExecutionInfo> queryExecutionInfoMono =
com.azure.data.cosmos.internal.query.QueryPlanRetriever.getQueryPlanThroughGatewayAsync(client, query, resourceLink);

return collectionObs.single().flatMap(collection ->
Comment thread
mbhaskar marked this conversation as resolved.
queryExecutionInfoMono.flatMap(partitionedQueryExecutionInfo -> {
QueryInfo queryInfo =
partitionedQueryExecutionInfo.getQueryInfo();
// Non value aggregates must go through
// DefaultDocumentQueryExecutionContext
// Single partition query can serve queries like SELECT AVG(c
// .age) FROM c
// SELECT MIN(c.age) + 5 FROM c
// SELECT MIN(c.age), MAX(c.age) FROM c
// while pipelined queries can only serve
// SELECT VALUE <AGGREGATE>. So we send the query down the old
// pipeline to avoid a breaking change.
// Should be fixed by adding support for nonvalueaggregates
if (queryInfo.hasAggregates() && !queryInfo.hasSelectValue()) {
if (feedOptions != null && feedOptions.enableCrossPartitionQuery()) {
return Mono.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.BADREQUEST,
"Cross partition query only supports 'VALUE " +
"<AggreateFunc>' for aggregates"));
}
return Mono.just(queryExecutionContext);
}

Mono<List<PartitionKeyRange>> partitionKeyRanges;
// The partitionKeyRangeIdInternal is no more a public API on FeedOptions, but have the below condition
// for handling ParallelDocumentQueryTest#partitionKeyRangeId
if (feedOptions != null && !StringUtils.isEmpty(CommonsBridgeInternal.partitionKeyRangeIdInternal(feedOptions))) {
partitionKeyRanges = queryExecutionContext.getTargetPartitionKeyRangesById(collection.resourceId(),
CommonsBridgeInternal.partitionKeyRangeIdInternal(feedOptions));
} else {
List<Range<String>> queryRanges =
partitionedQueryExecutionInfo.getQueryRanges();

if (feedOptions != null && feedOptions.partitionKey() != null) {
PartitionKeyInternal internalPartitionKey =
feedOptions.partitionKey()
.getInternalPartitionKey();
Range<String> range = Range.getPointRange(internalPartitionKey
.getEffectivePartitionKeyString(internalPartitionKey,
collection.getPartitionKey()));
queryRanges = Collections.singletonList(range);
}
partitionKeyRanges = queryExecutionContext
.getTargetPartitionKeyRanges(collection.resourceId(), queryRanges);
}
return partitionKeyRanges
.flatMap(pkranges -> createSpecializedDocumentQueryExecutionContextAsync(client,
resourceTypeEnum,
resourceType,
query,
feedOptions,
resourceLink,
isContinuationExpected,
partitionedQueryExecutionInfo,
pkranges,
collection.resourceId(),
correlatedActivityId).single());

})).flux();
}

public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext<T>> createSpecializedDocumentQueryExecutionContextAsync(
Expand All @@ -106,7 +157,12 @@ public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext
String collectionRid,
UUID correlatedActivityId) {

int initialPageSize = Utils.getValueOrDefault(feedOptions.maxItemCount(), ParallelQueryConfig.ClientInternalPageSize);
if (feedOptions == null) {
feedOptions = new FeedOptions();
}

int initialPageSize = Utils.getValueOrDefault(feedOptions.maxItemCount(),
ParallelQueryConfig.ClientInternalPageSize);

BadRequestException validationError = Utils.checkRequestOrReturnException(
initialPageSize > 0 || initialPageSize == -1, "MaxItemCount", "Invalid MaxItemCount %s", initialPageSize);
Expand All @@ -116,6 +172,10 @@ public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext

QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo();

if (!Strings.isNullOrEmpty(queryInfo.getRewrittenQuery())) {
query = new SqlQuerySpec(queryInfo.getRewrittenQuery(), query.parameters());
}

boolean getLazyFeedResponse = queryInfo.hasTop();

// We need to compute the optimal initial page size for order-by queries
Expand Down
Loading