Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,10 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
}

/**
* Processes the ContainerReport, unknown container reported
* that will be deleted by SCM.
* Processes the ContainerReport.
* Any unknown container reported by DN and not present in SCM
* containerSet will either be logged as an error or deleted based on
* unknownContainerHandleAction.
*
* @param datanodeDetails Datanode from which this report was received
* @param container ContainerInfo representing the container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1174,19 +1174,37 @@ private void handleUnderReplicatedContainer(final ContainerInfo container,
.stream()
.map(action -> action.datanode)
.collect(Collectors.toList());
final List<DatanodeDetails> source = replicas.stream()
List<DatanodeDetails> source = replicas.stream()
.filter(r ->
r.getState() == State.QUASI_CLOSED ||
r.getState() == State.CLOSED)
// Exclude stale and dead nodes. This is particularly important for
// maintenance nodes, as the replicas will remain present in the
// container manager, even when they go dead.
.filter(r ->
getNodeStatus(r.getDatanodeDetails()).isHealthy())
.filter(r -> getNodeStatus(r.getDatanodeDetails()).isHealthy())
.filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
.sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId()))
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());

// If there are no closed / quasi-closed replicas for a container,
// replicate the unhealthy replica. The unhealthy replica could still
// have good data and hence we should replicate it.
if (source.size() == 0) {
source = replicas.stream()
.filter(r -> r.getState() == State.UNHEALTHY)
.filter(r -> getNodeStatus(r.getDatanodeDetails()).isHealthy())
.filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
.sorted((r1, r2) ->
r2.getSequenceId().compareTo(r1.getSequenceId()))
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
if (source.size() > 0) {
LOG.debug("Replicating an unhealthy container replica as there " +
"are no closed or quasi-closed replicas for container {}", id);
}
}

if (source.size() > 0) {
final int replicationFactor = container
.getReplicationConfig().getRequiredNodes();
Expand Down Expand Up @@ -1308,15 +1326,19 @@ private void handleOverReplicatedContainer(final ContainerInfo container,
r.getDatanodeDetails().getPersistedOpState() !=
HddsProtos.NodeOperationalState.IN_SERVICE);

// If there are unhealthy replicas, then we should remove them even if it
// makes the container violate the placement policy, as excess unhealthy
// containers are not really useful. It will be corrected later as a
// mis-replicated container will be seen as under-replicated.
// Update - delete only those unhealthy replicas which have bcsId <
// container bcsId

final List<ContainerReplica> unhealthyReplicas = eligibleReplicas
.stream()
.filter(r -> !compareState(container.getState(), r.getState()))
.filter(r -> r.getSequenceId() < container.getSequenceId())
.collect(Collectors.toList());

// If there are unhealthy replicas, then we should remove them even if it
// makes the container violate the placement policy, as excess unhealthy
// containers are not really useful. It will be corrected later as a
// mis-replicated container will be seen as under-replicated.
for (ContainerReplica r : unhealthyReplicas) {
if (excess > 0) {
sendDeleteCommand(container, r.getDatanodeDetails(), true);
Expand Down Expand Up @@ -1474,21 +1496,26 @@ private ContainerPlacementStatus getPlacementStatus(
*/
private void handleUnstableContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
// Find unhealthy replicas
// Note - ReplicationManager would reach here only if the
// following conditions are met:
// 1. Container is in either CLOSED or QUASI-CLOSED state
// 2. Container has exactly as many replicas as the replication factor
// (neither under-replicated nor over-replicated)
// 3. At least one replica state does not match the container state

List<ContainerReplica> unhealthyReplicas = replicas.stream()
.filter(r -> !compareState(container.getState(), r.getState()))
.collect(Collectors.toList());

