diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java index bc83b8da6b17..1b190a22da1b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -26,6 +26,10 @@ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.slf4j.Logger; import java.io.IOException; @@ -68,7 +72,7 @@ public class AbstractContainerReportHandler { * @throws IOException In case of any Exception while processing the report */ protected void processContainerReplica(final DatanodeDetails datanodeDetails, - final ContainerReplicaProto replicaProto) + final ContainerReplicaProto replicaProto, final EventPublisher publisher) throws IOException { final ContainerID containerId = ContainerID .valueof(replicaProto.getContainerID()); @@ -81,8 +85,10 @@ protected void processContainerReplica(final DatanodeDetails datanodeDetails, // once we have introduced lock inside ContainerInfo. synchronized (containerManager.getContainer(containerId)) { updateContainerStats(datanodeDetails, containerId, replicaProto); - updateContainerState(datanodeDetails, containerId, replicaProto); - updateContainerReplica(datanodeDetails, containerId, replicaProto); + if (!updateContainerState(datanodeDetails, containerId, replicaProto, + publisher)) { + updateContainerReplica(datanodeDetails, containerId, replicaProto); + } } } @@ -98,11 +104,10 @@ private void updateContainerStats(final DatanodeDetails datanodeDetails, final ContainerID containerId, final ContainerReplicaProto replicaProto) throws ContainerNotFoundException { + final ContainerInfo containerInfo = containerManager + .getContainer(containerId); if (isHealthy(replicaProto::getState)) { - final ContainerInfo containerInfo = containerManager - .getContainer(containerId); - if (containerInfo.getSequenceId() < replicaProto.getBlockCommitSequenceId()) { containerInfo.updateSequenceId( @@ -154,15 +159,18 @@ private List getOtherReplicas(ContainerID containerId, * @param datanode Datanode from which the report is received * @param containerId ID of the container * @param replica ContainerReplica + * @boolean true - replica should be ignored in the next process * @throws IOException In case of Exception */ - private void updateContainerState(final DatanodeDetails datanode, + private boolean updateContainerState(final DatanodeDetails datanode, final ContainerID containerId, - final ContainerReplicaProto replica) + final ContainerReplicaProto replica, + final EventPublisher publisher) throws IOException { final ContainerInfo container = containerManager .getContainer(containerId); + boolean ignored = false; switch (container.getState()) { case OPEN: @@ -244,20 +252,29 @@ private void updateContainerState(final DatanodeDetails datanode, */ break; case DELETING: - throw new UnsupportedOperationException( - "Unsupported container state 'DELETING'."); + /* + * The container is under deleting. do nothing. + */ + break; case DELETED: - throw new UnsupportedOperationException( - "Unsupported container state 'DELETED'."); + /* + * The container is deleted. delete the replica. + */ + deleteReplica(containerId, datanode, publisher, "DELETED"); + ignored = true; + break; default: break; } + + return ignored; } private void updateContainerReplica(final DatanodeDetails datanodeDetails, final ContainerID containerId, final ContainerReplicaProto replicaProto) throws ContainerNotFoundException, ContainerReplicaNotFoundException { + final ContainerReplica replica = ContainerReplica.newBuilder() .setContainerID(containerId) .setContainerState(replicaProto.getState()) @@ -297,4 +314,14 @@ protected ContainerManager getContainerManager() { return containerManager; } + protected void deleteReplica(ContainerID containerID, DatanodeDetails dn, + EventPublisher publisher, String reason) { + final DeleteContainerCommand deleteCommand = + new DeleteContainerCommand(containerID.getId(), true); + final CommandForDatanode datanodeCommand = new CommandForDatanode<>( + dn.getUuid(), deleteCommand); + publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); + logger.info("Sending delete container command for " + reason + + " container {} to datanode {}", containerID.getId(), dn); + } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 8432e29ddbb8..3fc1479006ea 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -33,8 +33,6 @@ .ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; -import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,7 +157,7 @@ private void processContainerReplicas(final DatanodeDetails datanodeDetails, final EventPublisher publisher) { for (ContainerReplicaProto replicaProto : replicas) { try { - processContainerReplica(datanodeDetails, replicaProto); + processContainerReplica(datanodeDetails, replicaProto, publisher); } catch (ContainerNotFoundException e) { if(unknownContainerHandleAction.equals( UNKNOWN_CONTAINER_ACTION_WARN)) { @@ -170,13 +168,7 @@ private void processContainerReplicas(final DatanodeDetails datanodeDetails, UNKNOWN_CONTAINER_ACTION_DELETE)) { final ContainerID containerId = ContainerID .valueof(replicaProto.getContainerID()); - final DeleteContainerCommand deleteCommand = - new DeleteContainerCommand(containerId.getId(), true); - final CommandForDatanode datanodeCommand = new CommandForDatanode<>( - datanodeDetails.getUuid(), deleteCommand); - publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); - LOG.info("Sending delete container command for unknown container {}" - + " to datanode {}", containerId.getId(), datanodeDetails); + deleteReplica(containerId, datanodeDetails, publisher, "unknown"); } } catch (IOException e) { LOG.error("Exception while processing container report for container" + diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index c2148df17dc2..ed8756521b0e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -77,7 +77,7 @@ public void onMessage(final IncrementalContainerReportFromDatanode report, ContainerReplicaProto.State.DELETED)) { nodeManager.addContainer(dd, id); } - processContainerReplica(dd, replicaProto); + processContainerReplica(dd, replicaProto, publisher); } catch (ContainerNotFoundException e) { success = false; LOG.warn("Container {} not found!", replicaProto.getContainerID()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 3a9ad1bc8acb..984fff410d84 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigType; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; @@ -79,7 +80,7 @@ public class ReplicationManager implements MetricsSource, EventHandler { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(ReplicationManager.class); public static final String METRICS_SOURCE_NAME = "SCMReplicationManager"; @@ -312,6 +313,16 @@ private void processContainer(ContainerID id) { action -> replicas.stream() .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode))); + /* + * If container is under deleting and all it's replicas are deleted, then + * make the container as CLEANED, or resend the delete replica command if + * needed. + */ + if (state == LifeCycleState.DELETING) { + handleContainerUnderDelete(container, replicas); + return; + } + /* * We don't have to take any action if the container is healthy. * @@ -320,6 +331,12 @@ private void processContainer(ContainerID id) { * exact number of replicas in the same state. */ if (isContainerHealthy(container, replicas)) { + /* + * If container is empty, schedule task to delete the container. + */ + if (isContainerEmpty(container, replicas)) { + deleteContainerReplicas(container, replicas); + } return; } @@ -400,6 +417,21 @@ private boolean isContainerHealthy(final ContainerInfo container, r -> compareState(container.getState(), r.getState())); } + /** + * Returns true if the container is empty and CLOSED. + * + * @param container Container to check + * @param replicas Set of ContainerReplicas + * @return true if the container is empty, false otherwise + */ + private boolean isContainerEmpty(final ContainerInfo container, + final Set replicas) { + return container.getState() == LifeCycleState.CLOSED && + (container.getUsedBytes() == 0 && container.getNumberOfKeys() == 0) && + replicas.stream().allMatch(r -> r.getState() == State.CLOSED && + r.getBytesUsed() == 0 && r.getKeyCount() == 0); + } + /** * Checks if the container is under replicated or not. * @@ -409,6 +441,10 @@ private boolean isContainerHealthy(final ContainerInfo container, */ private boolean isContainerUnderReplicated(final ContainerInfo container, final Set replicas) { + if (container.getState() != LifeCycleState.CLOSED && + container.getState() != LifeCycleState.QUASI_CLOSED) { + return false; + } boolean misReplicated = !getPlacementStatus( replicas, container.getReplicationFactor().getNumber()) .isPolicySatisfied(); @@ -465,6 +501,64 @@ private boolean canForceCloseContainer(final ContainerInfo container, return uniqueQuasiClosedReplicaCount > (replicationFactor / 2); } + /** + * Delete the container and its replicas. + * + * @param container ContainerInfo + * @param replicas Set of ContainerReplicas + */ + private void deleteContainerReplicas(final ContainerInfo container, + final Set replicas) throws IOException { + Preconditions.assertTrue(container.getState() == + LifeCycleState.CLOSED); + Preconditions.assertTrue(container.getNumberOfKeys() == 0); + Preconditions.assertTrue(container.getUsedBytes() == 0); + + replicas.stream().forEach(rp -> { + Preconditions.assertTrue(rp.getState() == State.CLOSED); + Preconditions.assertTrue(rp.getBytesUsed() == 0); + Preconditions.assertTrue(rp.getKeyCount() == 0); + sendDeleteCommand(container, rp.getDatanodeDetails(), false); + }); + containerManager.updateContainerState(container.containerID(), + HddsProtos.LifeCycleEvent.DELETE); + LOG.debug("Deleting empty container {} replicas,", container.containerID()); + } + + /** + * Handle the container which is under delete. + * + * @param container ContainerInfo + * @param replicas Set of ContainerReplicas + */ + private void handleContainerUnderDelete(final ContainerInfo container, + final Set replicas) throws IOException { + if (replicas.size() == 0) { + containerManager.updateContainerState(container.containerID(), + HddsProtos.LifeCycleEvent.CLEANUP); + LOG.debug("Container {} state changes to DELETED", + container.containerID()); + } else { + // Check whether to resend the delete replica command + final List deletionInFlight = inflightDeletion + .getOrDefault(container.containerID(), Collections.emptyList()) + .stream() + .map(action -> action.datanode) + .collect(Collectors.toList()); + Set filteredReplicas = replicas.stream().filter( + r -> !deletionInFlight.contains(r.getDatanodeDetails())) + .collect(Collectors.toSet()); + // Resend the delete command + if (filteredReplicas.size() > 0) { + filteredReplicas.stream().forEach(rp -> { + sendDeleteCommand(container, rp.getDatanodeDetails(), false); + }); + LOG.debug("Resend delete Container {} command", + container.containerID()); + } + } + } + /** * Force close the container replica(s) with highest sequence Id. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 257667465e18..e3ad85f05450 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -353,7 +353,11 @@ private HddsProtos.LifeCycleState updateContainerState( containerID); } } - containerStore.put(containerID, container); + if (newState == LifeCycleState.DELETED) { + containerStore.delete(containerID); + } else { + containerStore.put(containerID, container); + } return newState; } catch (ContainerNotFoundException cnfe) { throw new SCMException( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/ContainerSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/ContainerSafeModeRule.java index 04106088a266..dbb4eb60ef6c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/ContainerSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/ContainerSafeModeRule.java @@ -70,8 +70,8 @@ public ContainerSafeModeRule(String ruleName, EventQueue eventQueue, // now. These containers can be handled by tracking pipelines. Optional.ofNullable(container.getState()) - .filter(state -> state != HddsProtos.LifeCycleState.OPEN) - .filter(state -> state != HddsProtos.LifeCycleState.CLOSING) + .filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED || + state == HddsProtos.LifeCycleState.CLOSED)) .ifPresent(s -> containerMap.put(container.getContainerID(), container)); }); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index f4f17598ed0d..42640f32a9d3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -551,6 +551,7 @@ public static ContainerReplica getReplicas( .setDatanodeDetails(datanodeDetails) .setOriginNodeId(originNodeId) .setSequenceId(sequenceId) + .setBytesUsed(100) .build(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 9f308fa9738e..205fea826119 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -627,6 +627,39 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas() assertEquals(11L, containerOne.getNumberOfKeys()); } + @Test + public void testStaleReplicaOfDeletedContainer() throws NodeNotFoundException, + SCMException, ContainerNotFoundException { + + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, containerManager); + + final Iterator nodeIterator = nodeManager.getNodes( + NodeState.HEALTHY).iterator(); + final DatanodeDetails datanodeOne = nodeIterator.next(); + final ContainerInfo containerOne = getContainer(LifeCycleState.DELETED); + + final Set containerIDSet = Stream.of( + containerOne.containerID()) + .collect(Collectors.toSet()); + + nodeManager.setContainers(datanodeOne, containerIDSet); + containerStateManager.loadContainer(containerOne); + + // Expects the replica will be deleted. + final ContainerReportsProto containerReport = getContainerReportsProto( + containerOne.containerID(), ContainerReplicaProto.State.CLOSED, + datanodeOne.getUuidString()); + final ContainerReportFromDatanode containerReportFromDatanode = + new ContainerReportFromDatanode(datanodeOne, containerReport); + reportHandler.onMessage(containerReportFromDatanode, publisher); + + Mockito.verify(publisher, Mockito.times(1)); + + Assert.assertEquals(0, containerManager.getContainerReplicas( + containerOne.containerID()).size()); + } + private ContainerReportFromDatanode getContainerReportFromDatanode( ContainerID containerId, ContainerReplicaProto.State state, DatanodeDetails dn, long bytesUsed, long keyCount) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index aeb5bc7fbd5f..58c3b9de29e6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; @@ -99,6 +100,7 @@ public static void init() throws Exception { conf = new OzoneConfiguration(); GenericTestUtils.setLogLevel(DeletedBlockLogImpl.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(SCMBlockDeletingService.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(ReplicationManager.LOG, Level.DEBUG); String path = GenericTestUtils.getTempPath(TestBlockDeletion.class.getSimpleName()); @@ -119,6 +121,8 @@ public static void init() throws Exception { conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1); conf.setQuietMode(false); + conf.setTimeDuration("hdds.scm.replication.event.timeout", 100, + TimeUnit.MILLISECONDS); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) .setHbInterval(200) @@ -222,7 +226,7 @@ public void testBlockDeletion() throws Exception { } @Test - public void testContainerStatistics() throws Exception { + public void testContainerStatisticsAfterDelete() throws Exception { String volumeName = UUID.randomUUID().toString(); String bucketName = UUID.randomUUID().toString(); @@ -277,6 +281,41 @@ public void testContainerStatistics() throws Exception { Assert.assertEquals(0, container.getUsedBytes()); Assert.assertEquals(0, container.getNumberOfKeys()); }); + + cluster.shutdownHddsDatanode(0); + scm.getReplicationManager().processContainersNow(); + // Wait for container state change to DELETING + Thread.sleep(100); + containerInfos = scm.getContainerManager().getContainers(); + containerInfos.stream().forEach(container -> + Assert.assertEquals(HddsProtos.LifeCycleState.DELETING, + container.getState())); + LogCapturer logCapturer = + LogCapturer.captureLogs(ReplicationManager.LOG); + logCapturer.clearOutput(); + + scm.getReplicationManager().processContainersNow(); + Thread.sleep(100); + // Wait for delete replica command resend + GenericTestUtils.waitFor(() -> logCapturer.getOutput() + .contains("Resend delete Container"), 500, 5000); + cluster.restartHddsDatanode(0, true); + Thread.sleep(100); + + scm.getReplicationManager().processContainersNow(); + // Wait for container state change to DELETED + Thread.sleep(100); + containerInfos = scm.getContainerManager().getContainers(); + containerInfos.stream().forEach(container -> { + Assert.assertEquals(HddsProtos.LifeCycleState.DELETED, + container.getState()); + try { + Assert.assertNull(scm.getScmMetadataStore().getContainerTable() + .get(container.containerID())); + } catch (IOException e) { + Assert.fail("Getting container from SCM DB should not fail"); + } + }); } private void waitForDatanodeBlockDeletionStart() diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java index 9e8887213f7c..bc07feca8c9f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java @@ -80,7 +80,7 @@ public void onMessage(final IncrementalContainerReportFromDatanode report, return; } getNodeManager().addContainer(dd, id); - processContainerReplica(dd, replicaProto); + processContainerReplica(dd, replicaProto, publisher); } catch (ContainerNotFoundException e) { success = false; LOG.warn("Container {} not found!", replicaProto.getContainerID());