diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java index f7f1d52f9ef3..591acbc3b154 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm.pipeline; +import org.apache.commons.collections.iterators.LoopingIterator; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -26,6 +27,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -109,13 +112,15 @@ private void createPipelines() { ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE_DEFAULT); + List list = + new ArrayList<>(); for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor .values()) { if (skipCreation(factor, type, autoCreateFactorOne)) { // Skip this iteration for creating pipeline continue; } - + list.add(factor); if (!pipelineManager.getSafeModeStatus()) { try { pipelineManager.scrubPipeline(type, factor); @@ -123,21 +128,27 @@ private void createPipelines() { LOG.error("Error while scrubbing pipelines {}", e); } } + } - while (true) { - try { - if (scheduler.isClosed()) { - break; - } - pipelineManager.createPipeline(type, factor); - } catch (IOException ioe) { - break; - } catch (Throwable t) { - LOG.error("Error while creating pipelines", t); + LoopingIterator it = new LoopingIterator(list); + while (it.hasNext()) { + HddsProtos.ReplicationFactor factor = + (HddsProtos.ReplicationFactor) it.next(); + + try { + if (scheduler.isClosed()) { break; } + pipelineManager.createPipeline(type, factor); + } catch (IOException ioe) { + it.remove(); + } catch (Throwable t) { + LOG.error("Error while creating pipelines", t); + it.remove(); } } + isPipelineCreatorRunning.set(false); + LOG.debug("BackgroundPipelineCreator createPipelines finished."); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 6a6d3284465b..f05be767e717 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.node; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS; import java.io.File; @@ -162,7 +163,9 @@ public void testOnMessage() throws Exception { LambdaTestUtils.await(120000, 1000, () -> { pipelineManager.triggerPipelineCreation(); - return pipelineManager.getPipelines(RATIS, THREE).size() == 3; + System.out.println(pipelineManager.getPipelines(RATIS, THREE).size()); + System.out.println(pipelineManager.getPipelines(RATIS, ONE).size()); + return pipelineManager.getPipelines(RATIS, THREE).size() > 3; }); TestUtils.openAllRatisPipelines(pipelineManager);