Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected void performWorkload(BaseSubscriber<Object> documentBaseSubscriber, lo
options.maxItemCount(10);

String sqlQuery = "Select top 100 * from c order by c._ts";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else {

int index = r.nextInt(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.PartitionKey;
import com.azure.cosmos.SqlParameter;
import com.azure.cosmos.SqlQuerySpec;
import com.codahale.metrics.Timer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -78,37 +80,37 @@ protected void performWorkload(BaseSubscriber<FeedResponse<PojoizedJson>> baseSu

int index = r.nextInt(1000);
String sqlQuery = "Select * from c where c.id = \"" + docsToRead.get(index).getId() + "\"";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QuerySingle) {

int index = r.nextInt(1000);
String pk = docsToRead.get(index).getProperty(partitionKey);
options.partitionKey(new PartitionKey(pk));
String sqlQuery = "Select * from c where c." + partitionKey + " = \"" + pk + "\"";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryParallel) {

options.maxItemCount(10);
String sqlQuery = "Select * from c";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryOrderby) {

options.maxItemCount(10);
String sqlQuery = "Select * from c order by c._ts";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryAggregate) {

options.maxItemCount(10);
String sqlQuery = "Select value max(c._ts) from c";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryAggregateTopOrderby) {

String sqlQuery = "Select top 1 value count(c) from c order by c._ts";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryTopOrderby) {

String sqlQuery = "Select top 1000 * from c order by c._ts";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryInClauseParallel) {

ReadMyWriteWorkflow.QueryBuilder queryBuilder = new ReadMyWriteWorkflow.QueryBuilder();
Expand All @@ -125,7 +127,7 @@ protected void performWorkload(BaseSubscriber<FeedResponse<PojoizedJson>> baseSu
parameters));

SqlQuerySpec query = queryBuilder.toSqlQuerySpec();
obs = cosmosAsyncContainer.queryItems(query, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(query, options, PojoizedJson.class).byPage();
} else {
throw new IllegalArgumentException("Unsupported Operation: " + configuration.getOperationType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.PartitionKey;
Expand Down Expand Up @@ -36,7 +37,7 @@ protected void onSuccess() {

@Override
protected void performWorkload(BaseSubscriber<FeedResponse<PojoizedJson>> baseSubscriber, long i) throws InterruptedException {
Flux<FeedResponse<PojoizedJson>> obs = cosmosAsyncContainer.queryItems(SQL_QUERY, options, PojoizedJson.class);
Flux<FeedResponse<PojoizedJson>> obs = cosmosAsyncContainer.queryItems(SQL_QUERY, options, PojoizedJson.class).byPage();

concurrencyControlSemaphore.acquire();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.CosmosAsyncItemResponse;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosContainerProperties;
import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
Expand Down Expand Up @@ -115,7 +116,7 @@ private void queryItems() {
String query = "SELECT * from root";
FeedOptions options = new FeedOptions();
options.setMaxDegreeOfParallelism(2);
Flux<FeedResponse<TestObject>> queryFlux = container.queryItems(query, options, TestObject.class);
Flux<FeedResponse<TestObject>> queryFlux = container.queryItems(query, options, TestObject.class).byPage();

queryFlux.publishOn(Schedulers.elastic())
.toIterable()
Expand All @@ -135,7 +136,7 @@ private void queryWithContinuationToken() {
String continuation = null;
do {
options.requestContinuation(continuation);
Flux<FeedResponse<TestObject>> queryFlux = container.queryItems(query, options, TestObject.class);
Flux<FeedResponse<TestObject>> queryFlux = container.queryItems(query, options, TestObject.class).byPage();
FeedResponse<TestObject> page = queryFlux.blockFirst();
assert page != null;
log(page.getResults());
Expand Down
19 changes: 18 additions & 1 deletion sdk/cosmos/azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,35 @@ Licensed under the MIT License.
<id>azure-java-build-docs</id>
<url>${site.url}/site/${project.artifactId}</url>
</site>
<repository>
<id>azure-sdk-for-java</id>
<url>https://pkgs.dev.azure.com/azure-sdk/public/_packaging/azure-sdk-for-java/maven/v1</url>
</repository>
</distributionManagement>

<scm>
<url>https://github.com/Azure/azure-sdk-for-java</url>
</scm>

<repositories>
<repository>
<id>azure-sdk-for-java</id>
<url>https://pkgs.dev.azure.com/azure-sdk/public/_packaging/azure-sdk-for-java/maven/v1</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<!-- https://mvnrepository.com/artifact/com.microsoft.azure/azure-core -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.0.0</version>
<version>1.3.0-beta.1.dev.20200120.1</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,49 +278,49 @@ public <T> Flux<FeedResponse<T>> readAllItems(FeedOptions options, Class<T> klas
/**
* Query for documents in a items in a container
* <p>
* After subscription the operation will be performed. The {@link Flux} will
* After subscription the operation will be performed. The {@link CosmosContinuablePagedFlux} will
* contain one or several feed response of the obtained items. In case of
* failure the {@link Flux} will error.
* failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param <T> the type parameter
* @param query the query.
* @param klass the class type
* @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
* @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
public <T> Flux<FeedResponse<T>> queryItems(String query, Class<T> klass) {
public <T> CosmosContinuablePagedFlux<T> queryItems(String query, Class<T> klass) {
return queryItems(new SqlQuerySpec(query), klass);
}

/**
* Query for documents in a items in a container
* <p>
* After subscription the operation will be performed. The {@link Flux} will
* After subscription the operation will be performed. The {@link CosmosContinuablePagedFlux} will
* contain one or several feed response of the obtained items. In case of
* failure the {@link Flux} will error.
* failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param <T> the type parameter
* @param query the query.
* @param options the feed options.
* @param klass the class type
* @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
* @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
public <T> Flux<FeedResponse<T>> queryItems(String query, FeedOptions options, Class<T> klass) {
public <T> CosmosContinuablePagedFlux<T> queryItems(String query, FeedOptions options, Class<T> klass) {
return queryItems(new SqlQuerySpec(query), options, klass);
}

/**
* Query for documents in a items in a container
* <p>
* After subscription the operation will be performed. The {@link Flux} will
* After subscription the operation will be performed. The {@link CosmosContinuablePagedFlux} will
* contain one or several feed response of the obtained items. In case of
* failure the {@link Flux} will error.
* failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param <T> the type parameter
* @param querySpec the SQL query specification.
* @param klass the class type
* @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
* @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
public <T> Flux<FeedResponse<T>> queryItems(SqlQuerySpec querySpec, Class<T> klass) {
public <T> CosmosContinuablePagedFlux<T> queryItems(SqlQuerySpec querySpec, Class<T> klass) {
return queryItems(querySpec, new FeedOptions(), klass);
}

Expand All @@ -329,22 +329,33 @@ public <T> Flux<FeedResponse<T>> queryItems(SqlQuerySpec querySpec, Class<T> kla
* <p>
* After subscription the operation will be performed. The {@link Flux} will
* contain one or several feed response of the obtained items. In case of
* failure the {@link Flux} will error.
* failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param <T> the type parameter
* @param querySpec the SQL query specification.
* @param options the feed options.
* @param klass the class type
* @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
* @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
public <T> Flux<FeedResponse<T>> queryItems(SqlQuerySpec querySpec, FeedOptions options, Class<T> klass) {
return getDatabase().getDocClientWrapper().queryDocuments(getLink(),
querySpec, options)
.map(response -> BridgeInternal.createFeedResponseWithQueryMetrics(
(CosmosItemProperties
.getTypedResultsFromV2Results((List<Document>) (Object) response.getResults(),
klass)), response.getResponseHeaders(),
response.queryMetrics()));
public <T> CosmosContinuablePagedFlux<T> queryItems(SqlQuerySpec querySpec, FeedOptions options, Class<T> klass) {
Comment thread
kushagraThapar marked this conversation as resolved.
return queryItemsInternal(querySpec, options, klass);
}

private <T> CosmosContinuablePagedFlux<T> queryItemsInternal(SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions, Class<T> klass) {
return new CosmosContinuablePagedFlux<>(pagedFluxOptions -> {
if (pagedFluxOptions.getRequestContinuation() != null) {
feedOptions.requestContinuation(pagedFluxOptions.getRequestContinuation());
}
if (pagedFluxOptions.getMaxItemCount() != null) {
feedOptions.maxItemCount(pagedFluxOptions.getMaxItemCount());
}
return getDatabase().getDocClientWrapper().queryDocuments(CosmosAsyncContainer.this.getLink(), sqlQuerySpec, feedOptions)
.map(response ->
BridgeInternal.createFeedResponseWithQueryMetrics((
CosmosItemProperties.getTypedResultsFromV2Results((List<Document>)(Object)response.getResults(), klass)),
response.getResponseHeaders(),
response.queryMetrics()));
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ public <T> Iterator<FeedResponse<T>> readAllItems(FeedOptions options, Class<T>
* @return the iterator
*/
public <T> Iterator<FeedResponse<T>> queryItems(String query, FeedOptions options, Class<T> klass) {
return getFeedIterator(this.asyncContainer.queryItems(query, options, klass));
// TODO: Temporary change for testing, remove byPage() once all APIs are migrated and change getFeedIterator() method to accept com.azure.cosmos.CosmosContinuablePagedFlux
return getFeedIterator(this.asyncContainer.queryItems(query, options, klass).byPage());
}

/**
Expand All @@ -270,7 +271,7 @@ public <T> Iterator<FeedResponse<T>> queryItems(String query, FeedOptions option
* @return the iterator
*/
public <T> Iterator<FeedResponse<T>> queryItems(SqlQuerySpec querySpec, FeedOptions options, Class<T> klass) {
return getFeedIterator(this.asyncContainer.queryItems(querySpec, options, klass));
return getFeedIterator(this.asyncContainer.queryItems(querySpec, options, klass).byPage());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

import com.azure.core.util.IterableStream;
import com.azure.core.util.paging.ContinuablePagedFlux;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;

import java.util.function.Function;

/**
* Cosmos implementation of {@link ContinuablePagedFlux}.
* <p>
* This type is a Flux that provides the ability to operate on pages of type {@link FeedResponse}
* and individual items in such pages. This type supports {@link String} type continuation tokens,
* allowing for restarting from a previously-retrieved continuation token.
* <p>
* For more information on the base type, refer {@link ContinuablePagedFlux}
*
* @param <T> The type of elements in a {@link com.azure.core.util.paging.ContinuablePage}
* @see com.azure.core.util.paging.ContinuablePage
* @see CosmosPagedFluxOptions
* @see FeedResponse
*/
public class CosmosContinuablePagedFlux<T> extends ContinuablePagedFlux<String, T, FeedResponse<T>> {

private final Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction;

public CosmosContinuablePagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction) {
Comment thread
kushagraThapar marked this conversation as resolved.
Outdated
this.optionsFluxFunction = optionsFluxFunction;
}

@Override
public Flux<FeedResponse<T>> byPage() {
CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();

return this.optionsFluxFunction.apply(cosmosPagedFluxOptions);
}

@Override
public Flux<FeedResponse<T>> byPage(String continuationToken) {
CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
cosmosPagedFluxOptions.setRequestContinuation(continuationToken);

return this.optionsFluxFunction.apply(cosmosPagedFluxOptions);
}

@Override
public Flux<FeedResponse<T>> byPage(int preferredPageSize) {
CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);

return this.optionsFluxFunction.apply(cosmosPagedFluxOptions);
}

@Override
public Flux<FeedResponse<T>> byPage(String continuationToken, int preferredPageSize) {
CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);

return this.optionsFluxFunction.apply(cosmosPagedFluxOptions);
}

/**
* Subscribe to consume all items of type {@code T} in the sequence respectively.
* This is recommended for most common scenarios. This will seamlessly fetch next
* page when required and provide with a {@link Flux} of items.
*
* @param coreSubscriber The subscriber for this {@link CosmosContinuablePagedFlux}
*/
@Override
public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
Flux<FeedResponse<T>> pagedResponse = this.byPage();
pagedResponse.flatMap(tFeedResponse -> {
Comment thread
kushagraThapar marked this conversation as resolved.
IterableStream<T> elements = tFeedResponse.getElements();
if (elements == null) {
return Flux.empty();
}
return Flux.fromIterable(elements);
}).subscribe(coreSubscriber);
}
}
Loading