Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ public enum BenchmarkType {
description = "The leader scm host x.x.x.x.")
private String scm;

@CommandLine.Mixin
private FreonReplicationOptions replication;

static final int CHECK_INTERVAL_MILLIS = 5000;

private static final Random RANDOM = new Random();
Expand All @@ -187,10 +190,10 @@ private SCMThroughputBenchmark() {

@Override
public Void call() throws Exception {
ThroughputBenchmark benchmark = createBenchmark();

initCluster();
conf = freon.createOzoneConfiguration();

ThroughputBenchmark benchmark = createBenchmark();
initCluster(benchmark);
benchmark.run();

return null;
Expand Down Expand Up @@ -220,16 +223,16 @@ private ThroughputBenchmark createBenchmark() {
return benchmark;
}

private void initCluster() throws IOException, InterruptedException,
IllegalArgumentException {

this.conf = freon.createOzoneConfiguration();
private void initCluster(ThroughputBenchmark benchmark)
throws IOException, InterruptedException, IllegalArgumentException {

initSCMClients();

registerFakeDatanodes();

activatePipelines();
if (benchmark.requiresPipelines()) {
activatePipelines();
}

exitSafeMode();
}
Expand Down Expand Up @@ -421,13 +424,19 @@ protected int getNumThreads() {
protected void enqueueTask(Runnable task) {
this.taskQueue.add(task);
}

public boolean requiresPipelines() {
return false;
}
}

/**
* Benchmarks throughput of allocate block operation from scm clients.
*/
private class BlockBenchmark extends ThroughputBenchmark {

private final ReplicationConfig replicationConfig;
private final ExcludeList excludeList = new ExcludeList();
private AtomicLong totalBlockCounter;
private AtomicLong succBlockCounter;
private AtomicLong failBlockCounter;
Expand All @@ -441,13 +450,23 @@ private class BlockBenchmark extends ThroughputBenchmark {
this.totalBlockCounter = new AtomicLong();
this.succBlockCounter = new AtomicLong();
this.failBlockCounter = new AtomicLong();
ReplicationConfig rc = replication.fromParamsOrConfig(conf);
if (rc == null) {
rc = RatisReplicationConfig.getInstance(ReplicationFactor.THREE);
}
replicationConfig = rc;
}

@Override
public boolean requiresPipelines() {
return replicationConfig.getReplicationType() == ReplicationType.RATIS;
}

@Override
public void prepare() {
super.prepare();
for (int i = 0; i < getNumThreads(); i++) {
enqueueTask(new BlockTask(this.blockSize));
enqueueTask(new BlockTask(blockSize, replicationConfig));
}
}

Expand Down Expand Up @@ -479,31 +498,30 @@ public void waitForComplete() throws InterruptedException {
}
}

private void doAllocateBlock(long size, int nBlocks,
ReplicationConfig config) {
private void doAllocateBlock(long size, ReplicationConfig config) {
try {
scmBlockClient.allocateBlock(size, nBlocks, config, "STB",
new ExcludeList());
scmBlockClient.allocateBlock(size, 1, config, "STB", excludeList);
succBlockCounter.incrementAndGet();
} catch (IOException e) {
LOG.error("{}", e);
LOG.error("Failed to allocate block", e);
failBlockCounter.incrementAndGet();
}
}

private class BlockTask implements Runnable {

private long blockSize;
private final long blockSize;
private final ReplicationConfig replicationConfig;

BlockTask(long blockSize) {
BlockTask(long blockSize, ReplicationConfig replicationConfig) {
this.blockSize = blockSize;
this.replicationConfig = replicationConfig;
}

@Override
public void run() {
while (totalBlockCounter.getAndIncrement() < totalBlocks) {
doAllocateBlock(blockSize, 1,
RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
doAllocateBlock(blockSize, replicationConfig);
}
}
}
Expand Down