diff --git a/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/FeedResponseListValidator.java b/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/FeedResponseListValidator.java index 6356f1363..13ce9656d 100644 --- a/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/FeedResponseListValidator.java +++ b/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/FeedResponseListValidator.java @@ -285,10 +285,10 @@ public void validate(List> feedList) { assertThat(queryMetrics.getTotalQueryExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0); assertThat(queryMetrics.getOutputDocumentCount()).isGreaterThan(0); assertThat(queryMetrics.getRetrievedDocumentCount()).isGreaterThan(0); - assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThan(0); + assertThat(queryMetrics.getDocumentLoadTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getDocumentWriteTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getVMExecutionTime().compareTo(Duration.ZERO)).isGreaterThan(0); - assertThat(queryMetrics.getQueryPreparationTimes().getLogicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThan(0); + assertThat(queryMetrics.getQueryPreparationTimes().getLogicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getQueryPreparationTimes().getPhysicalPlanBuildTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); assertThat(queryMetrics.getQueryPreparationTimes().getQueryCompilationTime().compareTo(Duration.ZERO)).isGreaterThan(0); assertThat(queryMetrics.getRuntimeExecutionTimes().getQueryEngineExecutionTime().compareTo(Duration.ZERO)).isGreaterThanOrEqualTo(0); diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/FeedOptions.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/FeedOptions.java index d0fd538ad..e70028e5e 100644 --- a/commons/src/main/java/com/microsoft/azure/cosmosdb/FeedOptions.java +++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/FeedOptions.java @@ -35,6 +35,7 @@ public final class FeedOptions extends FeedOptionsBase { private int maxDegreeOfParallelism; private int maxBufferedItemCount; private int responseContinuationTokenLimitInKb; + private boolean allowEmptyPages; public FeedOptions() {} @@ -48,6 +49,7 @@ public FeedOptions(FeedOptions options) { this.maxDegreeOfParallelism = options.maxDegreeOfParallelism; this.maxBufferedItemCount = options.maxBufferedItemCount; this.responseContinuationTokenLimitInKb = options.responseContinuationTokenLimitInKb; + this.allowEmptyPages = options.allowEmptyPages; } /** @@ -229,4 +231,19 @@ public void setResponseContinuationTokenLimitInKb(int limitInKb) { public int getResponseContinuationTokenLimitInKb() { return responseContinuationTokenLimitInKb; } + + /** + * Gets the option to allow empty result pages in feed response. + */ + public boolean getAllowEmptyPages() { + return allowEmptyPages; + } + + /** + * Sets the option to allow empty result pages in feed response. Defaults to false + * @param allowEmptyPages whether to allow empty pages in feed response + */ + public void setAllowEmptyPages(boolean allowEmptyPages) { + this.allowEmptyPages = allowEmptyPages; + } } diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java index a49b4f1be..913a164aa 100644 --- a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java +++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java @@ -98,6 +98,9 @@ public static class HttpHeaders { public static final String IS_QUERY = "x-ms-documentdb-isquery"; public static final String ENABLE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-enablecrosspartition"; public static final String PARALLELIZE_CROSS_PARTITION_QUERY = "x-ms-documentdb-query-parallelizecrosspartitionquery"; + public static final String IS_QUERY_PLAN_REQUEST = "x-ms-cosmos-is-query-plan-request"; + public static final String SUPPORTED_QUERY_FEATURES = "x-ms-cosmos-supported-query-features"; + public static final String QUERY_VERSION = "x-ms-cosmos-query-version"; // Our custom DocDB headers public static final String CONTINUATION = "x-ms-continuation"; @@ -274,6 +277,7 @@ public static class Versions { // https://stackoverflow.com/questions/2469922/generate-a-version-java-file-in-maven public static final String SDK_VERSION = "2.5.1"; public static final String SDK_NAME = "cosmosdb-java-sdk"; + public static final String QUERY_VERSION = "1.0"; } public static class StatusCodes { diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/OperationType.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/OperationType.java index 05b1a4845..cedcd06af 100644 --- a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/OperationType.java +++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/OperationType.java @@ -56,6 +56,7 @@ public enum OperationType { Replace, Resume, SqlQuery, + QueryPlan, Stop, Throttle, Update, diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/Utils.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/Utils.java index 821e25392..84405cedd 100644 --- a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/Utils.java +++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/Utils.java @@ -348,6 +348,7 @@ public static boolean isFeedRequest(OperationType requestOperationType) { requestOperationType == OperationType.ReadFeed || requestOperationType == OperationType.Query || requestOperationType == OperationType.SqlQuery || + requestOperationType == OperationType.QueryPlan || requestOperationType == OperationType.HeadFeed; } diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/query/metrics/QueryMetricsTextWriter.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/query/metrics/QueryMetricsTextWriter.java index 72c6383b0..742585c49 100644 --- a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/query/metrics/QueryMetricsTextWriter.java +++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/query/metrics/QueryMetricsTextWriter.java @@ -175,7 +175,7 @@ protected void writeIndexHitRatio(double indexHitRatio) { @Override protected void writeTotalQueryExecutionTime(Duration totalQueryExecutionTime) { - QueryMetricsTextWriter.appendNanosecondsToStringBuilder(stringBuilder, + QueryMetricsTextWriter.appendMillisecondsToStringBuilder(stringBuilder, QueryMetricsTextWriter.TotalQueryExecutionTime, durationToMilliseconds(totalQueryExecutionTime), 0); } diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/Int128.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/Int128.java similarity index 100% rename from direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/Int128.java rename to commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/Int128.java diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/MurmurHash3_128.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/MurmurHash3_128.java similarity index 100% rename from direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/MurmurHash3_128.java rename to commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/MurmurHash3_128.java diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/MurmurHash3_32.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/MurmurHash3_32.java similarity index 100% rename from direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/MurmurHash3_32.java rename to commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/MurmurHash3_32.java diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternal.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternal.java index 105119358..7cf9a742b 100644 --- a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternal.java +++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternal.java @@ -35,6 +35,7 @@ import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import com.microsoft.azure.cosmosdb.PartitionKeyDefinition; import com.microsoft.azure.cosmosdb.Undefined; import com.microsoft.azure.cosmosdb.internal.Utils; import com.microsoft.azure.cosmosdb.rx.internal.RMResources; @@ -222,6 +223,10 @@ public List getComponents() { return components; } + public String getEffectivePartitionKeyString(PartitionKeyInternal internalPartitionKey, PartitionKeyDefinition partitionKey) { + return PartitionKeyInternalHelper.getEffectivePartitionKeyString(internalPartitionKey, partitionKey); + } + @SuppressWarnings("serial") static final class PartitionKeyInternalJsonSerializer extends StdSerializer { diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalHelper.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalHelper.java similarity index 100% rename from direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalHelper.java rename to commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalHelper.java diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/UInt128.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/UInt128.java similarity index 100% rename from direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/UInt128.java rename to commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/UInt128.java diff --git a/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalUtils.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalUtils.java similarity index 100% rename from direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalUtils.java rename to direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalUtils.java diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/query/QueryInfo.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/query/QueryInfo.java index 666011b83..af315f924 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/query/QueryInfo.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/query/QueryInfo.java @@ -35,6 +35,7 @@ * Used internally to encapsulates a query's information in the Azure Cosmos DB database service. */ public final class QueryInfo extends JsonSerializable { + private static final String HAS_SELECT_VALUE = "hasSelectValue"; private Integer top; private List orderBy; private Collection aggregates; @@ -89,4 +90,8 @@ public Collection getOrderByExpressions() { ? this.orderByExpressions : (this.orderByExpressions = super.getCollection("orderByExpressions", String.class)); } + + public boolean hasSelectValue(){ + return super.has(HAS_SELECT_VALUE) && super.getBoolean(HAS_SELECT_VALUE); + } } \ No newline at end of file diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/RoutingMapProviderHelper.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/RoutingMapProviderHelper.java index ef4311699..836f162e4 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/RoutingMapProviderHelper.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/RoutingMapProviderHelper.java @@ -1,17 +1,17 @@ /* * The MIT License (MIT) * Copyright (c) 2018 Microsoft Corporation - * + * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: - * + * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. - * + * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -26,8 +26,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.ListIterator; import com.microsoft.azure.cosmosdb.PartitionKeyRange; +import com.microsoft.azure.cosmosdb.rx.internal.IRoutingMapProvider; +import rx.Observable; +import rx.Single; /** * Provide utility functionality to route request in direct connectivity mode in the Azure Cosmos DB database service. @@ -39,7 +43,7 @@ private static String max(String left, String right) { return left.compareTo(right) < 0 ? right : left; } - private static > boolean IsSortedAndNonOverlapping(List> list) { + private static > boolean isSortedAndNonOverlapping(List> list) { for (int i = 1; i < list.size(); i++) { Range previousRange = list.get(i - 1); Range currentRange = list.get(i); @@ -57,7 +61,7 @@ private static > boolean IsSortedAndNonOverlapping(List< public static Collection getOverlappingRanges(RoutingMapProvider routingMapProvider, String collectionSelfLink, List> sortedRanges) { - if (!IsSortedAndNonOverlapping(sortedRanges)) { + if (!isSortedAndNonOverlapping(sortedRanges)) { throw new IllegalArgumentException("sortedRanges"); } @@ -94,4 +98,65 @@ public static Collection getOverlappingRanges(RoutingMapProvi return targetRanges; } + + public static Single> getOverlappingRanges(IRoutingMapProvider routingMapProvider, + String resourceId, List> sortedRanges) { + + if (routingMapProvider == null){ + throw new IllegalArgumentException("routingMapProvider"); + } + + if (sortedRanges == null) { + throw new IllegalArgumentException("sortedRanges"); + } + + if (!isSortedAndNonOverlapping(sortedRanges)) { + throw new IllegalArgumentException("sortedRanges"); + } + + List targetRanges = new ArrayList<>(); + final ListIterator> iterator = sortedRanges.listIterator(); + + return Observable.defer(() -> { + if (!iterator.hasNext()) { + return Observable.empty(); + } + + Range queryRange; + Range sortedRange = iterator.next(); + if (!targetRanges.isEmpty()) { + String left = max(targetRanges.get(targetRanges.size() - 1).getMaxExclusive(), + sortedRange.getMin()); + + boolean leftInclusive = left.compareTo(sortedRange.getMin()) == 0 && sortedRange.isMinInclusive(); + + queryRange = new Range(left, sortedRange.getMax(), leftInclusive, + sortedRange.isMaxInclusive()); + } else { + queryRange = sortedRange; + } + + return routingMapProvider.tryGetOverlappingRangesAsync(resourceId, queryRange, false, null) + .map(targetRanges::addAll) + .flatMap(aBoolean -> { + if (!targetRanges.isEmpty()) { + Range lastKnownTargetRange = targetRanges.get(targetRanges.size() - 1).toRange(); + while (iterator.hasNext()) { + Range value = iterator.next(); + if (MAX_COMPARATOR.compare(value, lastKnownTargetRange) > 0) { + // Since we already moved forward on iterator to check above condition, we + // go to previous when it fails so the the value is not skipped on iteration + iterator.previous(); + break; + } + } + } + return Single.just(targetRanges); + }).toObservable(); + }).repeat(sortedRanges.size()) + .takeUntil(stringRange -> !iterator.hasNext()) + .last() + .toSingle(); + } + } diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxGatewayStoreModel.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxGatewayStoreModel.java index 7de0b323f..79ed63f31 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxGatewayStoreModel.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxGatewayStoreModel.java @@ -144,7 +144,9 @@ private Observable readFeed(RxDocumentServiceRequest } private Observable query(RxDocumentServiceRequest request) { - request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true"); + if(request.getOperationType() != OperationType.QueryPlan) { + request.getHeaders().put(HttpConstants.HttpHeaders.IS_QUERY, "true"); + } switch (this.queryCompatibilityMode) { case SqlQuery: @@ -462,7 +464,8 @@ private Observable invokeAsyncInternal(RxDocumentServ case Replace: return this.replace(request); case SqlQuery: - case Query: + case Query: + case QueryPlan: return this.query(request); default: throw new IllegalStateException("Unknown operation type " + request.getOperationType()); diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DefaultDocumentQueryExecutionContext.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DefaultDocumentQueryExecutionContext.java index c38e5d40f..4b06944c2 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DefaultDocumentQueryExecutionContext.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DefaultDocumentQueryExecutionContext.java @@ -23,6 +23,7 @@ package com.microsoft.azure.cosmosdb.rx.internal.query; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -40,6 +41,7 @@ import com.microsoft.azure.cosmosdb.internal.routing.PartitionKeyInternal; import com.microsoft.azure.cosmosdb.internal.routing.PartitionKeyRangeIdentity; import com.microsoft.azure.cosmosdb.internal.routing.Range; +import com.microsoft.azure.cosmosdb.internal.routing.RoutingMapProviderHelper; import com.microsoft.azure.cosmosdb.rx.internal.BackoffRetryUtility; import com.microsoft.azure.cosmosdb.rx.internal.IDocumentClientRetryPolicy; import com.microsoft.azure.cosmosdb.rx.internal.InvalidPartitionExceptionRetryPolicy; @@ -132,10 +134,14 @@ public Observable> executeAsync() { } public Single> getTargetPartitionKeyRanges(String resourceId, List> queryRanges) { - // TODO: FIXME this needs to be revisited - - Range r = new Range<>("", "FF", true, false); - return client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(resourceId, r, false, null); + return RoutingMapProviderHelper.getOverlappingRanges(client.getPartitionKeyRangeCache(), resourceId, queryRanges); + } + + public Single> getTargetPartitionKeyRangesById(String resourceId, String partitionKeyRangeIdInternal) { + return client.getPartitionKeyRangeCache().tryGetPartitionKeyRangeByIdAsync(resourceId, + partitionKeyRangeIdInternal, + false, + null).flatMap(partitionKeyRange -> Single.just(Collections.singletonList(partitionKeyRange))); } protected Func1>> executeInternalAsyncFunc() { diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextBase.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextBase.java index 3692ee46e..f0ac6e042 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextBase.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextBase.java @@ -233,7 +233,10 @@ public void populatePartitionKeyRangeInfo(RxDocumentServiceRequest request, Part } if (this.resourceTypeEnum.isPartitioned()) { - request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.getId())); + boolean hasPartitionKey = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) != null; + if(!hasPartitionKey){ + request.routeTo(new PartitionKeyRangeIdentity(collectionRid, range.getId())); + } } } diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextFactory.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextFactory.java index 4b4b2891d..c003d0d82 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextFactory.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextFactory.java @@ -22,23 +22,29 @@ */ package com.microsoft.azure.cosmosdb.rx.internal.query; +import java.util.Collections; import java.util.List; import java.util.UUID; +import com.microsoft.azure.cosmosdb.DocumentClientException; import com.microsoft.azure.cosmosdb.DocumentCollection; import com.microsoft.azure.cosmosdb.FeedOptions; import com.microsoft.azure.cosmosdb.PartitionKeyRange; import com.microsoft.azure.cosmosdb.Resource; import com.microsoft.azure.cosmosdb.SqlQuerySpec; +import com.microsoft.azure.cosmosdb.internal.HttpConstants; import com.microsoft.azure.cosmosdb.internal.OperationType; import com.microsoft.azure.cosmosdb.internal.ResourceType; import com.microsoft.azure.cosmosdb.internal.query.PartitionedQueryExecutionInfo; import com.microsoft.azure.cosmosdb.internal.query.QueryInfo; +import com.microsoft.azure.cosmosdb.internal.routing.Range; import com.microsoft.azure.cosmosdb.rx.internal.BadRequestException; import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest; +import com.microsoft.azure.cosmosdb.rx.internal.Strings; import com.microsoft.azure.cosmosdb.rx.internal.Utils; import com.microsoft.azure.cosmosdb.rx.internal.caches.RxCollectionCache; +import org.apache.commons.lang3.StringUtils; import rx.Observable; import rx.Single; @@ -74,31 +80,84 @@ public static Observable collectionObs = Observable.just(null); if (resourceTypeEnum.isCollectionChild()) { collectionObs = resolveCollection(client, query, resourceTypeEnum, resourceLink).toObservable(); } - // We create a ProxyDocumentQueryExecutionContext that will be initialized with DefaultDocumentQueryExecutionContext - // which will be used to send the query to Gateway and on getting 400(bad request) with 1004(cross parition query not servable), we initialize it with - // PipelinedDocumentQueryExecutionContext by providing the partition query execution info that's needed(which we get from the exception returned from Gateway). - - Observable> proxyQueryExecutionContext = - collectionObs.flatMap(collection -> - ProxyDocumentQueryExecutionContext.createAsync( - client, - resourceTypeEnum, - resourceType, - query, - feedOptions, - resourceLink, - collection, - isContinuationExpected, - correlatedActivityId)); - - return proxyQueryExecutionContext; + DefaultDocumentQueryExecutionContext queryExecutionContext = new DefaultDocumentQueryExecutionContext( + client, + resourceTypeEnum, + resourceType, + query, + feedOptions, + resourceLink, + correlatedActivityId, + isContinuationExpected); + + if (ResourceType.Document != resourceTypeEnum + || (feedOptions != null && feedOptions.getPartitionKeyRangeIdInternal() != null)) { + return Observable.just(queryExecutionContext); + } + + Single queryExecutionInfoSingle = + QueryPlanRetriever.getQueryPlanThroughGatewayAsync(client, query, resourceLink); + + return collectionObs.flatMap(collection -> queryExecutionInfoSingle.toObservable() + .flatMap(partitionedQueryExecutionInfo -> { + QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo(); + + // Non value aggregates must go through DefaultDocumentQueryExecutionContext + // Single partition query can serve queries like SELECT AVG(c.age) FROM c + // SELECT MIN(c.age) + 5 FROM c + // SELECT MIN(c.age), MAX(c.age) FROM c + // while pipelined queries can only serve + // SELECT VALUE . So we send the query down the old pipeline to avoid a breaking change. + // We will skip this in V3 SDK + if(queryInfo.hasAggregates() && !queryInfo.hasSelectValue()){ + if(feedOptions != null && feedOptions.getEnableCrossPartitionQuery()){ + return Observable.error(new DocumentClientException(HttpConstants.StatusCodes.BADREQUEST, + "Cross partition query only supports 'VALUE ' for aggregates")); + } + return Observable.just( queryExecutionContext); + } + + Single> partitionKeyRanges; + + if (feedOptions != null && !StringUtils.isEmpty(feedOptions.getPartitionKeyRangeIdInternal())) { + partitionKeyRanges = queryExecutionContext.getTargetPartitionKeyRangesById(collection.getResourceId(), + feedOptions.getPartitionKeyRangeIdInternal()); + } else { + List> queryRanges = partitionedQueryExecutionInfo.getQueryRanges(); + + if (feedOptions != null && feedOptions.getPartitionKey() != null) { + Range range = Range.getPointRange(feedOptions.getPartitionKey() + .getInternalPartitionKey() + .getEffectivePartitionKeyString(feedOptions.getPartitionKey() + .getInternalPartitionKey(), collection.getPartitionKey())); + queryRanges = Collections.singletonList(range); + } + partitionKeyRanges = queryExecutionContext + .getTargetPartitionKeyRanges(collection.getResourceId(), queryRanges); + } + + Observable> exContext = partitionKeyRanges + .toObservable() + .flatMap(pkranges -> createSpecializedDocumentQueryExecutionContextAsync(client, + resourceTypeEnum, + resourceType, + query, + feedOptions, + resourceLink, + isContinuationExpected, + partitionedQueryExecutionInfo, + pkranges, + collection.getResourceId(), + correlatedActivityId)); + + return exContext; + })); } public static Observable> createSpecializedDocumentQueryExecutionContextAsync( @@ -114,7 +173,12 @@ public static Observable 0, "MaxItemCount", "Invalid MaxItemCount %s", initialPageSize); @@ -123,6 +187,10 @@ public static Observable extends ParallelDocumentQueryExecutionContextBase { - + FeedOptions feedOptions; + private ParallelDocumentQueryExecutionContext( IDocumentQueryClient client, List partitionKeyRanges, @@ -75,6 +76,7 @@ private ParallelDocumentQueryExecutionContext( UUID correlatedActivityId) { super(client, partitionKeyRanges, resourceTypeEnum, resourceType, query, feedOptions, resourceLink, rewrittenQuery, isContinuationExpected, getLazyFeedResponse, correlatedActivityId); + this.feedOptions = feedOptions; } public static Observable> createAsync( @@ -194,9 +196,9 @@ private static class EmptyPagesFilterTransformer implements Transformer.DocumentProducerFeedResponse, FeedResponse> { private final RequestChargeTracker tracker; private DocumentProducer.DocumentProducerFeedResponse previousPage; + private final FeedOptions feedOptions; - public EmptyPagesFilterTransformer( - RequestChargeTracker tracker) { + public EmptyPagesFilterTransformer(RequestChargeTracker tracker, FeedOptions feedOptions) { if (tracker == null) { throw new IllegalArgumentException("Request Charge Tracker must not be null."); @@ -204,6 +206,7 @@ public EmptyPagesFilterTransformer( this.tracker = tracker; this.previousPage = null; + this.feedOptions = feedOptions; } private DocumentProducer.DocumentProducerFeedResponse plusCharge( @@ -246,7 +249,7 @@ private static Map headerResponse( public Observable> call( Observable.DocumentProducerFeedResponse> source) { return source.filter(documentProducerFeedResponse -> { - if (documentProducerFeedResponse.pageResult.getResults().isEmpty()) { + if (documentProducerFeedResponse.pageResult.getResults().isEmpty() && !this.feedOptions.getAllowEmptyPages()) { // filter empty pages and accumulate charge tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge()); return false; @@ -306,7 +309,6 @@ public Observable> call( page = current; page = this.addCompositeContinuationToken(page, compositeContinuationToken); - return page; }).map(documentProducerFeedResponse -> { // Unwrap the documentProducerFeedResponse and get back the feedResponse @@ -333,7 +335,8 @@ public Observable> drainAsync( .map(dp -> dp.produceAsync()) // Merge results from all partitions. .collect(Collectors.toList()); - return Observable.concat(obs).compose(new EmptyPagesFilterTransformer<>(new RequestChargeTracker())); + return Observable.concat(obs).compose(new EmptyPagesFilterTransformer<>(new RequestChargeTracker(), + this.feedOptions)); } @Override diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ParallelDocumentQueryExecutionContextBase.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ParallelDocumentQueryExecutionContextBase.java index 8ed4f7138..b451b1f76 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ParallelDocumentQueryExecutionContextBase.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ParallelDocumentQueryExecutionContextBase.java @@ -84,11 +84,18 @@ protected void initialize(String collectionRid, Map commonRequestHeaders = createCommonHeadersAsync(this.getFeedOptions(null, null)); for (PartitionKeyRange targetRange : partitionKeyRangeToContinuationTokenMap.keySet()) { + Func3 createRequestFunc = (partitionKeyRange, continuationToken, pageSize) -> { Map headers = new HashMap<>(commonRequestHeaders); headers.put(HttpConstants.HttpHeaders.CONTINUATION, continuationToken); headers.put(HttpConstants.HttpHeaders.PAGE_SIZE, Strings.toString(pageSize)); + if(feedOptions.getPartitionKey() != null){ + headers.put(HttpConstants.HttpHeaders.PARTITION_KEY, feedOptions + .getPartitionKey() + .getInternalPartitionKey() + .toJson()); + } return this.createDocumentServiceRequest(headers, querySpecForInit, partitionKeyRange, collectionRid); }; diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ProxyDocumentQueryExecutionContext.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ProxyDocumentQueryExecutionContext.java deleted file mode 100644 index 607b84152..000000000 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ProxyDocumentQueryExecutionContext.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * The MIT License (MIT) - * Copyright (c) 2018 Microsoft Corporation - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.microsoft.azure.cosmosdb.rx.internal.query; - -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.microsoft.azure.cosmosdb.DocumentClientException; -import com.microsoft.azure.cosmosdb.DocumentCollection; -import com.microsoft.azure.cosmosdb.FeedOptions; -import com.microsoft.azure.cosmosdb.FeedResponse; -import com.microsoft.azure.cosmosdb.PartitionKeyRange; -import com.microsoft.azure.cosmosdb.Resource; -import com.microsoft.azure.cosmosdb.SqlQuerySpec; -import com.microsoft.azure.cosmosdb.internal.HttpConstants; -import com.microsoft.azure.cosmosdb.internal.ResourceType; -import com.microsoft.azure.cosmosdb.internal.query.PartitionedQueryExecutionInfo; -import com.microsoft.azure.cosmosdb.rx.internal.Exceptions; -import com.microsoft.azure.cosmosdb.rx.internal.Utils; - -import rx.Observable; -import rx.Single; -import rx.functions.Func1; - -/** - * While this class is public, but it is not part of our published public APIs. - * This is meant to be internally used only by our sdk. - * - * This class is used as a proxy to wrap the - * DefaultDocumentQueryExecutionContext which is needed for sending the query to - * Gateway first and then uses PipelinedDocumentQueryExecutionContext after it - * gets the necessary info. - */ -public class ProxyDocumentQueryExecutionContext implements IDocumentQueryExecutionContext { - - private IDocumentQueryExecutionContext innerExecutionContext; - private IDocumentQueryClient client; - private ResourceType resourceTypeEnum; - private Class resourceType; - private FeedOptions feedOptions; - private SqlQuerySpec query; - private String resourceLink; - private DocumentCollection collection; - private UUID correlatedActivityId; - private boolean isContinuationExpected; - private final static Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - public ProxyDocumentQueryExecutionContext( - IDocumentQueryExecutionContext innerExecutionContext, - IDocumentQueryClient client, - ResourceType resourceTypeEnum, - Class resourceType, - SqlQuerySpec query, - FeedOptions feedOptions, - String resourceLink, - DocumentCollection collection, - boolean isContinuationExpected, - UUID correlatedActivityId) { - this.innerExecutionContext = innerExecutionContext; - - this.client = client; - this.resourceTypeEnum = resourceTypeEnum; - this.resourceType = resourceType; - this.query = query; - this.feedOptions = feedOptions; - this.resourceLink = resourceLink; - - this.collection = collection; - this.isContinuationExpected = isContinuationExpected; - this.correlatedActivityId = correlatedActivityId; - } - - @Override - public Observable> executeAsync() { - - Func1>> func = t -> { - - logger.debug("Received non result message from gateway", t); - if (!(t instanceof Exception)) { - logger.error("Unexpected failure", t); - return Observable.error(t); - } - - if (!isCrossPartitionQuery((Exception) t)) { - // If this is not a cross partition query then propagate error - logger.debug("Failure from gateway", t); - return Observable.error(t); - } - - logger.debug("Setting up query pipeline using the query plan received form gateway"); - - // cross partition query construct pipeline - - DocumentClientException dce = (DocumentClientException) t; - - PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = new - PartitionedQueryExecutionInfo(dce.getError().getPartitionedQueryExecutionInfo()); - - logger.debug("Query Plan from gateway {}", partitionedQueryExecutionInfo); - - DefaultDocumentQueryExecutionContext queryExecutionContext = - (DefaultDocumentQueryExecutionContext) this.innerExecutionContext; - - Single> partitionKeyRanges = queryExecutionContext.getTargetPartitionKeyRanges(collection.getResourceId(), - partitionedQueryExecutionInfo.getQueryRanges()); - - Observable> exContext = partitionKeyRanges.toObservable() - .flatMap(pkranges -> { - return DocumentQueryExecutionContextFactory.createSpecializedDocumentQueryExecutionContextAsync( - this.client, - this.resourceTypeEnum, - this.resourceType, - this.query, - this.feedOptions, - this.resourceLink, - isContinuationExpected, - partitionedQueryExecutionInfo, - pkranges, - this.collection.getResourceId(), - this.correlatedActivityId); - }); - - return exContext.flatMap(context -> context.executeAsync()); - }; - - return this.innerExecutionContext.executeAsync().onErrorResumeNext(func); - } - - private boolean isCrossPartitionQuery(Exception exception) { - - DocumentClientException clientException = Utils.as(exception, DocumentClientException.class); - - if (clientException == null) { - return false; - } - - return (Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.BADREQUEST) && - Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.CROSS_PARTITION_QUERY_NOT_SERVABLE)); - } - - public static Observable> createAsync(IDocumentQueryClient client, - ResourceType resourceTypeEnum, Class resourceType, SqlQuerySpec query, FeedOptions feedOptions, - String resourceLink, DocumentCollection collection, boolean isContinuationExpected, - UUID correlatedActivityId) { - - IDocumentQueryExecutionContext innerExecutionContext = - new DefaultDocumentQueryExecutionContext( - client, - resourceTypeEnum, - resourceType, - query, - feedOptions, - resourceLink, - correlatedActivityId, - isContinuationExpected); - - return Observable.just(new ProxyDocumentQueryExecutionContext(innerExecutionContext, client, - resourceTypeEnum, - resourceType, - query, - feedOptions, - resourceLink, - collection, - isContinuationExpected, - correlatedActivityId)); - } -} diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/QueryFeature.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/QueryFeature.java new file mode 100644 index 000000000..c4ee425d8 --- /dev/null +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/QueryFeature.java @@ -0,0 +1,14 @@ +package com.microsoft.azure.cosmosdb.rx.internal.query; + +enum QueryFeature { + None, + Aggregate, + CompositeAggregate, + Distinct, + GroupBy, + MultipleAggregates, + MultipleOrderBy, + OffsetAndLimit, + OrderBy, + Top +} diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/QueryPlanRetriever.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/QueryPlanRetriever.java new file mode 100644 index 000000000..d405d34cd --- /dev/null +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/QueryPlanRetriever.java @@ -0,0 +1,70 @@ +/* + * The MIT License (MIT) + * Copyright (c) 2018 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package com.microsoft.azure.cosmosdb.rx.internal.query; + +import com.microsoft.azure.cosmosdb.SqlQuerySpec; +import com.microsoft.azure.cosmosdb.internal.HttpConstants; +import com.microsoft.azure.cosmosdb.internal.OperationType; +import com.microsoft.azure.cosmosdb.internal.ResourceType; +import com.microsoft.azure.cosmosdb.internal.RuntimeConstants; +import com.microsoft.azure.cosmosdb.internal.query.PartitionedQueryExecutionInfo; +import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest; +import rx.Single; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +class QueryPlanRetriever { + private static final String TRUE = "True"; + private static final String SUPPORTED_QUERY_FEATURES = QueryFeature.Aggregate.name() + ", " + + QueryFeature.CompositeAggregate.name() + ", " + + QueryFeature.Distinct.name() + ", " + + QueryFeature.MultipleOrderBy.name() + ", " + + QueryFeature.OffsetAndLimit.name() + ", " + + QueryFeature.OrderBy.name() + ", " + + QueryFeature.Top.name(); + + static Single getQueryPlanThroughGatewayAsync(IDocumentQueryClient queryClient, + SqlQuerySpec sqlQuerySpec, String resourceLink) { + Map requestHeaders = new HashMap<>(); + requestHeaders.put(HttpConstants.HttpHeaders.CONTENT_TYPE, RuntimeConstants.MediaTypes.JSON); + requestHeaders.put(HttpConstants.HttpHeaders.IS_QUERY_PLAN_REQUEST, TRUE); + requestHeaders.put(HttpConstants.HttpHeaders.SUPPORTED_QUERY_FEATURES, SUPPORTED_QUERY_FEATURES); + requestHeaders.put(HttpConstants.HttpHeaders.QUERY_VERSION, HttpConstants.Versions.QUERY_VERSION); + + RxDocumentServiceRequest request; + request = RxDocumentServiceRequest.create(OperationType.QueryPlan, + ResourceType.Document, + resourceLink, + requestHeaders); + request.UseGatewayMode = true; + request.setContentBytes(sqlQuerySpec.toJson().getBytes(StandardCharsets.UTF_8)); + + return queryClient.executeQueryAsync(request).flatMap(rxDocumentServiceResponse -> { + PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = + new PartitionedQueryExecutionInfo(rxDocumentServiceResponse.getReponseBodyAsString()); + return Single.just(partitionedQueryExecutionInfo); + }); + } +} diff --git a/gateway/src/test/java/com/microsoft/azure/cosmosdb/internal/routing/RoutingMapProviderHelperTest.java b/gateway/src/test/java/com/microsoft/azure/cosmosdb/internal/routing/RoutingMapProviderHelperTest.java index c0b105b5b..151c7cfd9 100644 --- a/gateway/src/test/java/com/microsoft/azure/cosmosdb/internal/routing/RoutingMapProviderHelperTest.java +++ b/gateway/src/test/java/com/microsoft/azure/cosmosdb/internal/routing/RoutingMapProviderHelperTest.java @@ -29,15 +29,21 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import com.microsoft.azure.cosmosdb.rx.internal.IRoutingMapProvider; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.mockito.Matchers; +import org.mockito.Mockito; import org.testng.annotations.Test; import com.microsoft.azure.cosmosdb.PartitionKeyRange; +import rx.Single; public class RoutingMapProviderHelperTest { private static final MockRoutingMapProvider ROUTING_MAP_PROVIDER = new MockRoutingMapProvider( @@ -147,4 +153,56 @@ public String apply(PartitionKeyRange range) { assertThat("3,4").isEqualTo(ranges.stream().map(func).collect(Collectors.joining(","))); } + + @Test(groups = {"unit"}) + public void getOverlappingRangesWithList() { + + Function func = new Function() { + @Override + public String apply(PartitionKeyRange range) { + return range.getId(); + } + }; + + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + List rangeList = Arrays.asList(new PartitionKeyRange("0", "", "000A"), + new PartitionKeyRange("1", "000A", "000D"), + new PartitionKeyRange("2", "000D", "0012"), + new PartitionKeyRange("3", "0012", "0015"), + new PartitionKeyRange("4", "0015", "0020"), + new PartitionKeyRange("5", "0020", "0040"), + new PartitionKeyRange("6", "0040", "FF")); + Single> listSingle = Single.just(rangeList); + + Map> resultMap = new HashMap<>(); + + resultMap.put(new Range<>("000D", "0012", true, false), + Collections.singletonList(new PartitionKeyRange("2", "000D", "0012"))); + resultMap.put(new Range<>("0012", "0015", true, false), + Collections.singletonList(new PartitionKeyRange("3", "0012", "0015"))); + resultMap.put(new Range<>("0015", "0020", true, false), + Collections.singletonList(new PartitionKeyRange("4", "0015", "00120"))); + + Mockito.doAnswer(invocationOnMock -> { + Range range = invocationOnMock.getArgumentAt(1, Range.class); + return Single.just(resultMap.get(range)); + }).when(routingMapProviderMock).tryGetOverlappingRangesAsync(Matchers.anyString(), + Matchers.any(), + Matchers.anyBoolean(), + Matchers.anyMap()); + + Single> overlappingRanges; + overlappingRanges = RoutingMapProviderHelper.getOverlappingRanges(routingMapProviderMock, + "/dbs/db1/colls/coll1", + Arrays.asList(new Range("000D", "0012", true, false), + new Range("0012", "0015", true, false), + new Range<>("0015", "0020", true, false))); + assertThat("2,3,4").isEqualTo(overlappingRanges.toBlocking().value().stream().map(func).collect(Collectors.joining(","))); + + overlappingRanges = RoutingMapProviderHelper.getOverlappingRanges(routingMapProviderMock, + "/dbs/db1/colls/coll1", + Arrays.asList(new Range("000D", "0012", true, false))); + assertThat("2").isEqualTo(overlappingRanges.toBlocking().value().stream().map(func).collect(Collectors.joining(","))); + + } } diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AggregateQueryTests.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AggregateQueryTests.java index 345536cc0..ca77d1d1c 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AggregateQueryTests.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AggregateQueryTests.java @@ -94,8 +94,14 @@ public void queryDocumentsWithAggregates(boolean qmEnabled) throws Exception { options.setEnableCrossPartitionQuery(true); options.setPopulateQueryMetrics(qmEnabled); options.setMaxDegreeOfParallelism(2); - - for (QueryConfig queryConfig : queryConfigs) { + + for (QueryConfig queryConfig : queryConfigs) { + // Cross partition Non value aggregates are not supported + if(queryConfig.query.contains("VALUE")){ + options.setEnableCrossPartitionQuery(true); + }else{ + options.setEnableCrossPartitionQuery(false); + } Observable> queryObservable = client .queryDocuments(createdCollection.getSelfLink(), queryConfig.query, options); diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/BackPressureTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/BackPressureTest.java index 2070f2618..322b27307 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/BackPressureTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/BackPressureTest.java @@ -52,6 +52,7 @@ public class BackPressureTest extends TestSuiteBase { private static final int TIMEOUT = 200000; private static final int SETUP_TIMEOUT = 60000; + private static final int THRESHOLD = 5; private Database createdDatabase; private DocumentCollection createdCollection; @@ -101,7 +102,7 @@ public void readFeed() throws Exception { // validate that the difference between the number of requests to backend // and the number of returned results is always less than a fixed threshold assertThat(client.httpRequests.size() - subscriber.getOnNextEvents().size()) - .isLessThanOrEqualTo(RxRingBuffer.SIZE); + .isLessThanOrEqualTo(RxRingBuffer.SIZE + THRESHOLD); subscriber.requestMore(1); i++; @@ -138,7 +139,7 @@ public void query() throws Exception { // validate that the difference between the number of requests to backend // and the number of returned results is always less than a fixed threshold assertThat(client.httpRequests.size() - subscriber.getValueCount()) - .isLessThanOrEqualTo(RxRingBuffer.SIZE); + .isLessThanOrEqualTo(RxRingBuffer.SIZE + THRESHOLD); subscriber.requestMore(1); i++; diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OrderbyDocumentQueryTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OrderbyDocumentQueryTest.java index 6834b7b64..6876294de 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OrderbyDocumentQueryTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OrderbyDocumentQueryTest.java @@ -266,20 +266,6 @@ private List sortDocumentsAndCollectResourceIds(String propName, Fun .map(d -> d.getResourceId()).collect(Collectors.toList()); } - @Test(groups = { "simple" }, timeOut = TIMEOUT) - public void crossPartitionQueryNotEnabled() throws Exception { - String query = "SELECT * FROM r ORDER BY r.propInt"; - FeedOptions options = new FeedOptions(); - Observable> queryObservable = client - .queryDocuments(getCollectionLink(), query, options); - - FailureValidator validator = new FailureValidator.Builder() - .instanceOf(DocumentClientException.class) - .statusCode(400) - .build(); - validateQueryFailure(queryObservable, validator); - } - @Test(groups = { "simple" }, timeOut = TIMEOUT) public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exception { String query = "SELECT * FROM r ORDER BY r.propScopedPartitionInt ASC"; diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ParallelDocumentQueryTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ParallelDocumentQueryTest.java index 0683964ea..313a14026 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ParallelDocumentQueryTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ParallelDocumentQueryTest.java @@ -26,6 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -230,12 +231,16 @@ public void crossPartitionQueryNotEnabled() { FeedOptions options = new FeedOptions(); Observable> queryObservable = client .queryDocuments(getCollectionLink(), query, options); + List expectedDocs = createdDocuments; - FailureValidator validator = new FailureValidator.Builder() - .instanceOf(DocumentClientException.class) - .statusCode(400) + FeedResponseListValidator validator = new FeedResponseListValidator.Builder() + .totalSize(expectedDocs.size()) + .exactlyContainsInAnyOrder(expectedDocs.stream().map(d -> d.getResourceId()).collect(Collectors.toList())) + .allPagesSatisfy(new FeedResponseValidator.Builder() + .requestChargeGreaterThanOrEqualTo(1.0).build()) .build(); - validateQueryFailure(queryObservable, validator); + + validateQuerySuccess(queryObservable, validator); } @Test(groups = { "simple" }, timeOut = 2 * TIMEOUT) @@ -419,4 +424,31 @@ private List queryWithContinuationTokens(String query, int pageSize) { return receivedDocuments; } + + @Test(groups = { "simple" }, timeOut = TIMEOUT) + public void unsupportedQueries() { + String aggregateWithoutValue = "SELECT COUNT(1) FROM c"; + String compositeAggregate = "SELECT COUNT(1) + 5 FROM c"; + String multipleAggregates = "SELECT COUNT(1) + SUM(c) FROM c"; + List unsupportedQueries = Arrays.asList(aggregateWithoutValue, + compositeAggregate, + multipleAggregates); + + unsupportedQueries.forEach(this::runUnsupportedQueryForFailures); + } + + private void runUnsupportedQueryForFailures(String query){ + FeedOptions options = new FeedOptions(); + options.setEnableCrossPartitionQuery(true); + options.setMaxDegreeOfParallelism(2); + Observable> queryObservable = client.queryDocuments(getCollectionLink(), + query, + options); + FailureValidator validator = new FailureValidator.Builder() + .instanceOf(DocumentClientException.class) + .statusCode(400) + .build(); + validateQueryFailure(queryObservable, validator); + } + } diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ReadFeedDocumentsTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ReadFeedDocumentsTest.java index 4a89b94b7..a483799ad 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ReadFeedDocumentsTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ReadFeedDocumentsTest.java @@ -71,22 +71,6 @@ public void readDocuments() { validateQuerySuccess(feedObservable, validator, FEED_TIMEOUT); } - @Test(groups = { "simple" }, timeOut = FEED_TIMEOUT) - public void readDocuments_withoutEnableCrossPartitionQuery() { - FeedOptions options = new FeedOptions(); - options.setMaxItemCount(2); - - Observable> feedObservable = client.readDocuments(getCollectionLink(), options); - FailureValidator validator = FailureValidator.builder().instanceOf(DocumentClientException.class) - .statusCode(400) - .errorMessageContains("Cross partition query is required but disabled." + - " Please set x-ms-documentdb-query-enablecrosspartition to true," + - " specify x-ms-documentdb-partitionkey," + - " or revise your query to avoid this exception.") - .build(); - validateQueryFailure(feedObservable, validator, FEED_TIMEOUT); - } - @BeforeClass(groups = { "simple" }, timeOut = SETUP_TIMEOUT, alwaysRun = true) public void beforeClass() { client = this.clientBuilder().build(); diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ConsistencyTests2.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ConsistencyTests2.java index 7240708f8..c1eb8d6ca 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ConsistencyTests2.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/ConsistencyTests2.java @@ -265,6 +265,7 @@ public void call(Integer index) { try { FeedOptions feedOptions = new FeedOptions(); feedOptions.setEnableCrossPartitionQuery(true); + feedOptions.setAllowEmptyPages(true); FeedResponse queryResponse = client.queryDocuments(createdCollection.getSelfLink(), "SELECT * FROM c WHERE c.Id = 'foo'", feedOptions).toBlocking().first(); String lsnHeaderValue = queryResponse.getResponseHeaders().get(WFConstants.BackendHeaders.LSN); long lsn = Long.valueOf(lsnHeaderValue); diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/DocumentQuerySpyWireContentTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/DocumentQuerySpyWireContentTest.java index ecdf72296..a5d26d0dc 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/DocumentQuerySpyWireContentTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/DocumentQuerySpyWireContentTest.java @@ -147,17 +147,21 @@ public void queryWithContinuationTokenLimit(FeedOptions options, String query, b } private void validateRequestHasContinuationTokenLimit(HttpClientRequest request, Integer expectedValue) { - if (expectedValue != null && expectedValue > 0) { - assertThat(request.getHeaders() - .contains(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) - .isTrue(); - assertThat(request.getHeaders() - .get("x-ms-documentdb-responsecontinuationtokenlimitinkb")) - .isEqualTo(Integer.toString(expectedValue)); - } else { - assertThat(request.getHeaders() - .contains(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) - .isFalse(); + // query plan request does'nt have this header so checking only for queries + if(request.getHeaders().get(HttpConstants.HttpHeaders.IS_QUERY) != null ){ + + if (expectedValue != null && expectedValue > 0) { + assertThat(request.getHeaders() + .contains(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) + .isTrue(); + assertThat(request.getHeaders() + .get("x-ms-documentdb-responsecontinuationtokenlimitinkb")) + .isEqualTo(Integer.toString(expectedValue)); + } else { + assertThat(request.getHeaders() + .contains(HttpConstants.HttpHeaders.RESPONSE_CONTINUATION_TOKEN_LIMIT_IN_KB)) + .isFalse(); + } } }