klas
/**
* Query for documents in a items in a container
*
- * After subscription the operation will be performed. The {@link Flux} will
+ * After subscription the operation will be performed. The {@link CosmosContinuablePagedFlux} will
* contain one or several feed response of the obtained items. In case of
- * failure the {@link Flux} will error.
+ * failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param the type parameter
* @param query the query.
* @param klass the class type
- * @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
+ * @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
- public Flux> queryItems(String query, Class klass) {
+ public CosmosContinuablePagedFlux queryItems(String query, Class klass) {
return queryItems(new SqlQuerySpec(query), klass);
}
/**
* Query for documents in a items in a container
*
- * After subscription the operation will be performed. The {@link Flux} will
+ * After subscription the operation will be performed. The {@link CosmosContinuablePagedFlux} will
* contain one or several feed response of the obtained items. In case of
- * failure the {@link Flux} will error.
+ * failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param the type parameter
* @param query the query.
* @param options the feed options.
* @param klass the class type
- * @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
+ * @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
- public Flux> queryItems(String query, FeedOptions options, Class klass) {
+ public CosmosContinuablePagedFlux queryItems(String query, FeedOptions options, Class klass) {
return queryItems(new SqlQuerySpec(query), options, klass);
}
/**
* Query for documents in a items in a container
*
- * After subscription the operation will be performed. The {@link Flux} will
+ * After subscription the operation will be performed. The {@link CosmosContinuablePagedFlux} will
* contain one or several feed response of the obtained items. In case of
- * failure the {@link Flux} will error.
+ * failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param the type parameter
* @param querySpec the SQL query specification.
* @param klass the class type
- * @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
+ * @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
- public Flux> queryItems(SqlQuerySpec querySpec, Class klass) {
+ public CosmosContinuablePagedFlux queryItems(SqlQuerySpec querySpec, Class klass) {
return queryItems(querySpec, new FeedOptions(), klass);
}
@@ -329,22 +329,33 @@ public Flux> queryItems(SqlQuerySpec querySpec, Class kla
*
* After subscription the operation will be performed. The {@link Flux} will
* contain one or several feed response of the obtained items. In case of
- * failure the {@link Flux} will error.
+ * failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param the type parameter
* @param querySpec the SQL query specification.
* @param options the feed options.
* @param klass the class type
- * @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
+ * @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
- public Flux> queryItems(SqlQuerySpec querySpec, FeedOptions options, Class klass) {
- return getDatabase().getDocClientWrapper().queryDocuments(getLink(),
- querySpec, options)
- .map(response -> BridgeInternal.createFeedResponseWithQueryMetrics(
- (CosmosItemProperties
- .getTypedResultsFromV2Results((List) (Object) response.getResults(),
- klass)), response.getResponseHeaders(),
- response.queryMetrics()));
+ public CosmosContinuablePagedFlux queryItems(SqlQuerySpec querySpec, FeedOptions options, Class klass) {
+ return queryItemsInternal(querySpec, options, klass);
+ }
+
+ private CosmosContinuablePagedFlux queryItemsInternal(SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions, Class klass) {
+ return new CosmosContinuablePagedFlux<>(pagedFluxOptions -> {
+ if (pagedFluxOptions.getRequestContinuation() != null) {
+ feedOptions.requestContinuation(pagedFluxOptions.getRequestContinuation());
+ }
+ if (pagedFluxOptions.getMaxItemCount() != null) {
+ feedOptions.maxItemCount(pagedFluxOptions.getMaxItemCount());
+ }
+ return getDatabase().getDocClientWrapper().queryDocuments(CosmosAsyncContainer.this.getLink(), sqlQuerySpec, feedOptions)
+ .map(response ->
+ BridgeInternal.createFeedResponseWithQueryMetrics((
+ CosmosItemProperties.getTypedResultsFromV2Results((List)(Object)response.getResults(), klass)),
+ response.getResponseHeaders(),
+ response.queryMetrics()));
+ });
}
/**
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java
index 93f4cb09e974..22c605b9fd69 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java
@@ -3,6 +3,7 @@
package com.azure.cosmos;
+import com.azure.core.util.IterableStream;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -254,10 +255,10 @@ public Iterator> readAllItems(FeedOptions options, Class
* @param query the query
* @param options the options
* @param klass the class type
- * @return the iterator
+ * @return the {@link IterableStream}
*/
- public Iterator> queryItems(String query, FeedOptions options, Class klass) {
- return getFeedIterator(this.asyncContainer.queryItems(query, options, klass));
+ public IterableStream> queryItems(String query, FeedOptions options, Class klass) {
+ return getFeedIterableStream(this.asyncContainer.queryItems(query, options, klass));
}
/**
@@ -270,7 +271,7 @@ public Iterator> queryItems(String query, FeedOptions option
* @return the iterator
*/
public Iterator> queryItems(SqlQuerySpec querySpec, FeedOptions options, Class klass) {
- return getFeedIterator(this.asyncContainer.queryItems(querySpec, options, klass));
+ return getFeedIterator(this.asyncContainer.queryItems(querySpec, options, klass).byPage());
}
/**
@@ -368,4 +369,8 @@ private Iterator> getFeedIterator(Flux> item
return itemFlux.toIterable(1).iterator();
}
+ private IterableStream> getFeedIterableStream(CosmosContinuablePagedFlux cosmosContinuablePagedFlux) {
+ return IterableStream.of(cosmosContinuablePagedFlux.byPage().toIterable(1));
+ }
+
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContinuablePagedFlux.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContinuablePagedFlux.java
new file mode 100644
index 000000000000..4272f4d26c03
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContinuablePagedFlux.java
@@ -0,0 +1,86 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos;
+
+import com.azure.core.util.IterableStream;
+import com.azure.core.util.paging.ContinuablePagedFlux;
+import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
+import reactor.core.CoreSubscriber;
+import reactor.core.publisher.Flux;
+
+import java.util.function.Function;
+
+/**
+ * Cosmos implementation of {@link ContinuablePagedFlux}.
+ *
+ * This type is a Flux that provides the ability to operate on pages of type {@link FeedResponse}
+ * and individual items in such pages. This type supports {@link String} type continuation tokens,
+ * allowing for restarting from a previously-retrieved continuation token.
+ *
+ * For more information on the base type, refer {@link ContinuablePagedFlux}
+ *
+ * @param The type of elements in a {@link com.azure.core.util.paging.ContinuablePage}
+ * @see com.azure.core.util.paging.ContinuablePage
+ * @see CosmosPagedFluxOptions
+ * @see FeedResponse
+ */
+public class CosmosContinuablePagedFlux extends ContinuablePagedFlux> {
+
+ private final Function>> optionsFluxFunction;
+
+ CosmosContinuablePagedFlux(Function>> optionsFluxFunction) {
+ this.optionsFluxFunction = optionsFluxFunction;
+ }
+
+ @Override
+ public Flux> byPage() {
+ CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
+
+ return this.optionsFluxFunction.apply(cosmosPagedFluxOptions);
+ }
+
+ @Override
+ public Flux> byPage(String continuationToken) {
+ CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
+ cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
+
+ return this.optionsFluxFunction.apply(cosmosPagedFluxOptions);
+ }
+
+ @Override
+ public Flux> byPage(int preferredPageSize) {
+ CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
+ cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);
+
+ return this.optionsFluxFunction.apply(cosmosPagedFluxOptions);
+ }
+
+ @Override
+ public Flux> byPage(String continuationToken, int preferredPageSize) {
+ CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
+ cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
+ cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);
+
+ return this.optionsFluxFunction.apply(cosmosPagedFluxOptions);
+ }
+
+ /**
+ * Subscribe to consume all items of type {@code T} in the sequence respectively.
+ * This is recommended for most common scenarios. This will seamlessly fetch next
+ * page when required and provide with a {@link Flux} of items.
+ *
+ * @param coreSubscriber The subscriber for this {@link CosmosContinuablePagedFlux}
+ */
+ @Override
+ public void subscribe(CoreSubscriber super T> coreSubscriber) {
+ Flux> pagedResponse = this.byPage();
+ pagedResponse.flatMap(tFeedResponse -> {
+ IterableStream elements = tFeedResponse.getElements();
+ if (elements == null) {
+ return Flux.empty();
+ }
+ return Flux.fromIterable(elements);
+ }).subscribe(coreSubscriber);
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/FeedResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/FeedResponse.java
index 669cb3b4f014..986647931f9d 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/FeedResponse.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/FeedResponse.java
@@ -3,6 +3,8 @@
package com.azure.cosmos;
+import com.azure.core.util.IterableStream;
+import com.azure.core.util.paging.ContinuablePage;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.QueryMetrics;
@@ -15,7 +17,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-public class FeedResponse {
+public class FeedResponse implements ContinuablePage {
private final List results;
private final Map header;
@@ -255,6 +257,11 @@ public String getActivityId() {
return getValueOrNull(header, HttpConstants.HttpHeaders.ACTIVITY_ID);
}
+ @Override
+ public IterableStream getElements() {
+ return IterableStream.of(this.results);
+ }
+
/**
* Gets the continuation token to be used for continuing the enumeration.
*
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java
new file mode 100644
index 000000000000..b23cf504a5f9
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java
@@ -0,0 +1,60 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation;
+
+import com.azure.cosmos.CosmosContinuablePagedFlux;
+
+/**
+ * Specifies paging options for Cosmos Paged Flux implementation.
+ * @see CosmosContinuablePagedFlux
+ */
+public class CosmosPagedFluxOptions {
+
+ private String requestContinuation;
+ private Integer maxItemCount;
+
+ public CosmosPagedFluxOptions() {}
+
+ /**
+ * Gets the request continuation token.
+ *
+ * @return the request continuation.
+ */
+ public String getRequestContinuation() {
+ return requestContinuation;
+ }
+
+ /**
+ * Sets the request continuation token.
+ *
+ * @param requestContinuation the request continuation.
+ * @return the {@link CosmosPagedFluxOptions}.
+ */
+ public CosmosPagedFluxOptions setRequestContinuation(String requestContinuation) {
+ this.requestContinuation = requestContinuation;
+ return this;
+ }
+
+ /**
+ * Gets the maximum number of items to be returned in the enumeration
+ * operation.
+ *
+ * @return the max number of items.
+ */
+ public Integer getMaxItemCount() {
+ return this.maxItemCount;
+ }
+
+ /**
+ * Sets the maximum number of items to be returned in the enumeration
+ * operation.
+ *
+ * @param maxItemCount the max number of items.
+ * @return the {@link CosmosPagedFluxOptions}.
+ */
+ public CosmosPagedFluxOptions setMaxItemCount(Integer maxItemCount) {
+ this.maxItemCount = maxItemCount;
+ return this;
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedContextClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedContextClientImpl.java
index 7fd7c0032c43..25b5d633a1a7 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedContextClientImpl.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedContextClientImpl.java
@@ -132,7 +132,8 @@ public Mono> readItem(String itemId, PartitionKey
public Flux> queryItems(CosmosAsyncContainer containerLink, SqlQuerySpec querySpec,
FeedOptions options, Class klass) {
return containerLink.queryItems(querySpec, options, klass)
- .publishOn(this.rxScheduler);
+ .byPage()
+ .publishOn(this.rxScheduler);
}
@Override
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosItemTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosItemTest.java
index dbf6083c0848..64e96c93662d 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosItemTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosItemTest.java
@@ -6,6 +6,7 @@
package com.azure.cosmos;
+import com.azure.core.util.IterableStream;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.implementation.HttpConstants;
import org.testng.annotations.AfterClass;
@@ -141,10 +142,10 @@ public void queryItems() throws Exception{
String query = String.format("SELECT * from c where c.id = '%s'", properties.getId());
FeedOptions feedOptions = new FeedOptions();
- Iterator> feedResponseIterator1 =
+ IterableStream> feedResponseIterator1 =
container.queryItems(query, feedOptions, CosmosItemProperties.class);
// Very basic validation
- assertThat(feedResponseIterator1.hasNext()).isTrue();
+ assertThat(feedResponseIterator1.iterator().hasNext()).isTrue();
SqlQuerySpec querySpec = new SqlQuerySpec(query);
Iterator> feedResponseIterator3 =
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosPartitionKeyTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosPartitionKeyTests.java
index 57720eced44c..e0dd4836d833 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosPartitionKeyTests.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosPartitionKeyTests.java
@@ -174,7 +174,7 @@ public void nonPartitionedCollectionOperations() throws Exception {
expectedIds.add(NON_PARTITIONED_CONTAINER_DOCUEMNT_ID);
expectedIds.add(replacedItemId);
expectedIds.add(upsertedItemId);
- Flux> queryFlux = createdContainer.queryItems("SELECT * from c", feedOptions, CosmosItemProperties.class);
+ Flux> queryFlux = createdContainer.queryItems("SELECT * from c", feedOptions, CosmosItemProperties.class).byPage();
FeedResponseListValidator queryValidator = new FeedResponseListValidator.Builder()
.totalSize(3)
.numberOfPages(1)
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java
index 06d8ce90dba3..4704c32370fa 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/AggregateQueryTests.java
@@ -7,6 +7,7 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosClientException;
+import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
@@ -79,7 +80,7 @@ public void queryDocumentsWithAggregates(boolean qmEnabled) throws Exception {
for (QueryConfig queryConfig : queryConfigs) {
- Flux> queryObservable = createdCollection.queryItems(queryConfig.query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(queryConfig.query, options, CosmosItemProperties.class);
FeedResponseListValidator validator = new FeedResponseListValidator.Builder()
.withAggregateValue(queryConfig.expected)
@@ -87,7 +88,7 @@ public void queryDocumentsWithAggregates(boolean qmEnabled) throws Exception {
.hasValidQueryMetrics(qmEnabled)
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
}
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java
index c0b234d1f122..87fccdfb3bfd 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureCrossPartitionTest.java
@@ -11,6 +11,7 @@
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosContainerProperties;
import com.azure.cosmos.CosmosContainerRequestOptions;
+import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.DataType;
import com.azure.cosmos.FeedOptions;
@@ -97,7 +98,7 @@ public BackPressureCrossPartitionTest(CosmosClientBuilder clientBuilder) {
private void warmUp() {
FeedOptions options = new FeedOptions();
// ensure collection is cached
- createdCollection.queryItems("SELECT * FROM r", options, CosmosItemProperties.class).blockFirst();
+ createdCollection.queryItems("SELECT * FROM r", options, CosmosItemProperties.class).byPage().blockFirst();
}
@DataProvider(name = "queryProvider")
@@ -114,18 +115,17 @@ public Object[][] queryProvider() {
}
@Test(groups = { "long" }, dataProvider = "queryProvider", timeOut = 2 * TIMEOUT)
- public void query(String query, int maxItemCount, int maxExpectedBufferedCountForBackPressure, int expectedNumberOfResults) throws Exception {
+ public void queryPages(String query, int maxItemCount, int maxExpectedBufferedCountForBackPressure, int expectedNumberOfResults) throws Exception {
FeedOptions options = new FeedOptions();
- options.maxItemCount(maxItemCount);
options.setMaxDegreeOfParallelism(2);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest) CosmosBridgeInternal.getAsyncDocumentClient(client);
rxClient.httpRequests.clear();
log.info("instantiating subscriber ...");
TestSubscriber> subscriber = new TestSubscriber<>(1);
- queryObservable.publishOn(Schedulers.elastic(), 1).subscribe(subscriber);
+ queryObservable.byPage(maxItemCount).publishOn(Schedulers.elastic(), 1).subscribe(subscriber);
int sleepTimeInMillis = 10000;
int i = 0;
@@ -158,6 +158,50 @@ public void query(String query, int maxItemCount, int maxExpectedBufferedCountFo
assertThat(subscriber.values().stream().mapToInt(p -> p.getResults().size()).sum()).isEqualTo(expectedNumberOfResults);
}
+ @Test(groups = { "long" }, dataProvider = "queryProvider", timeOut = 2 * TIMEOUT)
+ public void queryItems(String query, int maxItemCount, int maxExpectedBufferedCountForBackPressure, int expectedNumberOfResults) throws Exception {
+ FeedOptions options = new FeedOptions();
+ options.setMaxDegreeOfParallelism(2);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+
+ RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest) CosmosBridgeInternal.getAsyncDocumentClient(client);
+ rxClient.httpRequests.clear();
+
+ log.info("instantiating subscriber ...");
+ TestSubscriber subscriber = new TestSubscriber<>(1);
+ queryObservable.publishOn(Schedulers.elastic(), 1).subscribe(subscriber);
+ int sleepTimeInMillis = 10000;
+ int i = 0;
+
+ // use a test subscriber and request for more result and sleep in between
+ while (subscriber.completions() == 0 && subscriber.errorCount() == 0) {
+ log.debug("loop " + i);
+
+ TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
+ sleepTimeInMillis /= 2;
+
+ if (sleepTimeInMillis > 4000) {
+ // validate that only one item is returned to subscriber in each iteration
+ assertThat(subscriber.valueCount() - i).isEqualTo(1);
+ }
+
+ log.debug("subscriber.getValueCount(): " + subscriber.valueCount());
+ log.debug("client.httpRequests.size(): " + rxClient.httpRequests.size());
+ // validate that the difference between the number of requests to backend
+ // and the number of returned results is always less than a fixed threshold
+ assertThat(rxClient.httpRequests.size() - subscriber.valueCount())
+ .isLessThanOrEqualTo(maxExpectedBufferedCountForBackPressure);
+
+ log.debug("requesting more");
+ subscriber.requestMore(1);
+ i++;
+ }
+
+ subscriber.assertNoErrors();
+ subscriber.assertComplete();
+ assertThat(Integer.valueOf(subscriber.values().size())).isEqualTo(expectedNumberOfResults);
+ }
+
@BeforeClass(groups = { "long" }, timeOut = SETUP_TIMEOUT)
public void before_BackPressureCrossPartitionTest() {
CosmosContainerRequestOptions options = new CosmosContainerRequestOptions();
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java
index 75c62806fcee..abdbd323fe08 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/BackPressureTest.java
@@ -10,6 +10,7 @@
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosContainerProperties;
import com.azure.cosmos.CosmosContainerRequestOptions;
+import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
@@ -110,11 +111,10 @@ public void readFeed() throws Exception {
}
@Test(groups = { "long" }, timeOut = 3 * TIMEOUT)
- public void query() throws Exception {
+ public void queryPages() throws Exception {
FeedOptions options = new FeedOptions();
- options.maxItemCount(1);
- Flux> queryObservable = createdCollection.queryItems("SELECT * from r", options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems("SELECT * from r", options, CosmosItemProperties.class);
RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest)CosmosBridgeInternal.getAsyncDocumentClient(client);
rxClient.httpRequests.clear();
@@ -122,7 +122,7 @@ public void query() throws Exception {
TestSubscriber> subscriber = new TestSubscriber>(1);
AtomicInteger valueCount = new AtomicInteger();
- queryObservable.doOnNext(cosmosItemPropertiesFeedResponse -> {
+ queryObservable.byPage(1).doOnNext(cosmosItemPropertiesFeedResponse -> {
if (!cosmosItemPropertiesFeedResponse.getResults().isEmpty()) {
valueCount.incrementAndGet();
}
@@ -155,6 +155,50 @@ public void query() throws Exception {
assertThat(valueCount.get()).isEqualTo(createdDocuments.size());
}
+ @Test(groups = { "long" }, timeOut = 3 * TIMEOUT)
+ public void queryItems() throws Exception {
+ FeedOptions options = new FeedOptions();
+
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems("SELECT * from r", options, CosmosItemProperties.class);
+
+ RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest)CosmosBridgeInternal.getAsyncDocumentClient(client);
+ rxClient.httpRequests.clear();
+
+ TestSubscriber subscriber = new TestSubscriber<>(1);
+ AtomicInteger valueCount = new AtomicInteger();
+
+ queryObservable.doOnNext(cosmosItemProperties -> {
+ valueCount.incrementAndGet();
+ }).publishOn(Schedulers.elastic(), 1).subscribe(subscriber);
+
+ int sleepTimeInMillis = 10000;
+
+ int i = 0;
+ // use a test subscriber and request for more result and sleep in between
+ while(subscriber.completions() == 0 && subscriber.getEvents().get(1).isEmpty()) {
+ TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
+ sleepTimeInMillis /= 2;
+
+ if (sleepTimeInMillis > 1000) {
+ // validate that only one item is returned to subscriber in each iteration
+ assertThat(subscriber.valueCount() - i).isEqualTo(1);
+ }
+ // validate that the difference between the number of requests to backend
+ // and the number of returned results is always less than a fixed threshold
+ assertThat(rxClient.httpRequests.size() - subscriber.valueCount())
+ .isLessThanOrEqualTo(Queues.SMALL_BUFFER_SIZE);
+
+ subscriber.requestMore(1);
+ i++;
+ }
+
+ subscriber.assertNoErrors();
+ subscriber.assertComplete();
+
+ logger.debug("final value count {}", valueCount);
+ assertThat(valueCount.get()).isEqualTo(createdDocuments.size());
+ }
+
@BeforeClass(groups = { "long" }, timeOut = 2 * SETUP_TIMEOUT)
public void before_BackPressureTest() throws Exception {
@@ -191,7 +235,7 @@ private void warmUp() {
// ensure collection is cached
FeedOptions options = new FeedOptions();
- createdCollection.queryItems("SELECT * from r", options, CosmosItemProperties.class).blockFirst();
+ createdCollection.queryItems("SELECT * from r", options, CosmosItemProperties.class).byPage().blockFirst();
}
@AfterClass(groups = { "long" }, timeOut = 2 * SHUTDOWN_TIMEOUT, alwaysRun = true)
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java
index 578dcb00ff7b..e54e6eb94d96 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java
@@ -262,7 +262,7 @@ public void staledLeaseAcquiring() {
FeedOptions feedOptions = new FeedOptions();
- createdLeaseCollection.queryItems(querySpec, feedOptions, CosmosItemProperties.class)
+ createdLeaseCollection.queryItems(querySpec, feedOptions, CosmosItemProperties.class).byPage()
.delayElements(Duration.ofMillis(CHANGE_FEED_PROCESSOR_TIMEOUT / 2))
.flatMap(documentFeedResponse -> reactor.core.publisher.Flux.fromIterable(documentFeedResponse.getResults()))
.flatMap(doc -> {
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java
index dae71f0c395b..a2000100fefb 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/MultiOrderByQueryTests.java
@@ -11,6 +11,7 @@
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosContainerProperties;
+import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.CosmosItemRequestOptions;
import com.azure.cosmos.FeedOptions;
@@ -245,14 +246,14 @@ public void queryDocumentsWithMultiOrder() throws CosmosClientException, Interru
List expectedOrderedList = top(sort(filter(this.documents, hasFilter), compositeIndex, invert), hasTop, topCount) ;
- Flux> queryObservable = documentCollection.queryItems(query, feedOptions, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = documentCollection.queryItems(query, feedOptions, CosmosItemProperties.class);
FeedResponseListValidator validator = new FeedResponseListValidator
.Builder()
.withOrderedResults(expectedOrderedList, compositeIndex)
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
}
}
@@ -264,13 +265,13 @@ public void queryDocumentsWithMultiOrder() throws CosmosClientException, Interru
BridgeInternal.remove(documentWithEmptyField, NUMBER_FIELD);
documentCollection.createItem(documentWithEmptyField, new CosmosItemRequestOptions()).block();
String query = "SELECT [root." + NUMBER_FIELD + ",root." + STRING_FIELD + "] FROM root ORDER BY root." + NUMBER_FIELD + " ASC ,root." + STRING_FIELD + " DESC";
- Flux> queryObservable = documentCollection.queryItems(query, feedOptions, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = documentCollection.queryItems(query, feedOptions, CosmosItemProperties.class);
FailureValidator validator = new FailureValidator.Builder()
.instanceOf(UnsupportedOperationException.class)
.build();
- validateQueryFailure(queryObservable, validator);
+ validateQueryFailure(queryObservable.byPage(), validator);
}
private List top(List arrayList, boolean hasTop, int topCount) {
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java
index 84b6b4eeec81..9ccbee1f6c37 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OffsetLimitQueryTests.java
@@ -8,6 +8,7 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
@@ -57,7 +58,7 @@ public void queryDocuments(boolean qmEnabled) {
options.maxItemCount(5);
options.populateQueryMetrics(qmEnabled);
options.setMaxDegreeOfParallelism(2);
- Flux> queryObservable = createdCollection.queryItems(query, options,
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options,
CosmosItemProperties.class);
FeedResponseListValidator validator =
@@ -69,7 +70,7 @@ public void queryDocuments(boolean qmEnabled) {
.hasValidQueryMetrics(qmEnabled)
.build();
- validateQuerySuccess(queryObservable, validator, TIMEOUT);
+ validateQuerySuccess(queryObservable.byPage(), validator, TIMEOUT);
}
@Test(groups = {"simple"}, timeOut = TIMEOUT)
@@ -79,7 +80,7 @@ public void drainAllDocumentsUsingOffsetLimit() {
String query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount;
FeedOptions options = new FeedOptions();
options.maxItemCount(5);
- Flux> queryObservable;
+ CosmosContinuablePagedFlux queryObservable;
int totalDocsObtained = 0;
int totalDocs = docs.size();
@@ -90,7 +91,7 @@ public void drainAllDocumentsUsingOffsetLimit() {
while (numCalls < expectedNumCalls) {
query = "SELECT * from c OFFSET " + skipCount + " LIMIT " + takeCount;
queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
- Iterator> iterator = queryObservable.toIterable().iterator();
+ Iterator> iterator = queryObservable.byPage().toIterable().iterator();
while (iterator.hasNext()) {
FeedResponse next = iterator.next();
totalDocsObtained += next.getResults().size();
@@ -154,11 +155,11 @@ private List queryWithContinuationTokens(String query, int
options.maxItemCount(pageSize);
options.maxItemCount(5);
options.requestContinuation(requestContinuation);
- Flux> queryObservable =
+ CosmosContinuablePagedFlux queryObservable =
createdCollection.queryItems(query, options, CosmosItemProperties.class);
TestSubscriber> testSubscriber = new TestSubscriber<>();
- queryObservable.subscribe(testSubscriber);
+ queryObservable.byPage().subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertComplete();
@@ -203,7 +204,7 @@ public void afterClass() {
safeClose(client);
}
- @BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT)
+ @BeforeClass(groups = {"simple"}, timeOut = 3 * SETUP_TIMEOUT)
public void beforeClass() throws Exception {
client = this.clientBuilder().buildAsyncClient();
createdCollection = getSharedMultiPartitionCosmosContainer(client);
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java
index d47b9f201e8b..54c1496fa7a8 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java
@@ -8,6 +8,7 @@
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosClientException;
+import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.CosmosItemRequestOptions;
import com.azure.cosmos.FeedOptions;
@@ -76,7 +77,7 @@ public void queryDocumentsValidateContent(boolean qmEnabled) throws Exception {
FeedOptions options = new FeedOptions();
options.populateQueryMetrics(qmEnabled);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
List expectedResourceIds = new ArrayList<>();
expectedResourceIds.add(expectedDocument.getResourceId());
@@ -95,7 +96,7 @@ public void queryDocumentsValidateContent(boolean qmEnabled) throws Exception {
.hasValidQueryMetrics(qmEnabled)
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -103,7 +104,7 @@ public void queryDocuments_NoResults() throws Exception {
String query = "SELECT * from root r where r.id = '2' ORDER BY r.propInt";
FeedOptions options = new FeedOptions();
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
FeedResponseListValidator validator = new FeedResponseListValidator.Builder()
.containsExactly(new ArrayList<>())
@@ -113,7 +114,7 @@ public void queryDocuments_NoResults() throws Exception {
.hasRequestChargeHeader().build())
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@DataProvider(name = "sortOrder")
@@ -128,7 +129,7 @@ public void queryOrderBy(String sortOrder) throws Exception {
int pageSize = 3;
options.maxItemCount(pageSize);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
Comparator validatorComparator = Comparator.nullsFirst(Comparator.naturalOrder());
List expectedResourceIds = sortDocumentsAndCollectResourceIds("propInt", d -> d.getInt("propInt"), validatorComparator);
@@ -146,7 +147,7 @@ public void queryOrderBy(String sortOrder) throws Exception {
.totalRequestChargeIsAtLeast(numberOfPartitions * minQueryRequestChargePerPartition)
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -156,7 +157,7 @@ public void queryOrderByInt() throws Exception {
int pageSize = 3;
options.maxItemCount(pageSize);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
Comparator validatorComparator = Comparator.nullsFirst(Comparator.naturalOrder());
List expectedResourceIds = sortDocumentsAndCollectResourceIds("propInt", d -> d.getInt("propInt"), validatorComparator);
@@ -170,7 +171,7 @@ public void queryOrderByInt() throws Exception {
.totalRequestChargeIsAtLeast(numberOfPartitions * minQueryRequestChargePerPartition)
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -180,7 +181,7 @@ public void queryOrderByString() throws Exception {
int pageSize = 3;
options.maxItemCount(pageSize);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
Comparator validatorComparator = Comparator.nullsFirst(Comparator.naturalOrder());
List expectedResourceIds = sortDocumentsAndCollectResourceIds("propStr", d -> d.getString("propStr"), validatorComparator);
@@ -194,7 +195,7 @@ public void queryOrderByString() throws Exception {
.totalRequestChargeIsAtLeast(numberOfPartitions * minQueryRequestChargePerPartition)
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@DataProvider(name = "topValue")
@@ -210,7 +211,7 @@ public void queryOrderWithTop(int topValue) throws Exception {
int pageSize = 3;
options.maxItemCount(pageSize);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
Comparator validatorComparator = Comparator.nullsFirst(Comparator.naturalOrder());
@@ -228,7 +229,7 @@ public void queryOrderWithTop(int topValue) throws Exception {
.totalRequestChargeIsAtLeast(numberOfPartitions * (topValue > 0 ? minQueryRequestChargePerPartition : 1))
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
private List sortDocumentsAndCollectResourceIds(String propName, Function extractProp, Comparator comparer) {
@@ -244,10 +245,10 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc
FeedOptions options = new FeedOptions();
options.partitionKey(new PartitionKey("duplicateParitionKeyValue"));
options.maxItemCount(3);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
TestSubscriber> subscriber = new TestSubscriber<>();
- queryObservable.take(1).subscribe(subscriber);
+ queryObservable.byPage().take(1).subscribe(subscriber);
subscriber.awaitTerminalEvent();
subscriber.assertComplete();
@@ -280,7 +281,7 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc
.requestChargeGreaterThanOrEqualTo(1.0).build())
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -454,12 +455,12 @@ private void assertInvalidContinuationToken(String query, int[] pageSize, List> queryObservable = createdCollection.queryItems(query,
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query,
options, CosmosItemProperties.class);
//Observable> firstPageObservable = queryObservable.first();
TestSubscriber> testSubscriber = new TestSubscriber<>();
- queryObservable.subscribe(testSubscriber);
+ queryObservable.byPage().subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
testSubscriber.assertError(CosmosClientException.class);
} while (requestContinuation != null);
@@ -487,12 +488,12 @@ private List queryWithContinuationTokens(String query, int
options.setMaxDegreeOfParallelism(2);
options.requestContinuation(requestContinuation);
- Flux> queryObservable = createdCollection.queryItems(query,
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query,
options, CosmosItemProperties.class);
- //Observable> firstPageObservable = queryObservable.first();
+ //Observable> firstPageObservable = queryObservable.byPage().first();
TestSubscriber> testSubscriber = new TestSubscriber<>();
- queryObservable.subscribe(testSubscriber);
+ queryObservable.byPage().subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertComplete();
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java
index 26accce8bc5b..81be32147b28 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java
@@ -9,6 +9,7 @@
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosClientException;
+import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
@@ -78,7 +79,7 @@ public void queryDocuments(boolean qmEnabled) {
options.populateQueryMetrics(qmEnabled);
options.setMaxDegreeOfParallelism(2);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
List expectedDocs = createdDocuments.stream().filter(d -> 99 == d.getInt("prop") ).collect(Collectors.toList());
assertThat(expectedDocs).isNotEmpty();
@@ -91,7 +92,7 @@ public void queryDocuments(boolean qmEnabled) {
.hasValidQueryMetrics(qmEnabled)
.build();
- validateQuerySuccess(queryObservable, validator, TIMEOUT);
+ validateQuerySuccess(queryObservable.byPage(), validator, TIMEOUT);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -103,12 +104,12 @@ public void queryMetricEquality() throws Exception {
options.populateQueryMetrics(true);
options.setMaxDegreeOfParallelism(0);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
- List> resultList1 = queryObservable.collectList().block();
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ List> resultList1 = queryObservable.byPage().collectList().block();
options.setMaxDegreeOfParallelism(4);
- Flux> threadedQueryObs = createdCollection.queryItems(query, options, CosmosItemProperties.class);
- List> resultList2 = threadedQueryObs.collectList().block();
+ CosmosContinuablePagedFlux threadedQueryObs = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ List> resultList2 = threadedQueryObs.byPage().collectList().block();
assertThat(resultList1.size()).isEqualTo(resultList2.size());
for(int i = 0; i < resultList1.size(); i++){
@@ -135,7 +136,7 @@ public void queryDocuments_NoResults() {
String query = "SELECT * from root r where r.id = '2'";
FeedOptions options = new FeedOptions();
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
FeedResponseListValidator validator = new FeedResponseListValidator.Builder()
.containsExactly(new ArrayList<>())
@@ -144,7 +145,7 @@ public void queryDocuments_NoResults() {
.pageSizeIsLessThanOrEqualTo(0)
.requestChargeGreaterThanOrEqualTo(1.0).build())
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = 2 * TIMEOUT)
@@ -155,7 +156,7 @@ public void queryDocumentsWithPageSize() {
options.maxItemCount(pageSize);
options.setMaxDegreeOfParallelism(-1);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
List expectedDocs = createdDocuments;
assertThat(expectedDocs).isNotEmpty();
@@ -172,7 +173,7 @@ public void queryDocumentsWithPageSize() {
.pageSizeIsLessThanOrEqualTo(pageSize)
.build())
.build();
- validateQuerySuccess(queryObservable, validator, 2 * subscriberValidationTimeout);
+ validateQuerySuccess(queryObservable.byPage(), validator, 2 * subscriberValidationTimeout);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -180,21 +181,21 @@ public void invalidQuerySyntax() {
String query = "I am an invalid query";
FeedOptions options = new FeedOptions();
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
FailureValidator validator = new FailureValidator.Builder()
.instanceOf(CosmosClientException.class)
.statusCode(400)
.notNullActivityId()
.build();
- validateQueryFailure(queryObservable, validator);
+ validateQueryFailure(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void crossPartitionQueryNotEnabled() {
String query = "SELECT * from root";
FeedOptions options = new FeedOptions();
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
List expectedDocs = createdDocuments;
FeedResponseListValidator validator =
@@ -205,7 +206,7 @@ public void crossPartitionQueryNotEnabled() {
.requestChargeGreaterThanOrEqualTo(1.0)
.build())
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = 2 * TIMEOUT)
@@ -220,6 +221,7 @@ public void partitionKeyRangeId() {
FeedOptions options = new FeedOptions();
partitionKeyRangeIdInternal(options, partitionKeyRangeId);
int queryResultCount = createdCollection.queryItems(query, options, CosmosItemProperties.class)
+ .byPage()
.flatMap(p -> Flux.fromIterable(p.getResults()))
.collectList().block().size();
@@ -285,10 +287,10 @@ public void queryDocumentsStringValue(){
String query = "Select value c.id from c";
- Flux> queryObservable = createdCollection.queryItems(query, options, String.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, String.class);
List fetchedResults = new ArrayList<>();
- queryObservable.map(stringFeedResponse -> fetchedResults.addAll(stringFeedResponse.getResults())).blockLast();
+ queryObservable.byPage().map(stringFeedResponse -> fetchedResults.addAll(stringFeedResponse.getResults())).blockLast();
assertThat(fetchedResults).containsAll(expectedValues);
}
@@ -318,10 +320,10 @@ public void queryDocumentsArrayValue(){
String query = "Select top 2 value c.sgmts from c";
- Flux> queryObservable = createdCollection.queryItems(query, options, List.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, List.class);
List fetchedResults = new ArrayList<>();
- queryObservable.map(feedResponse -> fetchedResults.addAll(feedResponse.getResults())).blockLast();
+ queryObservable.byPage().map(feedResponse -> fetchedResults.addAll(feedResponse.getResults())).blockLast();
assertThat(fetchedResults).containsAll(expectedValues);
}
@@ -335,10 +337,10 @@ public void queryDocumentsIntegerValue(){
String query = "Select value c.prop from c";
- Flux> queryObservable = createdCollection.queryItems(query, options, Integer.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, Integer.class);
List fetchedResults = new ArrayList<>();
- queryObservable.map(feedResponse -> fetchedResults.addAll(feedResponse.getResults())).blockLast();
+ queryObservable.byPage().map(feedResponse -> fetchedResults.addAll(feedResponse.getResults())).blockLast();
assertThat(fetchedResults).containsAll(expectedValues);
}
@@ -349,9 +351,9 @@ public void queryDocumentsPojo(){
options.setMaxDegreeOfParallelism(2);
String query = "Select * from c";
- Flux> queryObservable = createdCollection.queryItems(query, options, TestObject.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, TestObject.class);
List fetchedResults = new ArrayList<>();
- queryObservable.map(feedResponse -> fetchedResults.addAll(feedResponse.getResults())).blockLast();
+ queryObservable.byPage().map(feedResponse -> fetchedResults.addAll(feedResponse.getResults())).blockLast();
List assertTuples = createdDocuments.stream()
.map(cosmosItemProperties -> tuple(cosmosItemProperties.getId(),
@@ -482,11 +484,11 @@ public void invalidQuerySytax() throws Exception {
String query = "I am an invalid query";
FeedOptions options = new FeedOptions();
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
FailureValidator validator = new FailureValidator.Builder().instanceOf(CosmosClientException.class)
.statusCode(400).notNullActivityId().build();
- validateQueryFailure(queryObservable, validator);
+ validateQueryFailure(queryObservable.byPage(), validator);
}
public CosmosItemProperties createDocument(CosmosAsyncContainer cosmosContainer, int cnt) throws CosmosClientException {
@@ -523,10 +525,10 @@ private List queryWithContinuationTokens(String query, int
options.setMaxDegreeOfParallelism(2);
options.requestContinuation(requestContinuation);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
TestSubscriber> testSubscriber = new TestSubscriber<>();
- queryObservable.subscribe(testSubscriber);
+ queryObservable.byPage().subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertComplete();
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java
index ee7cf3c1e9b2..6fca8b4f25c2 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java
@@ -6,6 +6,7 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosClientException;
+import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.CosmosItemRequestOptions;
import com.azure.cosmos.FeedOptions;
@@ -59,7 +60,7 @@ public void queryDocuments(boolean queryMetricsEnabled) throws Exception {
options.maxItemCount(5);
options.populateQueryMetrics(queryMetricsEnabled);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
List expectedDocs = createdDocuments.stream().filter(d -> 99 == d.getInt("prop") ).collect(Collectors.toList());
assertThat(expectedDocs).isNotEmpty();
@@ -75,7 +76,7 @@ public void queryDocuments(boolean queryMetricsEnabled) throws Exception {
.hasValidQueryMetrics(queryMetricsEnabled)
.build();
- validateQuerySuccess(queryObservable, validator, 10000);
+ validateQuerySuccess(queryObservable.byPage(), validator, 10000);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -87,7 +88,7 @@ public void queryDocuments_ParameterizedQueryWithInClause() throws Exception {
FeedOptions options = new FeedOptions();
options.maxItemCount(5);
- Flux> queryObservable = createdCollection.queryItems(sqs, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(sqs, options, CosmosItemProperties.class);
List expectedDocs = createdDocuments.stream().filter(d -> (3 == d.getInt("prop") || 4 == d.getInt("prop"))).collect(Collectors.toList());
assertThat(expectedDocs).isNotEmpty();
@@ -102,7 +103,7 @@ public void queryDocuments_ParameterizedQueryWithInClause() throws Exception {
.requestChargeGreaterThanOrEqualTo(1.0).build())
.build();
- validateQuerySuccess(queryObservable, validator, 10000);
+ validateQuerySuccess(queryObservable.byPage(), validator, 10000);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -114,7 +115,7 @@ public void queryDocuments_ParameterizedQuery() throws Exception {
FeedOptions options = new FeedOptions();
options.maxItemCount(5);
- Flux> queryObservable = createdCollection.queryItems(sqs, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(sqs, options, CosmosItemProperties.class);
List expectedDocs = createdDocuments.stream().filter(d -> 3 == d.getInt("prop")).collect(Collectors.toList());
assertThat(expectedDocs).isNotEmpty();
@@ -129,7 +130,7 @@ public void queryDocuments_ParameterizedQuery() throws Exception {
.requestChargeGreaterThanOrEqualTo(1.0).build())
.build();
- validateQuerySuccess(queryObservable, validator, 10000);
+ validateQuerySuccess(queryObservable.byPage(), validator, 10000);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -138,7 +139,7 @@ public void queryDocuments_NoResults() throws Exception {
String query = "SELECT * from root r where r.id = '2'";
FeedOptions options = new FeedOptions();
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
FeedResponseListValidator validator = new FeedResponseListValidator.Builder()
.containsExactly(new ArrayList<>())
@@ -146,7 +147,7 @@ public void queryDocuments_NoResults() throws Exception {
.pageSatisfy(0, new FeedResponseValidator.Builder()
.requestChargeGreaterThanOrEqualTo(1.0).build())
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -156,7 +157,7 @@ public void queryDocumentsWithPageSize() throws Exception {
FeedOptions options = new FeedOptions();
options.maxItemCount(3);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
List expectedDocs = createdDocuments;
int expectedPageSize = (expectedDocs.size() + options.maxItemCount() - 1) / options.maxItemCount();
@@ -172,7 +173,7 @@ public void queryDocumentsWithPageSize() throws Exception {
.requestChargeGreaterThanOrEqualTo(1.0).build())
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -182,7 +183,7 @@ public void queryOrderBy() throws Exception {
FeedOptions options = new FeedOptions();
options.maxItemCount(3);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
List expectedDocs = createdDocuments;
int expectedPageSize = (expectedDocs.size() + options.maxItemCount() - 1) / options.maxItemCount();
@@ -196,7 +197,7 @@ public void queryOrderBy() throws Exception {
.requestChargeGreaterThanOrEqualTo(1.0).build())
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT * 1000)
@@ -205,10 +206,10 @@ public void continuationToken() throws Exception {
FeedOptions options = new FeedOptions();
options.maxItemCount(3);
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
TestSubscriber> subscriber = new TestSubscriber<>();
- queryObservable.take(1).subscribe(subscriber);
+ queryObservable.byPage().take(1).subscribe(subscriber);
subscriber.awaitTerminalEvent();
subscriber.assertComplete();
@@ -236,7 +237,7 @@ public void continuationToken() throws Exception {
.allPagesSatisfy(new FeedResponseValidator.Builder()
.requestChargeGreaterThanOrEqualTo(1.0).build())
.build();
- validateQuerySuccess(queryObservable, validator);
+ validateQuerySuccess(queryObservable.byPage(), validator);
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
@@ -244,14 +245,14 @@ public void invalidQuerySytax() throws Exception {
String query = "I am an invalid query";
FeedOptions options = new FeedOptions();
- Flux> queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
+ CosmosContinuablePagedFlux queryObservable = createdCollection.queryItems(query, options, CosmosItemProperties.class);
FailureValidator validator = new FailureValidator.Builder()
.instanceOf(CosmosClientException.class)
.statusCode(400)
.notNullActivityId()
.build();
- validateQueryFailure(queryObservable, validator);
+ validateQueryFailure(queryObservable.byPage(), validator);
}
public CosmosItemProperties createDocument(CosmosAsyncContainer cosmosContainer, int cnt) {
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java
index e29e644a3ecd..a2e6bc72d84b 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java
@@ -221,6 +221,7 @@ protected static void truncateCollection(CosmosAsyncContainer cosmosContainer) {
logger.info("Truncating collection {} documents ...", cosmosContainer.getId());
cosmosContainer.queryItems("SELECT * FROM root", options, CosmosItemProperties.class)
+ .byPage(100)
.publishOn(Schedulers.parallel())
.flatMap(page -> Flux.fromIterable(page.getResults()))
.flatMap(doc -> {
@@ -558,6 +559,7 @@ public static void deleteDocumentIfExists(CosmosAsyncClient client, String datab
List res = cosmosContainer
.queryItems(String.format("SELECT * FROM root r where r.id = '%s'", docId), options, CosmosItemProperties.class)
+ .byPage()
.flatMap(page -> Flux.fromIterable(page.getResults()))
.collectList().block();
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TopQueryTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TopQueryTests.java
index d21f6adeee32..6a24e94803cc 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TopQueryTests.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TopQueryTests.java
@@ -6,6 +6,7 @@
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
@@ -60,31 +61,31 @@ public void queryDocumentsWithTop(boolean qmEnabled) throws Exception {
int[] expectedPageLengths = new int[] { 9, 9, 2 };
for (int i = 0; i < 2; i++) {
- Flux> queryObservable1 = createdCollection.queryItems("SELECT TOP 0 value AVG(c.field) from c",
+ CosmosContinuablePagedFlux