Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.Database;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.ResourceResponse;
import com.azure.cosmos.implementation.Utils;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricFilter;
Expand All @@ -31,10 +27,12 @@
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
Expand All @@ -44,58 +42,57 @@
abstract class AsyncBenchmark<T> {
private final MetricRegistry metricsRegistry = new MetricRegistry();
private final ScheduledReporter reporter;
private final String nameCollectionLink;

private Meter successMeter;
private Meter failureMeter;

final Logger logger;
final CosmosAsyncClient v4Client;
final AsyncDocumentClient client;
final DocumentCollection collection;
final CosmosAsyncClient cosmosClient;
final CosmosAsyncContainer cosmosAsyncContainer;

final String partitionKey;
final Configuration configuration;
final List<Document> docsToRead;
final List<PojoizedJson> docsToRead;
final Semaphore concurrencyControlSemaphore;
Timer latency;

AsyncBenchmark(Configuration cfg) {
v4Client = new CosmosClientBuilder()
cosmosClient = new CosmosClientBuilder()
.setEndpoint(cfg.getServiceEndpoint())
.setKey(cfg.getMasterKey())
.setConnectionPolicy(cfg.getConnectionPolicy())
.setConsistencyLevel(cfg.getConsistencyLevel())
.buildAsyncClient();

logger = LoggerFactory.getLogger(this.getClass());
cosmosAsyncContainer = cosmosClient.getDatabase(cfg.getDatabaseId()).getContainer(cfg.getCollectionId()).read().block().getContainer();

client = CosmosBridgeInternal.getAsyncDocumentClient(v4Client);
logger = LoggerFactory.getLogger(this.getClass());
partitionKey = cosmosAsyncContainer.read().block().getProperties().getPartitionKeyDefinition()
.getPaths().iterator().next().split("/")[1];

Database database = DocDBUtils.getDatabase(client, cfg.getDatabaseId());
collection = DocDBUtils.getCollection(client, database.getSelfLink(), cfg.getCollectionId());
nameCollectionLink = String.format("dbs/%s/colls/%s", database.getId(), collection.getId());
partitionKey = collection.getPartitionKey().getPaths().iterator().next().split("/")[1];
concurrencyControlSemaphore = new Semaphore(cfg.getConcurrency());
configuration = cfg;

ArrayList<Flux<Document>> createDocumentObservables = new ArrayList<>();
ArrayList<Flux<PojoizedJson>> createDocumentObservables = new ArrayList<>();

if (configuration.getOperationType() != Configuration.Operation.WriteLatency
&& configuration.getOperationType() != Configuration.Operation.WriteThroughput
&& configuration.getOperationType() != Configuration.Operation.ReadMyWrites) {
String dataFieldValue = RandomStringUtils.randomAlphabetic(cfg.getDocumentDataFieldSize());
for (int i = 0; i < cfg.getNumberOfPreCreatedDocuments(); i++) {
String uuid = UUID.randomUUID().toString();
Document newDoc = new Document();
newDoc.setId(uuid);
BridgeInternal.setProperty(newDoc, partitionKey, uuid);
BridgeInternal.setProperty(newDoc, "dataField1", dataFieldValue);
BridgeInternal.setProperty(newDoc, "dataField2", dataFieldValue);
BridgeInternal.setProperty(newDoc, "dataField3", dataFieldValue);
BridgeInternal.setProperty(newDoc, "dataField4", dataFieldValue);
BridgeInternal.setProperty(newDoc, "dataField5", dataFieldValue);
Flux<Document> obs = client.createDocument(collection.getSelfLink(), newDoc, null, false)
.map(ResourceResponse::getResource);
PojoizedJson newDoc = generateDocument(uuid, dataFieldValue);

Flux<PojoizedJson> obs = cosmosAsyncContainer.createItem(newDoc).map(resp -> {
try {
PojoizedJson x = Utils.getSimpleObjectMapper().readValue(
resp.getProperties().toJson(), PojoizedJson.class);
return x;
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
).flux();
createDocumentObservables.add(obs);
}
}
Expand Down Expand Up @@ -139,7 +136,7 @@ protected void init() {
}

void shutdown() {
v4Client.close();
cosmosClient.close();
}

protected void onSuccess() {
Expand All @@ -148,22 +145,6 @@ protected void onSuccess() {
protected void onError(Throwable throwable) {
}

protected String getCollectionLink() {
if (configuration.isUseNameLink()) {
return this.nameCollectionLink;
} else {
return collection.getSelfLink();
}
}

protected String getDocumentLink(Document doc) {
if (configuration.isUseNameLink()) {
return this.nameCollectionLink + "/docs/" + doc.getId();
} else {
return doc.getSelfLink();
}
}

protected abstract void performWorkload(BaseSubscriber<T> baseSubscriber, long i) throws Exception;

private boolean shouldContinue(long startTimeMillis, long iterationCount) {
Expand Down Expand Up @@ -191,10 +172,23 @@ void run() throws Exception {
successMeter = metricsRegistry.meter("#Successful Operations");
failureMeter = metricsRegistry.meter("#Unsuccessful Operations");

if (configuration.getOperationType() == Configuration.Operation.ReadLatency
|| configuration.getOperationType() == Configuration.Operation.WriteLatency
|| configuration.getOperationType() == Configuration.Operation.QueryInClauseParallel) {
latency = metricsRegistry.timer("Latency");
switch (configuration.getOperationType()) {
case ReadLatency:
case WriteLatency:
case QueryInClauseParallel:
case QueryCross:
case QuerySingle:
case QuerySingleMany:
case QueryParallel:
case QueryOrderby:
case QueryAggregate:
case QueryAggregateTopOrderby:
case QueryTopOrderby:
case Mixed:
latency = metricsRegistry.timer("Latency");
break;
default:
break;
}

reporter.start(configuration.getPrintingInterval(), TimeUnit.SECONDS);
Expand Down Expand Up @@ -264,4 +258,17 @@ protected void hookOnError(Throwable throwable) {
reporter.report();
reporter.close();
}

public PojoizedJson generateDocument(String idString, String dataFieldValue) {
PojoizedJson instance = new PojoizedJson();
Map<String, String> properties = instance.getInstance();
properties.put("id", idString);
properties.put(partitionKey, idString);
properties.put("dataField1", dataFieldValue);
properties.put("dataField2", dataFieldValue);
properties.put("dataField3", dataFieldValue);
properties.put("dataField4", dataFieldValue);
properties.put("dataField5", dataFieldValue);
return instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.PartitionKey;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.ResourceResponse;
import org.apache.commons.lang3.RandomStringUtils;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
Expand All @@ -17,7 +14,7 @@
import java.util.Random;
import java.util.UUID;

class AsyncMixedBenchmark extends AsyncBenchmark<Document> {
class AsyncMixedBenchmark extends AsyncBenchmark<Object> {

private final String uuid;
private final String dataFieldValue;
Expand All @@ -31,37 +28,30 @@ class AsyncMixedBenchmark extends AsyncBenchmark<Document> {
}

@Override
protected void performWorkload(BaseSubscriber<Document> documentBaseSubscriber, long i) throws InterruptedException {
Flux<Document> obs;
protected void performWorkload(BaseSubscriber<Object> documentBaseSubscriber, long i) throws InterruptedException {
Flux<? extends Object> obs;
if (i % 10 == 0 && i % 100 != 0) {

String idString = uuid + i;
Document newDoc = new Document();
newDoc.setId(idString);
BridgeInternal.setProperty(newDoc, partitionKey, idString);
BridgeInternal.setProperty(newDoc, "dataField1", dataFieldValue);
BridgeInternal.setProperty(newDoc, "dataField2", dataFieldValue);
BridgeInternal.setProperty(newDoc, "dataField3", dataFieldValue);
BridgeInternal.setProperty(newDoc, "dataField4", dataFieldValue);
BridgeInternal.setProperty(newDoc, "dataField5", dataFieldValue);
obs = client.createDocument(getCollectionLink(), newDoc, null, false).map(ResourceResponse::getResource);
PojoizedJson data = generateDocument(uuid + i, dataFieldValue);
obs = cosmosAsyncContainer.createItem(data).flux();

} else if (i % 100 == 0) {

FeedOptions options = new FeedOptions();
options.maxItemCount(10);

String sqlQuery = "Select top 100 * from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options)
.map(frp -> frp.getResults().get(0));
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
} else {

int index = r.nextInt(1000);

RequestOptions options = new RequestOptions();
String partitionKeyValue = docsToRead.get(index).getId();

options.setPartitionKey(new PartitionKey(docsToRead.get(index).getId()));

obs = client.readDocument(getDocumentLink(docsToRead.get(index)), options).map(ResourceResponse::getResource);
obs = cosmosAsyncContainer.getItem(docsToRead.get(index).getId(), partitionKeyValue).read().flux();
}

concurrencyControlSemaphore.acquire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.SqlParameter;
import com.azure.cosmos.SqlQuerySpec;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.PartitionKey;
import com.azure.cosmos.SqlParameter;
import com.azure.cosmos.SqlQuerySpec;
import com.codahale.metrics.Timer;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
Expand All @@ -19,7 +18,7 @@
import java.util.List;
import java.util.Random;

class AsyncQueryBenchmark extends AsyncBenchmark<FeedResponse<Document>> {
class AsyncQueryBenchmark extends AsyncBenchmark<FeedResponse<PojoizedJson>> {

private int pageCount = 0;

Expand Down Expand Up @@ -70,53 +69,53 @@ protected void onSuccess() {
}

@Override
protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscriber, long i) throws InterruptedException {
Flux<FeedResponse<Document>> obs;
protected void performWorkload(BaseSubscriber<FeedResponse<PojoizedJson>> baseSubscriber, long i) throws InterruptedException {
Flux<FeedResponse<PojoizedJson>> obs;
Random r = new Random();
FeedOptions options = new FeedOptions();

if (configuration.getOperationType() == Configuration.Operation.QueryCross) {

int index = r.nextInt(1000);
String sqlQuery = "Select * from c where c._rid = \"" + docsToRead.get(index).getResourceId() + "\"";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
String sqlQuery = "Select * from c where c.id = \"" + docsToRead.get(index).getId() + "\"";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
} else if (configuration.getOperationType() == Configuration.Operation.QuerySingle) {

int index = r.nextInt(1000);
String pk = docsToRead.get(index).getString("pk");
String pk = docsToRead.get(index).getProperty(partitionKey);
options.partitionKey(new PartitionKey(pk));
String sqlQuery = "Select * from c where c.pk = \"" + pk + "\"";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
String sqlQuery = "Select * from c where c." + partitionKey + " = \"" + pk + "\"";
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
} else if (configuration.getOperationType() == Configuration.Operation.QueryParallel) {

options.maxItemCount(10);
String sqlQuery = "Select * from c";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
} else if (configuration.getOperationType() == Configuration.Operation.QueryOrderby) {

options.maxItemCount(10);
String sqlQuery = "Select * from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
} else if (configuration.getOperationType() == Configuration.Operation.QueryAggregate) {

options.maxItemCount(10);
String sqlQuery = "Select value max(c._ts) from c";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
} else if (configuration.getOperationType() == Configuration.Operation.QueryAggregateTopOrderby) {

String sqlQuery = "Select top 1 value count(c) from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
} else if (configuration.getOperationType() == Configuration.Operation.QueryTopOrderby) {

String sqlQuery = "Select top 1000 * from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
obs = cosmosAsyncContainer.queryItems(sqlQuery, options, PojoizedJson.class);
} else if (configuration.getOperationType() == Configuration.Operation.QueryInClauseParallel) {

ReadMyWriteWorkflow.QueryBuilder queryBuilder = new ReadMyWriteWorkflow.QueryBuilder();
options.setMaxDegreeOfParallelism(200);
List<SqlParameter> parameters = new ArrayList<>();
int j = 0;
for(Document doc: docsToRead) {
for(PojoizedJson doc: docsToRead) {
String partitionKeyValue = doc.getId();
parameters.add(new SqlParameter("@param" + j, partitionKeyValue));
j++;
Expand All @@ -126,8 +125,7 @@ protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscr
parameters));

SqlQuerySpec query = queryBuilder.toSqlQuerySpec();
obs = client.queryDocuments(getCollectionLink(), query, options);

obs = cosmosAsyncContainer.queryItems(query, options, PojoizedJson.class);
} else {
throw new IllegalArgumentException("Unsupported Operation: " + configuration.getOperationType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@

package com.azure.cosmos.benchmark;

import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.FeedOptions;
import com.azure.cosmos.FeedResponse;
import com.azure.cosmos.PartitionKey;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

class AsyncQuerySinglePartitionMultiple extends AsyncBenchmark<FeedResponse<Document>> {
class AsyncQuerySinglePartitionMultiple extends AsyncBenchmark<FeedResponse<PojoizedJson>> {

private static final String SQL_QUERY = "Select * from c where c.pk = \"pk\"";
private FeedOptions options;
Expand All @@ -36,8 +35,8 @@ protected void onSuccess() {
}

@Override
protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscriber, long i) throws InterruptedException {
Flux<FeedResponse<Document>> obs = client.queryDocuments(getCollectionLink(), SQL_QUERY, options);
protected void performWorkload(BaseSubscriber<FeedResponse<PojoizedJson>> baseSubscriber, long i) throws InterruptedException {
Flux<FeedResponse<PojoizedJson>> obs = cosmosAsyncContainer.queryItems(SQL_QUERY, options, PojoizedJson.class);

concurrencyControlSemaphore.acquire();

Expand Down
Loading