Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -161,8 +162,10 @@ protected String getDocumentLink(Document doc) {
protected abstract void performWorkload(BaseSubscriber<T> baseSubscriber, long i) throws Exception;

private boolean shouldContinue(long startTimeMillis, long iterationCount) {

Duration maxDurationTime = configuration.getMaxRunningTimeDuration();
int maxNumberOfOperations = configuration.getNumberOfOperations();

if (maxDurationTime == null) {
return iterationCount < maxNumberOfOperations;
}
Expand All @@ -182,17 +185,19 @@ void run() throws Exception {

successMeter = metricsRegistry.meter("#Successful Operations");
failureMeter = metricsRegistry.meter("#Unsuccessful Operations");

if (configuration.getOperationType() == Operation.ReadLatency
|| configuration.getOperationType() == Operation.WriteLatency)
|| configuration.getOperationType() == Operation.WriteLatency) {
latency = metricsRegistry.timer("Latency");
}

reporter.start(configuration.getPrintingInterval(), TimeUnit.SECONDS);

long startTime = System.currentTimeMillis();

AtomicLong count = new AtomicLong(0);
long i;
for ( i = 0; shouldContinue(startTime, i); i++) {

for (i = 0; shouldContinue(startTime, i); i++) {

BaseSubscriber<T> baseSubscriber = new BaseSubscriber<T>() {
@Override
Expand All @@ -202,7 +207,12 @@ protected void hookOnSubscribe(Subscription subscription) {

@Override
protected void hookOnNext(T value) {
logger.debug("hookOnNext: {}, count:{}", value, count.get());
}

@Override
protected void hookOnCancel() {
this.hookOnError(new CancellationException());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscr
} else {
throw new IllegalArgumentException("Unsupported Operation: " + configuration.getOperationType());
}
concurrencyControlSemaphore.acquire();

concurrencyControlSemaphore.acquire();
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static void main(String[] args) throws Exception {
throw new RuntimeException(cfg.getOperationType() + " is not supported");
}

LOGGER.info("Starting {}", cfg.getOperationType());
benchmark.run();
benchmark.shutdown();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,19 @@ protected void init() {

@Override
protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i) throws Exception {

Flux<Document> obs;
boolean readyMyWrite = RandomUtils.nextBoolean();

if (readyMyWrite) {

// will do a write and immediately upon success will either
// do a point read
// or single partition query
// or cross partition query to find the write.

int j = Math.toIntExact(Math.floorMod(i, 3));

switch (j) {
case 0:
// write a random document to cosmodb and update the cache.
Expand All @@ -78,7 +83,7 @@ protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i)
"couldn't find my write in a single partition query!"))));
break;
case 2:
// write a random document to cosmodb and update the cache.
// write a random document to cosmosdb and update the cache.
// then try to query for the document which just was written
obs = writeDocument()
.flatMap(d -> xPartitionQuery(generateQuery(d))
Expand All @@ -90,12 +95,15 @@ protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i)
throw new IllegalStateException();
}
} else {

// will either do
// a write
// a point read for a in memory cached document
// or single partition query for a in memory cached document
// or cross partition query for a in memory cached document

int j = Math.toIntExact(Math.floorMod(i, 4));

switch (j) {
case 0:
// write a random document to cosmosdb and update the cache
Expand Down Expand Up @@ -125,6 +133,7 @@ protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i)

concurrencyControlSemaphore.acquire();

logger.debug("concurrencyControlSemaphore: {}", concurrencyControlSemaphore);
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
# this is the log4j configuration for tests
# This is the log4j configuration for benchmarks

# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1
log4j.rootLogger=INFO, Console

log4j.category.com.azure.data.cosmos.internal.directconnectivity.rntbd=WARN
log4j.category.io.netty=INFO
log4j.category.io.reactivex=INFO
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.category.com.azure.cosmos=INFO
log4j.category.com.azure.cosmos.benchmark=INFO
log4j.category.com.azure.cosmos.internal=INFO
log4j.category.com.azure.cosmos.internal.caches=INFO
log4j.category.com.azure.cosmos.internal.changefeed=INFO
log4j.category.com.azure.cosmos.internal.directconnectivity=INFO
log4j.category.com.azure.cosmos.internal.directconnectivity.rntbd=INFO
log4j.category.com.azure.cosmos.internal.http=INFO
log4j.category.com.azure.cosmos.internal.query=INFO
log4j.category.com.azure.cosmos.internal.query.aggregation=INFO
log4j.category.com.azure.cosmos.internal.query.metrics=INFO
log4j.category.com.azure.cosmos.internal.query.orderbyquery=INFO
log4j.category.com.azure.cosmos.internal.routing=INFO

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d %5X{pid} [%t] %-5p %c - %m%n
log4j.category.com.azure.cosmos.internal.directconnectivity.RntbdTransportClient=INFO
log4j.category.com.azure.cosmos.internal.directconnectivity.rntbd.RntbdRequestManager=INFO

log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d %5X{pid} [%t] %-5p %c - %m%n

log4j.appender.LogFile=org.apache.log4j.FileAppender
log4j.appender.LogFile.File=${azure.cosmos.logger.directory}/azure-cosmos-benchmark.log
log4j.appender.LogFile.layout=org.apache.log4j.PatternLayout
log4j.appender.LogFile.layout.ConversionPattern=[%d][%p][${azure.cosmos.hostname}][thread:%t][logger:%c] %m%n
Loading