From 099f5941451fd11a2d980d21e0ec90a49bdfa309 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Wed, 22 Dec 2021 20:23:05 +0100 Subject: [PATCH 1/4] HDDS-6134. Move replication-specific config to ReplicationServer --- .../statemachine/DatanodeConfiguration.java | 33 +------- .../statemachine/DatanodeStateMachine.java | 5 +- .../replication/ReplicationServer.java | 40 ++++++++++ .../replication/ReplicationSupervisor.java | 11 ++- .../TestDatanodeConfiguration.java | 8 +- .../ReplicationSupervisorScheduling.java | 6 +- .../replication/TestReplicationConfig.java | 75 +++++++++++++++++++ .../freon/ClosedContainerReplicator.java | 4 +- 8 files changed, 136 insertions(+), 46 deletions(-) create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 14ae4c943c06..22f00715d073 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.ConfigTag; import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,8 +38,6 @@ public class DatanodeConfiguration { private static final Logger LOG = LoggerFactory.getLogger(DatanodeConfiguration.class); - static final String REPLICATION_STREAMS_LIMIT_KEY = - "hdds.datanode.replication.streams.limit"; static final String CONTAINER_DELETE_THREADS_MAX_KEY = "hdds.datanode.container.delete.threads.max"; static final String PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY = @@ -57,8 +56,6 @@ public class DatanodeConfiguration { static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false; - static final int REPLICATION_MAX_STREAMS_DEFAULT = 10; - static final long PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT = 60; static final int FAILED_VOLUMES_TOLERATED_DEFAULT = -1; @@ -71,19 +68,6 @@ public class DatanodeConfiguration { static final long DISK_CHECK_TIMEOUT_DEFAULT = Duration.ofMinutes(10).toMillis(); - /** - * The maximum number of replication commands a single datanode can execute - * simultaneously. - */ - @Config(key = "replication.streams.limit", - type = ConfigType.INT, - defaultValue = "10", - tags = {DATANODE}, - description = "The maximum number of replication commands a single " + - "datanode can execute simultaneously" - ) - private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; - /** * Number of threads per volume that Datanode will use for chunk read. */ @@ -264,13 +248,6 @@ public void setWaitOnAllFollowers(boolean val) { @PostConstruct public void validate() { - if (replicationMaxStreams < 1) { - LOG.warn(REPLICATION_STREAMS_LIMIT_KEY + " must be greater than zero " + - "and was set to {}. Defaulting to {}", - replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT); - replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; - } - if (containerDeleteThreads < 1) { LOG.warn(CONTAINER_DELETE_THREADS_MAX_KEY + " must be greater than zero" + " and was set to {}. Defaulting to {}", @@ -316,18 +293,10 @@ public void validate() { } } - public void setReplicationMaxStreams(int replicationMaxStreams) { - this.replicationMaxStreams = replicationMaxStreams; - } - public void setContainerDeleteThreads(int containerDeleteThreads) { this.containerDeleteThreads = containerDeleteThreads; } - public int getReplicationMaxStreams() { - return replicationMaxStreams; - } - public int getContainerDeleteThreads() { return containerDeleteThreads; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index ee5e87adcae2..e1fc297406c5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -55,6 +55,7 @@ import org.apache.hadoop.ozone.container.replication.ContainerReplicator; import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator; import org.apache.hadoop.ozone.container.replication.MeasuredReplicator; +import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics; import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; @@ -166,9 +167,11 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, replicatorMetrics = new MeasuredReplicator(replicator); + ReplicationConfig replicationConfig = + conf.getObject(ReplicationConfig.class); supervisor = new ReplicationSupervisor(container.getContainerSet(), context, - replicatorMetrics, dnConf.getReplicationMaxStreams()); + replicatorMetrics, replicationConfig); replicationSupervisorMetrics = ReplicationSupervisorMetrics.create(supervisor); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index dd5f4c428699..63465f8c0fcc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; +import org.apache.hadoop.hdds.conf.ConfigType; +import org.apache.hadoop.hdds.conf.PostConstruct; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor; @@ -39,6 +41,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE; + /** * Separated network server for server2server container replication. */ @@ -131,6 +135,23 @@ public int getPort() { @ConfigGroup(prefix = "hdds.datanode.replication") public static final class ReplicationConfig { + public static final String REPLICATION_STREAMS_LIMIT_KEY = + "hdds.datanode.replication.streams.limit"; + public static final int REPLICATION_MAX_STREAMS_DEFAULT = 10; + + /** + * The maximum number of replication commands a single datanode can execute + * simultaneously. + */ + @Config(key = "streams.limit", + type = ConfigType.INT, + defaultValue = "10", + tags = {DATANODE}, + description = "The maximum number of replication commands a single " + + "datanode can execute simultaneously" + ) + private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; + @Config(key = "port", defaultValue = "9886", description = "Port used for" + " the server2server replication server", tags = { ConfigTag.MANAGEMENT}) @@ -144,6 +165,25 @@ public ReplicationConfig setPort(int portParam) { this.port = portParam; return this; } + + public int getReplicationMaxStreams() { + return replicationMaxStreams; + } + + public void setReplicationMaxStreams(int replicationMaxStreams) { + this.replicationMaxStreams = replicationMaxStreams; + } + + @PostConstruct + public void validate() { + if (replicationMaxStreams < 1) { + LOG.warn(REPLICATION_STREAMS_LIMIT_KEY + " must be greater than zero " + + "and was set to {}. Defaulting to {}", + replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT); + replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; + } + } + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 05a4173eb78e..e0a3a0995500 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status; import com.google.common.annotations.VisibleForTesting; @@ -73,9 +74,11 @@ public class ReplicationSupervisor { public ReplicationSupervisor( ContainerSet containerSet, StateContext context, - ContainerReplicator replicator, int poolSize) { + ContainerReplicator replicator, ReplicationConfig replicationConfig) { this(containerSet, context, replicator, new ThreadPoolExecutor( - poolSize, poolSize, 60, TimeUnit.SECONDS, + replicationConfig.getReplicationMaxStreams(), + replicationConfig.getReplicationMaxStreams(), + 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("ContainerReplicationThread-%d") @@ -83,8 +86,8 @@ public ReplicationSupervisor( } public ReplicationSupervisor(ContainerSet containerSet, - ContainerReplicator replicator, int poolSize) { - this(containerSet, null, replicator, poolSize); + ContainerReplicator replicator, ReplicationConfig replicationConfig) { + this(containerSet, null, replicator, replicationConfig); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java index 1b4265476a56..f724d87723ec 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java @@ -28,8 +28,7 @@ import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_MIN_GAP_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_KEY; -import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_MAX_STREAMS_DEFAULT; -import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_STREAMS_LIMIT_KEY; +import static org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY; @@ -70,7 +69,6 @@ public void acceptsValidValues() { DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); // THEN - assertEquals(validReplicationLimit, subject.getReplicationMaxStreams()); assertEquals(validDeleteThreads, subject.getContainerDeleteThreads()); assertEquals(validDiskCheckIntervalMinutes, subject.getPeriodicDiskCheckIntervalMinutes()); @@ -111,8 +109,6 @@ public void overridesInvalidValues() { DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); // THEN - assertEquals(REPLICATION_MAX_STREAMS_DEFAULT, - subject.getReplicationMaxStreams()); assertEquals(CONTAINER_DELETE_THREADS_DEFAULT, subject.getContainerDeleteThreads()); assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT, @@ -136,8 +132,6 @@ public void isCreatedWitDefaultValues() { DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); // THEN - assertEquals(REPLICATION_MAX_STREAMS_DEFAULT, - subject.getReplicationMaxStreams()); assertEquals(CONTAINER_DELETE_THREADS_DEFAULT, subject.getContainerDeleteThreads()); assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT, diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java index ebdfad636023..ab3d257f7a06 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java @@ -24,10 +24,12 @@ import java.util.Random; import java.util.UUID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.junit.Assert; import org.junit.Test; @@ -69,6 +71,8 @@ public void test() throws InterruptedException { ContainerSet cs = new ContainerSet(); + ReplicationConfig replicationConfig = new OzoneConfiguration() + .getObject(ReplicationConfig.class); ReplicationSupervisor rs = new ReplicationSupervisor(cs, //simplified executor emulating the current sequential download + @@ -107,7 +111,7 @@ public void test() throws InterruptedException { } } - }, 10); + }, replicationConfig); final long start = System.currentTimeMillis(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java new file mode 100644 index 000000000000..6ab32d6cf9b6 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; +import org.junit.Test; + +import static org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_MAX_STREAMS_DEFAULT; +import static org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link ReplicationConfig}. + */ +public class TestReplicationConfig { + + @Test + public void acceptsValidValues() { + // GIVEN + int validReplicationLimit = 123; + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit); + + // WHEN + ReplicationConfig subject = conf.getObject(ReplicationConfig.class); + + // THEN + assertEquals(validReplicationLimit, subject.getReplicationMaxStreams()); + } + + @Test + public void overridesInvalidValues() { + // GIVEN + int invalidReplicationLimit = -5; + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit); + + // WHEN + ReplicationConfig subject = conf.getObject(ReplicationConfig.class); + + // THEN + assertEquals(REPLICATION_MAX_STREAMS_DEFAULT, + subject.getReplicationMaxStreams()); + } + + @Test + public void isCreatedWitDefaultValues() { + // GIVEN + OzoneConfiguration conf = new OzoneConfiguration(); + + // WHEN + ReplicationConfig subject = conf.getObject(ReplicationConfig.class); + + // THEN + assertEquals(REPLICATION_MAX_STREAMS_DEFAULT, + subject.getReplicationMaxStreams()); + } + +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java index 6797203d81b1..68286236255b 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java @@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.ContainerReplicator; import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator; +import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.container.replication.ReplicationTask; import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; @@ -204,7 +205,8 @@ private void initializeReplicationSupervisor(ConfigurationSource conf) new SimpleContainerDownloader(conf, null), new TarContainerPacker()); - supervisor = new ReplicationSupervisor(containerSet, replicator, 10); + supervisor = new ReplicationSupervisor(containerSet, replicator, + conf.getObject(ReplicationConfig.class)); } private void replicateContainer(long counter) throws Exception { From d438e35ba84d0074dac1d8378508946766ab757b Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Wed, 22 Dec 2021 21:41:17 +0100 Subject: [PATCH 2/4] Extract constants for keys --- .../replication/ReplicationServer.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index 63465f8c0fcc..bf8d6f102565 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; -import org.apache.hadoop.hdds.conf.ConfigTag; import org.apache.hadoop.hdds.conf.ConfigType; import org.apache.hadoop.hdds.conf.PostConstruct; import org.apache.hadoop.hdds.security.x509.SecurityConfig; @@ -42,6 +41,7 @@ import org.slf4j.LoggerFactory; import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE; +import static org.apache.hadoop.hdds.conf.ConfigTag.MANAGEMENT; /** * Separated network server for server2server container replication. @@ -132,18 +132,22 @@ public int getPort() { /** * Replication-related configuration. */ - @ConfigGroup(prefix = "hdds.datanode.replication") + @ConfigGroup(prefix = ReplicationConfig.PREFIX) public static final class ReplicationConfig { + public static final String PREFIX = "hdds.datanode.replication"; + public static final String STREAMS_LIMIT_KEY = "streams.limit"; + public static final String REPLICATION_STREAMS_LIMIT_KEY = - "hdds.datanode.replication.streams.limit"; + PREFIX + "." + STREAMS_LIMIT_KEY; + public static final int REPLICATION_MAX_STREAMS_DEFAULT = 10; /** * The maximum number of replication commands a single datanode can execute * simultaneously. */ - @Config(key = "streams.limit", + @Config(key = STREAMS_LIMIT_KEY, type = ConfigType.INT, defaultValue = "10", tags = {DATANODE}, @@ -152,9 +156,9 @@ public static final class ReplicationConfig { ) private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; - @Config(key = "port", defaultValue = "9886", description = "Port used for" - + " the server2server replication server", tags = { - ConfigTag.MANAGEMENT}) + @Config(key = "port", defaultValue = "9886", + description = "Port used for the server2server replication server", + tags = {DATANODE, MANAGEMENT}) private int port; public int getPort() { From de689769f7ec7395a83f6a58431f187e97c564f8 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Thu, 23 Dec 2021 08:39:51 +0100 Subject: [PATCH 3/4] Remove leftover replication setting from TestDatanodeConfiguration --- .../common/statemachine/TestDatanodeConfiguration.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java index f724d87723ec..5f1b0a63200f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java @@ -28,7 +28,6 @@ import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_MIN_GAP_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_KEY; -import static org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY; @@ -45,14 +44,12 @@ public class TestDatanodeConfiguration { @Test public void acceptsValidValues() { // GIVEN - int validReplicationLimit = 123; int validDeleteThreads = 42; long validDiskCheckIntervalMinutes = 60; int validFailedVolumesTolerated = 10; long validDiskCheckMinGap = 2; long validDiskCheckTimeout = 1; OzoneConfiguration conf = new OzoneConfiguration(); - conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit); conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, validDeleteThreads); conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY, validDiskCheckIntervalMinutes); @@ -85,14 +82,12 @@ public void acceptsValidValues() { @Test public void overridesInvalidValues() { // GIVEN - int invalidReplicationLimit = -5; int invalidDeleteThreads = 0; long invalidDiskCheckIntervalMinutes = -1; int invalidFailedVolumesTolerated = -2; long invalidDiskCheckMinGap = -1; long invalidDiskCheckTimeout = -1; OzoneConfiguration conf = new OzoneConfiguration(); - conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit); conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, invalidDeleteThreads); conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY, invalidDiskCheckIntervalMinutes); From 1a718a7944ac585ea8530436878ce03226ede42e Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Thu, 23 Dec 2021 09:09:51 +0100 Subject: [PATCH 4/4] Restore ReplicationSupervisor constructor with poolSize --- .../statemachine/DatanodeConfiguration.java | 1 - .../replication/ReplicationSupervisor.java | 15 ++++++++++----- .../ReplicationSupervisorScheduling.java | 6 +----- .../ozone/freon/ClosedContainerReplicator.java | 4 +--- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 22f00715d073..be83d9b7db15 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hdds.conf.ConfigTag; import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index e0a3a0995500..4cb826c6ec76 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -75,10 +75,15 @@ public class ReplicationSupervisor { public ReplicationSupervisor( ContainerSet containerSet, StateContext context, ContainerReplicator replicator, ReplicationConfig replicationConfig) { + this(containerSet, context, replicator, + replicationConfig.getReplicationMaxStreams()); + } + + public ReplicationSupervisor( + ContainerSet containerSet, StateContext context, + ContainerReplicator replicator, int poolSize) { this(containerSet, context, replicator, new ThreadPoolExecutor( - replicationConfig.getReplicationMaxStreams(), - replicationConfig.getReplicationMaxStreams(), - 60, TimeUnit.SECONDS, + poolSize, poolSize, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("ContainerReplicationThread-%d") @@ -86,8 +91,8 @@ public ReplicationSupervisor( } public ReplicationSupervisor(ContainerSet containerSet, - ContainerReplicator replicator, ReplicationConfig replicationConfig) { - this(containerSet, null, replicator, replicationConfig); + ContainerReplicator replicator, int poolSize) { + this(containerSet, null, replicator, poolSize); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java index ab3d257f7a06..ebdfad636023 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java @@ -24,12 +24,10 @@ import java.util.Random; import java.util.UUID; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.junit.Assert; import org.junit.Test; @@ -71,8 +69,6 @@ public void test() throws InterruptedException { ContainerSet cs = new ContainerSet(); - ReplicationConfig replicationConfig = new OzoneConfiguration() - .getObject(ReplicationConfig.class); ReplicationSupervisor rs = new ReplicationSupervisor(cs, //simplified executor emulating the current sequential download + @@ -111,7 +107,7 @@ public void test() throws InterruptedException { } } - }, replicationConfig); + }, 10); final long start = System.currentTimeMillis(); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java index 68286236255b..6797203d81b1 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java @@ -50,7 +50,6 @@ import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.ContainerReplicator; import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator; -import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.container.replication.ReplicationTask; import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; @@ -205,8 +204,7 @@ private void initializeReplicationSupervisor(ConfigurationSource conf) new SimpleContainerDownloader(conf, null), new TarContainerPacker()); - supervisor = new ReplicationSupervisor(containerSet, replicator, - conf.getObject(ReplicationConfig.class)); + supervisor = new ReplicationSupervisor(containerSet, replicator, 10); } private void replicateContainer(long counter) throws Exception {