diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java index 6bb6be7b6e46..a204d1694139 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java @@ -77,6 +77,7 @@ public class ECContainerReplicaCount implements ContainerReplicaCount { private final Map healthyIndexes = new HashMap<>(); private final Map decommissionIndexes = new HashMap<>(); private final Map maintenanceIndexes = new HashMap<>(); + private final Set unhealthyReplicaDNs; private final List replicas; public ECContainerReplicaCount(ContainerInfo containerInfo, @@ -97,7 +98,7 @@ public ECContainerReplicaCount(ContainerInfo containerInfo, this.remainingMaintenanceRedundancy = Math.min(repConfig.getParity(), remainingMaintenanceRedundancy); - Set unhealthyReplicaDNs = new HashSet<>(); + unhealthyReplicaDNs = new HashSet<>(); for (ContainerReplica r : replicas) { if (r.getState() == ContainerReplicaProto.State.UNHEALTHY) { unhealthyReplicaDNs.add(r.getDatanodeDetails()); @@ -105,17 +106,7 @@ public ECContainerReplicaCount(ContainerInfo containerInfo, } for (ContainerReplicaOp op : replicaPendingOps) { - if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) { - pendingAdd.add(op.getReplicaIndex()); - } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) { - if (!unhealthyReplicaDNs.contains(op.getTarget())) { - // We ignore unhealthy replicas later in this method, so we also - // need to ignore pending deletes on those unhealthy replicas, - // otherwise the pending delete will decrement the healthy count and - // make the container appear under-replicated when it is not. - pendingDelete.add(op.getReplicaIndex()); - } - } + processPendingOp(op); } for (ContainerReplica replica : replicas) { @@ -157,20 +148,7 @@ public ECContainerReplicaCount(ContainerInfo containerInfo, // will eventually be removed and reduce the count for this replica. If the // count goes to zero, remove it from the map. for (Integer i : pendingDelete) { - ensureIndexWithinBounds(i, "pendingDelete"); - Integer count = healthyIndexes.get(i); - if (count != null) { - count = count - 1; - if (count < 1) { - healthyIndexes.remove(i); - } else { - healthyIndexes.put(i, count); - } - } - } - // Ensure any pending adds are within bounds - for (Integer i : pendingAdd) { - ensureIndexWithinBounds(i, "pendingAdd"); + adjustHealthyCountWithPendingDelete(i); } } @@ -194,6 +172,58 @@ public int getMaintenanceCount() { return maintenanceIndexes.size(); } + /** + * Given a ContainerReplicaOp, check its index is within the expected + * bounds and then add it to the relevant list. + * @param op + */ + private void processPendingOp(ContainerReplicaOp op) { + ensureIndexWithinBounds(op.getReplicaIndex(), "pending" + op.getOpType()); + if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) { + pendingAdd.add(op.getReplicaIndex()); + } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) { + if (!unhealthyReplicaDNs.contains(op.getTarget())) { + // We ignore unhealthy replicas later in this method, so we also + // need to ignore pending deletes on those unhealthy replicas, + // otherwise the pending delete will decrement the healthy count and + // make the container appear under-replicated when it is not. + pendingDelete.add(op.getReplicaIndex()); + } + } + } + + /** + * Remove the pending delete replicas from the healthy set as we assume they + * will eventually be removed and reduce the count for this replica. If the + * count goes to zero, remove it from the map. + * @param index The replica Index which is pending delete. + */ + private void adjustHealthyCountWithPendingDelete(int index) { + Integer count = healthyIndexes.get(index); + if (count != null) { + count = count - 1; + if (count < 1) { + healthyIndexes.remove(index); + } else { + healthyIndexes.put(index, count); + } + } + } + + /** + * Add a pending op to the object. This allows the same + * ECContainerReplicaCount object to be used for multiple processing stages + * where commands are created at each state. The addition of a new pending + * op could influence what further replicas are needed in subsequent stages. + * @param op The ContainerReplicaOp to add. + */ + public void addPendingOp(ContainerReplicaOp op) { + processPendingOp(op); + if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) { + adjustHealthyCountWithPendingDelete(op.getReplicaIndex()); + } + } + /** * Get a set containing all decommissioning indexes, or an empty set if none * are decommissioning. Note it is possible for an index to be diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java index 7cddc1e5ce59..872835e44447 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java @@ -317,6 +317,12 @@ private int processMissingIndexes( repConfig); replicationManager.sendDatanodeCommand(reconstructionCommand, container, selectedDatanodes.get(0)); + // For each index that is going to be reconstructed with this command, + // adjust the replica count to reflect the pending operation. + for (int i = 0; i < missingIndexes.size(); i++) { + adjustPendingOps( + replicaCount, selectedDatanodes.get(i), missingIndexes.get(i)); + } commandsSent++; } } else { @@ -367,7 +373,8 @@ private int processDecommissioningIndexes( selectedDatanodes, excludedNodes, decomIndexes); break; } - createReplicateCommand(container, iterator, sourceReplica); + createReplicateCommand( + container, iterator, sourceReplica, replicaCount); commandsSent++; } } @@ -429,7 +436,7 @@ private int processMaintenanceOnlyIndexes( targets, excludedNodes, maintIndexes); break; } - createReplicateCommand(container, iterator, sourceReplica); + createReplicateCommand(container, iterator, sourceReplica, replicaCount); commandsSent++; additionalMaintenanceCopiesNeeded -= 1; } @@ -438,7 +445,7 @@ private int processMaintenanceOnlyIndexes( private void createReplicateCommand( ContainerInfo container, Iterator iterator, - ContainerReplica replica) + ContainerReplica replica, ECContainerReplicaCount replicaCount) throws AllSourcesOverloadedException, NotLeaderException { final boolean push = replicationManager.getConfig().isPush(); DatanodeDetails source = replica.getDatanodeDetails(); @@ -459,6 +466,14 @@ private void createReplicateCommand( replicationManager.sendDatanodeCommand(replicateCommand, container, target); } + adjustPendingOps(replicaCount, target, replica.getReplicaIndex()); + } + + private void adjustPendingOps(ECContainerReplicaCount replicaCount, + DatanodeDetails target, int replicaIndex) { + replicaCount.addPendingOp(new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex, + Long.MAX_VALUE)); } private static byte[] int2byte(List src) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java index 8ea24af0f6e7..883cb1f54259 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java @@ -86,6 +86,15 @@ public void testContainerMissingReplica() { Assertions.assertEquals(1, rcnt.unavailableIndexes(true).size()); Assertions.assertEquals(5, rcnt.unavailableIndexes(true).get(0).intValue()); + + // Add a pending add op for the missing replica and ensure it no longer + // appears missing + ContainerReplicaOp op = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, + MockDatanodeDetails.randomDatanodeDetails(), 5, Long.MAX_VALUE); + rcnt.addPendingOp(op); + Assertions.assertTrue(rcnt.isSufficientlyReplicated(true)); + Assertions.assertEquals(0, rcnt.unavailableIndexes(true).size()); } @Test @@ -201,6 +210,13 @@ public void testOverReplicatedContainer() { Assertions.assertEquals(1, rcnt.overReplicatedIndexes(true).size()); Assertions.assertTrue(rcnt.isOverReplicated(false)); Assertions.assertEquals(2, rcnt.overReplicatedIndexes(false).size()); + + // Add a pending delete op for the excess replica and ensure it now reports + // as not over replicated. + rcnt.addPendingOp(new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.DELETE, + MockDatanodeDetails.randomDatanodeDetails(), 2, Long.MAX_VALUE)); + Assertions.assertFalse(rcnt.isOverReplicated(true)); } @Test diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java index 0239e9edbbc4..2605469f2115 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java @@ -186,7 +186,8 @@ public void testUnderReplicationWithDecomIndex1() throws IOException { } - // Test used to reproduce the issue reported in HDDS-8171 + // Test used to reproduce the issue reported in HDDS-8171 and then adjusted + // to ensure only a single command is sent for HDDS-8172. @Test public void testUnderReplicationWithDecomIndexAndMaintOnSameIndex() throws IOException { @@ -214,10 +215,13 @@ public NodeStatus getNodeStatus(DatanodeDetails dd) { Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5))); + // Note that maintenanceIndexes is set to zero as we do not expect any + // maintenance commands to be created, as they are solved by the earlier + // decommission command. Set>> cmds = testUnderReplicationWithMissingIndexes( - Lists.emptyList(), availableReplicas, 1, 2, policy); - Assertions.assertEquals(2, cmds.size()); + Lists.emptyList(), availableReplicas, 1, 0, policy); + Assertions.assertEquals(1, cmds.size()); // Check the replicate command has index 1 set for (Pair> c : cmds) { // Ensure neither of the commands are for the dead maintenance node