Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
Expand Down Expand Up @@ -100,8 +100,6 @@ public class BaseFreonGenerator {

private MetricRegistry metrics = new MetricRegistry();

private ExecutorService executor;

private AtomicLong successCounter;

private AtomicLong failureCounter;
Expand All @@ -117,7 +115,7 @@ public class BaseFreonGenerator {
*/
public void runTests(TaskProvider provider) {

executor = Executors.newFixedThreadPool(threadNo);
ExecutorService executor = Executors.newFixedThreadPool(threadNo);

ProgressBar progressBar =
new ProgressBar(System.out, testNo, successCounter::get);
Expand Down Expand Up @@ -345,7 +343,7 @@ public void ensureVolumeExists(
/**
* Calculate checksum of a byte array.
*/
public byte[] getDigest(byte[] content) throws IOException {
public static byte[] getDigest(byte[] content) {
DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM);
dig.getMessageDigest().reset();
return dig.digest(content);
Expand All @@ -354,7 +352,7 @@ public byte[] getDigest(byte[] content) throws IOException {
/**
* Calculate checksum of an Input stream.
*/
public byte[] getDigest(InputStream stream) throws IOException {
public static byte[] getDigest(InputStream stream) throws IOException {
DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM);
dig.getMessageDigest().reset();
return dig.digest(stream);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
Expand Down Expand Up @@ -30,7 +30,7 @@ public class ContentGenerator {
/**
* Size of the destination object (key or file).
*/
private int keySize;
private long keySize;

/**
* Buffer for the pre-allocated content (will be reused if less than the
Expand All @@ -40,7 +40,7 @@ public class ContentGenerator {

private final byte[] buffer;

ContentGenerator(int keySize, int bufferSize) {
ContentGenerator(long keySize, int bufferSize) {
this.keySize = keySize;
this.bufferSize = bufferSize;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
Expand All @@ -18,6 +18,7 @@

import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

import org.apache.hadoop.hdds.cli.HddsVersionProvider;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class OzoneClientKeyGenerator extends BaseFreonGenerator
@Option(names = {"-s", "--size"},
description = "Size of the generated key (in bytes)",
defaultValue = "10240")
private int keySize;
private long keySize;

@Option(names = {"--buffer"},
description = "Size of buffer used to generated the key content.",
Expand All @@ -76,6 +77,7 @@ public class OzoneClientKeyGenerator extends BaseFreonGenerator

private OzoneBucket bucket;
private ContentGenerator contentGenerator;
private Map<String, String> metadata;

@Override
public Void call() throws Exception {
Expand All @@ -87,6 +89,7 @@ public Void call() throws Exception {
ensureVolumeAndBucketExist(ozoneConfiguration, volumeName, bucketName);

contentGenerator = new ContentGenerator(keySize, bufferSize);
metadata = new HashMap<>();

try (OzoneClient rpcClient = OzoneClientFactory
.getRpcClient(ozoneConfiguration)) {
Expand All @@ -104,13 +107,11 @@ public Void call() throws Exception {
}

private void createKey(long counter) throws Exception {
final String key = generateObjectName(counter);

timer.time(() -> {
try (OutputStream stream = bucket
.createKey(generateObjectName(counter), keySize,
ReplicationType.RATIS,
factor,
new HashMap<>())) {
try (OutputStream stream = bucket.createKey(key, keySize,
ReplicationType.RATIS, factor, metadata)) {
contentGenerator.write(stream);
stream.flush();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
Expand All @@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.freon;

import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
import java.util.concurrent.Callable;
Expand All @@ -27,6 +28,9 @@

import com.codahale.metrics.Timer;
import org.apache.commons.io.IOUtils;
import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

Expand All @@ -42,6 +46,9 @@
public class OzoneClientKeyValidator extends BaseFreonGenerator
implements Callable<Void> {

private static final Logger LOG =
LoggerFactory.getLogger(OzoneClientKeyValidator.class);

@Option(names = {"-v", "--volume"},
description = "Name of the bucket which contains the test data. Will be"
+ " created if missing.",
Expand All @@ -53,9 +60,16 @@ public class OzoneClientKeyValidator extends BaseFreonGenerator
defaultValue = "bucket1")
private String bucketName;

@Option(names = {"-s", "--stream"},
description = "Whether to calculate key digest during read from stream,"
+ " or separately after it is completely read.",
defaultValue = "false")
private boolean stream;

private Timer timer;

private byte[] referenceDigest;
private long referenceKeySize;

private OzoneClient rpcClient;

Expand All @@ -68,10 +82,7 @@ public Void call() throws Exception {

rpcClient = OzoneClientFactory.getRpcClient(ozoneConfiguration);

try (InputStream stream = rpcClient.getObjectStore().getVolume(volumeName)
.getBucket(bucketName).readKey(generateObjectName(0))) {
referenceDigest = getDigest(stream);
}
readReference();

timer = getMetrics().timer("key-validate");

Expand All @@ -80,16 +91,74 @@ public Void call() throws Exception {
return null;
}

private void readReference() throws IOException {
String name = generateObjectName(0);

if (!stream) {
// first obtain key size to be able to allocate exact buffer for keys
referenceKeySize = getKeySize(name);

// force stream if key is too large for byte[]
// (limit taken from ByteArrayOutputStream)
if (referenceKeySize > Integer.MAX_VALUE - 8) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can force this much sooner, say 1 GB ? or even 100MB. Otherwise, the virtual memory reservation would be huge.
And practically, does it matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any lower limit would be arbitrary. Actual memory requirement depends on the number of threads. One can always add the flag explicitly. I just wanted to preserve existing behavior as a courtesy.

Hope that makes sense :)

LOG.warn("Forcing 'stream' option, as key size is too large: {} bytes",
referenceKeySize);
stream = true;
}
}

if (stream) {
referenceDigest = calculateDigestStreaming(name);
} else {
byte[] data = readKeyToByteArray(name);
referenceDigest = getDigest(data);
}
}

private long getKeySize(String keyName) throws IOException {
return rpcClient.getObjectStore().getVolume(volumeName)
.getBucket(bucketName).getKey(keyName).getDataSize();
}

private void validateKey(long counter) throws Exception {
String objectName = generateObjectName(counter);
byte[] digest = getDigest(objectName);
validateDigest(objectName, digest);
}

byte[] content = timer.time(() -> {
try (InputStream stream = rpcClient.getObjectStore().getVolume(volumeName)
.getBucket(bucketName).readKey(objectName)) {
return IOUtils.toByteArray(stream);
}
});
if (!MessageDigest.isEqual(referenceDigest, getDigest(content))) {
private byte[] getDigest(String objectName) throws Exception {
byte[] digest;
if (stream) {
// Calculating the digest during stream read requires only constant
// memory, but timing results include digest calculation time, too.
digest = timer.time(() ->
calculateDigestStreaming(objectName));
} else {
byte[] data = timer.time(() -> readKeyToByteArray(objectName));
digest = getDigest(data);
}
return digest;
}

private byte[] calculateDigestStreaming(String name) throws IOException {
return readKey(name, BaseFreonGenerator::getDigest);
}

private byte[] readKeyToByteArray(String name) throws IOException {
return readKey(name, in -> IOUtils.toByteArray(in, referenceKeySize));
}

private <T> T readKey(String keyName,
CheckedFunction<InputStream, T, IOException> reader)
throws IOException {
try (InputStream in = rpcClient.getObjectStore().getVolume(volumeName)
.getBucket(bucketName).readKey(keyName)) {
return reader.apply(in);
}
}

private void validateDigest(String objectName, byte[] digest) {
if (!MessageDigest.isEqual(referenceDigest, digest)) {
throw new IllegalStateException(
"Reference (=first) message digest doesn't match with digest of "
+ objectName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
Expand All @@ -16,17 +16,10 @@
*/
package org.apache.hadoop.ozone.freon;

import java.io.InputStream;
import java.security.MessageDigest;
import java.util.concurrent.Callable;

import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;

import com.codahale.metrics.Timer;
import org.apache.commons.io.IOUtils;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

Expand All @@ -39,66 +32,16 @@
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true,
showDefaultValues = true)
public class SameKeyReader extends BaseFreonGenerator
public class SameKeyReader extends OzoneClientKeyValidator
implements Callable<Void> {

@Option(names = {"-v", "--volume"},
description = "Name of the bucket which contains the test data. Will be"
+ " created if missing.",
defaultValue = "vol1")
private String volumeName;

@Option(names = {"-b", "--bucket"},
description = "Name of the bucket which contains the test data. Will be"
+ " created if missing.",
defaultValue = "bucket1")
private String bucketName;

@Option(names = {"-k", "--key"},
required = true,
description = "Name of the key read from multiple threads")
private String keyName;

private Timer timer;

private byte[] referenceDigest;

private OzoneClient rpcClient;

@Override
public Void call() throws Exception {

init();

OzoneConfiguration ozoneConfiguration = createOzoneConfiguration();

rpcClient = OzoneClientFactory.getRpcClient(ozoneConfiguration);

try (InputStream stream = rpcClient.getObjectStore().getVolume(volumeName)
.getBucket(bucketName).readKey(keyName)) {
referenceDigest = getDigest(stream);
}

timer = getMetrics().timer("key-create");

runTests(this::validateKey);

return null;
public String generateObjectName(long counter) {
return keyName;
}

private void validateKey(long counter) throws Exception {

byte[] content = timer.time(() -> {
try (InputStream stream = rpcClient.getObjectStore().getVolume(volumeName)
.getBucket(bucketName).readKey(keyName)) {
return IOUtils.toByteArray(stream);
}
});
if (!MessageDigest.isEqual(referenceDigest, getDigest(content))) {
throw new IllegalStateException(
"Reference message digest doesn't match with the digest of the same"
+ " key." + counter);
}
}

}