Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ void run() throws Exception {
failureMeter = metricsRegistry.meter("#Unsuccessful Operations");

if (configuration.getOperationType() == Configuration.Operation.ReadLatency
|| configuration.getOperationType() == Configuration.Operation.WriteLatency) {
|| configuration.getOperationType() == Configuration.Operation.WriteLatency
|| configuration.getOperationType() == Configuration.Operation.QueryInClauseParallel) {
latency = metricsRegistry.timer("Latency");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,57 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.SqlParameter;
import com.azure.cosmos.SqlQuerySpec;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.PartitionKey;
import com.codahale.metrics.Timer;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

class AsyncQueryBenchmark extends AsyncBenchmark<FeedResponse<Document>> {

private int pageCount = 0;

class LatencySubscriber<T> extends BaseSubscriber<T> {

Timer.Context context;
BaseSubscriber<T> baseSubscriber;

LatencySubscriber(BaseSubscriber<T> baseSubscriber) {
this.baseSubscriber = baseSubscriber;
}

@Override
protected void hookOnSubscribe(Subscription subscription) {
super.hookOnSubscribe(subscription);
}

@Override
protected void hookOnNext(T value) {
}

@Override
protected void hookOnComplete() {
context.stop();
baseSubscriber.onComplete();
}

@Override
protected void hookOnError(Throwable throwable) {
context.stop();
baseSubscriber.onError(throwable);
}
}

AsyncQueryBenchmark(Configuration cfg) {
super(cfg);
}
Expand All @@ -34,7 +71,6 @@ protected void onSuccess() {

@Override
protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscriber, long i) throws InterruptedException {

Flux<FeedResponse<Document>> obs;
Random r = new Random();
FeedOptions options = new FeedOptions();
Expand Down Expand Up @@ -80,11 +116,32 @@ protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscr
options.setEnableCrossPartitionQuery(true);
String sqlQuery = "Select top 1000 * from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Configuration.Operation.QueryInClauseParallel) {

ReadMyWriteWorkflow.QueryBuilder queryBuilder = new ReadMyWriteWorkflow.QueryBuilder();
options.setEnableCrossPartitionQuery(true);
options.setMaxDegreeOfParallelism(200);
List<SqlParameter> parameters = new ArrayList<>();
int j = 0;
for(Document doc: docsToRead) {
String partitionKeyValue = doc.getId();
parameters.add(new SqlParameter("@param" + j, partitionKeyValue));
j++;
}

queryBuilder.whereClause(new ReadMyWriteWorkflow.QueryBuilder.WhereClause.InWhereClause(partitionKey,
parameters));

SqlQuerySpec query = queryBuilder.toSqlQuerySpec();
obs = client.queryDocuments(getCollectionLink(), query, options);

} else {
throw new IllegalArgumentException("Unsupported Operation: " + configuration.getOperationType());
}

concurrencyControlSemaphore.acquire();
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
LatencySubscriber<FeedResponse> latencySubscriber = new LatencySubscriber(baseSubscriber);
latencySubscriber.context = latency.time();
obs.subscribeOn(Schedulers.parallel()).subscribe(latencySubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Configuration {
+ "\tWriteThroughput - run a Write workload that prints only throughput\n"
+ "\tReadLatency - run a READ workload that prints both throughput and latency *\n"
+ "\tWriteLatency - run a Write workload that prints both throughput and latency\n"
+ "\tQueryInClauseParallel - run a 'Select * from c where c.pk in (....)' workload that prints latency\n"
+ "\tQueryCross - run a 'Select * from c where c._rid = SOME_RID' workload that prints throughput\n"
+ "\tQuerySingle - run a 'Select * from c where c.pk = SOME_PK' workload that prints throughput\n"
+ "\tQuerySingleMany - run a 'Select * from c where c.pk = \"pk\"' workload that prints throughput\n"
Expand Down Expand Up @@ -122,6 +123,7 @@ enum Operation {
WriteThroughput,
ReadLatency,
WriteLatency,
QueryInClauseParallel,
QueryCross,
QuerySingle,
QuerySingleMany,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static void main(String[] args) throws Exception {
case QueryAggregate:
case QueryTopOrderby:
case QueryAggregateTopOrderby:
case QueryInClauseParallel:
benchmark = new AsyncQueryBenchmark(cfg);
break;

Expand Down