diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java index de50d2afe9ea..b969a68a92ab 100644 --- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java +++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java @@ -40,5 +40,6 @@ public enum ConfigTag { STORAGE, PIPELINE, STANDALONE, - S3GATEWAY + S3GATEWAY, + DATANODE } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java new file mode 100644 index 000000000000..dd0fa6722ea6 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.hdds.conf.Config; +import org.apache.hadoop.hdds.conf.ConfigGroup; +import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE; +import org.apache.hadoop.hdds.conf.ConfigType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Configuration class used for high level datanode configuration parameters. + */ +@ConfigGroup(prefix = "hdds.datanode") +public class DatanodeConfiguration { + static final Logger LOG = + LoggerFactory.getLogger(DatanodeConfiguration.class); + + /** + * The maximum number of replication commands a single datanode can execute + * simultaneously. + */ + private int replicationMaxStreams = 10; + + @Config(key = "replication.streams.limit", + type = ConfigType.INT, + defaultValue = "10", + tags = {DATANODE}, + description = "The maximum number of replication commands a single " + + "datanode can execute simultaneously" + ) + public void setReplicationMaxStreams(int val) { + if (val < 1) { + LOG.warn("hdds.datanode.replication.streams.limit must be greater than" + + "zero and was set to {}. Defaulting to {}", + val, replicationMaxStreams); + } else { + this.replicationMaxStreams = val; + } + } + + public int getReplicationMaxStreams() { + return replicationMaxStreams; + } + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index c9eb7024eaf1..2763278b0d2b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -97,6 +97,10 @@ public class DatanodeStateMachine implements Closeable { public DatanodeStateMachine(DatanodeDetails datanodeDetails, Configuration conf, CertificateClient certClient, HddsDatanodeStopService hddsDatanodeStopService) throws IOException { + OzoneConfiguration ozoneConf = new OzoneConfiguration(conf); + DatanodeConfiguration dnConf = + ozoneConf.getObject(DatanodeConfiguration.class); + this.hddsDatanodeStopService = hddsDatanodeStopService; this.conf = conf; this.datanodeDetails = datanodeDetails; @@ -106,7 +110,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, connectionManager = new SCMConnectionManager(conf); context = new StateContext(this.conf, DatanodeStates.getInitState(), this); container = new OzoneContainer(this.datanodeDetails, - new OzoneConfiguration(conf), context, certClient); + ozoneConf, context, certClient); dnCertClient = certClient; nextHB = new AtomicLong(Time.monotonicNow()); @@ -116,7 +120,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, new SimpleContainerDownloader(conf), new TarContainerPacker()); supervisor = - new ReplicationSupervisor(container.getContainerSet(), replicator, 10); + new ReplicationSupervisor(container.getContainerSet(), replicator, + dnConf.getReplicationMaxStreams()); // When we add new handlers just adding a new handler here should do the // trick.