diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineCodec.java index 25a1e44651ea..3c0eb78f3b4f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineCodec.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineCodec.java @@ -41,8 +41,7 @@ public byte[] toPersistedFormat(Pipeline object) throws IOException { public Pipeline fromPersistedFormat(byte[] rawData) throws IOException { HddsProtos.Pipeline.Builder pipelineBuilder = HddsProtos.Pipeline .newBuilder(HddsProtos.Pipeline.PARSER.parseFrom(rawData)); - Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState( - HddsProtos.PipelineState.PIPELINE_ALLOCATED).build()); + Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.build()); // When SCM is restarted, set Creation time with current time. pipeline.setCreationTimestamp(Instant.now()); Preconditions.checkNotNull(pipeline); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 9b563efce1ad..ac6a4ad32630 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -114,9 +114,11 @@ protected void processPipelineReport(PipelineReport report, } if (pipeline.isHealthy()) { pipelineManager.openPipeline(pipelineID); - if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) { - publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline); - } + } + } + if (pipeline.isHealthy()) { + if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) { + publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index db7fcae212aa..e0ea885c49cc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -269,7 +269,9 @@ public Pipeline createPipeline(ReplicationType type, lock.writeLock().lock(); try { Pipeline pipeline = pipelineFactory.create(type, factor); - pipelineStore.put(pipeline.getId(), pipeline); + if (pipelineStore != null) { + pipelineStore.put(pipeline.getId(), pipeline); + } stateManager.addPipeline(pipeline); nodeManager.addPipeline(pipeline); recordMetricsForPipeline(pipeline); @@ -405,6 +407,23 @@ public void addContainerToPipeline(PipelineID pipelineID, } } + private void updatePipelineStateInDb(PipelineID pipelineId, + Pipeline.PipelineState state) + throws IOException { + // null check is here to prevent the case where SCM store + // is closed but the staleNode handlers/pipleine creations + // still try to access it. + if (pipelineStore != null) { + try { + pipelineStore.put(pipelineId, getPipeline(pipelineId)); + } catch (IOException ex) { + LOG.info("Pipeline {} state update failed", pipelineId); + // revert back to old state in memory + stateManager.updatePipelineState(pipelineId, state); + } + } + } + @Override public void removeContainerFromPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException { @@ -436,7 +455,10 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException { public void openPipeline(PipelineID pipelineId) throws IOException { lock.writeLock().lock(); try { + Pipeline.PipelineState state = stateManager. + getPipeline(pipelineId).getPipelineState(); Pipeline pipeline = stateManager.openPipeline(pipelineId); + updatePipelineStateInDb(pipelineId, state); metrics.incNumPipelineCreated(); metrics.createPerPipelineMetrics(pipeline); } finally { @@ -535,7 +557,10 @@ public void triggerPipelineCreation() { @Override public void activatePipeline(PipelineID pipelineID) throws IOException { + Pipeline.PipelineState state = stateManager. + getPipeline(pipelineID).getPipelineState(); stateManager.activatePipeline(pipelineID); + updatePipelineStateInDb(pipelineID, state); } /** @@ -547,7 +572,10 @@ public void activatePipeline(PipelineID pipelineID) @Override public void deactivatePipeline(PipelineID pipelineID) throws IOException { + Pipeline.PipelineState state = stateManager. + getPipeline(pipelineID).getPipelineState(); stateManager.deactivatePipeline(pipelineID); + updatePipelineStateInDb(pipelineID, state); } /** @@ -600,7 +628,10 @@ public void waitPipelineReady(PipelineID pipelineID, long timeout) private void finalizePipeline(PipelineID pipelineId) throws IOException { lock.writeLock().lock(); try { + Pipeline.PipelineState state = stateManager. + getPipeline(pipelineId).getPipelineState(); stateManager.finalizePipeline(pipelineId); + updatePipelineStateInDb(pipelineId, state); Set containerIDs = stateManager.getContainers(pipelineId); for (ContainerID containerID : containerIDs) { eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); @@ -669,6 +700,15 @@ public void close() throws IOException { // shutdown pipeline provider. pipelineFactory.shutdown(); + lock.writeLock().lock(); + try { + pipelineStore.close(); + pipelineStore = null; + } catch (Exception ex) { + LOG.error("Pipeline store close failed", ex); + } finally { + lock.writeLock().unlock(); + } } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java index 663d8d56e2d0..b6ab0d0d6d09 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java @@ -78,7 +78,7 @@ public class HealthyPipelineSafeModeRule extends SafeModeExitRule { // We want to wait for RATIS THREE factor write pipelines int pipelineCount = pipelineManager.getPipelines( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, - Pipeline.PipelineState.ALLOCATED).size(); + Pipeline.PipelineState.OPEN).size(); // This value will be zero when pipeline count is 0. // On a fresh installed cluster, there will be zero pipelines in the SCM @@ -118,7 +118,6 @@ protected void process(Pipeline pipeline) { Preconditions.checkNotNull(pipeline); if (pipeline.getType() == HddsProtos.ReplicationType.RATIS && pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && - pipeline.isHealthy() && !processedPipelineIDs.contains(pipeline.getId())) { getSafeModeMetrics().incCurrentHealthyPipelinesCount(); currentHealthyPipelineCount++; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java index 48ae991d684a..e2436226e8b1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java @@ -22,10 +22,13 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.TypedEvent; import org.slf4j.Logger; @@ -37,11 +40,11 @@ /** * This rule covers whether we have at least one datanode is reported for each - * pipeline. This rule is for all open containers, we have at least one + * open pipeline. This rule is for all open containers, we have at least one * replica available for read when we exit safe mode. */ public class OneReplicaPipelineSafeModeRule extends - SafeModeExitRule { + SafeModeExitRule { private static final Logger LOG = LoggerFactory.getLogger(OneReplicaPipelineSafeModeRule.class); @@ -50,6 +53,7 @@ public class OneReplicaPipelineSafeModeRule extends private Set reportedPipelineIDSet = new HashSet<>(); private Set oldPipelineIDSet; private int currentReportedPipelineCount = 0; + private PipelineManager pipelineManager; public OneReplicaPipelineSafeModeRule(String ruleName, EventQueue eventQueue, @@ -68,9 +72,10 @@ public OneReplicaPipelineSafeModeRule(String ruleName, EventQueue eventQueue, HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT + " value should be >= 0.0 and <= 1.0"); + this.pipelineManager = pipelineManager; oldPipelineIDSet = pipelineManager.getPipelines( HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE) + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) .stream().map(p -> p.getId()).collect(Collectors.toSet()); int totalPipelineCount = oldPipelineIDSet.size(); @@ -85,8 +90,8 @@ public OneReplicaPipelineSafeModeRule(String ruleName, EventQueue eventQueue, } @Override - protected TypedEvent getEventType() { - return SCMEvents.OPEN_PIPELINE; + protected TypedEvent getEventType() { + return SCMEvents.PIPELINE_REPORT; } @Override @@ -95,16 +100,26 @@ protected boolean validate() { } @Override - protected void process(Pipeline pipeline) { - Preconditions.checkNotNull(pipeline); - if (pipeline.getType() == HddsProtos.ReplicationType.RATIS && - pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && - !reportedPipelineIDSet.contains(pipeline.getId())) { - if (oldPipelineIDSet.contains(pipeline.getId())) { - getSafeModeMetrics() - .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount(); - currentReportedPipelineCount++; - reportedPipelineIDSet.add(pipeline.getId()); + protected void process(PipelineReportFromDatanode report) { + Preconditions.checkNotNull(report); + for (PipelineReport report1 : report.getReport().getPipelineReportList()) { + Pipeline pipeline; + try { + pipeline = pipelineManager.getPipeline( + PipelineID.getFromProtobuf(report1.getPipelineID())); + } catch (PipelineNotFoundException pnfe) { + continue; + } + if (pipeline.getType() == HddsProtos.ReplicationType.RATIS && + pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && + pipeline.isOpen() && + !reportedPipelineIDSet.contains(pipeline.getId())) { + if (oldPipelineIDSet.contains(pipeline.getId())) { + getSafeModeMetrics(). + incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount(); + currentReportedPipelineCount++; + reportedPipelineIDSet.add(pipeline.getId()); + } } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index d14e4683a4e2..67aa338ca02b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -131,6 +131,7 @@ public void testPipelineReload() throws IOException { Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + pipelineManager.openPipeline(pipeline.getId()); pipelines.add(pipeline); } pipelineManager.close(); @@ -146,7 +147,8 @@ public void testPipelineReload() throws IOException { pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, mockRatisProvider); for (Pipeline p : pipelines) { - pipelineManager.openPipeline(p.getId()); + // After reload, pipelines should be in open state + Assert.assertTrue(pipelineManager.getPipeline(p.getId()).isOpen()); } List pipelineList = pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java index 7a40e3e47c9e..e770ba959624 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java @@ -128,12 +128,15 @@ public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception { Pipeline pipeline1 = pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + pipelineManager.openPipeline(pipeline1.getId()); Pipeline pipeline2 = pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + pipelineManager.openPipeline(pipeline2.getId()); Pipeline pipeline3 = pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + pipelineManager.openPipeline(pipeline3.getId()); SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager( config, containers, pipelineManager, eventQueue); @@ -204,12 +207,15 @@ public void testHealthyPipelineSafeModeRuleWithMixedPipelines() Pipeline pipeline1 = pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE); + pipelineManager.openPipeline(pipeline1.getId()); Pipeline pipeline2 = pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + pipelineManager.openPipeline(pipeline2.getId()); Pipeline pipeline3 = pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + pipelineManager.openPipeline(pipeline3.getId()); SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java index 4e1cf6fcb2d3..6430247b6987 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java @@ -18,21 +18,24 @@ package org.apache.hadoop.hdds.scm.safemode; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; -import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; -import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.*; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.test.GenericTestUtils; @@ -52,6 +55,7 @@ public class TestOneReplicaPipelineSafeModeRule { private OneReplicaPipelineSafeModeRule rule; private SCMPipelineManager pipelineManager; private EventQueue eventQueue; + private MockNodeManager mockNodeManager; private void setup(int nodes, int pipelineFactorThreeCount, int pipelineFactorOneCount) throws Exception { @@ -65,7 +69,7 @@ private void setup(int nodes, int pipelineFactorThreeCount, List containers = new ArrayList<>(); containers.addAll(HddsTestUtils.getContainerInfo(1)); - MockNodeManager mockNodeManager = new MockNodeManager(true, nodes); + mockNodeManager = new MockNodeManager(true, nodes); eventQueue = new EventQueue(); @@ -112,9 +116,7 @@ public void testOneReplicaPipelineRule() throws Exception { LoggerFactory.getLogger(SCMSafeModeManager.class)); List pipelines = pipelineManager.getPipelines(); - for (int i = 0; i < pipelineFactorThreeCount -1; i++) { - firePipelineEvent(pipelines.get(i)); - } + firePipelineEvent(pipelines.subList(0, pipelineFactorThreeCount -1)); // As 90% of 7 with ceil is 7, if we send 6 pipeline reports, rule // validate should be still false. @@ -125,7 +127,8 @@ public void testOneReplicaPipelineRule() throws Exception { Assert.assertFalse(rule.validate()); //Fire last pipeline event from datanode. - firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1)); + firePipelineEvent(pipelines.subList(pipelineFactorThreeCount -1, + pipelineFactorThreeCount)); GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000); } @@ -150,10 +153,7 @@ public void testOneReplicaPipelineRuleMixedPipelines() throws Exception { List pipelines = pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE); - for (int i = 0; i < pipelineCountOne; i++) { - firePipelineEvent(pipelines.get(i)); - } - + firePipelineEvent(pipelines); GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains( "reported count is 0"), 1000, 5000); @@ -163,15 +163,14 @@ public void testOneReplicaPipelineRuleMixedPipelines() throws Exception { pipelines = pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); - for (int i = 0; i < pipelineCountThree - 1; i++) { - firePipelineEvent(pipelines.get(i)); - } + firePipelineEvent(pipelines.subList(0, pipelineCountThree -1)); GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains( "reported count is 6"), 1000, 5000); //Fire last pipeline event from datanode. - firePipelineEvent(pipelines.get(pipelineCountThree - 1)); + firePipelineEvent(pipelines.subList(pipelineCountThree -1, + pipelineCountThree)); GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000); } @@ -179,12 +178,45 @@ public void testOneReplicaPipelineRuleMixedPipelines() throws Exception { private void createPipelines(int count, HddsProtos.ReplicationFactor factor) throws Exception { for (int i = 0; i < count; i++) { - pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, - factor); + Pipeline pipeline = pipelineManager.createPipeline( + HddsProtos.ReplicationType.RATIS, factor); + pipelineManager.openPipeline(pipeline.getId()); + } } - private void firePipelineEvent(Pipeline pipeline) { - eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline); + private void firePipelineEvent(List pipelines) { + Map + reportMap = new HashMap<>(); + for (Pipeline pipeline : pipelines) { + for (DatanodeDetails dn : pipeline.getNodes()) { + reportMap.putIfAbsent(dn, PipelineReportsProto.newBuilder()); + } + } + for (DatanodeDetails dn : reportMap.keySet()) { + List reports = new ArrayList<>(); + for (PipelineID pipeline : mockNodeManager. + getNode2PipelineMap().getPipelines(dn.getUuid())) { + try { + if (!pipelines.contains(pipelineManager.getPipeline(pipeline))) { + continue; + } + } catch (PipelineNotFoundException pnfe) { + continue; + } + HddsProtos.PipelineID pipelineID = pipeline.getProtobuf(); + reports.add(PipelineReport.newBuilder() + .setPipelineID(pipelineID) + .setIsLeader(true) + .setBytesWritten(0) + .build()); + } + PipelineReportsProto.Builder pipelineReportsProto = + PipelineReportsProto.newBuilder(); + pipelineReportsProto.addAllPipelineReport(reports); + eventQueue.fireEvent(SCMEvents.PIPELINE_REPORT, new + SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(dn, + pipelineReportsProto.build())); + } } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index 418d945f6383..78313070b92b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -30,19 +30,18 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; -import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; -import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.*; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -364,8 +363,10 @@ public void testSafeModeExitRuleWithPipelineAvailabilityCheck( pipelineManager.allowPipelineCreation(); for (int i=0; i < pipelineCount; i++) { - pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, + Pipeline pipeline = pipelineManager. + createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + pipelineManager.openPipeline(pipeline.getId()); } for (ContainerInfo container : containers) { @@ -454,6 +455,27 @@ private void firePipelineEvent(SCMPipelineManager pipelineManager, pipelineManager.openPipeline(pipeline.getId()); queue.fireEvent(SCMEvents.OPEN_PIPELINE, pipelineManager.getPipeline(pipeline.getId())); + + for (DatanodeDetails dn : pipeline.getNodes()) { + List reports = new ArrayList<>(); + HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf(); + reports.add(StorageContainerDatanodeProtocolProtos + .PipelineReport.newBuilder() + .setPipelineID(pipelineID) + .setIsLeader(true) + .setBytesWritten(0) + .build()); + StorageContainerDatanodeProtocolProtos + .PipelineReportsProto.Builder pipelineReportsProto = + StorageContainerDatanodeProtocolProtos + .PipelineReportsProto.newBuilder(); + pipelineReportsProto.addAllPipelineReport(reports); + queue.fireEvent(SCMEvents.PIPELINE_REPORT, new + SCMDatanodeHeartbeatDispatcher + .PipelineReportFromDatanode(dn, + pipelineReportsProto.build())); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java index d3ae09492c21..3f62ec33e321 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java @@ -29,7 +29,9 @@ import org.junit.Test; import java.io.IOException; +import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; import static org.apache.hadoop.hdds.protocol.proto .HddsProtos.ReplicationFactor.THREE; @@ -65,6 +67,8 @@ public class TestSCMRestart { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1000, + TimeUnit.MILLISECONDS); int numOfNodes = 4; cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(numOfNodes) @@ -80,9 +84,11 @@ public static void init() throws Exception { ratisPipeline1 = pipelineManager.getPipeline( containerManager.allocateContainer( RATIS, THREE, "Owner1").getPipelineID()); + pipelineManager.openPipeline(ratisPipeline1.getId()); ratisPipeline2 = pipelineManager.getPipeline( containerManager.allocateContainer( RATIS, ONE, "Owner2").getPipelineID()); + pipelineManager.openPipeline(ratisPipeline2.getId()); // At this stage, there should be 2 pipeline one with 1 open container // each. Try restarting the SCM and then discover that pipeline are in // correct state.