From 69466746f856ce04b8b18dffad4bc205b19bc713 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Mon, 27 Jun 2022 22:38:14 +0800 Subject: [PATCH 1/2] Add stream option in ozone sh key --- .../ozone/shell/keys/PutKeyHandler.java | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java index 1f7c1ef7f49a..1eee5cf431ff 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; @@ -42,13 +43,13 @@ import org.apache.hadoop.ozone.shell.OzoneAddress; import org.apache.commons.codec.digest.DigestUtils; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY; import org.apache.hadoop.ozone.shell.ShellReplicationOptions; import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; +import picocli.CommandLine.Option; import picocli.CommandLine.Parameters; /** @@ -61,6 +62,9 @@ public class PutKeyHandler extends KeyHandler { @Parameters(index = "1", arity = "1..1", description = "File to upload") private String fileName; + @Option(names = "--stream") + private boolean stream; + @Mixin private ShellReplicationOptions replication; @@ -96,26 +100,21 @@ protected void execute(OzoneClient client, OzoneAddress address) int chunkSize = (int) getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES); - Boolean useAsync = false; - if (dataFile.length() <= chunkSize || - (replicationConfig != null && - replicationConfig.getReplicationType() == EC) || - bucket.getReplicationConfig() instanceof ECReplicationConfig) { - useAsync = true; - } - if (useAsync) { + if (stream) { if (isVerbose()) { - out().println("API: async"); + out().println("API: streaming"); } - try (InputStream input = new FileInputStream(dataFile); - OutputStream output = bucket.createKey(keyName, dataFile.length(), - replicationConfig, keyMetadata)) { - IOUtils.copyBytes(input, output, chunkSize); + // In streaming mode, always resolve replication config at client side, + // because streaming is not compatible for writing EC keys. + if (replicationConfig == null) { + replicationConfig = bucket.getReplicationConfig(); } - } else { - if (isVerbose()) { - out().println("API: streaming"); + if (replicationConfig == null) { + replicationConfig = ReplicationConfig.parse(null, null, getConf()); } + Preconditions.checkArgument( + !(replicationConfig instanceof ECReplicationConfig), + "Can not put EC key by streaming"); try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r"); OzoneDataStreamOutput out = bucket.createStreamKey(keyName, dataFile.length(), replicationConfig, keyMetadata)) { @@ -130,6 +129,15 @@ protected void execute(OzoneClient client, OzoneAddress address) len -= writeLen; } } + } else { + if (isVerbose()) { + out().println("API: async"); + } + try (InputStream input = new FileInputStream(dataFile); + OutputStream output = bucket.createKey(keyName, dataFile.length(), + replicationConfig, keyMetadata)) { + IOUtils.copyBytes(input, output, chunkSize); + } } } From dcd4dfb5890961d0cafcc6b3cd2019605e04f96c Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 1 Jul 2022 10:09:25 +0800 Subject: [PATCH 2/2] Move code to separated methods --- .../hadoop/hdds/client/ReplicationConfig.java | 11 +++ .../ozone/shell/keys/PutKeyHandler.java | 86 +++++++++++-------- 2 files changed, 61 insertions(+), 36 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java index 6135883158a3..2b88f943de26 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java @@ -76,6 +76,17 @@ static ReplicationConfig getDefault(ConfigurationSource config) { return parse(null, replication, config); } + static ReplicationConfig resolve(ReplicationConfig replicationConfig, + ReplicationConfig bucketReplicationConfig, ConfigurationSource conf) { + if (replicationConfig == null) { + replicationConfig = bucketReplicationConfig; + } + if (replicationConfig == null) { + replicationConfig = getDefault(conf); + } + return replicationConfig; + } + /** * Helper method to serialize from proto. *

diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java index 1eee5cf431ff..16af2c5fd1aa 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java @@ -101,44 +101,58 @@ protected void execute(OzoneClient client, OzoneAddress address) OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES); if (stream) { - if (isVerbose()) { - out().println("API: streaming"); - } - // In streaming mode, always resolve replication config at client side, - // because streaming is not compatible for writing EC keys. - if (replicationConfig == null) { - replicationConfig = bucket.getReplicationConfig(); - } - if (replicationConfig == null) { - replicationConfig = ReplicationConfig.parse(null, null, getConf()); - } - Preconditions.checkArgument( - !(replicationConfig instanceof ECReplicationConfig), - "Can not put EC key by streaming"); - try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r"); - OzoneDataStreamOutput out = bucket.createStreamKey(keyName, - dataFile.length(), replicationConfig, keyMetadata)) { - FileChannel ch = raf.getChannel(); - long len = raf.length(); - long off = 0; - while (len > 0) { - long writeLen = Math.min(len, chunkSize); - ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen); - out.write(bb); - off += writeLen; - len -= writeLen; - } - } + stream(dataFile, bucket, keyName, keyMetadata, + replicationConfig, chunkSize); } else { - if (isVerbose()) { - out().println("API: async"); - } - try (InputStream input = new FileInputStream(dataFile); - OutputStream output = bucket.createKey(keyName, dataFile.length(), - replicationConfig, keyMetadata)) { - IOUtils.copyBytes(input, output, chunkSize); - } + async(dataFile, bucket, keyName, keyMetadata, + replicationConfig, chunkSize); + } + } + + void async( + File dataFile, OzoneBucket bucket, + String keyName, Map keyMetadata, + ReplicationConfig replicationConfig, int chunkSize) + throws IOException { + if (isVerbose()) { + out().println("API: async"); + } + try (InputStream input = new FileInputStream(dataFile); + OutputStream output = bucket.createKey(keyName, dataFile.length(), + replicationConfig, keyMetadata)) { + IOUtils.copyBytes(input, output, chunkSize); } } + void stream( + File dataFile, OzoneBucket bucket, + String keyName, Map keyMetadata, + ReplicationConfig replicationConfig, int chunkSize) + throws IOException { + if (isVerbose()) { + out().println("API: streaming"); + } + // In streaming mode, always resolve replication config at client side, + // because streaming is not compatible for writing EC keys. + replicationConfig = ReplicationConfig.resolve(replicationConfig, + bucket.getReplicationConfig(), getConf()); + Preconditions.checkArgument( + !(replicationConfig instanceof ECReplicationConfig), + "Can not put EC key by streaming"); + + try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r"); + OzoneDataStreamOutput out = bucket.createStreamKey(keyName, + dataFile.length(), replicationConfig, keyMetadata)) { + FileChannel ch = raf.getChannel(); + long len = raf.length(); + long off = 0; + while (len > 0) { + long writeLen = Math.min(len, chunkSize); + ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen); + out.write(bb); + off += writeLen; + len -= writeLen; + } + } + } }