From 1c9b44447491c443988223f7d0eac00ea3e0cd46 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Thu, 30 Jun 2022 21:05:38 +0100 Subject: [PATCH 1/2] HDDS-6957. EC: ReplicationManager - priortise under replicated containers --- .../replication/ContainerHealthResult.java | 51 +++++++- .../replication/ReplicationManager.java | 90 ++++++++++++- .../replication/TestReplicationManager.java | 122 +++++++++++++++++- 3 files changed, 254 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java index 554e7260d1a7..801d3b2a90fb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java @@ -77,10 +77,24 @@ public static class HealthyResult extends ContainerHealthResult { public static class UnderReplicatedHealthResult extends ContainerHealthResult { + // For under replicated containers, the best remaining redundancy we can + // have is 3 for EC-10-4, 2 for EC-6-3, 1 for EC-3-2 and 2 for Ratis. + // A container which is under-replicated due to decommission will have one + // more, ie 4, 3, 2, 3 respectively. Ideally we want to sort decommission + // only under-replication after all other under-replicated containers. + // It may also make sense to allow under-replicated containers a chance to + // retry once before processing the decommission only under replication. + // Therefore we should adjust the weighted remaining redundancy of + // decommission only under-replicated containers to a floor of 5 so they + // sort after an under-replicated container with 3 remaining replicas ( + // EC-10-4) and plus one retry. + private static final int DECOMMISSION_REDUNDANCY = 5; + private final int remainingRedundancy; private final boolean dueToDecommission; private final boolean sufficientlyReplicatedAfterPending; private final boolean unrecoverable; + private int requeueCount = 0; UnderReplicatedHealthResult(ContainerInfo containerInfo, int remainingRedundancy, boolean dueToDecommission, @@ -93,7 +107,7 @@ public static class UnderReplicatedHealthResult } /** - * How many more replicas can be lost before the the container is + * How many more replicas can be lost before the container is * unreadable. For containers which are under-replicated due to decommission * or maintenance only, the remaining redundancy will include those * decommissioning or maintenance replicas, as they are technically still @@ -104,6 +118,41 @@ public int getRemainingRedundancy() { return remainingRedundancy; } + /** + * The weightedRedundancy, is the remaining redundancy + the requeue count. + * When this value is used for ordering in a priority queue it ensures the + * priority is reduced each time it is requeued, to prevent it from blocking + * other containers from being processed. + * Additionally, so that decommission and maintenance replicas are not + * ordered ahead of under-replicated replicas, a redundancy of + * DECOMMISSION_REDUNDANCY is used for the decommission redundancy rather + * than its real redundancy. + * @return The weightedRedundancy of this result. + */ + public int getWeightedRedundancy() { + int result = requeueCount; + if (dueToDecommission) { + result += DECOMMISSION_REDUNDANCY; + } else { + result += remainingRedundancy; + } + return result; + } + + /** + * If there is an attempt to process this under-replicated result, and it + * fails and has to be requeued, this method should be called to increment + * the requeue count to ensure the result is not placed back at the head + * of the queue. + */ + public void incrementRequeueCount() { + ++requeueCount; + } + + public int getRequeueCount() { + return requeueCount; + } + /** * Indicates whether the under-replication is caused only by replicas * being decommissioned or entering maintenance. Ie, there are not replicas diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index fc643af51baa..d5d2076f1b95 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -50,6 +50,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -130,6 +132,9 @@ public class ReplicationManager implements SCMService { private final ContainerReplicaPendingOps containerReplicaPendingOps; private final ContainerHealthCheck ecContainerHealthCheck; private final EventPublisher eventPublisher; + private final ReentrantLock lock = new ReentrantLock(); + private Queue + underRepQueue; /** * Constructs ReplicationManager instance with the given configuration. @@ -166,6 +171,7 @@ public ReplicationManager(final ConfigurationSource conf, this.legacyReplicationManager = legacyReplicationManager; this.ecContainerHealthCheck = new ECContainerHealthCheck(); this.nodeManager = nodeManager; + this.underRepQueue = createUnderReplicatedQueue(); start(); } @@ -232,8 +238,8 @@ public synchronized void processAll() { final List containers = containerManager.getContainers(); ReplicationManagerReport report = new ReplicationManagerReport(); - List underReplicated = - new ArrayList<>(); + Queue + underReplicated = createUnderReplicatedQueue(); List overReplicated = new ArrayList<>(); @@ -254,16 +260,61 @@ public synchronized void processAll() { } } report.setComplete(); - // TODO - Sort the pending lists by priority and assign to the main queue, - // which is yet to be defined. + lock.lock(); + try { + underRepQueue = underReplicated; + } finally { + lock.unlock(); + } this.containerReport = report; LOG.info("Replication Monitor Thread took {} milliseconds for" + " processing {} containers.", clock.millis() - start, containers.size()); } + /** + * Retrieve the new highest priority container to be replicated from the + * under replicated queue. + * @return The new underReplicated container to be processed, or null if the + * queue is empty. + */ + public ContainerHealthResult.UnderReplicatedHealthResult + dequeueUnderReplicatedContainer() { + lock.lock(); + try { + return underRepQueue.poll(); + } finally { + lock.unlock(); + } + } + + /** + * Add an under replicated container back to the queue if it was unable to + * be processed. Its retry count will be incremented before it is re-queued, + * reducing its priority. + * Note that the queue could have been rebuilt and replaced after this + * message was removed but before it is added back. This will result in a + * duplicate entry on the queue. However, when it is processed again, the + * result of the processing will end up with pending replicas scheduled. If + * instance 1 is processed and creates the pending replicas, when instance 2 + * is processed, it will find the pending containers and know it has no work + * to do, and be discarded. Additionally, the queue will be refreshed + * periodically removing any duplicates. + * @param underReplicatedHealthResult + */ + public void requeueUnderReplicatedContainer(ContainerHealthResult + .UnderReplicatedHealthResult underReplicatedHealthResult) { + underReplicatedHealthResult.incrementRequeueCount(); + lock.lock(); + try { + underRepQueue.add(underReplicatedHealthResult); + } finally { + lock.unlock(); + } + } + protected ContainerHealthResult processContainer(ContainerInfo containerInfo, - List underRep, + Queue underRep, List overRep, ReplicationManagerReport report) throws ContainerNotFoundException { Set replicas = containerManager.getContainerReplicas( @@ -302,6 +353,35 @@ protected ContainerHealthResult processContainer(ContainerInfo containerInfo, return health; } + /** + * Creates a priority queue of UnderReplicatedHealthResult, where the elements + * are ordered by the weighted redundancy of the container. This means that + * containers with the least remaining redundancy are at the front of the + * queue, and will be processed first. + * @return An empty instance of a PriorityQueue. + */ + protected PriorityQueue + createUnderReplicatedQueue() { + return new PriorityQueue<>( + (o1, o2) -> { + if (o1.getWeightedRedundancy() < o2.getWeightedRedundancy()) { + return -1; + } else if (o1.getWeightedRedundancy() > o2.getWeightedRedundancy()) { + return 1; + } else { + // Equal weighted redundancy. Put the one with the lesser retries + // first + if (o1.getRequeueCount() < o2.getRequeueCount()) { + return -1; + } else if (o1.getRequeueCount() > o2.getRequeueCount()) { + return 1; + } else { + return 0; + } + } + }); + } + public ReplicationManagerReport getContainerReport() { return containerReport; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index fdce87f8bac1..4ea08d7779d9 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm.container.replication; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.ozone.test.TestClock; @@ -43,11 +45,14 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo; import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas; @@ -68,9 +73,10 @@ public class TestReplicationManager { private ContainerReplicaPendingOps containerReplicaPendingOps; private Map> containerReplicaMap; + private Set containerInfoSet; private ReplicationConfig repConfig; private ReplicationManagerReport repReport; - private List underRep; + private Queue underRep; private List overRep; @Before @@ -94,6 +100,9 @@ public void setup() throws IOException { return containerReplicaMap.get(cid); }); + Mockito.when(containerManager.getContainers()).thenAnswer( + invocation -> new ArrayList<>(containerInfoSet)); + replicationManager = new ReplicationManager( configuration, containerManager, @@ -105,10 +114,18 @@ public void setup() throws IOException { legacyReplicationManager, containerReplicaPendingOps); containerReplicaMap = new HashMap<>(); + containerInfoSet = new HashSet<>(); repConfig = new ECReplicationConfig(3, 2); repReport = new ReplicationManagerReport(); - underRep = new ArrayList<>(); + underRep = replicationManager.createUnderReplicatedQueue(); overRep = new ArrayList<>(); + + // Ensure that RM will run when asked. + Mockito.when(scmContext.isLeaderReady()).thenReturn(true); + Mockito.when(scmContext.isInSafeMode()).thenReturn(false); + SCMServiceManager serviceManager = new SCMServiceManager(); + serviceManager.register(replicationManager); + serviceManager.notifyStatusChanged(); } @Test @@ -245,12 +262,111 @@ public void testOverReplicatedFixByPending() ReplicationManagerReport.HealthState.OVER_REPLICATED)); } + @Test + public void testUnderReplicatedOrderingInQueue() + throws ContainerNotFoundException { + ContainerInfo decomContainer = createContainerInfo(repConfig, 1, + HddsProtos.LifeCycleState.CLOSED); + addReplicas(decomContainer, Pair.of(DECOMMISSIONING, 1), + Pair.of(DECOMMISSIONING, 2), Pair.of(DECOMMISSIONING, 3), + Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5)); + + ContainerInfo underRep1 = createContainerInfo(repConfig, 2, + HddsProtos.LifeCycleState.CLOSED); + addReplicas(underRep1, 1, 2, 3, 4); + ContainerInfo underRep0 = createContainerInfo(repConfig, 3, + HddsProtos.LifeCycleState.CLOSED); + addReplicas(underRep0, 1, 2, 3); + + replicationManager.processContainer(decomContainer, underRep, overRep, + repReport); + replicationManager.processContainer(underRep1, underRep, overRep, + repReport); + replicationManager.processContainer(underRep0, underRep, overRep, + repReport); + // We expect 3 messages on the queue. They should be in the reverse order + // to which they were added, putting the lowest remaining redundancy first. + Assert.assertEquals(3, underRep.size()); + ContainerHealthResult.UnderReplicatedHealthResult res = underRep.poll(); + Assert.assertEquals(underRep0, res.getContainerInfo()); + res = underRep.poll(); + Assert.assertEquals(underRep1, res.getContainerInfo()); + res = underRep.poll(); + Assert.assertEquals(decomContainer, res.getContainerInfo()); + } + + @Test + public void testUnderReplicationQueuePopulated() { + ContainerInfo decomContainer = createContainerInfo(repConfig, 1, + HddsProtos.LifeCycleState.CLOSED); + addReplicas(decomContainer, Pair.of(DECOMMISSIONING, 1), + Pair.of(DECOMMISSIONING, 2), Pair.of(DECOMMISSIONING, 3), + Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5)); + + ContainerInfo underRep1 = createContainerInfo(repConfig, 2, + HddsProtos.LifeCycleState.CLOSED); + addReplicas(underRep1, 1, 2, 3, 4); + ContainerInfo underRep0 = createContainerInfo(repConfig, 3, + HddsProtos.LifeCycleState.CLOSED); + addReplicas(underRep0, 1, 2, 3); + + replicationManager.processAll(); + + // Get the first message off the queue - it should be underRep0. + ContainerHealthResult.UnderReplicatedHealthResult res + = replicationManager.dequeueUnderReplicatedContainer(); + Assert.assertEquals(underRep0, res.getContainerInfo()); + + // Now requeue it + replicationManager.requeueUnderReplicatedContainer(res); + + // Now get the next message. It should be underRep1, as it has remaining + // redundancy 1 + zero retries. UnderRep1 will have remaining redundancy 0 + // and 1 retry. They will have the same weighted redundancy so lesser + // retries should come first + res = replicationManager.dequeueUnderReplicatedContainer(); + Assert.assertEquals(underRep1, res.getContainerInfo()); + + // Next message is underRep0. It starts with a weighted redundancy of 0 + 1 + // retry. The other message on the queue is a decommission only with a + // weighted redundancy of 5 + 0. So lets dequeue and requeue the message 4 + // times. Then the weighted redundancy will be equal and the decommission + // one will be next due to having less retries. + for (int i = 0; i < 4; i++) { + res = replicationManager.dequeueUnderReplicatedContainer(); + Assert.assertEquals(underRep0, res.getContainerInfo()); + replicationManager.requeueUnderReplicatedContainer(res); + } + res = replicationManager.dequeueUnderReplicatedContainer(); + Assert.assertEquals(decomContainer, res.getContainerInfo()); + + res = replicationManager.dequeueUnderReplicatedContainer(); + Assert.assertEquals(underRep0, res.getContainerInfo()); + + res = replicationManager.dequeueUnderReplicatedContainer(); + Assert.assertNull(res); + } + + private Set addReplicas(ContainerInfo container, + Pair... nodes) { + final Set replicas = + createReplicas(container.containerID(), nodes); + storeContainerAndReplicas(container, replicas); + return replicas; + } + private Set addReplicas(ContainerInfo container, int... indexes) { final Set replicas = createReplicas(container.containerID(), indexes); - containerReplicaMap.put(container.containerID(), replicas); + storeContainerAndReplicas(container, replicas); return replicas; } + private void storeContainerAndReplicas(ContainerInfo container, + Set replicas) { + containerReplicaMap.put(container.containerID(), replicas); + containerInfoSet.add(container); + } + } From 670e94e574bcfaf3a45fc11ec14df6bec29031a6 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Fri, 1 Jul 2022 16:33:39 +0100 Subject: [PATCH 2/2] Fix review comments --- .../replication/ReplicationManager.java | 23 +++--------- .../replication/TestReplicationManager.java | 35 +------------------ 2 files changed, 6 insertions(+), 52 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index d5d2076f1b95..0760e724ca38 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -48,6 +48,7 @@ import java.time.Clock; import java.time.Duration; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; @@ -362,24 +363,10 @@ protected ContainerHealthResult processContainer(ContainerInfo containerInfo, */ protected PriorityQueue createUnderReplicatedQueue() { - return new PriorityQueue<>( - (o1, o2) -> { - if (o1.getWeightedRedundancy() < o2.getWeightedRedundancy()) { - return -1; - } else if (o1.getWeightedRedundancy() > o2.getWeightedRedundancy()) { - return 1; - } else { - // Equal weighted redundancy. Put the one with the lesser retries - // first - if (o1.getRequeueCount() < o2.getRequeueCount()) { - return -1; - } else if (o1.getRequeueCount() > o2.getRequeueCount()) { - return 1; - } else { - return 0; - } - } - }); + return new PriorityQueue<>(Comparator.comparing(ContainerHealthResult + .UnderReplicatedHealthResult::getWeightedRedundancy) + .thenComparing(ContainerHealthResult + .UnderReplicatedHealthResult::getRequeueCount)); } public ReplicationManagerReport getContainerReport() { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 4ea08d7779d9..74a1955dd79c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -262,39 +262,6 @@ public void testOverReplicatedFixByPending() ReplicationManagerReport.HealthState.OVER_REPLICATED)); } - @Test - public void testUnderReplicatedOrderingInQueue() - throws ContainerNotFoundException { - ContainerInfo decomContainer = createContainerInfo(repConfig, 1, - HddsProtos.LifeCycleState.CLOSED); - addReplicas(decomContainer, Pair.of(DECOMMISSIONING, 1), - Pair.of(DECOMMISSIONING, 2), Pair.of(DECOMMISSIONING, 3), - Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5)); - - ContainerInfo underRep1 = createContainerInfo(repConfig, 2, - HddsProtos.LifeCycleState.CLOSED); - addReplicas(underRep1, 1, 2, 3, 4); - ContainerInfo underRep0 = createContainerInfo(repConfig, 3, - HddsProtos.LifeCycleState.CLOSED); - addReplicas(underRep0, 1, 2, 3); - - replicationManager.processContainer(decomContainer, underRep, overRep, - repReport); - replicationManager.processContainer(underRep1, underRep, overRep, - repReport); - replicationManager.processContainer(underRep0, underRep, overRep, - repReport); - // We expect 3 messages on the queue. They should be in the reverse order - // to which they were added, putting the lowest remaining redundancy first. - Assert.assertEquals(3, underRep.size()); - ContainerHealthResult.UnderReplicatedHealthResult res = underRep.poll(); - Assert.assertEquals(underRep0, res.getContainerInfo()); - res = underRep.poll(); - Assert.assertEquals(underRep1, res.getContainerInfo()); - res = underRep.poll(); - Assert.assertEquals(decomContainer, res.getContainerInfo()); - } - @Test public void testUnderReplicationQueuePopulated() { ContainerInfo decomContainer = createContainerInfo(repConfig, 1, @@ -321,7 +288,7 @@ public void testUnderReplicationQueuePopulated() { replicationManager.requeueUnderReplicatedContainer(res); // Now get the next message. It should be underRep1, as it has remaining - // redundancy 1 + zero retries. UnderRep1 will have remaining redundancy 0 + // redundancy 1 + zero retries. UnderRep0 will have remaining redundancy 0 // and 1 retry. They will have the same weighted redundancy so lesser // retries should come first res = replicationManager.dequeueUnderReplicatedContainer();