Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Database;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.DocumentCollection;
Expand Down Expand Up @@ -47,6 +50,7 @@ abstract class AsyncBenchmark<T> {
private Meter failureMeter;

final Logger logger;
final CosmosAsyncClient v4Client;
final AsyncDocumentClient client;
final DocumentCollection collection;
final String partitionKey;
Expand All @@ -56,15 +60,17 @@ abstract class AsyncBenchmark<T> {
Timer latency;

AsyncBenchmark(Configuration cfg) {
client = new AsyncDocumentClient.Builder()
.withServiceEndpoint(cfg.getServiceEndpoint())
.withMasterKeyOrResourceToken(cfg.getMasterKey())
.withConnectionPolicy(cfg.getConnectionPolicy())
.withConsistencyLevel(cfg.getConsistencyLevel())
.build();
v4Client = new CosmosClientBuilder()
.setEndpoint(cfg.getServiceEndpoint())
.setKey(cfg.getMasterKey())
.setConnectionPolicy(cfg.getConnectionPolicy())
.setConsistencyLevel(cfg.getConsistencyLevel())
.buildAsyncClient();

logger = LoggerFactory.getLogger(this.getClass());

client = CosmosBridgeInternal.getAsyncDocumentClient(v4Client);

Database database = DocDBUtils.getDatabase(client, cfg.getDatabaseId());
collection = DocDBUtils.getCollection(client, database.getSelfLink(), cfg.getCollectionId());
nameCollectionLink = String.format("dbs/%s/colls/%s", database.getId(), collection.getId());
Expand Down Expand Up @@ -133,7 +139,7 @@ protected void init() {
}

void shutdown() {
client.close();
v4Client.close();
}

protected void onSuccess() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncItemResponse;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.PartitionKey;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.ResourceResponse;
import com.codahale.metrics.Timer;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

class AsyncReadBenchmark extends AsyncBenchmark<ResourceResponse<Document>> {
class AsyncReadBenchmark extends AsyncBenchmark<CosmosAsyncItemResponse> {
private final CosmosAsyncContainer cosmosAsyncContainer;

class LatencySubscriber<T> extends BaseSubscriber<T> {

Timer.Context context;
BaseSubscriber<ResourceResponse<Document>> baseSubscriber;
BaseSubscriber<CosmosAsyncItemResponse> baseSubscriber;

LatencySubscriber(BaseSubscriber<ResourceResponse<Document>> baseSubscriber) {
LatencySubscriber(BaseSubscriber<CosmosAsyncItemResponse> baseSubscriber) {
this.baseSubscriber = baseSubscriber;
}

Expand Down Expand Up @@ -48,24 +48,26 @@ protected void hookOnError(Throwable throwable) {

AsyncReadBenchmark(Configuration cfg) {
super(cfg);
// TODO: once all benchmarks move to v4 api, we should rely on the container which is read in the parent, and remove this.
cosmosAsyncContainer = v4Client.getDatabase(cfg.getDatabaseId()).getContainer(cfg.getCollectionId()).read().block().getContainer();
}

@Override
protected void performWorkload(BaseSubscriber<ResourceResponse<Document>> baseSubscriber, long i) throws InterruptedException {
protected void performWorkload(BaseSubscriber<CosmosAsyncItemResponse> baseSubscriber, long i) throws InterruptedException {
int index = (int) (i % docsToRead.size());
RequestOptions options = new RequestOptions();
options.setPartitionKey(new PartitionKey(docsToRead.get(index).getId()));
Document doc = docsToRead.get(index);

Flux<ResourceResponse<Document>> obs = client.readDocument(getDocumentLink(docsToRead.get(index)), options);
String partitionKeyValue = doc.getId();
Mono<CosmosAsyncItemResponse> result = cosmosAsyncContainer.getItem(doc.getId(), partitionKeyValue).read();

concurrencyControlSemaphore.acquire();

if (configuration.getOperationType() == Configuration.Operation.ReadThroughput) {
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
result.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
} else {
LatencySubscriber<ResourceResponse<Document>> latencySubscriber = new LatencySubscriber<>(baseSubscriber);
LatencySubscriber<CosmosAsyncItemResponse> latencySubscriber = new LatencySubscriber<>(baseSubscriber);
latencySubscriber.context = latency.time();
obs.subscribeOn(Schedulers.parallel()).subscribe(latencySubscriber);
result.subscribeOn(Schedulers.parallel()).subscribe(latencySubscriber);
}
}
}