diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java index ccc9928ddd5..90700326986 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this * work for additional information regarding copyright ownership. The ASF @@ -100,8 +100,6 @@ public class BaseFreonGenerator { private MetricRegistry metrics = new MetricRegistry(); - private ExecutorService executor; - private AtomicLong successCounter; private AtomicLong failureCounter; @@ -117,7 +115,7 @@ public class BaseFreonGenerator { */ public void runTests(TaskProvider provider) { - executor = Executors.newFixedThreadPool(threadNo); + ExecutorService executor = Executors.newFixedThreadPool(threadNo); ProgressBar progressBar = new ProgressBar(System.out, testNo, successCounter::get); @@ -345,7 +343,7 @@ public void ensureVolumeExists( /** * Calculate checksum of a byte array. */ - public byte[] getDigest(byte[] content) throws IOException { + public static byte[] getDigest(byte[] content) { DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM); dig.getMessageDigest().reset(); return dig.digest(content); @@ -354,7 +352,7 @@ public byte[] getDigest(byte[] content) throws IOException { /** * Calculate checksum of an Input stream. */ - public byte[] getDigest(InputStream stream) throws IOException { + public static byte[] getDigest(InputStream stream) throws IOException { DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM); dig.getMessageDigest().reset(); return dig.digest(stream); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ContentGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ContentGenerator.java index e31c7094cac..c6ec60e59a2 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ContentGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ContentGenerator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this * work for additional information regarding copyright ownership. The ASF @@ -30,7 +30,7 @@ public class ContentGenerator { /** * Size of the destination object (key or file). */ - private int keySize; + private long keySize; /** * Buffer for the pre-allocated content (will be reused if less than the @@ -40,7 +40,7 @@ public class ContentGenerator { private final byte[] buffer; - ContentGenerator(int keySize, int bufferSize) { + ContentGenerator(long keySize, int bufferSize) { this.keySize = keySize; this.bufferSize = bufferSize; diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java index 0b2c531b663..bc8ce66f522 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this * work for additional information regarding copyright ownership. The ASF @@ -18,6 +18,7 @@ import java.io.OutputStream; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Callable; import org.apache.hadoop.hdds.cli.HddsVersionProvider; @@ -59,7 +60,7 @@ public class OzoneClientKeyGenerator extends BaseFreonGenerator @Option(names = {"-s", "--size"}, description = "Size of the generated key (in bytes)", defaultValue = "10240") - private int keySize; + private long keySize; @Option(names = {"--buffer"}, description = "Size of buffer used to generated the key content.", @@ -76,6 +77,7 @@ public class OzoneClientKeyGenerator extends BaseFreonGenerator private OzoneBucket bucket; private ContentGenerator contentGenerator; + private Map metadata; @Override public Void call() throws Exception { @@ -87,6 +89,7 @@ public Void call() throws Exception { ensureVolumeAndBucketExist(ozoneConfiguration, volumeName, bucketName); contentGenerator = new ContentGenerator(keySize, bufferSize); + metadata = new HashMap<>(); try (OzoneClient rpcClient = OzoneClientFactory .getRpcClient(ozoneConfiguration)) { @@ -104,13 +107,11 @@ public Void call() throws Exception { } private void createKey(long counter) throws Exception { + final String key = generateObjectName(counter); timer.time(() -> { - try (OutputStream stream = bucket - .createKey(generateObjectName(counter), keySize, - ReplicationType.RATIS, - factor, - new HashMap<>())) { + try (OutputStream stream = bucket.createKey(key, keySize, + ReplicationType.RATIS, factor, metadata)) { contentGenerator.write(stream); stream.flush(); } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java index f247b333635..a72cbd6848d 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this * work for additional information regarding copyright ownership. The ASF @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.freon; +import java.io.IOException; import java.io.InputStream; import java.security.MessageDigest; import java.util.concurrent.Callable; @@ -27,6 +28,9 @@ import com.codahale.metrics.Timer; import org.apache.commons.io.IOUtils; +import org.apache.ratis.util.function.CheckedFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import picocli.CommandLine.Command; import picocli.CommandLine.Option; @@ -42,6 +46,9 @@ public class OzoneClientKeyValidator extends BaseFreonGenerator implements Callable { + private static final Logger LOG = + LoggerFactory.getLogger(OzoneClientKeyValidator.class); + @Option(names = {"-v", "--volume"}, description = "Name of the bucket which contains the test data. Will be" + " created if missing.", @@ -53,9 +60,16 @@ public class OzoneClientKeyValidator extends BaseFreonGenerator defaultValue = "bucket1") private String bucketName; + @Option(names = {"-s", "--stream"}, + description = "Whether to calculate key digest during read from stream," + + " or separately after it is completely read.", + defaultValue = "false") + private boolean stream; + private Timer timer; private byte[] referenceDigest; + private long referenceKeySize; private OzoneClient rpcClient; @@ -68,10 +82,7 @@ public Void call() throws Exception { rpcClient = OzoneClientFactory.getRpcClient(ozoneConfiguration); - try (InputStream stream = rpcClient.getObjectStore().getVolume(volumeName) - .getBucket(bucketName).readKey(generateObjectName(0))) { - referenceDigest = getDigest(stream); - } + readReference(); timer = getMetrics().timer("key-validate"); @@ -80,16 +91,74 @@ public Void call() throws Exception { return null; } + private void readReference() throws IOException { + String name = generateObjectName(0); + + if (!stream) { + // first obtain key size to be able to allocate exact buffer for keys + referenceKeySize = getKeySize(name); + + // force stream if key is too large for byte[] + // (limit taken from ByteArrayOutputStream) + if (referenceKeySize > Integer.MAX_VALUE - 8) { + LOG.warn("Forcing 'stream' option, as key size is too large: {} bytes", + referenceKeySize); + stream = true; + } + } + + if (stream) { + referenceDigest = calculateDigestStreaming(name); + } else { + byte[] data = readKeyToByteArray(name); + referenceDigest = getDigest(data); + } + } + + private long getKeySize(String keyName) throws IOException { + return rpcClient.getObjectStore().getVolume(volumeName) + .getBucket(bucketName).getKey(keyName).getDataSize(); + } + private void validateKey(long counter) throws Exception { String objectName = generateObjectName(counter); + byte[] digest = getDigest(objectName); + validateDigest(objectName, digest); + } - byte[] content = timer.time(() -> { - try (InputStream stream = rpcClient.getObjectStore().getVolume(volumeName) - .getBucket(bucketName).readKey(objectName)) { - return IOUtils.toByteArray(stream); - } - }); - if (!MessageDigest.isEqual(referenceDigest, getDigest(content))) { + private byte[] getDigest(String objectName) throws Exception { + byte[] digest; + if (stream) { + // Calculating the digest during stream read requires only constant + // memory, but timing results include digest calculation time, too. + digest = timer.time(() -> + calculateDigestStreaming(objectName)); + } else { + byte[] data = timer.time(() -> readKeyToByteArray(objectName)); + digest = getDigest(data); + } + return digest; + } + + private byte[] calculateDigestStreaming(String name) throws IOException { + return readKey(name, BaseFreonGenerator::getDigest); + } + + private byte[] readKeyToByteArray(String name) throws IOException { + return readKey(name, in -> IOUtils.toByteArray(in, referenceKeySize)); + } + + private T readKey(String keyName, + CheckedFunction reader) + throws IOException { + try (InputStream in = rpcClient.getObjectStore().getVolume(volumeName) + .getBucket(bucketName).readKey(keyName)) { + return reader.apply(in); + } + } + + private void validateDigest(String objectName, byte[] digest) { + if (!MessageDigest.isEqual(referenceDigest, digest)) { throw new IllegalStateException( "Reference (=first) message digest doesn't match with digest of " + objectName); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SameKeyReader.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SameKeyReader.java index bafd3ecf6f1..bbd83d64dec 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SameKeyReader.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SameKeyReader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this * work for additional information regarding copyright ownership. The ASF @@ -16,17 +16,10 @@ */ package org.apache.hadoop.ozone.freon; -import java.io.InputStream; -import java.security.MessageDigest; import java.util.concurrent.Callable; import org.apache.hadoop.hdds.cli.HddsVersionProvider; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.OzoneClientFactory; -import com.codahale.metrics.Timer; -import org.apache.commons.io.IOUtils; import picocli.CommandLine.Command; import picocli.CommandLine.Option; @@ -39,66 +32,16 @@ versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true, showDefaultValues = true) -public class SameKeyReader extends BaseFreonGenerator +public class SameKeyReader extends OzoneClientKeyValidator implements Callable { - @Option(names = {"-v", "--volume"}, - description = "Name of the bucket which contains the test data. Will be" - + " created if missing.", - defaultValue = "vol1") - private String volumeName; - - @Option(names = {"-b", "--bucket"}, - description = "Name of the bucket which contains the test data. Will be" - + " created if missing.", - defaultValue = "bucket1") - private String bucketName; - @Option(names = {"-k", "--key"}, required = true, description = "Name of the key read from multiple threads") private String keyName; - private Timer timer; - - private byte[] referenceDigest; - - private OzoneClient rpcClient; - @Override - public Void call() throws Exception { - - init(); - - OzoneConfiguration ozoneConfiguration = createOzoneConfiguration(); - - rpcClient = OzoneClientFactory.getRpcClient(ozoneConfiguration); - - try (InputStream stream = rpcClient.getObjectStore().getVolume(volumeName) - .getBucket(bucketName).readKey(keyName)) { - referenceDigest = getDigest(stream); - } - - timer = getMetrics().timer("key-create"); - - runTests(this::validateKey); - - return null; + public String generateObjectName(long counter) { + return keyName; } - - private void validateKey(long counter) throws Exception { - - byte[] content = timer.time(() -> { - try (InputStream stream = rpcClient.getObjectStore().getVolume(volumeName) - .getBucket(bucketName).readKey(keyName)) { - return IOUtils.toByteArray(stream); - } - }); - if (!MessageDigest.isEqual(referenceDigest, getDigest(content))) { - throw new IllegalStateException( - "Reference message digest doesn't match with the digest of the same" - + " key." + counter); - } - } - }