diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/PipelinedDocumentQueryExecutionContext.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/PipelinedDocumentQueryExecutionContext.java index 591b3579e..f7e598a6f 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/PipelinedDocumentQueryExecutionContext.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/PipelinedDocumentQueryExecutionContext.java @@ -121,7 +121,7 @@ public static Observable { return TopDocumentQueryExecutionContext.createAsync(createSkipComponentFunction, - queryInfo.getTop(), continuationToken); + queryInfo.getTop(), queryInfo.getTop(), continuationToken); }; } else { createTopComponentFunction = createSkipComponentFunction; @@ -130,8 +130,13 @@ public static Observable>> createTakeComponentFunction; if (queryInfo.hasLimit()) { createTakeComponentFunction = (continuationToken) -> { + int totalLimit = queryInfo.getLimit(); + if (queryInfo.hasOffset()){ + // This is being done to match the limit from rewritten query + totalLimit = queryInfo.getOffset() + queryInfo.getLimit(); + } return TopDocumentQueryExecutionContext.createAsync(createTopComponentFunction, - queryInfo.getLimit(), + queryInfo.getLimit(), totalLimit, continuationToken); }; } else { diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/TopDocumentQueryExecutionContext.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/TopDocumentQueryExecutionContext.java index 79904836c..0d60036d6 100644 --- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/TopDocumentQueryExecutionContext.java +++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/TopDocumentQueryExecutionContext.java @@ -41,15 +41,18 @@ public class TopDocumentQueryExecutionContext implements IDo private final IDocumentQueryExecutionComponent component; private final int top; + // limit from rewritten query + private final int limit; - public TopDocumentQueryExecutionContext(IDocumentQueryExecutionComponent component, int top) { + public TopDocumentQueryExecutionContext(IDocumentQueryExecutionComponent component, int top, int limit) { this.component = component; this.top = top; + this.limit = limit; } public static Observable> createAsync( Function>> createSourceComponentFunction, - int topCount, String topContinuationToken) { + int topCount, int limit, String topContinuationToken) { TakeContinuationToken takeContinuationToken; if (topContinuationToken == null) { @@ -76,7 +79,7 @@ public static Observable { - return new TopDocumentQueryExecutionContext(component, takeContinuationToken.getTakeCount()); + return new TopDocumentQueryExecutionContext(component, takeContinuationToken.getTakeCount(), limit); }); } @@ -94,7 +97,8 @@ public Observable> drainAsync(int maxPageSize) { context = (ParallelDocumentQueryExecutionContextBase) this.component; } - context.setTop(this.top); + // we are setting the new limit from rewritten query + context.setTop(this.limit); return this.component.drainAsync(maxPageSize).takeUntil(new Func1, Boolean>() {