From 6565f91284913b040bd11bb905244e4e92e7081c Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Tue, 10 Jan 2023 15:06:13 +0000 Subject: [PATCH] HDDS-7761. EC: ReplicationManager - Use placementPolicy.replicasToRemoveToFixOverreplication in EC Over replication handler --- .../AbstractOverReplicationHandler.java | 14 ++ .../replication/ECOverReplicationHandler.java | 122 +++++++++++------- .../replication/ReplicationTestUtil.java | 10 ++ .../TestECOverReplicationHandler.java | 113 ++++++++++++++-- 4 files changed, 197 insertions(+), 62 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java index 2dac8b2312c9..5d647593effc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/AbstractOverReplicationHandler.java @@ -80,6 +80,20 @@ public boolean isPlacementStatusActuallyEqualAfterRemove( return isPlacementStatusActuallyEqual(currentCPS, newCPS); } + /** + * Allow the placement policy to indicate which replicas can be removed for + * an over replicated container, so that the placement policy is not violated + * by removing them. + * @param replicas + * @param expectedCountPerUniqueReplica + * @return + */ + protected Set selectReplicasToRemove( + Set replicas, int expectedCountPerUniqueReplica) { + return placementPolicy.replicasToRemoveToFixOverreplication( + replicas, expectedCountPerUniqueReplica); + } + /** * Given a set of ContainerReplica, transform it to a list of DatanodeDetails * and then check if the list meets the container placement policy. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java index 85295c974961..00c1a264af23 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hdds.scm.container.replication; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; @@ -30,12 +32,10 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -74,12 +74,34 @@ public Map> processAndCreateCommands( ContainerHealthResult result, int remainingMaintenanceRedundancy) { ContainerInfo container = result.getContainerInfo(); + // We are going to check for over replication, so we should filter out any + // replicas that are not in a HEALTHY state. This is because a replica can + // be healthy, stale or dead. If it is dead is will be quickly removed from + // scm. If it is state, there is a good chance the DN is offline and the + // replica will go away soon. So, if we have a container that is over + // replicated with a HEALTHY and STALE replica, and we decide to delete the + // HEALTHY one, and then the STALE ones goes away, we will lose them both. + // To avoid this, we will filter out any non-healthy replicas first. + // EcContainerReplicaCount will ignore nodes which are not IN_SERVICE for + // over replication checks, but we need to filter these out later in this + // method anyway, so it makes sense to filter them here too, to avoid a + // second lookup of the NodeStatus + Set healthyReplicas = replicas.stream() + .filter(r -> { + NodeStatus ns = ReplicationManager.getNodeStatus( + r.getDatanodeDetails(), nodeManager); + return ns.isHealthy() && ns.getOperationalState() == + HddsProtos.NodeOperationalState.IN_SERVICE; + }) + .collect(Collectors.toSet()); + final ECContainerReplicaCount replicaCount = - new ECContainerReplicaCount(container, replicas, pendingOps, + new ECContainerReplicaCount(container, healthyReplicas, pendingOps, remainingMaintenanceRedundancy); if (!replicaCount.isOverReplicated()) { - LOG.info("The container {} state changed and it's not in over" - + " replication any more", container.getContainerID()); + LOG.info("The container {} state changed and it is no longer over" + + " replication. Replica count: {}, healthy replica count: {}", + container.getContainerID(), replicas.size(), healthyReplicas.size()); return emptyMap(); } @@ -93,10 +115,9 @@ public Map> processAndCreateCommands( replicaCount.overReplicatedIndexes(true); //sanity check if (overReplicatedIndexes.size() == 0) { - LOG.warn("The container {} with replicas {} is found over replicated " + - "by ContainerHealthCheck, but found not over replicated by " + - "ECContainerReplicaCount", - container.getContainerID(), replicas); + LOG.warn("The container {} with replicas {} was found over replicated " + + "by EcContainerReplicaCount, but there are no over replicated " + + "indexes returned", container.getContainerID(), replicas); return emptyMap(); } @@ -106,48 +127,51 @@ public Map> processAndCreateCommands( deletionInFlight.add(op.getTarget()); } } - Map> index2replicas = new HashMap<>(); - replicas.stream() - .filter(r -> overReplicatedIndexes.contains(r.getReplicaIndex())) - .filter(r -> r - .getState() == StorageContainerDatanodeProtocolProtos - .ContainerReplicaProto.State.CLOSED) - .filter(r -> ReplicationManager - .getNodeStatus(r.getDatanodeDetails(), nodeManager).isHealthy()) + + Set candidates = healthyReplicas.stream() .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails())) - .forEach(r -> { - int index = r.getReplicaIndex(); - index2replicas.computeIfAbsent(index, k -> new LinkedList<>()); - index2replicas.get(index).add(r); - }); - - if (index2replicas.size() > 0) { - final Map> commands = new HashMap<>(); - final int replicationFactor = - container.getReplicationConfig().getRequiredNodes(); - index2replicas.values().forEach(l -> { - Iterator it = l.iterator(); - Set tempReplicaSet = new HashSet<>(replicas); - while (it.hasNext() && l.size() > 1) { - ContainerReplica r = it.next(); - if (isPlacementStatusActuallyEqualAfterRemove( - tempReplicaSet, r, replicationFactor)) { - DeleteContainerCommand deleteCommand = - new DeleteContainerCommand(container.getContainerID(), true); - deleteCommand.setReplicaIndex(r.getReplicaIndex()); - commands.put(r.getDatanodeDetails(), deleteCommand); - it.remove(); - tempReplicaSet.remove(r); - } - } - }); - if (commands.size() == 0) { - LOG.info("With the current state of avilable replicas {}, no" + - " commands to process due to over replication.", replicas); + .filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos + .ContainerReplicaProto.State.CLOSED) + .collect(Collectors.toSet()); + + Set replicasToRemove = + selectReplicasToRemove(candidates, 1); + + if (replicasToRemove.size() == 0) { + LOG.warn("The container {} is over replicated, but no replicas were " + + "selected to remove by the placement policy. Replicas: {}", + container, replicas); + return emptyMap(); + } + + final Map> commands = new HashMap<>(); + // As a sanity check, sum up the current counts of each replica index. When + // processing replicasToRemove, ensure that removing the replica would not + // drop the count of that index to zero. + Map replicaIndexCounts = new HashMap<>(); + for (ContainerReplica r : candidates) { + replicaIndexCounts.put(r.getReplicaIndex(), + replicaIndexCounts.getOrDefault(r.getReplicaIndex(), 0) + 1); + } + for (ContainerReplica r : replicasToRemove) { + int currentCount = replicaIndexCounts.getOrDefault( + r.getReplicaIndex(), 0); + if (currentCount < 2) { + LOG.warn("The replica {} selected to remove would reduce the count " + + "for that index to zero. Candidate Replicas: {}", r, candidates); + continue; } - return commands; + replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1); + DeleteContainerCommand deleteCommand = + new DeleteContainerCommand(container.getContainerID(), true); + deleteCommand.setReplicaIndex(r.getReplicaIndex()); + commands.put(r.getDatanodeDetails(), deleteCommand); } - return emptyMap(); + if (commands.size() == 0) { + LOG.warn("With the current state of available replicas {}, no" + + " commands were created to remove excess replicas.", replicas); + } + return commands; } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java index f87d2851586a..4960f89d2fd0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.Node; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -198,6 +199,9 @@ public static Set createReplicas( public static PlacementPolicy getSimpleTestPlacementPolicy( final NodeManager nodeManager, final OzoneConfiguration conf) { + + final Node rackNode = MockDatanodeDetails.randomDatanodeDetails(); + return new SCMCommonPlacementPolicy(nodeManager, conf) { @Override protected List chooseDatanodesInternal( @@ -216,6 +220,12 @@ protected List chooseDatanodesInternal( public DatanodeDetails chooseNode(List healthyNodes) { return null; } + + @Override + protected Node getPlacementGroup(DatanodeDetails dn) { + // Make it look like a single rack cluster + return rackNode; + } }; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java index 71eb164ec0ca..e882374a0940 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java @@ -19,16 +19,17 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.MockNodeManager; -import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault; import org.apache.hadoop.hdds.scm.net.NodeSchema; import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -42,18 +43,20 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE; import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA; import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA; import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyList; /** * Tests the ECOverReplicationHandling functionality. @@ -64,14 +67,18 @@ public class TestECOverReplicationHandler { private NodeManager nodeManager; private OzoneConfiguration conf; private PlacementPolicy policy; - private PlacementPolicy placementPolicy; + private DatanodeDetails staleNode; @BeforeEach public void setup() { + staleNode = null; nodeManager = new MockNodeManager(true, 10) { @Override public NodeStatus getNodeStatus(DatanodeDetails dd) throws NodeNotFoundException { + if (staleNode != null && dd.equals(staleNode)) { + return NodeStatus.inServiceStale(); + } return NodeStatus.inServiceHealthy(); } }; @@ -84,10 +91,6 @@ public NodeStatus getNodeStatus(DatanodeDetails dd) NodeSchema[] schemas = new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}; NodeSchemaManager.getInstance().init(schemas, true); - placementPolicy = Mockito.mock(PlacementPolicy.class); - Mockito.when(placementPolicy.validateContainerPlacement( - anyList(), anyInt())) - .thenReturn(new ContainerPlacementStatusDefault(2, 2, 3)); } @Test @@ -96,7 +99,89 @@ public void testNoOverReplication() { .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5)); - testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap()); + testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(), + ImmutableList.of()); + } + + @Test + public void testOverReplicationFixedByPendingDelete() { + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(IN_SERVICE, 1), + Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3), + Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5)); + ContainerReplica excess = ReplicationTestUtil.createContainerReplica( + container.containerID(), 5, IN_SERVICE, + ContainerReplicaProto.State.CLOSED); + availableReplicas.add(excess); + List pendingOps = new ArrayList(); + pendingOps.add(ContainerReplicaOp.create(DELETE, + excess.getDatanodeDetails(), 5)); + testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(), + pendingOps); + } + + @Test + public void testOverReplicationWithDecommissionIndexes() { + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(IN_SERVICE, 1), + Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3), + Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5), + Pair.of(DECOMMISSIONING, 5)); + testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(), + ImmutableList.of()); + } + + @Test + public void testOverReplicationWithStaleIndexes() { + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(IN_SERVICE, 1), + Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3), + Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5)); + ContainerReplica stale = ReplicationTestUtil.createContainerReplica( + container.containerID(), 5, IN_SERVICE, + ContainerReplicaProto.State.CLOSED); + availableReplicas.add(stale); + // By setting stale node, it makes the mocked nodeManager return a stale + // start for it when checked. + staleNode = stale.getDatanodeDetails(); + testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(), + ImmutableList.of()); + } + + @Test + public void testOverReplicationWithOpenReplica() { + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(IN_SERVICE, 1), + Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3), + Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5)); + ContainerReplica open = ReplicationTestUtil.createContainerReplica( + container.containerID(), 5, IN_SERVICE, + ContainerReplicaProto.State.OPEN); + availableReplicas.add(open); + testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(), + ImmutableList.of()); + } + + /** + * This test mocks the placement policy so it returns invalid results. This + * should not happen, but it tests that commands are not sent for the wrong + * replica. + */ + @Test + public void testOverReplicationButPolicyReturnsWrongIndexes() { + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3), + Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5), + Pair.of(IN_SERVICE, 5)); + ContainerReplica toReturn = ReplicationTestUtil.createContainerReplica( + container.containerID(), 1, IN_SERVICE, + ContainerReplicaProto.State.CLOSED); + policy = Mockito.mock(PlacementPolicy.class); + Mockito.when(policy.replicasToRemoveToFixOverreplication( + Mockito.any(), Mockito.anyInt())) + .thenReturn(ImmutableSet.of(toReturn)); + testOverReplicationWithIndexes(availableReplicas, Collections.emptyMap(), + ImmutableList.of()); } @Test @@ -109,7 +194,8 @@ public void testOverReplicationWithOneSameIndexes() { testOverReplicationWithIndexes(availableReplicas, //num of index 1 is 3, but it should be 1, so 2 excess - new ImmutableMap.Builder().put(1, 2).build()); + new ImmutableMap.Builder().put(1, 2).build(), + ImmutableList.of()); } @Test @@ -127,7 +213,7 @@ public void testOverReplicationWithMultiSameIndexes() { //num of index 1 is 3, but it should be 1, so 2 excess new ImmutableMap.Builder() .put(1, 2).put(2, 2).put(3, 2).put(4, 1) - .put(5, 1).build()); + .put(5, 1).build(), ImmutableList.of()); } /** @@ -163,7 +249,8 @@ public void testOverReplicationWithUnderReplication() { private void testOverReplicationWithIndexes( Set availableReplicas, - Map index2excessNum) { + Map index2excessNum, + List pendingOps) { ECOverReplicationHandler ecORH = new ECOverReplicationHandler(policy, nodeManager); ContainerHealthResult.OverReplicatedHealthResult result = @@ -171,7 +258,7 @@ private void testOverReplicationWithIndexes( Mockito.when(result.getContainerInfo()).thenReturn(container); Map> commands = ecORH - .processAndCreateCommands(availableReplicas, ImmutableList.of(), + .processAndCreateCommands(availableReplicas, pendingOps, result, 1); // total commands send out should be equal to the sum of all