diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 3a9ad1bc8acb..6b1e2afc52bc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -660,9 +660,8 @@ private void handleOverReplicatedContainer(final ContainerInfo container, if (excess > 0) { eligibleReplicas.removeAll(unhealthyReplicas); Set replicaSet = new HashSet<>(eligibleReplicas); - boolean misReplicated = - getPlacementStatus(replicaSet, replicationFactor) - .isPolicySatisfied(); + ContainerPlacementStatus ps = + getPlacementStatus(replicaSet, replicationFactor); for (ContainerReplica r : eligibleReplicas) { if (excess <= 0) { break; @@ -670,11 +669,14 @@ private void handleOverReplicatedContainer(final ContainerInfo container, // First remove the replica we are working on from the set, and then // check if the set is now mis-replicated. replicaSet.remove(r); - boolean nowMisRep = getPlacementStatus(replicaSet, replicationFactor) - .isPolicySatisfied(); - if (misReplicated || !nowMisRep) { - // Remove the replica if the container was already mis-replicated - // OR if losing this replica does not make it become mis-replicated + ContainerPlacementStatus nowPS = + getPlacementStatus(replicaSet, replicationFactor); + if ((!ps.isPolicySatisfied() + && nowPS.actualPlacementCount() == ps.actualPlacementCount()) + || (ps.isPolicySatisfied() && nowPS.isPolicySatisfied())) { + // Remove the replica if the container was already unsatisfied + // and losing this replica keep actual placement count unchanged. + // OR if losing this replica still keep satisfied sendDeleteCommand(container, r.getDatanodeDetails(), true); excess -= 1; continue; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index 51692c3cde8e..b1e27c0816e8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -54,6 +54,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -764,6 +765,84 @@ public void overReplicatedButRemovingMakesMisReplicated() replicaFive.getDatanodeDetails())); } + @Test + public void testOverReplicatedAndPolicySatisfied() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.CLOSED); + final ContainerID id = container.containerID(); + final UUID originNodeId = UUID.randomUUID(); + final ContainerReplica replicaOne = getReplicas( + id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaTwo = getReplicas( + id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaThree = getReplicas( + id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaFour = getReplicas( + id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + + containerStateManager.loadContainer(container); + containerStateManager.updateContainerReplica(id, replicaOne); + containerStateManager.updateContainerReplica(id, replicaTwo); + containerStateManager.updateContainerReplica(id, replicaThree); + containerStateManager.updateContainerReplica(id, replicaFour); + + Mockito.when(containerPlacementPolicy.validateContainerPlacement( + Mockito.argThat(new ListOfNElements(3)), + Mockito.anyInt() + )).thenAnswer( + invocation -> new ContainerPlacementStatusDefault(2, 2, 3)); + + final int currentDeleteCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + } + + @Test + public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.CLOSED); + final ContainerID id = container.containerID(); + final UUID originNodeId = UUID.randomUUID(); + final ContainerReplica replicaOne = getReplicas( + id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaTwo = getReplicas( + id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaThree = getReplicas( + id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaFour = getReplicas( + id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaFive = getReplicas( + id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + + containerStateManager.loadContainer(container); + containerStateManager.updateContainerReplica(id, replicaOne); + containerStateManager.updateContainerReplica(id, replicaTwo); + containerStateManager.updateContainerReplica(id, replicaThree); + containerStateManager.updateContainerReplica(id, replicaFour); + containerStateManager.updateContainerReplica(id, replicaFive); + + Mockito.when(containerPlacementPolicy.validateContainerPlacement( + Mockito.argThat(new FunctionMatcher(list -> + list != null && ((List) list).size() <= 4)), + Mockito.anyInt() + )).thenAnswer( + invocation -> new ContainerPlacementStatusDefault(1, 2, 3)); + + final int currentDeleteCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + } + @After public void teardown() throws IOException { containerStateManager.close(); @@ -831,4 +910,17 @@ public boolean matches(Object argument) { } } + class FunctionMatcher extends ArgumentMatcher { + + private Function function; + + FunctionMatcher(Function function) { + this.function = function; + } + + @Override + public boolean matches(Object argument) { + return function.apply(argument); + } + } } \ No newline at end of file