diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 2d8f8c06c851..ff20e7ebd937 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -416,8 +416,12 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT = "ozone.scm.pipeline.destroy.timeout"; + // We wait for 150s before closing containers + // OzoneConfigKeys#OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION. + // So, we are waiting for another 150s before deleting the pipeline + // (150 + 150) = 300s public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT = - "66s"; + "300s"; public static final String OZONE_SCM_PIPELINE_CREATION_INTERVAL = "ozone.scm.pipeline.creation.interval"; @@ -427,7 +431,7 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL = "ozone.scm.pipeline.scrub.interval"; public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT = - "5m"; + "150s"; // Allow SCM to auto create factor ONE ratis pipeline. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 42aff0678a42..486f6781f9b7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -87,10 +87,21 @@ public static Codec getCodec() { // suggested leader id with high priority private final UUID suggestedLeaderId; + private final Instant stateEnterTime; + /** * The immutable properties of pipeline object is used in * ContainerStateManager#getMatchingContainerByPipeline to take a lock on * the container allocations for a particular pipeline. + *

+ * Since the Pipeline class is immutable, if we want to change the state of + * the Pipeline we should create a new Pipeline object with the new state. + * Make sure that you set the value of creationTimestamp properly while + * creating the new Pipeline object. + *

+ * There is no need to worry about the value of stateEnterTime as it's + * set to Instant.now when you crate the Pipeline object as part of + * state change. */ private Pipeline(PipelineID id, ReplicationConfig replicationConfig, PipelineState state, @@ -102,6 +113,7 @@ private Pipeline(PipelineID id, this.creationTimestamp = Instant.now(); this.suggestedLeaderId = suggestedLeaderId; this.replicaIndexes = new HashMap<>(); + this.stateEnterTime = Instant.now(); } /** @@ -140,6 +152,10 @@ public Instant getCreationTimestamp() { return creationTimestamp; } + public Instant getStateEnterTime() { + return stateEnterTime; + } + /** * Return the suggested leaderId which has a high priority among DNs of the * pipeline. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index b95998c1da11..f05eb761d91f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -80,8 +80,8 @@ public void onMessage(final DatanodeDetails datanodeDetails, * action. */ LOG.info("A dead datanode is detected. {}", datanodeDetails); - destroyPipelines(datanodeDetails); closeContainers(datanodeDetails, publisher); + destroyPipelines(datanodeDetails); // Remove the container replicas associated with the dead node unless it // is IN_MAINTENANCE @@ -122,8 +122,8 @@ private void destroyPipelines(final DatanodeDetails datanodeDetails) { .ifPresent(pipelines -> pipelines.forEach(id -> { try { - pipelineManager.closePipeline( - pipelineManager.getPipeline(id), false); + pipelineManager.closePipeline(id); + pipelineManager.deletePipeline(id); } catch (PipelineNotFoundException ignore) { // Pipeline is not there in pipeline manager, // should we care? diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HealthyReadOnlyNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HealthyReadOnlyNodeHandler.java index 3286133009e1..2256829942f4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HealthyReadOnlyNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HealthyReadOnlyNodeHandler.java @@ -90,7 +90,7 @@ public void onMessage(DatanodeDetails datanodeDetails, pipelineID, pipeline.getPipelineState(), HddsProtos.NodeState.HEALTHY_READONLY, datanodeDetails.getUuidString()); - pipelineManager.closePipeline(pipeline, true); + pipelineManager.closePipeline(pipelineID); } catch (IOException ex) { LOG.error("Failed to close pipeline {} which uses HEALTHY READONLY " + "datanode {}: ", pipelineID, datanodeDetails, ex); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index dd8cea366975..c58cd054f9ff 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -59,8 +58,7 @@ public void onMessage(DatanodeDetails datanodeDetails, datanodeDetails, pipelineIds); for (PipelineID pipelineID : pipelineIds) { try { - Pipeline pipeline = pipelineManager.getPipeline(pipelineID); - pipelineManager.closePipeline(pipeline, true); + pipelineManager.closePipeline(pipelineID); } catch (IOException e) { LOG.info("Could not finalize pipeline={} for dn={}", pipelineID, datanodeDetails); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java index 783eb1358e4d..0951ea811443 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -57,8 +56,7 @@ public void onMessage(DatanodeDetails datanodeDetails, datanodeDetails, pipelineIds); for (PipelineID pipelineID : pipelineIds) { try { - Pipeline pipeline = pipelineManager.getPipeline(pipelineID); - pipelineManager.closePipeline(pipeline, false); + pipelineManager.closePipeline(pipelineID); } catch (IOException e) { LOG.info("Could not finalize pipeline={} for dn={}", pipelineID, datanodeDetails); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java index e33f256a4476..2f1785cf1747 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java @@ -84,8 +84,7 @@ private void processPipelineAction(final DatanodeDetails datanode, info.getDetailedReason()); if (action == PipelineAction.Action.CLOSE) { - pipelineManager.closePipeline( - pipelineManager.getPipeline(pid), false); + pipelineManager.closePipeline(pid); } else { LOG.error("unknown pipeline action:{}", action); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 2df7e6db5f94..15b0f408c560 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -115,9 +115,14 @@ NavigableSet getContainersInPipeline(PipelineID pipelineID) void openPipeline(PipelineID pipelineId) throws IOException; + @Deprecated void closePipeline(Pipeline pipeline, boolean onTimeout) throws IOException; + void closePipeline(PipelineID pipelineID) throws IOException; + + void deletePipeline(PipelineID pipelineID) throws IOException; + void closeStalePipelines(DatanodeDetails datanodeDetails); void scrubPipelines() throws IOException; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index d5cb5504eb7f..5e422b43d3d0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -489,17 +489,25 @@ private void closeContainersForPipeline(final PipelineID pipelineId) * put pipeline in CLOSED state. * @param pipeline - ID of the pipeline. * @param onTimeout - whether to remove pipeline after some time. - * @throws IOException + * @throws IOException throws exception in case of failure + * @deprecated Do not use this method, onTimeout is not honored. */ - @Override + @Deprecated public void closePipeline(Pipeline pipeline, boolean onTimeout) - throws IOException { - PipelineID pipelineID = pipeline.getId(); + throws IOException { + closePipeline(pipeline.getId()); + } + + /** + * Move the Pipeline to CLOSED state. + * @param pipelineID ID of the Pipeline to be closed + * @throws IOException In case of exception while closing the Pipeline + */ + public void closePipeline(PipelineID pipelineID) throws IOException { HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); // close containers. closeContainersForPipeline(pipelineID); - - if (!pipeline.isClosed()) { + if (!getPipeline(pipelineID).isClosed()) { acquireWriteLock(); try { stateManager.updatePipelineState(pipelineIDProtobuf, @@ -507,15 +515,20 @@ public void closePipeline(Pipeline pipeline, boolean onTimeout) } finally { releaseWriteLock(); } - LOG.info("Pipeline {} moved to CLOSED state", pipeline); + LOG.info("Pipeline {} moved to CLOSED state", pipelineID); } metrics.removePipelineMetrics(pipelineID); - if (!onTimeout) { - // close pipeline right away. - removePipeline(pipeline); - } + } + + /** + * Deletes the Pipeline for the given PipelineID. + * @param pipelineID ID of the Pipeline to be deleted + * @throws IOException In case of exception while deleting the Pipeline + */ + public void deletePipeline(PipelineID pipelineID) throws IOException { + removePipeline(getPipeline(pipelineID)); } /** close the pipelines whose nodes' IPs are stale. @@ -535,9 +548,10 @@ public void closeStalePipelines(DatanodeDetails datanodeDetails) { pipelinesWithStaleIpOrHostname.size()); pipelinesWithStaleIpOrHostname.forEach(p -> { try { - LOG.info("Closing the stale pipeline: {}", p.getId()); - closePipeline(p, false); - LOG.info("Closed the stale pipeline: {}", p.getId()); + final PipelineID id = p.getId(); + LOG.info("Closing the stale pipeline: {}", id); + closePipeline(id); + deletePipeline(id); } catch (IOException e) { LOG.error("Closing the stale pipeline failed: {}", p, e); } @@ -568,26 +582,34 @@ public void scrubPipelines() throws IOException { ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + long pipelineDeleteTimoutInMills = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, + ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); List candidates = stateManager.getPipelines(); for (Pipeline p : candidates) { + final PipelineID id = p.getId(); // scrub pipelines who stay ALLOCATED for too long. if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED && (currentTime.toEpochMilli() - p.getCreationTimestamp() .toEpochMilli() >= pipelineScrubTimeoutInMills)) { + LOG.info("Scrubbing pipeline: id: {} since it stays at ALLOCATED " + - "stage for {} mins.", p.getId(), + "stage for {} mins.", id, Duration.between(currentTime, p.getCreationTimestamp()) .toMinutes()); - closePipeline(p, false); + closePipeline(id); + deletePipeline(id); } // scrub pipelines who stay CLOSED for too long. - if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) { + if (p.getPipelineState() == Pipeline.PipelineState.CLOSED && + (currentTime.toEpochMilli() - p.getStateEnterTime().toEpochMilli()) + >= pipelineDeleteTimoutInMills) { LOG.info("Scrubbing pipeline: id: {} since it stays at CLOSED stage.", p.getId()); - closeContainersForPipeline(p.getId()); - removePipeline(p); + deletePipeline(id); } // If a datanode is stopped and then SCM is restarted, a pipeline can get // stuck in an open state. For Ratis, provided some other DNs that were @@ -599,8 +621,7 @@ public void scrubPipelines() throws IOException { if (isOpenWithUnregisteredNodes(p)) { LOG.info("Scrubbing pipeline: id: {} as it has unregistered nodes", p.getId()); - closeContainersForPipeline(p.getId()); - closePipeline(p, true); + closePipeline(id); } } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index f00aa7f6c1a2..f1685486d2b3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -243,6 +243,16 @@ public void closePipeline(final Pipeline pipeline, final boolean onTimeout) HddsProtos.PipelineState.PIPELINE_CLOSED); } + @Override + public void closePipeline(PipelineID pipelineID) throws IOException { + + } + + @Override + public void deletePipeline(PipelineID pipelineID) throws IOException { + + } + @Override public void closeStalePipelines(DatanodeDetails datanodeDetails) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java index 6ac0b5386307..791220f67079 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java @@ -26,10 +26,10 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; -import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.io.IOException; import java.util.UUID; /** @@ -39,12 +39,12 @@ public class TestPipelineActionHandler { @Test public void testCloseActionForMissingPipeline() - throws PipelineNotFoundException, NotLeaderException { + throws IOException { final PipelineManager manager = Mockito.mock(PipelineManager.class); final EventQueue queue = Mockito.mock(EventQueue.class); - Mockito.when(manager.getPipeline(Mockito.any(PipelineID.class))) - .thenThrow(new PipelineNotFoundException()); + Mockito.doThrow(new PipelineNotFoundException()) + .when(manager).closePipeline(Mockito.any(PipelineID.class)); final PipelineActionHandler actionHandler = new PipelineActionHandler(manager, SCMContext.emptyContext(), null); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index 33243b650e3d..48f82b5cc958 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -86,6 +86,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED; import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; @@ -341,7 +342,9 @@ public void testRemovePipeline() throws Exception { } // Destroy pipeline - pipelineManager.closePipeline(pipeline, false); + pipelineManager.closePipeline(pipeline.getId()); + pipelineManager.deletePipeline(pipeline.getId()); + try { pipelineManager.getPipeline(pipeline.getId()); fail("Pipeline should not have been retrieved"); @@ -393,7 +396,8 @@ public void testPipelineReport() throws Exception { pipelineManager.getPipeline(pipeline.getId()).isOpen()); // close the pipeline - pipelineManager.closePipeline(pipeline, false); + pipelineManager.closePipeline(pipeline.getId()); + pipelineManager.deletePipeline(pipeline.getId()); // pipeline report for destroyed pipeline should be ignored nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, @@ -514,6 +518,8 @@ public void testScrubPipelines() throws Exception { // Allocated pipelines should not be scrubbed for 50 seconds. conf.setTimeDuration( OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, 50, TimeUnit.SECONDS); + conf.setTimeDuration( + OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 50, TimeUnit.SECONDS); PipelineManagerImpl pipelineManager = createPipelineManager(true); Pipeline allocatedPipeline = pipelineManager @@ -553,8 +559,9 @@ public void testScrubPipelines() throws Exception { .getInstance(ReplicationFactor.THREE), Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline)); - // The closedPipeline should be scrubbed, as they are scrubbed immediately - Assertions.assertFalse(pipelineManager + // The closedPipeline should not be scrubbed as the interval has not + // yet passed. + Assertions.assertTrue(pipelineManager .getPipelines(RatisReplicationConfig .getInstance(ReplicationFactor.THREE), Pipeline.PipelineState.CLOSED).contains(closedPipeline)); @@ -569,6 +576,12 @@ public void testScrubPipelines() throws Exception { .getInstance(ReplicationFactor.THREE), Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline)); + // The closedPipeline should now be scrubbed as the interval has passed + Assertions.assertFalse(pipelineManager + .getPipelines(RatisReplicationConfig + .getInstance(ReplicationFactor.THREE), + Pipeline.PipelineState.CLOSED).contains(closedPipeline)); + pipelineManager.close(); } @@ -742,11 +755,11 @@ public void testPipelineCloseFlow() throws IOException, TimeoutException { addContainer(containerInfo.getProtobuf()); //Add Container to PipelineStateMap pipelineManager.addContainerToPipeline(pipelineID, containerID); - pipelineManager.closePipeline(pipeline, false); + pipelineManager.closePipeline(pipelineID); String containerExpectedOutput = "Container " + containerID + " closed for pipeline=" + pipelineID; String pipelineExpectedOutput = - "Pipeline " + pipeline + " moved to CLOSED state"; + "Pipeline " + pipelineID + " moved to CLOSED state"; String logOutput = logCapturer.getOutput(); assertTrue(logOutput.contains(containerExpectedOutput)); assertTrue(logOutput.contains(pipelineExpectedOutput)); @@ -847,9 +860,9 @@ public void testCloseStalePipelines() throws IOException, TimeoutException { pipelineManager.closeStalePipelines(datanodeDetails); verify(pipelineManager, times(1)) - .closePipeline(stalePipelines.get(0), false); + .closePipeline(stalePipelines.get(0).getId()); verify(pipelineManager, times(1)) - .closePipeline(stalePipelines.get(1), false); + .closePipeline(stalePipelines.get(1).getId()); } @Test diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index 6604cd652f9a..dd0e5d9f3180 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -116,8 +116,8 @@ public void testPipelineMap() throws IOException, ratisContainer.getPipeline().getId()); Assertions.assertEquals(0, set2.size()); - pipelineManager - .closePipeline(ratisContainer.getPipeline(), false); + pipelineManager.closePipeline(ratisContainer.getPipeline().getId()); + pipelineManager.deletePipeline(ratisContainer.getPipeline().getId()); pipelines = scm.getScmNodeManager() .getPipelines(dns.get(0)); Assertions.assertFalse( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index bc312719cf52..a5ca909b3e78 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -84,6 +85,9 @@ public class TestPipelineClose { @BeforeEach public void init() throws Exception { conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build(); conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000, TimeUnit.MILLISECONDS); @@ -136,8 +140,8 @@ public void testPipelineCloseWithClosedContainer() throws IOException, .getContainersInPipeline(ratisContainer.getPipeline().getId()); Assert.assertEquals(0, setClosed.size()); - pipelineManager - .closePipeline(ratisContainer.getPipeline(), false); + pipelineManager.closePipeline(ratisContainer.getPipeline().getId()); + pipelineManager.deletePipeline(ratisContainer.getPipeline().getId()); for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) { // Assert that the pipeline has been removed from Node2PipelineMap as well Assert.assertFalse(scm.getScmNodeManager().getPipelines(dn) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java index ab36e079c14b..7c0fcd43722d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java @@ -107,6 +107,9 @@ public void setup() throws Exception { conf.setQuietMode(false); OzoneManager.setTestSecureOmFlag(true); conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1); + conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); clientConfig.setStreamBufferFlushDelay(false); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 834669bfcdfb..717304a5d0ae 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; @@ -84,7 +85,6 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.apache.ratis.protocol.RaftGroupId; @@ -136,8 +136,9 @@ public static void init() throws Exception { conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); - conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1, - TimeUnit.SECONDS); + conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); RatisClientConfig ratisClientConfig = conf.getObject(RatisClientConfig.class); @@ -480,7 +481,7 @@ public void testApplyTransactionFailure() throws Exception { } // when remove pipeline, group dir including snapshot will be deleted - LambdaTestUtils.await(5000, 500, + LambdaTestUtils.await(10000, 500, () -> (!snapshot.getPath().toFile().exists())); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java index 8be227fd1845..c24f209cdeb1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -111,6 +112,9 @@ public void setup() throws Exception { OzoneManager.setTestSecureOmFlag(true); conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1); // conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString()); + conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1) .setBlockSize(blockSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java index 7714f9d226e0..f3a0142f488e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; @@ -100,6 +101,9 @@ public void init() throws Exception { conf.setFromObject(config); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3); + conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java index c84ea1c8c178..e9497cd73f52 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptions.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; @@ -107,6 +108,9 @@ public void init() throws Exception { conf.setFromObject(clientConfig); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3); + conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index b52b10ed53c5..b9fb0d425b81 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.RatisTestHelper; import org.apache.hadoop.ozone.client.ObjectStore; @@ -63,7 +64,6 @@ import org.apache.ozone.test.tag.Flaky; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.junit.Assert; @@ -109,9 +109,10 @@ public void init() throws Exception { OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); clientConfig.setStreamBufferFlushDelay(false); conf.setFromObject(clientConfig); - - conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10, - TimeUnit.SECONDS); + + conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); conf.setQuietMode(false); conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java index cceae761947b..150e1b9860cd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java @@ -192,7 +192,8 @@ public void testReconRestart() throws Exception { .filter(p -> !p.getId().equals(containerInfo.getPipelineID())) .findFirst(); assertTrue(pipelineToClose.isPresent()); - scmPipelineManager.closePipeline(pipelineToClose.get(), false); + scmPipelineManager.closePipeline(pipelineToClose.get().getId()); + scmPipelineManager.deletePipeline(pipelineToClose.get().getId()); // Start Recon cluster.startRecon(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java index 073e0e3bbd4e..2df0d09db531 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.MiniOzoneClusterProvider; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.TestDataUtil; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; @@ -134,6 +135,9 @@ public static void init() { 1, SECONDS); conf.setTimeDuration(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, 0, SECONDS); + conf.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); ReplicationManagerConfiguration replicationConf = conf.getObject(ReplicationManagerConfiguration.class); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java index 52bd98de3506..53f4ce5e16ae 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java @@ -86,10 +86,12 @@ public void testPipelineDestroy() { Optional pipeline = pipelineManager .getPipelines().stream().findFirst(); Assertions.assertTrue(pipeline.isPresent()); - Assertions.assertDoesNotThrow(() -> - cluster.getStorageContainerManager() - .getPipelineManager() - .closePipeline(pipeline.get(), false)); + Assertions.assertDoesNotThrow(() -> { + PipelineManager pm = cluster.getStorageContainerManager() + .getPipelineManager(); + pm.closePipeline(pipeline.get().getId()); + pm.deletePipeline(pipeline.get().getId()); + }); MetricsRecordBuilder metrics = getMetrics( SCMPipelineMetrics.class.getSimpleName()); assertCounter("NumPipelineDestroyed", 1L, metrics); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java index 77dd5486ecfa..940de277ac7c 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java @@ -149,7 +149,8 @@ public void removeInvalidPipelines(List pipelinesFromScm) { } try { LOG.info("Removing invalid pipeline {} from Recon.", pipelineID); - closePipeline(p, false); + closePipeline(p.getId()); + deletePipeline(p.getId()); } catch (IOException e) { LOG.warn("Unable to remove pipeline {}", pipelineID, e); }