Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions sdk/cosmos/azure-cosmos-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ Licensed under the MIT License.
<url>https://github.com/Azure/azure-sdk-for-java</url>
</scm>

<repositories>
<repository>
<id>azure-sdk-for-java</id>
<url>https://pkgs.dev.azure.com/azure-sdk/public/_packaging/azure-sdk-for-java/maven/v1</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>

<dependency>
Expand Down Expand Up @@ -101,6 +114,11 @@ Licensed under the MIT License.
<artifactId>slf4j-log4j12</artifactId>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ protected void performWorkload(BaseSubscriber<Object> documentBaseSubscriber, lo
} else if (i % 100 == 0) {

FeedOptions options = new FeedOptions();
options.maxItemCount(10);

String sqlQuery = "Select top 100 * from c order by c._ts";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage(10);
} else {

int index = r.nextInt(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.PartitionKey;
import com.azure.cosmos.SqlParameter;
import com.azure.cosmos.SqlQuerySpec;
import com.codahale.metrics.Timer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -78,37 +80,37 @@ protected void performWorkload(BaseSubscriber<FeedResponse<PojoizedJson>> baseSu

int index = r.nextInt(1000);
String sqlQuery = "Select * from c where c.id = \"" + docsToRead.get(index).getId() + "\"";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QuerySingle) {

int index = r.nextInt(1000);
String pk = docsToRead.get(index).getProperty(partitionKey);
options.partitionKey(new PartitionKey(pk));
String sqlQuery = "Select * from c where c." + partitionKey + " = \"" + pk + "\"";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryParallel) {

options.maxItemCount(10);
String sqlQuery = "Select * from c";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryOrderby) {

options.maxItemCount(10);
String sqlQuery = "Select * from c order by c._ts";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryAggregate) {

options.maxItemCount(10);
String sqlQuery = "Select value max(c._ts) from c";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryAggregateTopOrderby) {

String sqlQuery = "Select top 1 value count(c) from c order by c._ts";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryTopOrderby) {

String sqlQuery = "Select top 1000 * from c order by c._ts";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class).byPage();
} else if (configuration.getOperationType() == Configuration.Operation.QueryInClauseParallel) {

ReadMyWriteWorkflow.QueryBuilder queryBuilder = new ReadMyWriteWorkflow.QueryBuilder();
Expand All @@ -125,7 +127,7 @@ protected void performWorkload(BaseSubscriber<FeedResponse<PojoizedJson>> baseSu
parameters));

SqlQuerySpec query = queryBuilder.toSqlQuerySpec();
obs = cosmosAsyncContainer.queryItems(query, options, PojoizedJson.class);
obs = cosmosAsyncContainer.queryItems(query, options, PojoizedJson.class).byPage();
} else {
throw new IllegalArgumentException("Unsupported Operation: " + configuration.getOperationType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.PartitionKey;
Expand Down Expand Up @@ -36,7 +37,7 @@ protected void onSuccess() {

@Override
protected void performWorkload(BaseSubscriber<FeedResponse<PojoizedJson>> baseSubscriber, long i) throws InterruptedException {
Flux<FeedResponse<PojoizedJson>> obs = cosmosAsyncContainer.queryItems(SQL_QUERY, options, PojoizedJson.class);
Flux<FeedResponse<PojoizedJson>> obs = cosmosAsyncContainer.queryItems(SQL_QUERY, options, PojoizedJson.class).byPage();

concurrencyControlSemaphore.acquire();

Expand Down
18 changes: 18 additions & 0 deletions sdk/cosmos/azure-cosmos-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ Licensed under the MIT License.
<url>https://github.com/Azure/azure-sdk-for-java</url>
</scm>

<repositories>
<repository>
<id>azure-sdk-for-java</id>
<url>https://pkgs.dev.azure.com/azure-sdk/public/_packaging/azure-sdk-for-java/maven/v1</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<build>
<plugins>
<plugin>
Expand Down Expand Up @@ -118,6 +131,11 @@ Licensed under the MIT License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.CosmosAsyncItemResponse;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosContainerProperties;
import com.azure.cosmos.CosmosContinuablePagedFlux;
import com.azure.cosmos.CosmosItemProperties;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
Expand Down Expand Up @@ -115,7 +116,7 @@ private void queryItems() {
String query = "SELECT * from root";
FeedOptions options = new FeedOptions();
options.setMaxDegreeOfParallelism(2);
Flux<FeedResponse<TestObject>> queryFlux = container.queryItems(query, options, TestObject.class);
Flux<FeedResponse<TestObject>> queryFlux = container.queryItems(query, options, TestObject.class).byPage();

queryFlux.publishOn(Schedulers.elastic())
.toIterable()
Expand All @@ -135,7 +136,7 @@ private void queryWithContinuationToken() {
String continuation = null;
do {
options.requestContinuation(continuation);
Flux<FeedResponse<TestObject>> queryFlux = container.queryItems(query, options, TestObject.class);
Flux<FeedResponse<TestObject>> queryFlux = container.queryItems(query, options, TestObject.class).byPage();
FeedResponse<TestObject> page = queryFlux.blockFirst();
assert page != null;
log(page.getResults());
Expand Down
24 changes: 13 additions & 11 deletions sdk/cosmos/azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ Licensed under the MIT License.
<url>https://github.com/Azure/azure-sdk-for-java</url>
</scm>

<repositories>
<repository>
<id>azure-sdk-for-java</id>
<url>https://pkgs.dev.azure.com/azure-sdk/public/_packaging/azure-sdk-for-java/maven/v1</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>

<dependency>
Expand Down Expand Up @@ -66,17 +79,6 @@ Licensed under the MIT License.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.2.0</version>
<exclusions>
<exclusion>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,4 +465,17 @@ public static ConcurrentMap<String, QueryMetrics> queryMetricsFromFeedResponse(F
public static PartitionKeyInternal getPartitionKeyInternal(PartitionKey partitionKey) {
return partitionKey.getInternalPartitionKey();
}

public static void setFeedOptionsContinuationTokenAndMaxItemCount(FeedOptions feedOptions, String continuationToken, Integer maxItemCount) {
feedOptions.requestContinuation(continuationToken);
feedOptions.maxItemCount(maxItemCount);
}

public static void setFeedOptionsContinuationToken(FeedOptions feedOptions, String continuationToken) {
feedOptions.requestContinuation(continuationToken);
}

public static void setFeedOptionsMaxItemCount(FeedOptions feedOptions, Integer maxItemCount) {
feedOptions.maxItemCount(maxItemCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,49 +278,49 @@ public <T> Flux<FeedResponse<T>> readAllItems(FeedOptions options, Class<T> klas
/**
* Query for documents in a items in a container
* <p>
* After subscription the operation will be performed. The {@link Flux} will
* After subscription the operation will be performed. The {@link CosmosContinuablePagedFlux} will
* contain one or several feed response of the obtained items. In case of
* failure the {@link Flux} will error.
* failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param <T> the type parameter
* @param query the query.
* @param klass the class type
* @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
* @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
public <T> Flux<FeedResponse<T>> queryItems(String query, Class<T> klass) {
public <T> CosmosContinuablePagedFlux<T> queryItems(String query, Class<T> klass) {
return queryItems(new SqlQuerySpec(query), klass);
}

/**
* Query for documents in a items in a container
* <p>
* After subscription the operation will be performed. The {@link Flux} will
* After subscription the operation will be performed. The {@link CosmosContinuablePagedFlux} will
* contain one or several feed response of the obtained items. In case of
* failure the {@link Flux} will error.
* failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param <T> the type parameter
* @param query the query.
* @param options the feed options.
* @param klass the class type
* @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
* @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
public <T> Flux<FeedResponse<T>> queryItems(String query, FeedOptions options, Class<T> klass) {
public <T> CosmosContinuablePagedFlux<T> queryItems(String query, FeedOptions options, Class<T> klass) {
return queryItems(new SqlQuerySpec(query), options, klass);
}

/**
* Query for documents in a items in a container
* <p>
* After subscription the operation will be performed. The {@link Flux} will
* After subscription the operation will be performed. The {@link CosmosContinuablePagedFlux} will
* contain one or several feed response of the obtained items. In case of
* failure the {@link Flux} will error.
* failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param <T> the type parameter
* @param querySpec the SQL query specification.
* @param klass the class type
* @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
* @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
public <T> Flux<FeedResponse<T>> queryItems(SqlQuerySpec querySpec, Class<T> klass) {
public <T> CosmosContinuablePagedFlux<T> queryItems(SqlQuerySpec querySpec, Class<T> klass) {
return queryItems(querySpec, new FeedOptions(), klass);
}

Expand All @@ -329,22 +329,33 @@ public <T> Flux<FeedResponse<T>> queryItems(SqlQuerySpec querySpec, Class<T> kla
* <p>
* After subscription the operation will be performed. The {@link Flux} will
* contain one or several feed response of the obtained items. In case of
* failure the {@link Flux} will error.
* failure the {@link CosmosContinuablePagedFlux} will error.
*
* @param <T> the type parameter
* @param querySpec the SQL query specification.
* @param options the feed options.
* @param klass the class type
* @return a {@link Flux} containing one or several feed response pages of the obtained items or an error.
* @return a {@link CosmosContinuablePagedFlux} containing one or several feed response pages of the obtained items or an error.
*/
public <T> Flux<FeedResponse<T>> queryItems(SqlQuerySpec querySpec, FeedOptions options, Class<T> klass) {
return getDatabase().getDocClientWrapper().queryDocuments(getLink(),
querySpec, options)
.map(response -> BridgeInternal.createFeedResponseWithQueryMetrics(
(CosmosItemProperties
.getTypedResultsFromV2Results((List<Document>) (Object) response.getResults(),
klass)), response.getResponseHeaders(),
response.queryMetrics()));
public <T> CosmosContinuablePagedFlux<T> queryItems(SqlQuerySpec querySpec, FeedOptions options, Class<T> klass) {
return queryItemsInternal(querySpec, options, klass);
}

private <T> CosmosContinuablePagedFlux<T> queryItemsInternal(SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions, Class<T> klass) {
return new CosmosContinuablePagedFlux<>(pagedFluxOptions -> {
if (pagedFluxOptions.getRequestContinuation() != null) {
feedOptions.requestContinuation(pagedFluxOptions.getRequestContinuation());
}
if (pagedFluxOptions.getMaxItemCount() != null) {
feedOptions.maxItemCount(pagedFluxOptions.getMaxItemCount());
}
return getDatabase().getDocClientWrapper().queryDocuments(CosmosAsyncContainer.this.getLink(), sqlQuerySpec, feedOptions)
.map(response ->
BridgeInternal.createFeedResponseWithQueryMetrics((
CosmosItemProperties.getTypedResultsFromV2Results((List<Document>)(Object)response.getResults(), klass)),
response.getResponseHeaders(),
response.queryMetrics()));
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos;

import com.azure.core.util.IterableStream;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -254,10 +255,10 @@ public <T> Iterator<FeedResponse<T>> readAllItems(FeedOptions options, Class<T>
* @param query the query
* @param options the options
* @param klass the class type
* @return the iterator
* @return the {@link IterableStream}
*/
public <T> Iterator<FeedResponse<T>> queryItems(String query, FeedOptions options, Class<T> klass) {
return getFeedIterator(this.asyncContainer.queryItems(query, options, klass));
public <T> IterableStream<FeedResponse<T>> queryItems(String query, FeedOptions options, Class<T> klass) {
return getFeedIterableStream(this.asyncContainer.queryItems(query, options, klass));
}

/**
Expand All @@ -270,7 +271,7 @@ public <T> Iterator<FeedResponse<T>> queryItems(String query, FeedOptions option
* @return the iterator
*/
public <T> Iterator<FeedResponse<T>> queryItems(SqlQuerySpec querySpec, FeedOptions options, Class<T> klass) {
return getFeedIterator(this.asyncContainer.queryItems(querySpec, options, klass));
return getFeedIterator(this.asyncContainer.queryItems(querySpec, options, klass).byPage());
}

/**
Expand Down Expand Up @@ -368,4 +369,8 @@ private <T> Iterator<FeedResponse<T>> getFeedIterator(Flux<FeedResponse<T>> item
return itemFlux.toIterable(1).iterator();
}

private <T> IterableStream<FeedResponse<T>> getFeedIterableStream(CosmosContinuablePagedFlux<T> cosmosContinuablePagedFlux) {
return IterableStream.of(cosmosContinuablePagedFlux.byPage().toIterable(1));
}

}
Loading