diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
new file mode 100644
index 000000000000..e5b2fe8bda75
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class handles Ratis containers that are under replicated. It should
+ * be used to obtain SCMCommands that can be sent to datanodes to solve
+ * under replication.
+ */
+public class RatisUnderReplicationHandler
+ implements UnhealthyReplicationHandler {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(RatisUnderReplicationHandler.class);
+ private final PlacementPolicy placementPolicy;
+ private final NodeManager nodeManager;
+ private final long currentContainerSize;
+
+ public RatisUnderReplicationHandler(final PlacementPolicy placementPolicy,
+ final ConfigurationSource conf, final NodeManager nodeManager) {
+ this.placementPolicy = placementPolicy;
+ this.currentContainerSize = (long) conf
+ .getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+ this.nodeManager = nodeManager;
+ }
+
+ /**
+ * Identifies new set of datanodes as targets for container replication.
+ * Forms the SCMCommands to be sent to these datanodes.
+ *
+ * @param replicas Set of container replicas.
+ * @param pendingOps Pending ContainerReplicaOp including adds and deletes
+ * for this container.
+ * @param result Health check result indicating under replication.
+ * @param minHealthyForMaintenance Number of healthy replicas that must be
+ * available for a DN to enter maintenance
+ *
+ * @return Returns the key value pair of destination dn where the command gets
+ * executed and the command itself. If an empty map is returned, it indicates
+ * the container is no longer unhealthy and can be removed from the unhealthy
+ * queue. Any exception indicates that the container is still unhealthy and
+ * should be retried later.
+ */
+ @Override
+ public Map> processAndCreateCommands(
+ Set replicas, List pendingOps,
+ ContainerHealthResult result, int minHealthyForMaintenance)
+ throws IOException {
+ ContainerInfo containerInfo = result.getContainerInfo();
+ LOG.debug("Handling under replicated Ratis container {}", containerInfo);
+
+ // count pending adds and deletes
+ int pendingAdd = 0, pendingDelete = 0;
+ for (ContainerReplicaOp op : pendingOps) {
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+ pendingAdd++;
+ } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+ pendingDelete++;
+ }
+ }
+ RatisContainerReplicaCount replicaCount =
+ new RatisContainerReplicaCount(containerInfo, replicas, pendingAdd,
+ pendingDelete, containerInfo.getReplicationFactor().getNumber(),
+ minHealthyForMaintenance);
+
+ // verify that this container is still under replicated and we don't have
+ // sufficient replication after considering pending adds
+ if (!verifyUnderReplication(replicaCount)) {
+ return Collections.emptyMap();
+ }
+
+ // find sources that can provide replicas
+ List sourceDatanodes =
+ getSources(replicaCount, pendingOps);
+ if (sourceDatanodes.isEmpty()) {
+ LOG.warn("Cannot replicate container {} because no healthy replicas " +
+ "were found.", containerInfo);
+ return Collections.emptyMap();
+ }
+
+ // find targets to send replicas to
+ List targetDatanodes =
+ getTargets(replicaCount, pendingOps);
+ if (targetDatanodes.isEmpty()) {
+ LOG.warn("Cannot replicate container {} because no eligible targets " +
+ "were found.", containerInfo);
+ return Collections.emptyMap();
+ }
+
+ return createReplicationCommands(containerInfo.getContainerID(),
+ sourceDatanodes, targetDatanodes);
+ }
+
+ /**
+ * Verify that this container is under replicated, even after considering
+ * pending adds. Note that the container might be under replicated but
+ * unrecoverable (no replicas), in which case this returns false.
+ *
+ * @param replicaCount RatisContainerReplicaCount object to check
+ * @return true if the container is under replicated, false if the
+ * container is sufficiently replicated or unrecoverable.
+ */
+ private boolean verifyUnderReplication(
+ RatisContainerReplicaCount replicaCount) {
+ if (replicaCount.isSufficientlyReplicated()) {
+ LOG.info("The container {} state changed and it's not under " +
+ "replicated any more.", replicaCount.getContainer().containerID());
+ return false;
+ }
+ if (replicaCount.isSufficientlyReplicated(true)) {
+ LOG.info("Container {} with replicas {} will be sufficiently " +
+ "replicated after pending replicas are created.",
+ replicaCount.getContainer().getContainerID(),
+ replicaCount.getReplicas());
+ return false;
+ }
+ if (replicaCount.getReplicas().isEmpty()) {
+ LOG.warn("Container {} does not have any replicas and is unrecoverable" +
+ ".", replicaCount.getContainer());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Returns a list of datanodes that can be used as sources for replication
+ * for the container specified in replicaCount.
+ *
+ * @param replicaCount RatisContainerReplicaCount object for this container
+ * @param pendingOps List of pending ContainerReplicaOp
+ * @return List of healthy datanodes that have closed/quasi-closed replicas
+ * and are not pending replica deletion. Sorted in descending order of
+ * sequence id.
+ */
+ private List getSources(
+ RatisContainerReplicaCount replicaCount,
+ List pendingOps) {
+ Set pendingDeletion = new HashSet<>();
+ // collect the DNs that are going to have their container replica deleted
+ for (ContainerReplicaOp op : pendingOps) {
+ if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+ pendingDeletion.add(op.getTarget());
+ }
+ }
+
+ /*
+ * Return healthy datanodes that have closed/quasi-closed replicas and
+ * are not pending replica deletion. Sorted in descending order of
+ * sequence id.
+ */
+ return replicaCount.getReplicas().stream()
+ .filter(r -> r.getState() == State.QUASI_CLOSED ||
+ r.getState() == State.CLOSED)
+ .filter(r -> ReplicationManager.getNodeStatus(r.getDatanodeDetails(),
+ nodeManager).isHealthy())
+ .filter(r -> !pendingDeletion.contains(r.getDatanodeDetails()))
+ .sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId()))
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+ }
+
+ private List getTargets(
+ RatisContainerReplicaCount replicaCount,
+ List pendingOps) throws IOException {
+ // DNs that already have replicas cannot be targets and should be excluded
+ final List excludeList =
+ replicaCount.getReplicas().stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+
+ // DNs that are already waiting to receive replicas cannot be targets
+ final List pendingReplication =
+ pendingOps.stream()
+ .filter(containerReplicaOp -> containerReplicaOp.getOpType() ==
+ ContainerReplicaOp.PendingOpType.ADD)
+ .map(ContainerReplicaOp::getTarget)
+ .collect(Collectors.toList());
+ excludeList.addAll(pendingReplication);
+
+ /*
+ Ensure that target datanodes have enough space to hold a complete
+ container.
+ */
+ final long dataSizeRequired =
+ Math.max(replicaCount.getContainer().getUsedBytes(),
+ currentContainerSize);
+ return placementPolicy.chooseDatanodes(excludeList, null,
+ replicaCount.additionalReplicaNeeded(), 0, dataSizeRequired);
+ }
+
+ private Map> createReplicationCommands(
+ long containerID, List sources,
+ List targets) {
+ Map> commands = new HashMap<>();
+ for (DatanodeDetails target : targets) {
+ ReplicateContainerCommand command =
+ new ReplicateContainerCommand(containerID, sources);
+ commands.put(target, command);
+ }
+
+ return commands;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 2c206224f189..f78536912ebc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -158,6 +158,7 @@ public class ReplicationManager implements SCMService {
private ReplicationQueue replicationQueue;
private final ECUnderReplicationHandler ecUnderReplicationHandler;
private final ECOverReplicationHandler ecOverReplicationHandler;
+ private final RatisUnderReplicationHandler ratisUnderReplicationHandler;
private final int maintenanceRedundancy;
private final int ratisMaintenanceMinReplicas;
private Thread underReplicatedProcessorThread;
@@ -220,6 +221,8 @@ public ReplicationManager(final ConfigurationSource conf,
ecContainerPlacement, conf, nodeManager, this);
ecOverReplicationHandler =
new ECOverReplicationHandler(ecContainerPlacement, nodeManager);
+ ratisUnderReplicationHandler = new RatisUnderReplicationHandler(
+ ratisContainerPlacement, conf, nodeManager);
underReplicatedProcessor =
new UnderReplicatedProcessor(this,
rmConf.getUnderReplicatedInterval());
@@ -513,8 +516,12 @@ public Map> processUnderReplicatedContainer(
containerID);
List pendingOps =
containerReplicaPendingOps.getPendingOps(containerID);
- return ecUnderReplicationHandler.processAndCreateCommands(replicas,
- pendingOps, result, maintenanceRedundancy);
+ if (result.getContainerInfo().getReplicationType() == EC) {
+ return ecUnderReplicationHandler.processAndCreateCommands(replicas,
+ pendingOps, result, maintenanceRedundancy);
+ }
+ return ratisUnderReplicationHandler.processAndCreateCommands(replicas,
+ pendingOps, result, ratisMaintenanceMinReplicas);
}
public Map> processOverReplicatedContainer(
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
new file mode 100644
index 000000000000..b2ccba14554b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+
+/**
+ * Tests for {@link RatisUnderReplicationHandler}.
+ */
+public class TestRatisUnderReplicationHandler {
+ private ContainerInfo container;
+ private NodeManager nodeManager;
+ private OzoneConfiguration conf;
+ private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG =
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+ private PlacementPolicy policy;
+
+ @Before
+ public void setup() throws NodeNotFoundException {
+ container = ReplicationTestUtil.createContainer(
+ HddsProtos.LifeCycleState.CLOSED, RATIS_REPLICATION_CONFIG);
+
+ nodeManager = Mockito.mock(NodeManager.class);
+ conf = SCMTestUtils.getConf();
+ policy = ReplicationTestUtil
+ .getSimpleTestPlacementPolicy(nodeManager, conf);
+
+ /*
+ Return NodeStatus with NodeOperationalState as specified in
+ DatanodeDetails, and NodeState as HEALTHY.
+ */
+ Mockito.when(nodeManager.getNodeStatus(Mockito.any(DatanodeDetails.class)))
+ .thenAnswer(invocationOnMock -> {
+ DatanodeDetails dn = invocationOnMock.getArgument(0);
+ return new NodeStatus(dn.getPersistedOpState(),
+ HddsProtos.NodeState.HEALTHY);
+ });
+ }
+
+ /**
+ * When the container is under replicated even though there's a pending
+ * add, the handler should create replication commands.
+ */
+ @Test
+ public void testUnderReplicatedWithMissingReplicasAndPendingAdd()
+ throws IOException {
+ Set replicas
+ = createReplicas(container.containerID(), State.CLOSED, 0);
+ List pendingOps = ImmutableList.of(
+ ContainerReplicaOp.create(ContainerReplicaOp.PendingOpType.ADD,
+ MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+ testProcessing(replicas, pendingOps, getUnderReplicatedHealthResult(), 2,
+ 1);
+ }
+
+ /**
+ * When the container is under replicated and unrecoverable (no replicas
+ * exist), the handler will not create any commands.
+ */
+ @Test
+ public void testUnderReplicatedAndUnrecoverable() throws IOException {
+ testProcessing(Collections.emptySet(), Collections.emptyList(),
+ getUnderReplicatedHealthResult(), 2, 0);
+ }
+
+ /**
+ * The container is currently under replicated, but there's a pending add
+ * that will make it sufficiently replicated. The handler should not create
+ * any commands.
+ */
+ @Test
+ public void testUnderReplicatedFixedByPendingAdd() throws IOException {
+ Set replicas
+ = createReplicas(container.containerID(), State.CLOSED, 0, 0);
+ List pendingOps = ImmutableList.of(
+ ContainerReplicaOp.create(ContainerReplicaOp.PendingOpType.ADD,
+ MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+ testProcessing(replicas, pendingOps, getUnderReplicatedHealthResult(), 2,
+ 0);
+ }
+
+ /**
+ * The container is under-replicated because a DN is decommissioning. The
+ * handler should create replication command.
+ */
+ @Test
+ public void testUnderReplicatedBecauseOfDecommissioningReplica()
+ throws IOException {
+ Set replicas = ReplicationTestUtil
+ .createReplicas(Pair.of(DECOMMISSIONING, 0), Pair.of(IN_SERVICE, 0),
+ Pair.of(IN_SERVICE, 0));
+
+ testProcessing(replicas, Collections.emptyList(),
+ getUnderReplicatedHealthResult(), 2, 1);
+ }
+
+ /**
+ * The container is under-replicated because a DN is entering maintenance
+ * and the remaining number of replicas (CLOSED or QUASI_CLOSED replicas on
+ * HEALTHY datanodes) are less than the minimum healthy required.
+ */
+ @Test
+ public void testUnderReplicatedBecauseOfMaintenanceReplica()
+ throws IOException {
+ Set replicas = ReplicationTestUtil
+ .createReplicas(Pair.of(ENTERING_MAINTENANCE, 0),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0));
+
+ testProcessing(replicas, Collections.emptyList(),
+ getUnderReplicatedHealthResult(), 3, 1);
+ }
+
+ /**
+ * The container is sufficiently replicated because we have the minimum
+ * healthy replicas required for a DN to enter maintenance.
+ */
+ @Test
+ public void testSufficientlyReplicatedDespiteMaintenanceReplica()
+ throws IOException {
+ Set replicas = ReplicationTestUtil
+ .createReplicas(Pair.of(ENTERING_MAINTENANCE, 0),
+ Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0));
+
+ testProcessing(replicas, Collections.emptyList(),
+ getUnderReplicatedHealthResult(), 2, 0);
+ }
+
+ /**
+ * The handler should throw an exception when the placement policy is unable
+ * to choose new targets for replication.
+ */
+ @Test
+ public void testNoTargetsFoundBecauseOfPlacementPolicy() {
+ policy = ReplicationTestUtil.getNoNodesTestPlacementPolicy(nodeManager,
+ conf);
+ RatisUnderReplicationHandler handler =
+ new RatisUnderReplicationHandler(policy, conf, nodeManager);
+
+ Set replicas
+ = createReplicas(container.containerID(), State.CLOSED, 0, 0);
+
+ Assert.assertThrows(IOException.class,
+ () -> handler.processAndCreateCommands(replicas,
+ Collections.emptyList(), getUnderReplicatedHealthResult(), 2));
+ }
+
+ /**
+ * Tests whether the specified expectNumCommands number of commands are
+ * created by the handler.
+ * @param replicas All replicas of the container
+ * @param pendingOps Collection of pending ops
+ * @param healthResult ContainerHealthResult that should be passed to the
+ * handler
+ * @param minHealthyForMaintenance the minimum number of healthy replicas
+ * required for a datanode to enter
+ * maintenance
+ * @param expectNumCommands number of commands expected to be created by
+ * the handler
+ */
+ private void testProcessing(
+ Set replicas, List pendingOps,
+ ContainerHealthResult healthResult,
+ int minHealthyForMaintenance, int expectNumCommands) throws IOException {
+ RatisUnderReplicationHandler handler =
+ new RatisUnderReplicationHandler(policy, conf, nodeManager);
+
+ Map> commands =
+ handler.processAndCreateCommands(replicas, pendingOps,
+ healthResult, minHealthyForMaintenance);
+ Assert.assertEquals(expectNumCommands, commands.size());
+ }
+
+ private UnderReplicatedHealthResult getUnderReplicatedHealthResult() {
+ UnderReplicatedHealthResult healthResult =
+ Mockito.mock(UnderReplicatedHealthResult.class);
+ Mockito.when(healthResult.getContainerInfo()).thenReturn(container);
+ return healthResult;
+ }
+}