Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -660,21 +660,23 @@ private void handleOverReplicatedContainer(final ContainerInfo container,
if (excess > 0) {
eligibleReplicas.removeAll(unhealthyReplicas);
Set<ContainerReplica> replicaSet = new HashSet<>(eligibleReplicas);
boolean misReplicated =
getPlacementStatus(replicaSet, replicationFactor)
.isPolicySatisfied();
ContainerPlacementStatus ps =
getPlacementStatus(replicaSet, replicationFactor);
for (ContainerReplica r : eligibleReplicas) {
if (excess <= 0) {
break;
}
// First remove the replica we are working on from the set, and then
// check if the set is now mis-replicated.
replicaSet.remove(r);
boolean nowMisRep = getPlacementStatus(replicaSet, replicationFactor)
.isPolicySatisfied();
if (misReplicated || !nowMisRep) {
// Remove the replica if the container was already mis-replicated
// OR if losing this replica does not make it become mis-replicated
ContainerPlacementStatus nowPS =
getPlacementStatus(replicaSet, replicationFactor);
if ((!ps.isPolicySatisfied()
&& nowPS.actualPlacementCount() == ps.actualPlacementCount())
|| (ps.isPolicySatisfied() && nowPS.isPolicySatisfied())) {
Comment on lines -675 to +676
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need such a complex condition check?
Won't fixing the previous check solve the problem?
if (misReplicated || !nowMisRep) to if (!misReplicated || nowMisRep)

// Remove the replica if the container was already unsatisfied
// and losing this replica keep actual placement count unchanged.
// OR if losing this replica still keep satisfied
sendDeleteCommand(container, r.getDatanodeDetails(), true);
excess -= 1;
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -764,6 +765,84 @@ public void overReplicatedButRemovingMakesMisReplicated()
replicaFive.getDatanodeDetails()));
}

@Test
public void testOverReplicatedAndPolicySatisfied() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());

containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);

Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.argThat(new ListOfNElements(3)),
Mockito.anyInt()
)).thenAnswer(
invocation -> new ContainerPlacementStatusDefault(2, 2, 3));

final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);

replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
}

@Test
public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFive = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());

containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);
containerStateManager.updateContainerReplica(id, replicaFive);

Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.argThat(new FunctionMatcher(list ->
list != null && ((List) list).size() <= 4)),
Mockito.anyInt()
)).thenAnswer(
invocation -> new ContainerPlacementStatusDefault(1, 2, 3));

final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);

replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
}

@After
public void teardown() throws IOException {
containerStateManager.close();
Expand Down Expand Up @@ -831,4 +910,17 @@ public boolean matches(Object argument) {
}
}

class FunctionMatcher extends ArgumentMatcher<List> {

private Function<Object, Boolean> function;

FunctionMatcher(Function<Object, Boolean> function) {
this.function = function;
}

@Override
public boolean matches(Object argument) {
return function.apply(argument);
}
}
}