// Close the replicas when possible
Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
while (iterator.hasNext()) {
final ContainerReplica replica = iterator.next();
final State state = replica.getState();
if (state == State.OPEN || state == State.CLOSING) {
sendCloseCommand(container, replica.getDatanodeDetails(), false);
iterator.remove();
}

if (state == State.QUASI_CLOSED) {
} else if (state == State.QUASI_CLOSED) {
// Send force close command if the BCSID matches
if (container.getSequenceId() == replica.getSequenceId()) {
sendCloseCommand(container, replica.getDatanodeDetails(), true);
Expand All @@ -1497,8 +1524,54 @@ private void handleUnstableContainer(final ContainerInfo container,
}
}

// Now we are left with the replicas which are either unhealthy or
// the BCSID doesn't match. These replicas should be deleted.
if (unhealthyReplicas.isEmpty()) {
return;
}

// Find the highest bcsId among the healthy replicas. Note that this
// could be different from the container bcsId (in case the replica with
// highest bcsId = container bcsId is lost, and since ContainerReportHandler
// only increments the bcsId, the container bcsId is not updated even
// though the replica is lost).
final Long sequenceId = replicas.stream()
.filter(r -> !r.getState().equals(State.UNHEALTHY))
.map(ContainerReplica::getSequenceId)
.max(Long::compare)
.orElse(-1L);

// Now we are left with either unhealthy replicas or quasi-closed
// replicas whose bcsId doesn't match with container bcsId.
// Check the following conditions before deleting a replica:
// 1. Delete the quasi-closed replica only if there is a closed replica
// available. Otherwise there will be a loop of replica deletion in
// handleUnstableContainer and replication again in
// handleUnderReplicatedContainers in the next iteration.
// 2. Delete the unhealthy replica only if it's bcsId < any healthy
// replica's bcsId.

if (unhealthyReplicas.size() == replicas.size()) {
// All the replicas are unstable.
// This could happen when a closed replica is lost and all the other
// replicas are in quasi-closed / unhealthy state with bcsId < container
// bcsId. Since we cannot determine which is the most upto date
// replica, no replica should be deleted.
LOG.error("Cannot take any action on the unstable container {} as all " +
"the replicas are unstable (either in unhealthy state or have bcsId" +
" less than container bcsId", container.getContainerID());
return;
}

// Filter out the unhealthy replicas which have higher bcsId than
// healthy (closed/ quasi-closed) replicas.
unhealthyReplicas.removeIf(r -> r.getSequenceId() > sequenceId);

if (unhealthyReplicas.isEmpty()) {
return;
}

// There is at least one stable replica - either closed replica or
// quasi-closed replica with bcsId == container bcsId.
// Therefore, we can go ahead and deleted unstable replicas.

/*
* If we have unhealthy replicas we go under replicated and then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
Expand Down Expand Up @@ -1461,6 +1462,71 @@ public void testUnderReplicatedNotHealthySource() throws IOException {
assertUnderReplicatedCount(1);
}

/**
* When all the replicas are unstable (unhealthy or quasi-closed with bcsId <
* container bcsId), no replica should be deleted.
*/
@Test
public void testAllUnstableReplicas() throws Exception {
// Default test bcsId for container = 10000L
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), QUASI_CLOSED, 990L);
addReplica(container, NodeStatus.inServiceHealthy(), QUASI_CLOSED, 990L);
addReplica(container, NodeStatus.inServiceHealthy(), UNHEALTHY, 980L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should SCM not delete the unhealthy container in this case? It seems an unhealthy container with lower BCSID has no advantage over a quasi-closed container with higher BCSID. The PR description currently says

An unhealthy replica should be deleted only if it's bcsId is < than all quasi-closed and closed replicas bcsId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. But with unhealthy replicas, it is hard to determine if the replica can be recovered and if recovered, will the bcsId also change. That's why we thought of keeping the replica around if the closed container is lost.

But with the same argument, do we never delete unhealthy containers? I guess there is no absolutely right answer for this. The only thing we can be certain about is when we have a closed replica, it can be assumed to be the source of truth.

The PR description currently says

An unhealthy replica should be deleted only if it's bcsId is < than all quasi-closed and closed replicas bcsId.

The 2nd proposed fix in the PR description is what handles this case currently:

If all the replicas are unstable (either unhealthy or quasi-closed with lesser bcsId than container), then no replica should be deleted.

cc. @nandakumar131

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we think the unhealthy container's BCSID may be inaccurate then I am okay with keeping it in this scenario.


// If all the replicas are in unstable state then no replica should be
// deleted.
assertDeleteScheduled(0);
}

/**
* When an unhealthy replica has higher bcsId than the stable replicas
* bcsId, it should not be deleted.
*/
@Test
public void testUnhealthyReplicaWithHigherBcsId() throws Exception {
// Default test bcsId for container = 10000L
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), QUASI_CLOSED, 10000L);
addReplica(container, NodeStatus.inServiceHealthy(), QUASI_CLOSED, 10000L);
addReplica(container, NodeStatus.inServiceHealthy(), UNHEALTHY, 10010L);

// If an unhealthy replica has bcsId > all other stable replica's bcsId,
// it should not be deleted.
assertDeleteScheduled(0);
}

/**
* Replication Manager should replicate an unhealthy container replica (not
* unhealhty node) if there are no closed or quasi-closed replicas available.
*/
@Test
public void testUnderReplicatedWithOnlyOneUnhealthyReplica()
throws Exception {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), UNHEALTHY);
// There should be 2 replications scheduled for replicating the unhealthy
// replica.
assertReplicaScheduled(2);
assertUnderReplicatedCount(1);
}

/**
* Replication Manager should replicate an unhealthy container replica (not
* unhealhty node) if there are no closed or quasi-closed replicas available.
*/
@Test
public void testUnderReplicatedWithTwoUnhealthyReplicas()
throws Exception {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), UNHEALTHY);
addReplica(container, NodeStatus.inServiceHealthy(), UNHEALTHY);
// There should be 1 replications scheduled for replicating the unhealthy
// replica.
assertReplicaScheduled(1);
assertUnderReplicatedCount(1);
}

/**
* if all the prerequisites are satisfied, move should work as expected.
*/
Expand Down Expand Up @@ -1891,16 +1957,29 @@ private ContainerReplica addReplica(ContainerInfo container,
return addReplicaToDn(container, dn, replicaState);
}

private ContainerReplica addReplica(ContainerInfo container,
NodeStatus nodeStatus, State replicaState, long bcsId)
throws ContainerNotFoundException {
DatanodeDetails dn = addNode(nodeStatus);
return addReplicaToDn(container, dn, replicaState, bcsId);
}

private ContainerReplica addReplicaToDn(ContainerInfo container,
DatanodeDetails dn, State replicaState)
throws ContainerNotFoundException {
return addReplicaToDn(container, dn, replicaState, 1000L);
}

private ContainerReplica addReplicaToDn(ContainerInfo container,
DatanodeDetails dn, State replicaState, long bcsId)
throws ContainerNotFoundException {
// Using the same originID for all replica in the container set. If each
// replica has a unique originID, it causes problems in ReplicationManager
// when processing over-replicated containers.
final UUID originNodeId =
UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID()));
final ContainerReplica replica = getReplicas(
container.containerID(), replicaState, 1000L, originNodeId, dn);
container.containerID(), replicaState, bcsId, originNodeId, dn);
containerStateManager
.updateContainerReplica(container.containerID(), replica);
return replica;
Expand Down