diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java index f5cec6f0c554..88bf88cb3ced 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java @@ -3,8 +3,11 @@ package com.azure.cosmos.benchmark; -import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosBridgeInternal; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.Database; import com.azure.cosmos.implementation.Document; import com.azure.cosmos.implementation.DocumentCollection; @@ -47,6 +50,7 @@ abstract class AsyncBenchmark { private Meter failureMeter; final Logger logger; + final CosmosAsyncClient v4Client; final AsyncDocumentClient client; final DocumentCollection collection; final String partitionKey; @@ -56,15 +60,17 @@ abstract class AsyncBenchmark { Timer latency; AsyncBenchmark(Configuration cfg) { - client = new AsyncDocumentClient.Builder() - .withServiceEndpoint(cfg.getServiceEndpoint()) - .withMasterKeyOrResourceToken(cfg.getMasterKey()) - .withConnectionPolicy(cfg.getConnectionPolicy()) - .withConsistencyLevel(cfg.getConsistencyLevel()) - .build(); + v4Client = new CosmosClientBuilder() + .setEndpoint(cfg.getServiceEndpoint()) + .setKey(cfg.getMasterKey()) + .setConnectionPolicy(cfg.getConnectionPolicy()) + .setConsistencyLevel(cfg.getConsistencyLevel()) + .buildAsyncClient(); logger = LoggerFactory.getLogger(this.getClass()); + client = CosmosBridgeInternal.getAsyncDocumentClient(v4Client); + Database database = DocDBUtils.getDatabase(client, cfg.getDatabaseId()); collection = DocDBUtils.getCollection(client, database.getSelfLink(), cfg.getCollectionId()); nameCollectionLink = String.format("dbs/%s/colls/%s", database.getId(), collection.getId()); @@ -133,7 +139,7 @@ protected void init() { } void shutdown() { - client.close(); + v4Client.close(); } protected void onSuccess() { diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java index b6987d43e201..b0242cad37a8 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncReadBenchmark.java @@ -3,24 +3,24 @@ package com.azure.cosmos.benchmark; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncItemResponse; import com.azure.cosmos.implementation.Document; -import com.azure.cosmos.PartitionKey; -import com.azure.cosmos.implementation.RequestOptions; -import com.azure.cosmos.implementation.ResourceResponse; import com.codahale.metrics.Timer; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -class AsyncReadBenchmark extends AsyncBenchmark> { +class AsyncReadBenchmark extends AsyncBenchmark { + private final CosmosAsyncContainer cosmosAsyncContainer; class LatencySubscriber extends BaseSubscriber { Timer.Context context; - BaseSubscriber> baseSubscriber; + BaseSubscriber baseSubscriber; - LatencySubscriber(BaseSubscriber> baseSubscriber) { + LatencySubscriber(BaseSubscriber baseSubscriber) { this.baseSubscriber = baseSubscriber; } @@ -48,24 +48,26 @@ protected void hookOnError(Throwable throwable) { AsyncReadBenchmark(Configuration cfg) { super(cfg); + // TODO: once all benchmarks move to v4 api, we should rely on the container which is read in the parent, and remove this. + cosmosAsyncContainer = v4Client.getDatabase(cfg.getDatabaseId()).getContainer(cfg.getCollectionId()).read().block().getContainer(); } @Override - protected void performWorkload(BaseSubscriber> baseSubscriber, long i) throws InterruptedException { + protected void performWorkload(BaseSubscriber baseSubscriber, long i) throws InterruptedException { int index = (int) (i % docsToRead.size()); - RequestOptions options = new RequestOptions(); - options.setPartitionKey(new PartitionKey(docsToRead.get(index).getId())); + Document doc = docsToRead.get(index); - Flux> obs = client.readDocument(getDocumentLink(docsToRead.get(index)), options); + String partitionKeyValue = doc.getId(); + Mono result = cosmosAsyncContainer.getItem(doc.getId(), partitionKeyValue).read(); concurrencyControlSemaphore.acquire(); if (configuration.getOperationType() == Configuration.Operation.ReadThroughput) { - obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber); + result.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber); } else { - LatencySubscriber> latencySubscriber = new LatencySubscriber<>(baseSubscriber); + LatencySubscriber latencySubscriber = new LatencySubscriber<>(baseSubscriber); latencySubscriber.context = latency.time(); - obs.subscribeOn(Schedulers.parallel()).subscribe(latencySubscriber); + result.subscribeOn(Schedulers.parallel()).subscribe(latencySubscriber); } } }