diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java index d23934184eb5..fe771fac6a4a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java @@ -470,10 +470,10 @@ && getReplicas().stream() /** * QUASI_CLOSED containers that have a mix of healthy and UNHEALTHY - * replicas require special treatment. If the healthy replicas don't have - * the same BCSID as the container, but the UNHEALTHY ones do, then we need - * to save at least one copy of each such UNHEALTHY replica. This method - * finds such UNHEALTHY replicas. + * replicas require special treatment. If the UNHEALTHY replicas have the + * correct sequence ID and have unique origins, then we need to save at least + * one copy of each such UNHEALTHY replicas. This method finds such UNHEALTHY + * replicas. * * @param nodeStatusFn a function used to check the {@link NodeStatus} of a node, * accepting a {@link DatanodeDetails} and returning {@link NodeStatus} @@ -498,11 +498,6 @@ public List getVulnerableUnhealthyReplicas(Function getVulnerableUnhealthyReplicas(Function { + NodeStatus status = nodeStatusFn.apply(replica.getDatanodeDetails()); + return status == null || !status.isHealthy(); + }); /* At this point, the list of unhealthyReplicas contains all UNHEALTHY non-empty replicas with the greatest Sequence ID that are on healthy Datanodes. @@ -531,9 +531,9 @@ public List getVulnerableUnhealthyReplicas(Function originsOfInServiceReplicas = new HashSet<>(); - for (ContainerReplica replica : unhealthyReplicas) { + for (ContainerReplica replica : replicas) { if (replica.getDatanodeDetails().getPersistedOpState() - .equals(IN_SERVICE)) { + .equals(IN_SERVICE) && replica.getSequenceId().equals(container.getSequenceId())) { originsOfInServiceReplicas.add(replica.getOriginDatanodeId()); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/VulnerableUnhealthyReplicasHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/VulnerableUnhealthyReplicasHandler.java index 21b2d8151d20..d8a4edf31e93 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/VulnerableUnhealthyReplicasHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/VulnerableUnhealthyReplicasHandler.java @@ -37,9 +37,8 @@ /** * A QUASI_CLOSED container may have some UNHEALTHY replicas with the - * same Sequence ID as the container. RM should try to maintain one - * copy of such replicas when there are no healthy replicas that - * match the container's Sequence ID. + * same Sequence ID as the container and on unique origins. RM should try to maintain one + * copy of such replicas. */ public class VulnerableUnhealthyReplicasHandler extends AbstractCheck { public static final Logger LOG = LoggerFactory.getLogger(VulnerableUnhealthyReplicasHandler.class); @@ -52,8 +51,8 @@ public VulnerableUnhealthyReplicasHandler(ReplicationManager replicationManager) /** * Checks if the container is QUASI_CLOSED has some vulnerable UNHEALTHY replicas that need to replicated to * other Datanodes. These replicas have the same sequence ID as the container while other healthy replicas don't. - * If the node hosting such a replica is being taken offline, then the replica may have to be replicated to another - * node. + * Or, these replicas have unique origin Datanodes. If the node hosting such a replica is being taken offline, then + * the replica may have to be replicated to another node. * @param request ContainerCheckRequest object representing the container * @return true if some vulnerable UNHEALTHY replicas were found, else false */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java index 02de63ee51ca..ca86cb689fb0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java @@ -39,9 +39,11 @@ import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.ratis.protocol.exceptions.NotLeaderException; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import java.io.IOException; import java.util.ArrayList; @@ -581,6 +583,35 @@ public void testUnderReplicationWithVulnerableReplicas() throws IOException { assertEquals(unhealthyReplica.getDatanodeDetails(), commands.iterator().next().getKey()); } + /** + * A QUASI_CLOSED container may have UNHEALTHY replicas with the correct sequence ID which have unique + * origin Datanodes. If any of these UNHEALTHY replicas is being taken offline, then it needs to be replicated to + * another DN for decommission to progress. This test asserts that a replicate command is sent for one such replica. + */ + @Test + public void testUnderReplicationWithVulnerableReplicasOnUniqueOrigins() throws IOException { + final long sequenceID = 20; + container = ReplicationTestUtil.createContainerInfo(RATIS_REPLICATION_CONFIG, 1, + HddsProtos.LifeCycleState.QUASI_CLOSED, sequenceID); + + final Set replicas = new HashSet<>(4); + for (int i = 0; i < 3; i++) { + replicas.add(createContainerReplica(container.containerID(), 0, IN_SERVICE, State.QUASI_CLOSED, + sequenceID)); + } + + // create an UNHEALTHY replica with a unique origin + final ContainerReplica unhealthyReplica = createContainerReplica(container.containerID(), 0, + DECOMMISSIONING, State.UNHEALTHY, sequenceID); + replicas.add(unhealthyReplica); + UnderReplicatedHealthResult result = getUnderReplicatedHealthResult(); + Mockito.when(result.hasVulnerableUnhealthy()).thenReturn(true); + + final Set>> commands = testProcessing(replicas, Collections.emptyList(), + result, 2, 1); + Assertions.assertEquals(unhealthyReplica.getDatanodeDetails(), commands.iterator().next().getKey()); + } + /** * In the push replication model, a replicate command is sent to the DN hosting the replica, and that DN is * expected to "push" the replica to another DN. If the DN hosting the replica has too many commands already, an diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index b3cb96282024..fe1cdcc06957 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -56,6 +56,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import java.io.IOException; import java.time.Instant; @@ -508,6 +509,67 @@ public void testQuasiClosedContainerWithVulnerableUnhealthyReplica() assertEquals(decommissioning.getDatanodeDetails().getUuid(), command.getKey()); } + + /** + * There is a QUASI_CLOSED container with some UNHEALTHY replicas on unique origin nodes. If the datanode hosting + * one such replica is being taken offline, then the UNHEALTHY replica needs to be replicated to another node. + */ + @Test + public void testQuasiClosedContainerWithUnhealthyReplicaOnDecommissioningNodeWithUniqueOrigin() + throws IOException, NodeNotFoundException { + RatisReplicationConfig ratisRepConfig = + RatisReplicationConfig.getInstance(THREE); + // create a QUASI_CLOSED container with 3 QUASI_CLOSED replicas on same origin, and 1 UNHEALTHY on unique origin + ContainerInfo container = createContainerInfo(ratisRepConfig, 1, + HddsProtos.LifeCycleState.QUASI_CLOSED); + Set replicas = + createReplicasWithSameOrigin(container.containerID(), + ContainerReplicaProto.State.QUASI_CLOSED, 0, 0, 0); + ContainerReplica unhealthy = + createContainerReplica(container.containerID(), 0, DECOMMISSIONING, + ContainerReplicaProto.State.UNHEALTHY); + replicas.add(unhealthy); + storeContainerAndReplicas(container, replicas); + Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class))) + .thenAnswer(invocation -> { + DatanodeDetails dn = invocation.getArgument(0); + if (dn.equals(unhealthy.getDatanodeDetails())) { + return new NodeStatus(DECOMMISSIONING, HddsProtos.NodeState.HEALTHY); + } + + return NodeStatus.inServiceHealthy(); + }); + + // the container should be under replicated and queued to under replication queue + replicationManager.processContainer(container, repQueue, repReport); + assertEquals(1, repReport.getStat( + ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + assertEquals(0, repReport.getStat( + ReplicationManagerReport.HealthState.OVER_REPLICATED)); + assertEquals(1, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); + + // next, this test sets up some mocks to test if RatisUnderReplicationHandler will handle this container correctly + Mockito.when(ratisPlacementPolicy.chooseDatanodes(anyList(), anyList(), eq(null), eq(1), anyLong(), + anyLong())).thenAnswer(invocation -> ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails())); + Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(DatanodeDetails.class), any(), any())) + .thenAnswer(invocation -> { + Map map = new HashMap<>(); + map.put(SCMCommandProto.Type.replicateContainerCommand, 0); + map.put(SCMCommandProto.Type.reconstructECContainersCommand, 0); + return map; + }); + RatisUnderReplicationHandler handler = + new RatisUnderReplicationHandler(ratisPlacementPolicy, configuration, replicationManager); + + handler.processAndSendCommands(replicas, Collections.emptyList(), repQueue.dequeueUnderReplicatedContainer(), 2); + assertEquals(1, commandsSent.size()); + Pair> command = commandsSent.iterator().next(); + // a replicate command should have been sent for the UNHEALTHY replica + assertEquals(SCMCommandProto.Type.replicateContainerCommand, command.getValue().getType()); + assertEquals(unhealthy.getDatanodeDetails().getUuid(), command.getKey()); + } + /** * When there is Quasi Closed Replica with incorrect sequence id * for a Closed container, it's treated as unhealthy and deleted. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestVulnerableUnhealthyReplicasHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestVulnerableUnhealthyReplicasHandler.java index 1c0a29d3eb3f..8fa4c974e1ba 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestVulnerableUnhealthyReplicasHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestVulnerableUnhealthyReplicasHandler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.util.Collections; import java.util.HashSet; @@ -128,6 +129,11 @@ public void testReturnsFalseForQuasiClosedContainerWithNoVulnerableReplicas() { assertEquals(0, repQueue.overReplicatedQueueSize()); } + /** + * A QUASI_CLOSED container with 3 QUASI_CLOSED replicas with incorrect sequence id. They're on unique origin nodes. + * There's an UNHEALTHY replica on a Decommissioning node, which has the correct sequence ID and unique origin. + * It's expected that the UNHEALTHY replica is queued for under replication. + */ @Test public void testReturnsTrueForQuasiClosedContainerWithVulnerableReplica() throws NodeNotFoundException { long sequenceId = 10; @@ -156,6 +162,49 @@ public void testReturnsTrueForQuasiClosedContainerWithVulnerableReplica() throws assertEquals(0, repQueue.overReplicatedQueueSize()); } + /** + * A QUASI_CLOSED container with 3 QUASI_CLOSED replicas with correct sequence id. They're on unique origin nodes. + * There's an UNHEALTHY replica on a Decommissioning node, which also has the correct sequence ID and unique origin. + * It's expected that the UNHEALTHY replica is queued for under replication. This is a variation of the situation + * where the healthy replicas have incorrect sequence id, and the unhealthy ones have the correct sequence id. + * Here, all the replicas have the correct sequence id but the unhealthy still need to be saved because they're on + * unique origin nodes. + *

+ * Why do we need to save the UNHEALTHY replicas if we have enough unique QUASI_CLOSED replicas to form a quorum? + * Simply because we're ensuring redundancy of replicas having unique origin node IDs. When HDDS has the ability to + * restore UNHEALTHY replicas to a healthy state, they can also be used to create a quorum. In any case, when the + * container transitions to CLOSED, any UNHEALTHY replicas will be deleted. + *

+ */ + @Test + public void testReturnsTrueForQuasiClosedContainerWithVulnerableReplicaWhenAllReplicasHaveCorrectSequence() + throws NodeNotFoundException { + long sequenceId = 10; + ContainerInfo container = createContainerInfo(repConfig, 1, LifeCycleState.QUASI_CLOSED, sequenceId); + Set replicas = new HashSet<>(4); + for (int i = 0; i < 3; i++) { + replicas.add(createContainerReplica(container.containerID(), 0, IN_SERVICE, State.QUASI_CLOSED, + container.getSequenceId())); + } + // create UNHEALTHY replica with unique origin id on a DECOMMISSIONING node + ContainerReplica unhealthy = + createContainerReplica(container.containerID(), 0, DECOMMISSIONING, State.UNHEALTHY, sequenceId); + replicas.add(unhealthy); + Mockito.when(replicationManager.getNodeStatus(Mockito.any(DatanodeDetails.class))) + .thenAnswer(invocation -> { + DatanodeDetails dn = invocation.getArgument(0); + if (dn.equals(unhealthy.getDatanodeDetails())) { + return new NodeStatus(DECOMMISSIONING, HEALTHY); + } + return NodeStatus.inServiceHealthy(); + }); + requestBuilder.setContainerReplicas(replicas).setContainerInfo(container); + + assertTrue(handler.handle(requestBuilder.build())); + assertEquals(1, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); + } + @Test public void testReturnsFalseForVulnerableReplicaWithAnotherCopy() throws NodeNotFoundException { long sequenceId = 10; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java index d4e2ce4b7f04..523d4226cb43 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java @@ -41,6 +41,8 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + import java.io.IOException; import java.util.Collections; import java.util.HashSet; @@ -374,6 +376,70 @@ public void testDecommissionWaitsForUnhealthyReplicaToReplicateNewRM() nodeManager.getNodeStatus(dn1).getOperationalState()); } + /** + * Situation: A QUASI_CLOSED container has an UNHEALTHY replica with a unique origin and three QUASI_CLOSED replicas. + * All the replicas have the correct Sequence ID. UNHEALTHY container is on a decommissioning node, and there are + * no other copies of this replica, that is, replicas with the same Origin ID as this replica. + * + * Expectation: Decommissioning should not complete until the UNHEALTHY replica has been replicated to another node. + */ + @Test + public void testDecommissionWaitsForUnhealthyReplicaWithUniqueOriginToReplicateNewRM() + throws NodeNotFoundException, ContainerNotFoundException { + DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails(); + nodeManager.register(dn1, + new NodeStatus(HddsProtos.NodeOperationalState.DECOMMISSIONING, HddsProtos.NodeState.HEALTHY)); + + // create a container and 3 QUASI_CLOSED replicas with containerID 1 and same origin ID + ContainerID containerID = ContainerID.valueOf(1); + ContainerInfo container = ReplicationTestUtil.createContainerInfo(RatisReplicationConfig.getInstance( + HddsProtos.ReplicationFactor.THREE), containerID.getId(), HddsProtos.LifeCycleState.QUASI_CLOSED); + Set replicas = + ReplicationTestUtil.createReplicasWithSameOrigin(containerID, State.QUASI_CLOSED, 0, 0, 0); + + // UNHEALTHY replica is on a unique origin and has same sequence id as the container + ContainerReplica unhealthy = + ReplicationTestUtil.createContainerReplica(containerID, 0, + dn1.getPersistedOpState(), State.UNHEALTHY, + container.getNumberOfKeys(), container.getUsedBytes(), dn1, + dn1.getUuid(), container.getSequenceId()); + replicas.add(unhealthy); + nodeManager.setContainers(dn1, ImmutableSet.of(containerID)); + + Mockito.when(repManager.getContainerReplicaCount(Mockito.eq(containerID))) + .thenReturn(new RatisContainerReplicaCount(container, replicas, + Collections.emptyList(), 2, false)); + DatanodeAdminMonitorTestUtil.mockCheckContainerState(repManager, true); + + // start monitoring dn1 + monitor.startMonitoring(dn1); + monitor.run(); + assertEquals(1, monitor.getTrackedNodeCount()); + assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING, + nodeManager.getNodeStatus(dn1).getOperationalState()); + + // Running the monitor again causes it to remain DECOMMISSIONING + // as nothing has changed. + monitor.run(); + assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING, + nodeManager.getNodeStatus(dn1).getOperationalState()); + + // add a copy of the UNHEALTHY replica on a new node, dn1 should get + // decommissioned now + ContainerReplica copyOfUnhealthyOnNewNode = unhealthy.toBuilder() + .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails()) + .build(); + replicas.add(copyOfUnhealthyOnNewNode); + Mockito.when(repManager.getContainerReplicaCount(Mockito.eq(containerID))) + .thenReturn(new RatisContainerReplicaCount(container, replicas, + Collections.emptyList(), 2, false)); + DatanodeAdminMonitorTestUtil.mockCheckContainerState(repManager, false); + monitor.run(); + assertEquals(0, monitor.getTrackedNodeCount()); + assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONED, + nodeManager.getNodeStatus(dn1).getOperationalState()); + } + /** * Consider a QUASI_CLOSED container with only UNHEALTHY replicas. If one * of its nodes is decommissioned, the decommissioning should succeed.