diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 5cdefcf38c2e..103472c4b1d9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -203,8 +203,10 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode, } /** - * Processes the ContainerReport, unknown container reported - * that will be deleted by SCM. + * Processes the ContainerReport. + * Any unknown container reported by DN and not present in SCM + * containerSet will either be logged as an error or deleted based on + * unknownContainerHandleAction. * * @param datanodeDetails Datanode from which this report was received * @param container ContainerInfo representing the container diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 6e49db9ae2b7..2b10a1ded1eb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -1174,19 +1174,37 @@ private void handleUnderReplicatedContainer(final ContainerInfo container, .stream() .map(action -> action.datanode) .collect(Collectors.toList()); - final List source = replicas.stream() + List source = replicas.stream() .filter(r -> r.getState() == State.QUASI_CLOSED || r.getState() == State.CLOSED) // Exclude stale and dead nodes. This is particularly important for // maintenance nodes, as the replicas will remain present in the // container manager, even when they go dead. - .filter(r -> - getNodeStatus(r.getDatanodeDetails()).isHealthy()) + .filter(r -> getNodeStatus(r.getDatanodeDetails()).isHealthy()) .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails())) .sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId())) .map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toList()); + + // If there are no closed / quasi-closed replicas for a container, + // replicate the unhealthy replica. The unhealthy replica could still + // have good data and hence we should replicate it. + if (source.size() == 0) { + source = replicas.stream() + .filter(r -> r.getState() == State.UNHEALTHY) + .filter(r -> getNodeStatus(r.getDatanodeDetails()).isHealthy()) + .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails())) + .sorted((r1, r2) -> + r2.getSequenceId().compareTo(r1.getSequenceId())) + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + if (source.size() > 0) { + LOG.debug("Replicating an unhealthy container replica as there " + + "are no closed or quasi-closed replicas for container {}", id); + } + } + if (source.size() > 0) { final int replicationFactor = container .getReplicationConfig().getRequiredNodes(); @@ -1308,15 +1326,19 @@ private void handleOverReplicatedContainer(final ContainerInfo container, r.getDatanodeDetails().getPersistedOpState() != HddsProtos.NodeOperationalState.IN_SERVICE); + // If there are unhealthy replicas, then we should remove them even if it + // makes the container violate the placement policy, as excess unhealthy + // containers are not really useful. It will be corrected later as a + // mis-replicated container will be seen as under-replicated. + // Update - delete only those unhealthy replicas which have bcsId < + // container bcsId + final List unhealthyReplicas = eligibleReplicas .stream() .filter(r -> !compareState(container.getState(), r.getState())) + .filter(r -> r.getSequenceId() < container.getSequenceId()) .collect(Collectors.toList()); - // If there are unhealthy replicas, then we should remove them even if it - // makes the container violate the placement policy, as excess unhealthy - // containers are not really useful. It will be corrected later as a - // mis-replicated container will be seen as under-replicated. for (ContainerReplica r : unhealthyReplicas) { if (excess > 0) { sendDeleteCommand(container, r.getDatanodeDetails(), true); @@ -1474,11 +1496,18 @@ private ContainerPlacementStatus getPlacementStatus( */ private void handleUnstableContainer(final ContainerInfo container, final Set replicas) { - // Find unhealthy replicas + // Note - ReplicationManager would reach here only if the + // following conditions are met: + // 1. Container is in either CLOSED or QUASI-CLOSED state + // 2. Container has exactly as many replicas as the replication factor + // (neither under-replicated nor over-replicated) + // 3. At least one replica state does not match the container state + List unhealthyReplicas = replicas.stream() .filter(r -> !compareState(container.getState(), r.getState())) .collect(Collectors.toList()); + // Close the replicas when possible Iterator iterator = unhealthyReplicas.iterator(); while (iterator.hasNext()) { final ContainerReplica replica = iterator.next(); @@ -1486,9 +1515,7 @@ private void handleUnstableContainer(final ContainerInfo container, if (state == State.OPEN || state == State.CLOSING) { sendCloseCommand(container, replica.getDatanodeDetails(), false); iterator.remove(); - } - - if (state == State.QUASI_CLOSED) { + } else if (state == State.QUASI_CLOSED) { // Send force close command if the BCSID matches if (container.getSequenceId() == replica.getSequenceId()) { sendCloseCommand(container, replica.getDatanodeDetails(), true); @@ -1497,8 +1524,54 @@ private void handleUnstableContainer(final ContainerInfo container, } } - // Now we are left with the replicas which are either unhealthy or - // the BCSID doesn't match. These replicas should be deleted. + if (unhealthyReplicas.isEmpty()) { + return; + } + + // Find the highest bcsId among the healthy replicas. Note that this + // could be different from the container bcsId (in case the replica with + // highest bcsId = container bcsId is lost, and since ContainerReportHandler + // only increments the bcsId, the container bcsId is not updated even + // though the replica is lost). + final Long sequenceId = replicas.stream() + .filter(r -> !r.getState().equals(State.UNHEALTHY)) + .map(ContainerReplica::getSequenceId) + .max(Long::compare) + .orElse(-1L); + + // Now we are left with either unhealthy replicas or quasi-closed + // replicas whose bcsId doesn't match with container bcsId. + // Check the following conditions before deleting a replica: + // 1. Delete the quasi-closed replica only if there is a closed replica + // available. Otherwise there will be a loop of replica deletion in + // handleUnstableContainer and replication again in + // handleUnderReplicatedContainers in the next iteration. + // 2. Delete the unhealthy replica only if it's bcsId < any healthy + // replica's bcsId. + + if (unhealthyReplicas.size() == replicas.size()) { + // All the replicas are unstable. + // This could happen when a closed replica is lost and all the other + // replicas are in quasi-closed / unhealthy state with bcsId < container + // bcsId. Since we cannot determine which is the most upto date + // replica, no replica should be deleted. + LOG.error("Cannot take any action on the unstable container {} as all " + + "the replicas are unstable (either in unhealthy state or have bcsId" + + " less than container bcsId", container.getContainerID()); + return; + } + + // Filter out the unhealthy replicas which have higher bcsId than + // healthy (closed/ quasi-closed) replicas. + unhealthyReplicas.removeIf(r -> r.getSequenceId() > sequenceId); + + if (unhealthyReplicas.isEmpty()) { + return; + } + + // There is at least one stable replica - either closed replica or + // quasi-closed replica with bcsId == container bcsId. + // Therefore, we can go ahead and deleted unstable replicas. /* * If we have unhealthy replicas we go under replicated and then diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index 9e266d1c6ff3..db1eb864b88e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -89,8 +89,9 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY; import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer; import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; @@ -1461,6 +1462,71 @@ public void testUnderReplicatedNotHealthySource() throws IOException { assertUnderReplicatedCount(1); } + /** + * When all the replicas are unstable (unhealthy or quasi-closed with bcsId < + * container bcsId), no replica should be deleted. + */ + @Test + public void testAllUnstableReplicas() throws Exception { + // Default test bcsId for container = 10000L + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + addReplica(container, NodeStatus.inServiceHealthy(), QUASI_CLOSED, 990L); + addReplica(container, NodeStatus.inServiceHealthy(), QUASI_CLOSED, 990L); + addReplica(container, NodeStatus.inServiceHealthy(), UNHEALTHY, 980L); + + // If all the replicas are in unstable state then no replica should be + // deleted. + assertDeleteScheduled(0); + } + + /** + * When an unhealthy replica has higher bcsId than the stable replicas + * bcsId, it should not be deleted. + */ + @Test + public void testUnhealthyReplicaWithHigherBcsId() throws Exception { + // Default test bcsId for container = 10000L + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + addReplica(container, NodeStatus.inServiceHealthy(), QUASI_CLOSED, 10000L); + addReplica(container, NodeStatus.inServiceHealthy(), QUASI_CLOSED, 10000L); + addReplica(container, NodeStatus.inServiceHealthy(), UNHEALTHY, 10010L); + + // If an unhealthy replica has bcsId > all other stable replica's bcsId, + // it should not be deleted. + assertDeleteScheduled(0); + } + + /** + * Replication Manager should replicate an unhealthy container replica (not + * unhealhty node) if there are no closed or quasi-closed replicas available. + */ + @Test + public void testUnderReplicatedWithOnlyOneUnhealthyReplica() + throws Exception { + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + addReplica(container, NodeStatus.inServiceHealthy(), UNHEALTHY); + // There should be 2 replications scheduled for replicating the unhealthy + // replica. + assertReplicaScheduled(2); + assertUnderReplicatedCount(1); + } + + /** + * Replication Manager should replicate an unhealthy container replica (not + * unhealhty node) if there are no closed or quasi-closed replicas available. + */ + @Test + public void testUnderReplicatedWithTwoUnhealthyReplicas() + throws Exception { + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + addReplica(container, NodeStatus.inServiceHealthy(), UNHEALTHY); + addReplica(container, NodeStatus.inServiceHealthy(), UNHEALTHY); + // There should be 1 replications scheduled for replicating the unhealthy + // replica. + assertReplicaScheduled(1); + assertUnderReplicatedCount(1); + } + /** * if all the prerequisites are satisfied, move should work as expected. */ @@ -1891,16 +1957,29 @@ private ContainerReplica addReplica(ContainerInfo container, return addReplicaToDn(container, dn, replicaState); } + private ContainerReplica addReplica(ContainerInfo container, + NodeStatus nodeStatus, State replicaState, long bcsId) + throws ContainerNotFoundException { + DatanodeDetails dn = addNode(nodeStatus); + return addReplicaToDn(container, dn, replicaState, bcsId); + } + private ContainerReplica addReplicaToDn(ContainerInfo container, DatanodeDetails dn, State replicaState) throws ContainerNotFoundException { + return addReplicaToDn(container, dn, replicaState, 1000L); + } + + private ContainerReplica addReplicaToDn(ContainerInfo container, + DatanodeDetails dn, State replicaState, long bcsId) + throws ContainerNotFoundException { // Using the same originID for all replica in the container set. If each // replica has a unique originID, it causes problems in ReplicationManager // when processing over-replicated containers. final UUID originNodeId = UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID())); final ContainerReplica replica = getReplicas( - container.containerID(), replicaState, 1000L, originNodeId, dn); + container.containerID(), replicaState, bcsId, originNodeId, dn); containerStateManager .updateContainerReplica(container.containerID(), replica); return replica;