diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java index eb5272b26bb9..248e7d715b22 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java @@ -77,4 +77,14 @@ ContainerPlacementStatus validateContainerPlacement( */ Set replicasToCopyToFixMisreplication( Map replicas); + + /** + * Given a set of replicas of a container which are overreplicated, + * return a set of replicas to delete to fix overreplication. + * @param replicas: Set of existing replicas of the container + * @param expectedCountPerUniqueReplica: Replication factor of each + * unique replica + */ + Set replicasToRemoveToFixOverreplication( + Set replicas, int expectedCountPerUniqueReplica); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java index ec062b032e89..8d02d7fc3b80 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -38,9 +39,13 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.stream.Collectors; @@ -503,4 +508,89 @@ public Set replicasToCopyToFixMisreplication( protected Node getPlacementGroup(DatanodeDetails dn) { return nodeManager.getClusterNetworkTopologyMap().getAncestor(dn, 1); } + + /** + * Given a set of replicas, expectedCount for Each replica, + * number of unique replica indexes. Replicas to be deleted for fixing over + * replication is computed. + * The algorithm starts with creating a replicaIdMap which contains the + * replicas grouped by replica Index. A placementGroup Map is created which + * groups replicas based on their rack & the replicas within the rack + * are further grouped based on the replica Index. + * A placement Group Count Map is created which keeps + * track of the count of replicas in each rack. + * We iterate through overreplicated replica indexes sorted in descending + * order based on their current replication factor in a descending factor. + * For each replica Index the replica is removed from the rack which contains + * the most replicas, in order to achieve this the racks are put + * into priority queue & are based on the number of replicas they have. + * The replica is removed from the rack with maximum replicas & the replica + * to be removed is also removed from the maps created above & + * the count for rack is reduced. + * The set of replicas computed are then returned by the function. + * @param replicas: Set of existing replicas of the container + * @param expectedCountPerUniqueReplica: Replication factor of each + * * unique replica + * @return Set of replicas to be removed are computed. + */ + @Override + public Set replicasToRemoveToFixOverreplication( + Set replicas, int expectedCountPerUniqueReplica) { + Map> replicaIdMap = new HashMap<>(); + Map>> placementGroupReplicaIdMap + = new HashMap<>(); + Map placementGroupCntMap = new HashMap<>(); + for (ContainerReplica replica:replicas) { + Integer replicaId = replica.getReplicaIndex(); + Node placementGroup = getPlacementGroup(replica.getDatanodeDetails()); + replicaIdMap.computeIfAbsent(replicaId, (rid) -> Sets.newHashSet()) + .add(replica); + placementGroupCntMap.compute(placementGroup, + (group, cnt) -> (cnt == null ? 0 : cnt) + 1); + placementGroupReplicaIdMap.computeIfAbsent(placementGroup, + (pg) -> Maps.newHashMap()).computeIfAbsent(replicaId, (rid) -> + Sets.newHashSet()).add(replica); + } + + Set replicasToRemove = new HashSet<>(); + List sortedRIDList = replicaIdMap.keySet().stream() + .filter(rid -> replicaIdMap.get(rid).size() > + expectedCountPerUniqueReplica) + .sorted((o1, o2) -> Integer.compare(replicaIdMap.get(o2).size(), + replicaIdMap.get(o1).size())) + .collect(Collectors.toList()); + for (Integer rid : sortedRIDList) { + if (replicaIdMap.get(rid).size() <= expectedCountPerUniqueReplica) { + break; + } + Queue pq = new PriorityQueue<>((o1, o2) -> + Integer.compare(placementGroupCntMap.get(o2), + placementGroupCntMap.get(o1))); + pq.addAll(placementGroupReplicaIdMap.entrySet() + .stream() + .filter(nodeMapEntry -> nodeMapEntry.getValue().containsKey(rid)) + .map(Map.Entry::getKey) + .collect(Collectors.toList())); + + while (replicaIdMap.get(rid).size() > expectedCountPerUniqueReplica) { + Node rack = pq.poll(); + Set replicaSet = + placementGroupReplicaIdMap.get(rack).get(rid); + if (replicaSet.size() > 0) { + ContainerReplica r = replicaSet.stream().findFirst().get(); + replicasToRemove.add(r); + replicaSet.remove(r); + replicaIdMap.get(rid).remove(r); + placementGroupCntMap.compute(rack, + (group, cnt) -> (cnt == null ? 0 : cnt) - 1); + if (replicaSet.size() == 0) { + placementGroupReplicaIdMap.get(rack).remove(rid); + } else { + pq.add(rack); + } + } + } + } + return replicasToRemove; + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java index a1ae137594d9..a64cc73b7840 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestSCMCommonPlacementPolicy.java @@ -35,17 +35,20 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; + import org.mockito.Mockito; + +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import java.util.function.Function; +import java.util.stream.Stream; /** * Test functions of SCMCommonPlacementPolicy. @@ -286,6 +289,147 @@ public void testReplicasWithoutMisreplication() { Assertions.assertEquals(0, replicasToCopy.size()); } + @Test + public void testReplicasToRemoveWithOneOverreplication() { + DummyPlacementPolicy dummyPlacementPolicy = + new DummyPlacementPolicy(nodeManager, conf, 5); + List list = nodeManager.getAllNodes(); + Set replicas = Sets.newHashSet( + HddsTestUtils.getReplicasWithReplicaIndex( + new ContainerID(1), CLOSED, 0, 0, 0, list.subList(1, 6))); + ContainerReplica replica = ContainerReplica.newBuilder() + .setContainerID(new ContainerID(1)) + .setContainerState(CLOSED) + .setReplicaIndex(1) + .setDatanodeDetails(list.get(7)).build(); + replicas.add(replica); + + Set replicasToRemove = dummyPlacementPolicy + .replicasToRemoveToFixOverreplication(replicas, 1); + Assertions.assertEquals(replicasToRemove.size(), 1); + Assertions.assertEquals(replicasToRemove.toArray()[0], replica); + } + + @Test + public void testReplicasToRemoveWithTwoOverreplication() { + DummyPlacementPolicy dummyPlacementPolicy = + new DummyPlacementPolicy(nodeManager, conf, 5); + List list = nodeManager.getAllNodes(); + + Set replicas = Sets.newHashSet( + HddsTestUtils.getReplicasWithReplicaIndex( + new ContainerID(1), CLOSED, 0, 0, 0, list.subList(1, 6))); + + Set replicasToBeRemoved = Sets.newHashSet( + HddsTestUtils.getReplicasWithReplicaIndex( + new ContainerID(1), CLOSED, 0, 0, 0, list.subList(7, 9))); + replicas.addAll(replicasToBeRemoved); + + Set replicasToRemove = dummyPlacementPolicy + .replicasToRemoveToFixOverreplication(replicas, 1); + Assertions.assertEquals(replicasToRemove.size(), 2); + Assertions.assertEquals(replicasToRemove, replicasToBeRemoved); + } + + @Test + public void testReplicasToRemoveWith2CountPerUniqueReplica() { + DummyPlacementPolicy dummyPlacementPolicy = + new DummyPlacementPolicy(nodeManager, conf, 3); + List list = nodeManager.getAllNodes(); + + Set replicas = Sets.newHashSet( + HddsTestUtils.getReplicasWithReplicaIndex( + new ContainerID(1), CLOSED, 0, 0, 0, list.subList(0, 3))); + replicas.addAll(HddsTestUtils.getReplicasWithReplicaIndex( + new ContainerID(1), CLOSED, 0, 0, 0, list.subList(3, 6))); + Set replicasToBeRemoved = Sets.newHashSet( + HddsTestUtils.getReplicaBuilder(new ContainerID(1), CLOSED, 0, 0, 0, + list.get(7).getUuid(), list.get(7)) + .setReplicaIndex(1).build(), + HddsTestUtils.getReplicaBuilder(new ContainerID(1), CLOSED, 0, 0, 0, + list.get(8).getUuid(), list.get(8)).setReplicaIndex(1) + .build()); + replicas.addAll(replicasToBeRemoved); + + Set replicasToRemove = dummyPlacementPolicy + .replicasToRemoveToFixOverreplication(replicas, 2); + Assertions.assertEquals(replicasToRemove.size(), 2); + Assertions.assertEquals(replicasToRemove, replicasToBeRemoved); + } + + @Test + public void testReplicasToRemoveWithoutReplicaIndex() { + DummyPlacementPolicy dummyPlacementPolicy = + new DummyPlacementPolicy(nodeManager, conf, 3); + List list = nodeManager.getAllNodes(); + + Set replicas = Sets.newHashSet(HddsTestUtils.getReplicas( + new ContainerID(1), CLOSED, 0, list.subList(0, 5))); + + Set replicasToRemove = dummyPlacementPolicy + .replicasToRemoveToFixOverreplication(replicas, 3); + Assertions.assertEquals(replicasToRemove.size(), 2); + Set racksToBeRemoved = Arrays.asList(0, 1).stream() + .map(dummyPlacementPolicy.racks::get).collect(Collectors.toSet()); + Assertions.assertEquals(replicasToRemove.stream() + .map(ContainerReplica::getDatanodeDetails) + .map(dummyPlacementPolicy::getPlacementGroup) + .collect(Collectors.toSet()), racksToBeRemoved); + } + + @Test + public void testReplicasToRemoveWithOverreplicationWithinSameRack() { + DummyPlacementPolicy dummyPlacementPolicy = + new DummyPlacementPolicy(nodeManager, conf, 3); + List list = nodeManager.getAllNodes(); + + Set replicas = Sets.newHashSet( + HddsTestUtils.getReplicasWithReplicaIndex( + new ContainerID(1), CLOSED, 0, 0, 0, list.subList(1, 6))); + + ContainerReplica replica1 = ContainerReplica.newBuilder() + .setContainerID(new ContainerID(1)) + .setContainerState(CLOSED) + .setReplicaIndex(1) + .setDatanodeDetails(list.get(6)).build(); + replicas.add(replica1); + ContainerReplica replica2 = ContainerReplica.newBuilder() + .setContainerID(new ContainerID(1)) + .setContainerState(CLOSED) + .setReplicaIndex(1) + .setDatanodeDetails(list.get(0)).build(); + replicas.add(replica2); + + Set replicasToRemove = dummyPlacementPolicy + .replicasToRemoveToFixOverreplication(replicas, 1); + Map removedReplicasRackCntMap = replicasToRemove.stream() + .map(ContainerReplica::getDatanodeDetails) + .map(dummyPlacementPolicy::getPlacementGroup) + .collect(Collectors.groupingBy(Function.identity(), + Collectors.counting())); + Assertions.assertEquals(replicasToRemove.size(), 2); + Assertions.assertTrue(Sets.newHashSet(1L, 2L).contains( + removedReplicasRackCntMap.get(dummyPlacementPolicy.racks.get(0)))); + Assertions.assertEquals( + removedReplicasRackCntMap.get(dummyPlacementPolicy.racks.get(1)), + removedReplicasRackCntMap.get(dummyPlacementPolicy.racks.get(0)) + == 2 ? 0 : 1); + } + + @Test + public void testReplicasToRemoveWithNoOverreplication() { + DummyPlacementPolicy dummyPlacementPolicy = + new DummyPlacementPolicy(nodeManager, conf, 5); + List list = nodeManager.getAllNodes(); + Set replicas = Sets.newHashSet( + HddsTestUtils.getReplicasWithReplicaIndex( + new ContainerID(1), CLOSED, 0, 0, 0, list.subList(1, 6))); + + Set replicasToRemove = dummyPlacementPolicy + .replicasToRemoveToFixOverreplication(replicas, 1); + Assertions.assertEquals(replicasToRemove.size(), 0); + } + private static class DummyPlacementPolicy extends SCMCommonPlacementPolicy { @@ -293,6 +437,14 @@ private static class DummyPlacementPolicy extends SCMCommonPlacementPolicy { private List racks; private int rackCnt; + + /** + * Creates Dummy Placement Policy with dn index to rack Mapping + * in round robin fashion (rack Index = dn Index % total number of racks). + * @param nodeManager + * @param conf + * @param rackCnt + */ DummyPlacementPolicy(NodeManager nodeManager, ConfigurationSource conf, int rackCnt) { this(nodeManager, conf, @@ -301,6 +453,12 @@ private static class DummyPlacementPolicy extends SCMCommonPlacementPolicy { idx -> idx % rackCnt)), rackCnt); } + /** + * Creates Dummy Placement Policy with dn index -> rack index mapping. + * @param nodeManager + * @param conf + * @param rackCnt + */ DummyPlacementPolicy(NodeManager nodeManager, ConfigurationSource conf, Map datanodeRackMap, int rackCnt) { super(nodeManager, conf); @@ -322,7 +480,7 @@ public DatanodeDetails chooseNode(List healthyNodes) { } @Override - public Node getPlacementGroup(DatanodeDetails dn) { + protected Node getPlacementGroup(DatanodeDetails dn) { return rackMap.get(dn); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java index 3c2a1d3ddb16..732cf208b97d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java @@ -202,6 +202,13 @@ public Set replicasToCopyToFixMisreplication( Map replicas) { return Collections.emptySet(); } + + @Override + public Set replicasToRemoveToFixOverreplication( + Set replicas, int expectedCountPerUniqueReplica) { + return null; + } + } @Test diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java index 254436e52a0e..51e6f4a2052d 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java @@ -379,6 +379,12 @@ public Set replicasToCopyToFixMisreplication( } + @Override + public Set replicasToRemoveToFixOverreplication( + Set replicas, int expectedCountPerUniqueReplica) { + return null; + } + private boolean isDnPresent(List dns) { for (DatanodeDetails dn : dns) { if (misRepWhenDnPresent != null