diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SCMThroughputBenchmark.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SCMThroughputBenchmark.java index 673b5272c8e1..d62ad1d79c1e 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SCMThroughputBenchmark.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SCMThroughputBenchmark.java @@ -168,6 +168,9 @@ public enum BenchmarkType { description = "The leader scm host x.x.x.x.") private String scm; + @CommandLine.Mixin + private FreonReplicationOptions replication; + static final int CHECK_INTERVAL_MILLIS = 5000; private static final Random RANDOM = new Random(); @@ -187,10 +190,10 @@ private SCMThroughputBenchmark() { @Override public Void call() throws Exception { - ThroughputBenchmark benchmark = createBenchmark(); - - initCluster(); + conf = freon.createOzoneConfiguration(); + ThroughputBenchmark benchmark = createBenchmark(); + initCluster(benchmark); benchmark.run(); return null; @@ -220,16 +223,16 @@ private ThroughputBenchmark createBenchmark() { return benchmark; } - private void initCluster() throws IOException, InterruptedException, - IllegalArgumentException { - - this.conf = freon.createOzoneConfiguration(); + private void initCluster(ThroughputBenchmark benchmark) + throws IOException, InterruptedException, IllegalArgumentException { initSCMClients(); registerFakeDatanodes(); - activatePipelines(); + if (benchmark.requiresPipelines()) { + activatePipelines(); + } exitSafeMode(); } @@ -421,6 +424,10 @@ protected int getNumThreads() { protected void enqueueTask(Runnable task) { this.taskQueue.add(task); } + + public boolean requiresPipelines() { + return false; + } } /** @@ -428,6 +435,8 @@ protected void enqueueTask(Runnable task) { */ private class BlockBenchmark extends ThroughputBenchmark { + private final ReplicationConfig replicationConfig; + private final ExcludeList excludeList = new ExcludeList(); private AtomicLong totalBlockCounter; private AtomicLong succBlockCounter; private AtomicLong failBlockCounter; @@ -441,13 +450,23 @@ private class BlockBenchmark extends ThroughputBenchmark { this.totalBlockCounter = new AtomicLong(); this.succBlockCounter = new AtomicLong(); this.failBlockCounter = new AtomicLong(); + ReplicationConfig rc = replication.fromParamsOrConfig(conf); + if (rc == null) { + rc = RatisReplicationConfig.getInstance(ReplicationFactor.THREE); + } + replicationConfig = rc; + } + + @Override + public boolean requiresPipelines() { + return replicationConfig.getReplicationType() == ReplicationType.RATIS; } @Override public void prepare() { super.prepare(); for (int i = 0; i < getNumThreads(); i++) { - enqueueTask(new BlockTask(this.blockSize)); + enqueueTask(new BlockTask(blockSize, replicationConfig)); } } @@ -479,31 +498,30 @@ public void waitForComplete() throws InterruptedException { } } - private void doAllocateBlock(long size, int nBlocks, - ReplicationConfig config) { + private void doAllocateBlock(long size, ReplicationConfig config) { try { - scmBlockClient.allocateBlock(size, nBlocks, config, "STB", - new ExcludeList()); + scmBlockClient.allocateBlock(size, 1, config, "STB", excludeList); succBlockCounter.incrementAndGet(); } catch (IOException e) { - LOG.error("{}", e); + LOG.error("Failed to allocate block", e); failBlockCounter.incrementAndGet(); } } private class BlockTask implements Runnable { - private long blockSize; + private final long blockSize; + private final ReplicationConfig replicationConfig; - BlockTask(long blockSize) { + BlockTask(long blockSize, ReplicationConfig replicationConfig) { this.blockSize = blockSize; + this.replicationConfig = replicationConfig; } @Override public void run() { while (totalBlockCounter.getAndIncrement() < totalBlocks) { - doAllocateBlock(blockSize, 1, - RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); + doAllocateBlock(blockSize, replicationConfig); } } }