diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/RatisReplicationConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/RatisReplicationConfig.java index 8f299059cd56..e9cbe5b0a034 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/RatisReplicationConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/RatisReplicationConfig.java @@ -35,6 +35,21 @@ public RatisReplicationConfig(ReplicationFactor replicationFactor) { this.replicationFactor = replicationFactor; } + public RatisReplicationConfig(String factorString) { + ReplicationFactor factor = null; + try { + factor = ReplicationFactor.valueOf(Integer.parseInt(factorString)); + } catch (NumberFormatException ex) { + try { + factor = ReplicationFactor.valueOf(factorString); + } catch (IllegalArgumentException x) { + throw new IllegalArgumentException("Invalid RatisReplicationFactor '" + + factorString + "'. Please use ONE or THREE!"); + } + } + this.replicationFactor = factor; + } + public static boolean hasFactor(ReplicationConfig replicationConfig, ReplicationFactor factor) { if (replicationConfig instanceof RatisReplicationConfig) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java index 7818661e25ae..bc7f759af7f7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.client; +import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; /** @@ -47,6 +48,24 @@ static ReplicationConfig fromTypeAndFactor( } } + /** + * Helper method to create proper replication method from old-style + * factor+type definition. + *

+ * Note: it's never used for EC replication where config is created. + */ + static ReplicationConfig fromTypeAndFactor( + org.apache.hadoop.hdds.client.ReplicationType type, + org.apache.hadoop.hdds.client.ReplicationFactor factor + ) { + return fromTypeAndFactor(HddsProtos.ReplicationType.valueOf(type.name()), + HddsProtos.ReplicationFactor.valueOf(factor.name())); + } + + static ReplicationConfig getDefault(ConfigurationSource config) { + return new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE); + } + /** * Helper method to serialize from proto. *

@@ -85,6 +104,42 @@ static HddsProtos.ReplicationFactor getLegacyFactor( .getReplicationType()); } + /** + * Create new replication config with adjusted replication factor. + *

+ * Used by hadoop file system. Some replication scheme (like EC) may not + * support changing the replication. + */ + static ReplicationConfig adjustReplication( + ReplicationConfig replicationConfig, short replication) { + switch (replicationConfig.getReplicationType()) { + case RATIS: + return new RatisReplicationConfig( + org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor + .valueOf(replication)); + case STAND_ALONE: + return new StandaloneReplicationConfig( + org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor + .valueOf(replication)); + default: + return replicationConfig; + } + } + + static ReplicationConfig fromTypeAndString(ReplicationType replicationType, + String replication) { + switch (replicationType) { + case RATIS: + return new RatisReplicationConfig(replication); + case STAND_ALONE: + return new StandaloneReplicationConfig(replication); + default: + throw new UnsupportedOperationException( + "String based replication config initialization is not supported for " + + replicationType); + } + } + /** * Replication type supported by the replication config. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StandaloneReplicationConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StandaloneReplicationConfig.java index 49f8ebd8ebb0..115291145225 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StandaloneReplicationConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StandaloneReplicationConfig.java @@ -34,6 +34,15 @@ public StandaloneReplicationConfig(ReplicationFactor replicationFactor) { this.replicationFactor = replicationFactor; } + public StandaloneReplicationConfig(String factorString) { + ReplicationFactor factor = null; + try { + factor = ReplicationFactor.valueOf(Integer.parseInt(factorString)); + } catch (NumberFormatException ex) { + factor = ReplicationFactor.valueOf(factorString); + } + this.replicationFactor = factor; + } public ReplicationFactor getReplicationFactor() { return replicationFactor; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 7e587174f9f5..370292f28e4d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -150,8 +150,8 @@ public final class OzoneConfigKeys { public static final int OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT = 5000; public static final String OZONE_REPLICATION = "ozone.replication"; - public static final int OZONE_REPLICATION_DEFAULT = - ReplicationFactor.THREE.getValue(); + public static final String OZONE_REPLICATION_DEFAULT = + ReplicationFactor.THREE.toString(); public static final String OZONE_REPLICATION_TYPE = "ozone.replication.type"; public static final String OZONE_REPLICATION_TYPE_DEFAULT = diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java new file mode 100644 index 000000000000..b30e895ba3ba --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.client; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test replicationConfig. + */ +public class TestReplicationConfig { + + @Test + public void deserializeRatis() { + final ReplicationConfig replicationConfig = ReplicationConfig + .fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.THREE); + + Assert + .assertEquals(RatisReplicationConfig.class, + replicationConfig.getClass()); + + RatisReplicationConfig ratisReplicationConfig = + (RatisReplicationConfig) replicationConfig; + Assert.assertEquals(ReplicationType.RATIS, + ratisReplicationConfig.getReplicationType()); + Assert.assertEquals(ReplicationFactor.THREE, + ratisReplicationConfig.getReplicationFactor()); + } + + @Test + public void deserializeStandalone() { + final ReplicationConfig replicationConfig = ReplicationConfig + .fromTypeAndFactor(ReplicationType.STAND_ALONE, ReplicationFactor.ONE); + + Assert + .assertEquals(StandaloneReplicationConfig.class, + replicationConfig.getClass()); + + StandaloneReplicationConfig standalone = + (StandaloneReplicationConfig) replicationConfig; + Assert.assertEquals(ReplicationType.STAND_ALONE, + standalone.getReplicationType()); + Assert.assertEquals(ReplicationFactor.ONE, + standalone.getReplicationFactor()); + } + + @Test + public void fromJavaObjects() { + + final ReplicationConfig replicationConfig = ReplicationConfig + .fromTypeAndFactor(org.apache.hadoop.hdds.client.ReplicationType.RATIS, + org.apache.hadoop.hdds.client.ReplicationFactor.THREE); + + Assert.assertEquals(replicationConfig.getReplicationType(), + ReplicationType.RATIS); + Assert.assertEquals( + ((RatisReplicationConfig) replicationConfig).getReplicationFactor(), + ReplicationFactor.THREE); + + } + + @Test + public void fromTypeAndStringName() { + + ReplicationConfig replicationConfig = null; + + //RATIS-THREE + replicationConfig = ReplicationConfig.fromTypeAndString( + org.apache.hadoop.hdds.client.ReplicationType.RATIS, "THREE"); + + Assert.assertEquals(replicationConfig.getReplicationType(), + ReplicationType.RATIS); + Assert.assertEquals( + ((RatisReplicationConfig) replicationConfig).getReplicationFactor(), + ReplicationFactor.THREE); + + //RATIS-ONE + replicationConfig = ReplicationConfig.fromTypeAndString( + org.apache.hadoop.hdds.client.ReplicationType.RATIS, "ONE"); + + Assert.assertEquals(replicationConfig.getReplicationType(), + ReplicationType.RATIS); + Assert.assertEquals( + ((RatisReplicationConfig) replicationConfig).getReplicationFactor(), + ReplicationFactor.ONE); + + //STANDALONE-ONE + replicationConfig = ReplicationConfig.fromTypeAndString( + org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE, "ONE"); + + Assert.assertEquals(replicationConfig.getReplicationType(), + ReplicationType.STAND_ALONE); + Assert.assertEquals( + ((StandaloneReplicationConfig) replicationConfig) + .getReplicationFactor(), + ReplicationFactor.ONE); + + } + + + @Test + public void fromTypeAndStringInteger() { + //RATIS-THREE + ReplicationConfig replicationConfig = ReplicationConfig.fromTypeAndString( + org.apache.hadoop.hdds.client.ReplicationType.RATIS, "3"); + + Assert.assertEquals(replicationConfig.getReplicationType(), + ReplicationType.RATIS); + Assert.assertEquals( + ((RatisReplicationConfig) replicationConfig).getReplicationFactor(), + ReplicationFactor.THREE); + } + + @Test + public void adjustReplication() { + ReplicationConfig config = + new RatisReplicationConfig(ReplicationFactor.ONE); + + final ReplicationConfig replicationConfig = + ReplicationConfig.adjustReplication(config, (short) 1); + + Assert.assertEquals(replicationConfig.getReplicationType(), + ReplicationType.RATIS); + Assert.assertEquals( + ((RatisReplicationConfig) replicationConfig) + .getReplicationFactor(), + ReplicationFactor.ONE); + + } +} \ No newline at end of file diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/scm/protocol/TestReplicationConfig.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/scm/protocol/TestReplicationConfig.java deleted file mode 100644 index 4ab60bc9cc8e..000000000000 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/scm/protocol/TestReplicationConfig.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.protocol; - -import org.apache.hadoop.hdds.client.RatisReplicationConfig; -import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test replicationConfig. - */ -public class TestReplicationConfig { - - @Test - public void deserializeRatis() { - final ReplicationConfig replicationConfig = ReplicationConfig - .fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.THREE); - - Assert - .assertEquals(RatisReplicationConfig.class, - replicationConfig.getClass()); - - RatisReplicationConfig ratisReplicationConfig = - (RatisReplicationConfig) replicationConfig; - Assert.assertEquals(ReplicationType.RATIS, - ratisReplicationConfig.getReplicationType()); - Assert.assertEquals(ReplicationFactor.THREE, - ratisReplicationConfig.getReplicationFactor()); - } - - @Test - public void deserializeStandalone() { - final ReplicationConfig replicationConfig = ReplicationConfig - .fromTypeAndFactor(ReplicationType.STAND_ALONE, ReplicationFactor.ONE); - - Assert - .assertEquals(StandaloneReplicationConfig.class, - replicationConfig.getClass()); - - StandaloneReplicationConfig standalone = - (StandaloneReplicationConfig) replicationConfig; - Assert.assertEquals(ReplicationType.STAND_ALONE, - standalone.getReplicationType()); - Assert.assertEquals(ReplicationFactor.ONE, - standalone.getReplicationFactor()); - } -} \ No newline at end of file diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java index edcaec047476..d715c82dd4b2 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java @@ -22,12 +22,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.client.OzoneQuota; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; @@ -72,12 +72,7 @@ public class OzoneBucket extends WithMetadata { /** * Default replication factor to be used while creating keys. */ - private final ReplicationFactor defaultReplication; - - /** - * Default replication type to be used while creating keys. - */ - private final ReplicationType defaultReplicationType; + private final ReplicationConfig defaultReplication; /** * Type of storage to be used for this bucket. @@ -135,26 +130,13 @@ public class OzoneBucket extends WithMetadata { private long quotaInNamespace; private OzoneBucket(ConfigurationSource conf, String volumeName, - String bucketName, ReplicationFactor defaultReplication, - ReplicationType defaultReplicationType, ClientProtocol proxy) { + String bucketName, ClientProtocol proxy) { Preconditions.checkNotNull(proxy, "Client proxy is not set."); this.volumeName = volumeName; this.name = bucketName; - if (defaultReplication == null) { - this.defaultReplication = ReplicationFactor.valueOf(conf.getInt( - OzoneConfigKeys.OZONE_REPLICATION, - OzoneConfigKeys.OZONE_REPLICATION_DEFAULT)); - } else { - this.defaultReplication = defaultReplication; - } - if (defaultReplicationType == null) { - this.defaultReplicationType = ReplicationType.valueOf(conf.get( - OzoneConfigKeys.OZONE_REPLICATION_TYPE, - OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT)); - } else { - this.defaultReplicationType = defaultReplicationType; - } + this.defaultReplication = ReplicationConfig.getDefault(conf); + this.proxy = proxy; this.ozoneObj = OzoneObjInfo.Builder.newBuilder() .setBucketName(bucketName) @@ -169,7 +151,7 @@ public OzoneBucket(ConfigurationSource conf, ClientProtocol proxy, Boolean versioning, long creationTime, Map metadata, String encryptionKeyName, String sourceVolume, String sourceBucket) { - this(conf, volumeName, bucketName, null, null, proxy); + this(conf, volumeName, bucketName, proxy); this.storageType = storageType; this.versioning = versioning; this.listCacheSize = HddsClientUtils.getListCacheSize(conf); @@ -226,7 +208,7 @@ public OzoneBucket(ConfigurationSource conf, ClientProtocol proxy, public OzoneBucket(ConfigurationSource conf, ClientProtocol proxy, String volumeName, String bucketName, StorageType storageType, Boolean versioning, long creationTime, Map metadata) { - this(conf, volumeName, bucketName, null, null, proxy); + this(conf, volumeName, bucketName, proxy); this.storageType = storageType; this.versioning = versioning; this.listCacheSize = HddsClientUtils.getListCacheSize(conf); @@ -255,14 +237,13 @@ public OzoneBucket(ConfigurationSource conf, ClientProtocol proxy, @VisibleForTesting @SuppressWarnings("parameternumber") OzoneBucket(String volumeName, String name, - ReplicationFactor defaultReplication, - ReplicationType defaultReplicationType, StorageType storageType, + ReplicationConfig defaultReplication, + StorageType storageType, Boolean versioning, long creationTime) { this.proxy = null; this.volumeName = volumeName; this.name = name; this.defaultReplication = defaultReplication; - this.defaultReplicationType = defaultReplicationType; this.storageType = storageType; this.versioning = versioning; this.creationTime = Instant.ofEpochMilli(creationTime); @@ -480,7 +461,7 @@ public void setQuota(OzoneQuota quota) throws IOException { */ public OzoneOutputStream createKey(String key, long size) throws IOException { - return createKey(key, size, defaultReplicationType, defaultReplication, + return createKey(key, size, defaultReplication, new HashMap<>()); } @@ -493,17 +474,36 @@ public OzoneOutputStream createKey(String key, long size) * @return OzoneOutputStream to which the data has to be written. * @throws IOException */ + @Deprecated public OzoneOutputStream createKey(String key, long size, - ReplicationType type, - ReplicationFactor factor, - Map keyMetadata) + ReplicationType type, + ReplicationFactor factor, + Map keyMetadata) throws IOException { return proxy .createKey(volumeName, name, key, size, type, factor, keyMetadata); } + /** + * Creates a new key in the bucket. + * + * @param key Name of the key to be created. + * @param size Size of the data the key will point to. + * @param replicationConfig Replication configuration. + * @return OzoneOutputStream to which the data has to be written. + * @throws IOException + */ + public OzoneOutputStream createKey(String key, long size, + ReplicationConfig replicationConfig, + Map keyMetadata) + throws IOException { + return proxy + .createKey(volumeName, name, key, size, replicationConfig, keyMetadata); + } + /** * Reads an existing key from the bucket. + * * @param key Name of the key to be read. * @return OzoneInputStream the stream using which the data can be read. * @throws IOException @@ -605,25 +605,35 @@ public void renameKeys(Map keyMap) * @return OmMultipartInfo * @throws IOException */ + @Deprecated public OmMultipartInfo initiateMultipartUpload(String keyName, - ReplicationType type, - ReplicationFactor factor) + ReplicationType type, + ReplicationFactor factor) throws IOException { - return proxy.initiateMultipartUpload(volumeName, name, keyName, type, + return proxy.initiateMultipartUpload(volumeName, name, keyName, type, factor); } + /** + * Initiate multipart upload for a specified key. + */ + public OmMultipartInfo initiateMultipartUpload(String keyName, + ReplicationConfig config) + throws IOException { + return proxy.initiateMultipartUpload(volumeName, name, keyName, config); + } + /** * Initiate multipart upload for a specified key, with default replication * type RATIS and with replication factor THREE. + * * @param key Name of the key to be created. * @return OmMultipartInfo. * @throws IOException */ public OmMultipartInfo initiateMultipartUpload(String key) throws IOException { - return initiateMultipartUpload(key, defaultReplicationType, - defaultReplication); + return initiateMultipartUpload(key, defaultReplication); } /** @@ -749,6 +759,28 @@ public OzoneOutputStream createFile(String keyName, long size, recursive); } + /** + * OzoneFS api to creates an output stream for a file. + * + * @param keyName Key name + * @param overWrite if true existing file at the location will be overwritten + * @param recursive if true file would be created even if parent directories + * do not exist + * @throws OMException if given key is a directory + * if file exists and isOverwrite flag is false + * if an ancestor exists as a file + * if bucket does not exist + * @throws IOException if there is error in the db + * invalid arguments + */ + public OzoneOutputStream createFile(String keyName, long size, + ReplicationConfig replicationConfig, boolean overWrite, + boolean recursive) throws IOException { + return proxy + .createFile(volumeName, name, keyName, size, replicationConfig, + overWrite, recursive); + } + /** * List the status for a file or a directory and its contents. * diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index 4e0d7e185a8b..40aa68430d29 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.protocol.StorageType; @@ -268,12 +269,29 @@ List listBuckets(String volumeName, String bucketPrefix, * @return {@link OzoneOutputStream} * */ + @Deprecated OzoneOutputStream createKey(String volumeName, String bucketName, String keyName, long size, ReplicationType type, ReplicationFactor factor, Map metadata) throws IOException; + /** + * Writes a key in an existing bucket. + * @param volumeName Name of the Volume + * @param bucketName Name of the Bucket + * @param keyName Name of the Key + * @param size Size of the data + * @param metadata custom key value metadata + * @return {@link OzoneOutputStream} + * + */ + OzoneOutputStream createKey(String volumeName, String bucketName, + String keyName, long size, ReplicationConfig replicationConfig, + Map metadata) + throws IOException; + + /** * Reads a key from an existing bucket. * @param volumeName Name of the Volume @@ -404,10 +422,24 @@ OzoneKeyDetails getKeyDetails(String volumeName, String bucketName, * @return {@link OmMultipartInfo} * @throws IOException */ + @Deprecated OmMultipartInfo initiateMultipartUpload(String volumeName, String bucketName, String keyName, ReplicationType type, ReplicationFactor factor) throws IOException; + /** + * Initiate Multipart upload. + * @param volumeName + * @param bucketName + * @param keyName + * @param replicationConfig + * @return {@link OmMultipartInfo} + * @throws IOException + */ + OmMultipartInfo initiateMultipartUpload(String volumeName, String + bucketName, String keyName, ReplicationConfig replicationConfig) + throws IOException; + /** * Create a part key for a multipart upload key. * @param volumeName @@ -604,6 +636,32 @@ OzoneOutputStream createFile(String volumeName, String bucketName, String keyName, long size, ReplicationType type, ReplicationFactor factor, boolean overWrite, boolean recursive) throws IOException; + + /** + * Creates an output stream for writing to a file. + * + * @param volumeName Volume name + * @param bucketName Bucket name + * @param keyName Absolute path of the file to be written + * @param size Size of data to be written + * @param replicationConfig Replication config + * @param overWrite if true existing file at the location will be overwritten + * @param recursive if true file would be created even if parent directories + * do not exist + * @return Output stream for writing to the file + * @throws OMException if given key is a directory + * if file exists and isOverwrite flag is false + * if an ancestor exists as a file + * if bucket does not exist + * @throws IOException if there is error in the db + * invalid arguments + */ + @SuppressWarnings("checkstyle:parameternumber") + OzoneOutputStream createFile(String volumeName, String bucketName, + String keyName, long size, ReplicationConfig replicationConfig, + boolean overWrite, boolean recursive) throws IOException; + + /** * List the status for a file or a directory and its contents. * diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 9c16a8517e6c..0d6f3c81bf9f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -42,6 +42,7 @@ import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -741,10 +742,20 @@ public List listBuckets(String volumeName, String bucketPrefix, .collect(Collectors.toList()); } + @Override + @Deprecated + public OzoneOutputStream createKey(String volumeName, String bucketName, + String keyName, long size, ReplicationType type, ReplicationFactor factor, + Map metadata) throws IOException { + + return createKey(volumeName, bucketName, keyName, size, + ReplicationConfig.fromTypeAndFactor(type, factor), metadata); + } + @Override public OzoneOutputStream createKey( String volumeName, String bucketName, String keyName, long size, - ReplicationType type, ReplicationFactor factor, + ReplicationConfig replicationConfig, Map metadata) throws IOException { verifyVolumeName(volumeName); @@ -752,7 +763,7 @@ public OzoneOutputStream createKey( if (checkKeyNameEnabled) { HddsClientUtils.verifyKeyName(keyName); } - HddsClientUtils.checkNotNull(keyName, type, factor); + HddsClientUtils.checkNotNull(keyName, replicationConfig); String requestId = UUID.randomUUID().toString(); OmKeyArgs.Builder builder = new OmKeyArgs.Builder() @@ -760,8 +771,9 @@ public OzoneOutputStream createKey( .setBucketName(bucketName) .setKeyName(keyName) .setDataSize(size) - .setType(HddsProtos.ReplicationType.valueOf(type.toString())) - .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue())) + .setType(HddsProtos.ReplicationType + .valueOf(replicationConfig.getReplicationType().toString())) + .setFactor(ReplicationConfig.getLegacyFactor(replicationConfig)) .addAllMetadata(metadata) .setAcls(getAclList()); @@ -781,7 +793,7 @@ public OzoneOutputStream createKey( } OpenKeySession openKey = ozoneManagerClient.openKey(builder.build()); - return createOutputStream(openKey, requestId, type, factor); + return createOutputStream(openKey, requestId, replicationConfig); } private KeyProvider.KeyVersion getDEK(FileEncryptionInfo feInfo) @@ -948,20 +960,27 @@ public void close() throws IOException { @Override public OmMultipartInfo initiateMultipartUpload(String volumeName, - String bucketName, - String keyName, - ReplicationType type, - ReplicationFactor factor) + String bucketName, String keyName, ReplicationType type, + ReplicationFactor factor) throws IOException { + return initiateMultipartUpload(volumeName, bucketName, keyName, + ReplicationConfig.fromTypeAndFactor(type, factor)); + } + + @Override + public OmMultipartInfo initiateMultipartUpload(String volumeName, + String bucketName, + String keyName, + ReplicationConfig replicationConfig) throws IOException { verifyVolumeName(volumeName); verifyBucketName(bucketName); - HddsClientUtils.checkNotNull(keyName, type, factor); + HddsClientUtils.checkNotNull(keyName, replicationConfig); OmKeyArgs keyArgs = new OmKeyArgs.Builder() .setVolumeName(volumeName) .setBucketName(bucketName) .setKeyName(keyName) - .setType(HddsProtos.ReplicationType.valueOf(type.toString())) - .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue())) + .setType(replicationConfig.getReplicationType()) + .setFactor(ReplicationConfig.getLegacyFactor(replicationConfig)) .setAcls(getAclList()) .build(); OmMultipartInfo multipartInfo = ozoneManagerClient @@ -1164,9 +1183,19 @@ public OzoneInputStream readFile(String volumeName, String bucketName, return getInputStreamWithRetryFunction(keyInfo); } + @Override + public OzoneOutputStream createFile(String volumeName, String bucketName, + String keyName, long size, ReplicationType type, ReplicationFactor factor, + boolean overWrite, boolean recursive) throws IOException { + return createFile(volumeName, bucketName, keyName, size, + ReplicationConfig.fromTypeAndFactor(type, factor), overWrite, + recursive); + } + /** * Create InputStream with Retry function to refresh pipeline information * if reads fail. + * * @param keyInfo * @return * @throws IOException @@ -1192,21 +1221,21 @@ private OzoneInputStream getInputStreamWithRetryFunction( @Override public OzoneOutputStream createFile(String volumeName, String bucketName, - String keyName, long size, ReplicationType type, ReplicationFactor factor, + String keyName, long size, ReplicationConfig replicationConfig, boolean overWrite, boolean recursive) throws IOException { OmKeyArgs keyArgs = new OmKeyArgs.Builder() .setVolumeName(volumeName) .setBucketName(bucketName) .setKeyName(keyName) .setDataSize(size) - .setType(HddsProtos.ReplicationType.valueOf(type.name())) - .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue())) + .setType(replicationConfig.getReplicationType()) + .setFactor(ReplicationConfig.getLegacyFactor(replicationConfig)) .setAcls(getAclList()) .build(); OpenKeySession keySession = ozoneManagerClient.createFile(keyArgs, overWrite, recursive); - return createOutputStream(keySession, UUID.randomUUID().toString(), type, - factor); + return createOutputStream(keySession, UUID.randomUUID().toString(), + replicationConfig); } @Override @@ -1332,7 +1361,7 @@ private OzoneInputStream createInputStream( } private OzoneOutputStream createOutputStream(OpenKeySession openKey, - String requestId, ReplicationType type, ReplicationFactor factor) + String requestId, ReplicationConfig replicationConfig) throws IOException { KeyOutputStream keyOutputStream = new KeyOutputStream.Builder() @@ -1340,8 +1369,8 @@ private OzoneOutputStream createOutputStream(OpenKeySession openKey, .setXceiverClientManager(xceiverClientManager) .setOmClient(ozoneManagerClient) .setRequestID(requestId) - .setType(HddsProtos.ReplicationType.valueOf(type.toString())) - .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue())) + .setType(replicationConfig.getReplicationType()) + .setFactor(ReplicationConfig.getLegacyFactor(replicationConfig)) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(clientConfig) .build(); diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java index 6784dea2b476..1327a367b565 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java @@ -31,8 +31,8 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationFactor; -import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -78,8 +78,7 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter { private ObjectStore objectStore; private OzoneVolume volume; private OzoneBucket bucket; - private ReplicationType replicationType; - private ReplicationFactor replicationFactor; + private ReplicationConfig replicationConfig; private boolean securityEnabled; private int configuredDnPort; @@ -135,12 +134,7 @@ public BasicOzoneClientAdapterImpl(String omHost, int omPort, this.securityEnabled = true; } - String replicationTypeConf = - conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE, - OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT); - - int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION, - OzoneConfigKeys.OZONE_REPLICATION_DEFAULT); + replicationConfig = ReplicationConfig.getDefault(conf); if (OmUtils.isOmHAServiceId(conf, omHost)) { // omHost is listed as one of the service ids in the config, @@ -157,8 +151,6 @@ public BasicOzoneClientAdapterImpl(String omHost, int omPort, objectStore = ozoneClient.getObjectStore(); this.volume = objectStore.getVolume(volumeStr); this.bucket = volume.getBucket(bucketStr); - this.replicationType = ReplicationType.valueOf(replicationTypeConf); - this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf); this.configuredDnPort = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); @@ -166,7 +158,7 @@ public BasicOzoneClientAdapterImpl(String omHost, int omPort, @Override public short getDefaultReplication() { - return (short) replicationFactor.getValue(); + return (short) replicationConfig.getRequiredNodes(); } @Override @@ -202,13 +194,17 @@ public OzoneFSOutputStream createFile(String key, short replication, OzoneOutputStream ozoneOutputStream = null; if (replication == ReplicationFactor.ONE.getValue() || replication == ReplicationFactor.THREE.getValue()) { - ReplicationFactor clientReplication = ReplicationFactor - .valueOf(replication); - ozoneOutputStream = bucket.createFile(key, 0, replicationType, - clientReplication, overWrite, recursive); + + ReplicationConfig customReplicationConfig = + ReplicationConfig.adjustReplication( + replicationConfig, replication + ); + ozoneOutputStream = + bucket.createFile(key, 0, customReplicationConfig, overWrite, + recursive); } else { - ozoneOutputStream = bucket.createFile(key, 0, replicationType, - replicationFactor, overWrite, recursive); + ozoneOutputStream = + bucket.createFile(key, 0, replicationConfig, overWrite, recursive); } return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream()); } catch (OMException ex) { diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java index eff52c07d9b1..3bf3b76f5691 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java @@ -40,8 +40,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationFactor; -import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -98,8 +98,7 @@ public class BasicRootedOzoneClientAdapterImpl private OzoneClient ozoneClient; private ObjectStore objectStore; private ClientProtocol proxy; - private ReplicationType replicationType; - private ReplicationFactor replicationFactor; + private ReplicationConfig replicationConfig; private boolean securityEnabled; private int configuredDnPort; @@ -167,12 +166,7 @@ public BasicRootedOzoneClientAdapterImpl(String omHost, int omPort, this.securityEnabled = true; } - String replicationTypeConf = - conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE, - OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT); - - int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION, - OzoneConfigKeys.OZONE_REPLICATION_DEFAULT); + replicationConfig = ReplicationConfig.getDefault(conf); if (OmUtils.isOmHAServiceId(conf, omHost)) { // omHost is listed as one of the service ids in the config, @@ -188,8 +182,7 @@ public BasicRootedOzoneClientAdapterImpl(String omHost, int omPort, } objectStore = ozoneClient.getObjectStore(); proxy = objectStore.getClientProxy(); - this.replicationType = ReplicationType.valueOf(replicationTypeConf); - this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf); + this.configuredDnPort = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); @@ -276,7 +269,7 @@ private OzoneBucket getBucket(String volumeStr, String bucketStr, @Override public short getDefaultReplication() { - return (short) replicationFactor.getValue(); + return (short) replicationConfig.getRequiredNodes(); } @Override @@ -322,13 +315,13 @@ public OzoneFSOutputStream createFile(String pathStr, short replication, OzoneOutputStream ozoneOutputStream = null; if (replication == ReplicationFactor.ONE.getValue() || replication == ReplicationFactor.THREE.getValue()) { - ReplicationFactor clientReplication = ReplicationFactor - .valueOf(replication); - ozoneOutputStream = bucket.createFile(key, 0, replicationType, - clientReplication, overWrite, recursive); + + ozoneOutputStream = bucket.createFile(key, 0, + ReplicationConfig.adjustReplication(replicationConfig, replication), + overWrite, recursive); } else { - ozoneOutputStream = bucket.createFile(key, 0, replicationType, - replicationFactor, overWrite, recursive); + ozoneOutputStream = + bucket.createFile(key, 0, replicationConfig, overWrite, recursive); } return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream()); } catch (OMException ex) { diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index 30cd412d343b..b22199924be7 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -34,7 +34,9 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.StorageType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; @@ -76,8 +78,7 @@ public OzoneBucketStub( long creationTime) { super(volumeName, bucketName, - ReplicationFactor.ONE, - ReplicationType.STAND_ALONE, + new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE), storageType, versioning, creationTime); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/CopyKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/CopyKeyHandler.java index 1437f9691db0..18b72038e7fe 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/CopyKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/CopyKeyHandler.java @@ -24,7 +24,7 @@ import java.util.Map; import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.OzoneConsts; @@ -63,9 +63,11 @@ public class CopyKeyHandler extends BucketHandler { private String toKey; @Option(names = {"-r", "--replication"}, - description = "Replication factor of the new key. (use ONE or THREE) " - + "Default is specified in the cluster-wide config.") - private ReplicationFactor replicationFactor; + description = + "Replication configuration of the new key. (this is replication " + + "specific. for RATIS/STANDALONE you can use ONE or THREE) " + + "Default is specified in the cluster-wide config.") + private String replication; @Option(names = {"-t", "--type"}, description = "Replication type of the new key. (use RATIS or " + @@ -82,17 +84,19 @@ protected void execute(OzoneClient client, OzoneAddress address) OzoneVolume vol = client.getObjectStore().getVolume(volumeName); OzoneBucket bucket = vol.getBucket(bucketName); - if (replicationFactor == null) { - replicationFactor = ReplicationFactor.valueOf( - getConf().getInt(OZONE_REPLICATION, OZONE_REPLICATION_DEFAULT)); - } - if (replicationType == null) { replicationType = ReplicationType.valueOf( - getConf().get(OZONE_REPLICATION_TYPE, - OZONE_REPLICATION_TYPE_DEFAULT)); + getConf() + .get(OZONE_REPLICATION_TYPE, OZONE_REPLICATION_TYPE_DEFAULT)); } + if (replication == null) { + replication = getConf().get(OZONE_REPLICATION, OZONE_REPLICATION_DEFAULT); + } + + ReplicationConfig replicationConfig = + ReplicationConfig.fromTypeAndString(replicationType, replication); + OzoneKeyDetails keyDetail = bucket.getKey(fromKey); Map keyMetadata = new HashMap<>(keyDetail.getMetadata()); keyMetadata.remove(OzoneConsts.GDPR_SECRET); @@ -107,7 +111,7 @@ protected void execute(OzoneClient client, OzoneAddress address) OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES); try (InputStream input = bucket.readKey(fromKey); OutputStream output = bucket.createKey(toKey, input.available(), - replicationType, replicationFactor, keyMetadata)) { + replicationConfig, keyMetadata)) { IOUtils.copyBytes(input, output, chunkSize); } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java index c01e22c5b0f9..608d58619d17 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java @@ -27,7 +27,7 @@ import java.util.Map; import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.OzoneConsts; @@ -59,9 +59,11 @@ public class PutKeyHandler extends KeyHandler { private String fileName; @Option(names = {"-r", "--replication"}, - description = "Replication factor of the new key. (use ONE or THREE) " - + "Default is specified in the cluster-wide config.") - private ReplicationFactor replicationFactor; + description = + "Replication configuration of the new key. (this is replication " + + "specific. for RATIS/STANDALONE you can use ONE or THREE) " + + "Default is specified in the cluster-wide config.") + private String replication; @Option(names = {"-t", "--type"}, description = "Replication type of the new key. (use RATIS or " + @@ -85,16 +87,19 @@ protected void execute(OzoneClient client, OzoneAddress address) } } - if (replicationFactor == null) { - replicationFactor = ReplicationFactor.valueOf( - getConf().getInt(OZONE_REPLICATION, OZONE_REPLICATION_DEFAULT)); - } - if (replicationType == null) { replicationType = ReplicationType.valueOf( getConf() .get(OZONE_REPLICATION_TYPE, OZONE_REPLICATION_TYPE_DEFAULT)); } + + if (replication == null) { + replication = getConf().get(OZONE_REPLICATION, OZONE_REPLICATION_DEFAULT); + } + + ReplicationConfig replicationConfig = + ReplicationConfig.fromTypeAndString(replicationType, replication); + OzoneVolume vol = client.getObjectStore().getVolume(volumeName); OzoneBucket bucket = vol.getBucket(bucketName); @@ -107,8 +112,8 @@ protected void execute(OzoneClient client, OzoneAddress address) int chunkSize = (int) getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES); try (InputStream input = new FileInputStream(dataFile); - OutputStream output = bucket.createKey(keyName, dataFile.length(), - replicationType, replicationFactor, keyMetadata)) { + OutputStream output = bucket.createKey(keyName, dataFile.length(), + replicationConfig, keyMetadata)) { IOUtils.copyBytes(input, output, chunkSize); } }