diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHsyncGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHsyncGenerator.java index 7026f32d8b31..66714e58bad1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHsyncGenerator.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHsyncGenerator.java @@ -94,14 +94,13 @@ public void test() throws IOException { OzoneVolume volume = store.getVolume(volumeName); volume.createBucket(bucketName); - String rootPath = String.format("%s://%s/%s/%s/", - OZONE_OFS_URI_SCHEME, cluster.getConf().get(OZONE_OM_ADDRESS_KEY), - volumeName, bucketName); + String rootPath = String.format("%s://%s/%s/%s/", OZONE_OFS_URI_SCHEME, + cluster.getConf().get(OZONE_OM_ADDRESS_KEY), volumeName, bucketName); int exitCode = cmd.execute( "--path", rootPath, - "--bytes-per-write", "1024", - "--number-of-files", "2", + "--bytes-per-write", "8", + "--writes-per-transaction", "64", "-t", "5", "-n", "100"); assertEquals(0, exitCode); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/HsyncGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/HsyncGenerator.java index 8de2ee032d0a..687030ab3257 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/HsyncGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/HsyncGenerator.java @@ -16,17 +16,13 @@ */ package org.apache.hadoop.ozone.freon; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; - +import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.conf.OzoneConfiguration; - -import com.codahale.metrics.Timer; import org.apache.hadoop.ozone.util.PayloadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,17 +30,25 @@ import picocli.CommandLine.Command; import picocli.CommandLine.Option; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + /** * Data generator tool test hsync/write synchronization performance. + * This tool simulates the way HBase writes transaction logs (WAL) to a file in Ozone: + * - Transactions are written to the file's OutputStream by a single thread, each transaction is numbered by an + * increasing counter. Every transaction can be serialized to the OutputStream via multiple write calls. + * - Multiple threads checks and sync (hsync) the OutputStream to make it persistent. * * Example usage: * - * To generate 1000 hsync calls with 10 threads on a single file: - * ozone freon hsync-generator -t 10 --bytes-per-write=1024 -n 1000 - * - * To generate 1000 hsync calls with 10 threads on 3 files simultaneously: + * To simulate hlog that generates 1M hsync calls with 5 threads: * - * ozone freon hsync-generator -t 10 --bytes-per-write=1024 --number-of-files=3 -n 1000 + * ozone freon hsync-generator -t 5 --writes-per-transaction=32 --bytes-per-write=8 -n 1000000 * */ @Command(name = "hg", @@ -53,32 +57,38 @@ versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true, showDefaultValues = true) -public class HsyncGenerator extends HadoopNestedDirGenerator implements Callable { +public class HsyncGenerator extends BaseFreonGenerator implements Callable { private static final Logger LOG = LoggerFactory.getLogger(HsyncGenerator.class); @CommandLine.ParentCommand private Freon freon; + @Option(names = {"--path"}, + description = "Hadoop FS file system path. Use full path.", + defaultValue = "o3fs://bucket1.vol1") + private String rootPath; + @Option(names = {"--bytes-per-write"}, description = "Size of each write", - defaultValue = "1024") + defaultValue = "8") private int writeSize; - @Option(names = {"--number-of-files"}, - description = "Number of files to run test.", - defaultValue = "1") - private int numberOfFiles; + @Option(names = {"--writes-per-transaction"}, + description = "Size of each write", + defaultValue = "32") + private int writesPerTransaction; private Timer timer; private OzoneConfiguration configuration; - private FSDataOutputStream[] outputStreams; - private Path[] files; - private AtomicInteger[] callsPerFile; + private FSDataOutputStream outputStream; + private byte[] data; + private final BlockingQueue writtenTransactions = new ArrayBlockingQueue<>(10_000); + private final AtomicInteger lastSyncedTransaction = new AtomicInteger(); public HsyncGenerator() { } - private byte[] data; + @VisibleForTesting HsyncGenerator(OzoneConfiguration ozoneConfiguration) { @@ -87,55 +97,75 @@ public HsyncGenerator() { @Override public Void call() throws Exception { - super.init(); + init(); if (configuration == null) { configuration = freon.createOzoneConfiguration(); } + URI uri = URI.create(rootPath); - outputStreams = new FSDataOutputStream[numberOfFiles]; - files = new Path[numberOfFiles]; - callsPerFile = new AtomicInteger[numberOfFiles]; - FileSystem fileSystem = getFileSystem(); - for (int i = 0; i < numberOfFiles; i++) { - Path file = new Path(getRootPath() + "/" + generateObjectName(i)); - fileSystem.mkdirs(file.getParent()); - outputStreams[i] = fileSystem.create(file); - files[i] = file; - callsPerFile[i] = new AtomicInteger(); - - LOG.info("Created file for testing: {}", file); - } + FileSystem fileSystem = FileSystem.get(uri, configuration); + Path file = new Path(rootPath + "/" + generateObjectName(0)); + fileSystem.mkdirs(file.getParent()); + outputStream = fileSystem.create(file); + + LOG.info("Created file for testing: {}", file); timer = getMetrics().timer("hsync-generator"); data = PayloadUtils.generatePayload(writeSize); + startTransactionWriter(); + try { runTests(this::sendHsync); } finally { - for (FSDataOutputStream outputStream : outputStreams) { - outputStream.close(); - } + outputStream.close(); + fileSystem.close(); } - StringBuilder distributionReport = new StringBuilder(); - for (int i = 0; i < numberOfFiles; i++) { - distributionReport.append("\t").append(files[i]).append(": ").append(callsPerFile[i]).append("\n"); - } + return null; + } - LOG.info("Hsync generator finished, calls distribution: \n {}", distributionReport); + private void startTransactionWriter() { + Thread transactionWriter = new Thread(this::generateTransactions); + transactionWriter.setDaemon(true); + transactionWriter.start(); + } - return null; + private void generateTransactions() { + int transaction = 0; + while (true) { + for (int i = 0; i < writesPerTransaction; i++) { + try { + if (writeSize > 1) { + outputStream.write(data); + } else { + outputStream.write(i); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + try { + writtenTransactions.put(transaction++); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } private void sendHsync(long counter) throws Exception { timer.time(() -> { - int i = ((int) counter) % numberOfFiles; - FSDataOutputStream outputStream = outputStreams[i]; - outputStream.write(data); - outputStream.hsync(); - callsPerFile[i].incrementAndGet(); - return null; + while (true) { + int transaction = writtenTransactions.take(); + int lastSynced = lastSyncedTransaction.get(); + if (transaction > lastSynced) { + outputStream.hsync(); + lastSyncedTransaction.compareAndSet(lastSynced, transaction); + return null; + } + } }); } }