diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java index 6135883158a3..2b88f943de26 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java @@ -76,6 +76,17 @@ static ReplicationConfig getDefault(ConfigurationSource config) { return parse(null, replication, config); } + static ReplicationConfig resolve(ReplicationConfig replicationConfig, + ReplicationConfig bucketReplicationConfig, ConfigurationSource conf) { + if (replicationConfig == null) { + replicationConfig = bucketReplicationConfig; + } + if (replicationConfig == null) { + replicationConfig = getDefault(conf); + } + return replicationConfig; + } + /** * Helper method to serialize from proto. *

diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java index 1f7c1ef7f49a..16af2c5fd1aa 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; @@ -42,13 +43,13 @@ import org.apache.hadoop.ozone.shell.OzoneAddress; import org.apache.commons.codec.digest.DigestUtils; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY; import org.apache.hadoop.ozone.shell.ShellReplicationOptions; import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; +import picocli.CommandLine.Option; import picocli.CommandLine.Parameters; /** @@ -61,6 +62,9 @@ public class PutKeyHandler extends KeyHandler { @Parameters(index = "1", arity = "1..1", description = "File to upload") private String fileName; + @Option(names = "--stream") + private boolean stream; + @Mixin private ShellReplicationOptions replication; @@ -96,41 +100,59 @@ protected void execute(OzoneClient client, OzoneAddress address) int chunkSize = (int) getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES); - Boolean useAsync = false; - if (dataFile.length() <= chunkSize || - (replicationConfig != null && - replicationConfig.getReplicationType() == EC) || - bucket.getReplicationConfig() instanceof ECReplicationConfig) { - useAsync = true; - } - if (useAsync) { - if (isVerbose()) { - out().println("API: async"); - } - try (InputStream input = new FileInputStream(dataFile); - OutputStream output = bucket.createKey(keyName, dataFile.length(), - replicationConfig, keyMetadata)) { - IOUtils.copyBytes(input, output, chunkSize); - } + if (stream) { + stream(dataFile, bucket, keyName, keyMetadata, + replicationConfig, chunkSize); } else { - if (isVerbose()) { - out().println("API: streaming"); - } - try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r"); - OzoneDataStreamOutput out = bucket.createStreamKey(keyName, - dataFile.length(), replicationConfig, keyMetadata)) { - FileChannel ch = raf.getChannel(); - long len = raf.length(); - long off = 0; - while (len > 0) { - long writeLen = Math.min(len, chunkSize); - ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen); - out.write(bb); - off += writeLen; - len -= writeLen; - } - } + async(dataFile, bucket, keyName, keyMetadata, + replicationConfig, chunkSize); + } + } + + void async( + File dataFile, OzoneBucket bucket, + String keyName, Map keyMetadata, + ReplicationConfig replicationConfig, int chunkSize) + throws IOException { + if (isVerbose()) { + out().println("API: async"); + } + try (InputStream input = new FileInputStream(dataFile); + OutputStream output = bucket.createKey(keyName, dataFile.length(), + replicationConfig, keyMetadata)) { + IOUtils.copyBytes(input, output, chunkSize); } } + void stream( + File dataFile, OzoneBucket bucket, + String keyName, Map keyMetadata, + ReplicationConfig replicationConfig, int chunkSize) + throws IOException { + if (isVerbose()) { + out().println("API: streaming"); + } + // In streaming mode, always resolve replication config at client side, + // because streaming is not compatible for writing EC keys. + replicationConfig = ReplicationConfig.resolve(replicationConfig, + bucket.getReplicationConfig(), getConf()); + Preconditions.checkArgument( + !(replicationConfig instanceof ECReplicationConfig), + "Can not put EC key by streaming"); + + try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r"); + OzoneDataStreamOutput out = bucket.createStreamKey(keyName, + dataFile.length(), replicationConfig, keyMetadata)) { + FileChannel ch = raf.getChannel(); + long len = raf.length(); + long off = 0; + while (len > 0) { + long writeLen = Math.min(len, chunkSize); + ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen); + out.write(bb); + off += writeLen; + len -= writeLen; + } + } + } }