diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 14ae4c943c06..be83d9b7db15 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -37,8 +37,6 @@ public class DatanodeConfiguration { private static final Logger LOG = LoggerFactory.getLogger(DatanodeConfiguration.class); - static final String REPLICATION_STREAMS_LIMIT_KEY = - "hdds.datanode.replication.streams.limit"; static final String CONTAINER_DELETE_THREADS_MAX_KEY = "hdds.datanode.container.delete.threads.max"; static final String PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY = @@ -57,8 +55,6 @@ public class DatanodeConfiguration { static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false; - static final int REPLICATION_MAX_STREAMS_DEFAULT = 10; - static final long PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT = 60; static final int FAILED_VOLUMES_TOLERATED_DEFAULT = -1; @@ -71,19 +67,6 @@ public class DatanodeConfiguration { static final long DISK_CHECK_TIMEOUT_DEFAULT = Duration.ofMinutes(10).toMillis(); - /** - * The maximum number of replication commands a single datanode can execute - * simultaneously. - */ - @Config(key = "replication.streams.limit", - type = ConfigType.INT, - defaultValue = "10", - tags = {DATANODE}, - description = "The maximum number of replication commands a single " + - "datanode can execute simultaneously" - ) - private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; - /** * Number of threads per volume that Datanode will use for chunk read. */ @@ -264,13 +247,6 @@ public void setWaitOnAllFollowers(boolean val) { @PostConstruct public void validate() { - if (replicationMaxStreams < 1) { - LOG.warn(REPLICATION_STREAMS_LIMIT_KEY + " must be greater than zero " + - "and was set to {}. Defaulting to {}", - replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT); - replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; - } - if (containerDeleteThreads < 1) { LOG.warn(CONTAINER_DELETE_THREADS_MAX_KEY + " must be greater than zero" + " and was set to {}. Defaulting to {}", @@ -316,18 +292,10 @@ public void validate() { } } - public void setReplicationMaxStreams(int replicationMaxStreams) { - this.replicationMaxStreams = replicationMaxStreams; - } - public void setContainerDeleteThreads(int containerDeleteThreads) { this.containerDeleteThreads = containerDeleteThreads; } - public int getReplicationMaxStreams() { - return replicationMaxStreams; - } - public int getContainerDeleteThreads() { return containerDeleteThreads; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index ee5e87adcae2..e1fc297406c5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -55,6 +55,7 @@ import org.apache.hadoop.ozone.container.replication.ContainerReplicator; import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator; import org.apache.hadoop.ozone.container.replication.MeasuredReplicator; +import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics; import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; @@ -166,9 +167,11 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, replicatorMetrics = new MeasuredReplicator(replicator); + ReplicationConfig replicationConfig = + conf.getObject(ReplicationConfig.class); supervisor = new ReplicationSupervisor(container.getContainerSet(), context, - replicatorMetrics, dnConf.getReplicationMaxStreams()); + replicatorMetrics, replicationConfig); replicationSupervisorMetrics = ReplicationSupervisorMetrics.create(supervisor); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index dd5f4c428699..bf8d6f102565 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,8 @@ import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; -import org.apache.hadoop.hdds.conf.ConfigTag; +import org.apache.hadoop.hdds.conf.ConfigType; +import org.apache.hadoop.hdds.conf.PostConstruct; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor; @@ -39,6 +40,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE; +import static org.apache.hadoop.hdds.conf.ConfigTag.MANAGEMENT; + /** * Separated network server for server2server container replication. */ @@ -128,12 +132,33 @@ public int getPort() { /** * Replication-related configuration. */ - @ConfigGroup(prefix = "hdds.datanode.replication") + @ConfigGroup(prefix = ReplicationConfig.PREFIX) public static final class ReplicationConfig { - @Config(key = "port", defaultValue = "9886", description = "Port used for" - + " the server2server replication server", tags = { - ConfigTag.MANAGEMENT}) + public static final String PREFIX = "hdds.datanode.replication"; + public static final String STREAMS_LIMIT_KEY = "streams.limit"; + + public static final String REPLICATION_STREAMS_LIMIT_KEY = + PREFIX + "." + STREAMS_LIMIT_KEY; + + public static final int REPLICATION_MAX_STREAMS_DEFAULT = 10; + + /** + * The maximum number of replication commands a single datanode can execute + * simultaneously. + */ + @Config(key = STREAMS_LIMIT_KEY, + type = ConfigType.INT, + defaultValue = "10", + tags = {DATANODE}, + description = "The maximum number of replication commands a single " + + "datanode can execute simultaneously" + ) + private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; + + @Config(key = "port", defaultValue = "9886", + description = "Port used for the server2server replication server", + tags = {DATANODE, MANAGEMENT}) private int port; public int getPort() { @@ -144,6 +169,25 @@ public ReplicationConfig setPort(int portParam) { this.port = portParam; return this; } + + public int getReplicationMaxStreams() { + return replicationMaxStreams; + } + + public void setReplicationMaxStreams(int replicationMaxStreams) { + this.replicationMaxStreams = replicationMaxStreams; + } + + @PostConstruct + public void validate() { + if (replicationMaxStreams < 1) { + LOG.warn(REPLICATION_STREAMS_LIMIT_KEY + " must be greater than zero " + + "and was set to {}. Defaulting to {}", + replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT); + replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; + } + } + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 05a4173eb78e..4cb826c6ec76 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status; import com.google.common.annotations.VisibleForTesting; @@ -71,6 +72,13 @@ public class ReplicationSupervisor { this.context = context; } + public ReplicationSupervisor( + ContainerSet containerSet, StateContext context, + ContainerReplicator replicator, ReplicationConfig replicationConfig) { + this(containerSet, context, replicator, + replicationConfig.getReplicationMaxStreams()); + } + public ReplicationSupervisor( ContainerSet containerSet, StateContext context, ContainerReplicator replicator, int poolSize) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java index 1b4265476a56..5f1b0a63200f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java @@ -28,8 +28,6 @@ import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_MIN_GAP_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_KEY; -import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_MAX_STREAMS_DEFAULT; -import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_STREAMS_LIMIT_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY; @@ -46,14 +44,12 @@ public class TestDatanodeConfiguration { @Test public void acceptsValidValues() { // GIVEN - int validReplicationLimit = 123; int validDeleteThreads = 42; long validDiskCheckIntervalMinutes = 60; int validFailedVolumesTolerated = 10; long validDiskCheckMinGap = 2; long validDiskCheckTimeout = 1; OzoneConfiguration conf = new OzoneConfiguration(); - conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit); conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, validDeleteThreads); conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY, validDiskCheckIntervalMinutes); @@ -70,7 +66,6 @@ public void acceptsValidValues() { DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); // THEN - assertEquals(validReplicationLimit, subject.getReplicationMaxStreams()); assertEquals(validDeleteThreads, subject.getContainerDeleteThreads()); assertEquals(validDiskCheckIntervalMinutes, subject.getPeriodicDiskCheckIntervalMinutes()); @@ -87,14 +82,12 @@ public void acceptsValidValues() { @Test public void overridesInvalidValues() { // GIVEN - int invalidReplicationLimit = -5; int invalidDeleteThreads = 0; long invalidDiskCheckIntervalMinutes = -1; int invalidFailedVolumesTolerated = -2; long invalidDiskCheckMinGap = -1; long invalidDiskCheckTimeout = -1; OzoneConfiguration conf = new OzoneConfiguration(); - conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit); conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, invalidDeleteThreads); conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY, invalidDiskCheckIntervalMinutes); @@ -111,8 +104,6 @@ public void overridesInvalidValues() { DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); // THEN - assertEquals(REPLICATION_MAX_STREAMS_DEFAULT, - subject.getReplicationMaxStreams()); assertEquals(CONTAINER_DELETE_THREADS_DEFAULT, subject.getContainerDeleteThreads()); assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT, @@ -136,8 +127,6 @@ public void isCreatedWitDefaultValues() { DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); // THEN - assertEquals(REPLICATION_MAX_STREAMS_DEFAULT, - subject.getReplicationMaxStreams()); assertEquals(CONTAINER_DELETE_THREADS_DEFAULT, subject.getContainerDeleteThreads()); assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT, diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java new file mode 100644 index 000000000000..6ab32d6cf9b6 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; +import org.junit.Test; + +import static org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_MAX_STREAMS_DEFAULT; +import static org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link ReplicationConfig}. + */ +public class TestReplicationConfig { + + @Test + public void acceptsValidValues() { + // GIVEN + int validReplicationLimit = 123; + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit); + + // WHEN + ReplicationConfig subject = conf.getObject(ReplicationConfig.class); + + // THEN + assertEquals(validReplicationLimit, subject.getReplicationMaxStreams()); + } + + @Test + public void overridesInvalidValues() { + // GIVEN + int invalidReplicationLimit = -5; + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit); + + // WHEN + ReplicationConfig subject = conf.getObject(ReplicationConfig.class); + + // THEN + assertEquals(REPLICATION_MAX_STREAMS_DEFAULT, + subject.getReplicationMaxStreams()); + } + + @Test + public void isCreatedWitDefaultValues() { + // GIVEN + OzoneConfiguration conf = new OzoneConfiguration(); + + // WHEN + ReplicationConfig subject = conf.getObject(ReplicationConfig.class); + + // THEN + assertEquals(REPLICATION_MAX_STREAMS_DEFAULT, + subject.getReplicationMaxStreams()); + } + +}