Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@

import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand Down Expand Up @@ -60,6 +65,7 @@
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;

Expand Down Expand Up @@ -92,6 +98,18 @@ public class BaseFreonGenerator {
defaultValue = "10")
private int threadNo;

@Option(names = {"--timebase"},
description = "If set, freon will run for the duration of the --runtime"
+ " specified even if the --number-of-tests operation"
+ " has been completed.",
defaultValue = "false")
private boolean timebase;

@Option(names = {"--runtime"},
description = "Tell freon to terminate processing after"
+ "the specified period of time in seconds.")
private long runtime;

@Option(names = {"-f", "--fail-at-end"},
description = "If turned on, all the tasks will be executed even if "
+ "there are failures.")
Expand All @@ -104,6 +122,14 @@ public class BaseFreonGenerator {
defaultValue = "")
private String prefix = "";

@Option(names = {"--verbose"},
description = "More verbose output. "
+ "Show all the command line Option info.")
private boolean verbose;

@CommandLine.Spec
private CommandLine.Model.CommandSpec spec;

private MetricRegistry metrics = new MetricRegistry();

private AtomicLong successCounter;
Expand All @@ -117,6 +143,13 @@ public class BaseFreonGenerator {
private ExecutorService executor;
private ProgressBar progressBar;

// the `threadSequenceId` Starting from 0,
// each thread will be set a self-incrementing sequence number when it starts
private final ThreadLocal<Long> threadSequenceId = new ThreadLocal<>();
private final AtomicLong id = new AtomicLong(0);

private final AtomicBoolean completed = new AtomicBoolean(false);

/**
* The main logic to execute a test generator.
*
Expand Down Expand Up @@ -153,15 +186,24 @@ private void startTaskRunners(TaskProvider provider) {
* concurrently in {@code executor}.
*/
private void taskLoop(TaskProvider provider) {
while (true) {
threadSequenceId.set(id.getAndIncrement());
while (!completed.get()) {
long counter = attemptCounter.getAndIncrement();

//in case of an other failed test, we shouldn't execute more tasks.
if (counter >= testNo || (!failAtEnd && failureCounter.get() > 0)) {
break;
if (timebase) {
if (System.currentTimeMillis()
> startTime + TimeUnit.SECONDS.toMillis(runtime)) {
completed.set(true);
break;
}
} else {
//in case of an other failed test, we shouldn't execute more tasks.
if (counter >= testNo || (!failAtEnd && failureCounter.get() > 0)) {
completed.set(true);
break;
}
}

tryNextTask(provider, counter);
tryNextTask(provider, counter % testNo);
}

taskLoopCompleted();
Expand Down Expand Up @@ -198,8 +240,7 @@ private void tryNextTask(TaskProvider provider, long taskId) {
* thread.
*/
private void waitForCompletion() {
while (successCounter.get() + failureCounter.get() < testNo && (
failureCounter.get() == 0 || failAtEnd)) {
while (!completed.get() && (failureCounter.get() == 0 || failAtEnd)) {
try {
Thread.sleep(CHECK_INTERVAL_MILLIS);
} catch (InterruptedException e) {
Expand All @@ -215,6 +256,7 @@ private void shutdown() {
} else {
progressBar.shutdown();
}
threadSequenceId.remove();
executor.shutdown();
try {
executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -250,6 +292,16 @@ public void init() {
//replace environment variables to support multi-node execution
prefix = resolvePrefix(prefix);
}
if (timebase && runtime <= 0) {
throw new IllegalArgumentException(
"Incomplete command, "
+ "the runtime must be given, and must not be negative");
}
if (testNo <= 0) {
throw new IllegalArgumentException(
"Invalid command, "
+ "the testNo must be a positive integer");
}
LOG.info("Executing test with prefix {} " +
"and number-of-tests {}", prefix, testNo);

Expand All @@ -263,17 +315,33 @@ public void init() {
LOG.error("HTTP server can't be stopped.", ex);
}
printReport();
if (verbose) {
printOption();
}
}, 10);

executor = Executors.newFixedThreadPool(threadNo);

progressBar = new ProgressBar(System.out, testNo, successCounter::get,
freonCommand.isInteractive());
long maxValue;
LongSupplier supplier;
if (timebase) {
maxValue = runtime;
supplier = () -> Duration.between(
Instant.ofEpochMilli(startTime), Instant.now()).getSeconds();
} else {
maxValue = testNo;
supplier = successCounter::get;
}
progressBar = new ProgressBar(System.out, maxValue, supplier,
freonCommand.isInteractive(), realTimeStatusSupplier());
progressBar.start();

startTime = System.currentTimeMillis();
}

public Supplier<String> realTimeStatusSupplier() {
return () -> "";
}

/**
* Resolve environment variables in the prefixes.
*/
Expand Down Expand Up @@ -314,6 +382,23 @@ public void printReport() {
messages.forEach(print);
}

/**
* Print Option info about the executed tests.
*/
public void printOption() {
List<String> messages = new LinkedList<>();
messages.add("\nOption:");
for (CommandLine.Model.OptionSpec option : spec.options()) {
String name = option.longestName();
messages.add(name + "=" + option.getValue());
}

Consumer<String> print = freonCommand.isInteractive()
? System.out::println
: LOG::info;
messages.forEach(print);
}

/**
* Print out reports with the given message.
*/
Expand Down Expand Up @@ -484,6 +569,14 @@ public void setThreadNo(int threadNo) {
this.threadNo = threadNo;
}

/**
* Get current Thread sequence ID.
* @return Current Thread sequence ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the comment, but I still don't think it's clear right away what it is used for. The doc just repeats the method name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been updated

*/
public long getThreadSequenceId() {
return threadSequenceId.get();
Comment on lines +579 to +580
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc comment explaining purpose of this new sequence ID.

}

protected OzoneClient createOzoneClient(String omServiceID,
OzoneConfiguration conf) throws Exception {
if (omServiceID != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
OmRPCLoadGenerator.class,
OzoneClientKeyReadWriteOps.class,
RangeKeysGenerator.class,
DatanodeSimulator.class
DatanodeSimulator.class,
OmMetadataGenerator.class
},
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true)
Expand Down
Loading