diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 008683dbc92e..a7c09238b4cf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -101,32 +101,32 @@ public RatisPipelineProvider(NodeManager nodeManager, } } - private boolean exceedPipelineNumberLimit( - RatisReplicationConfig replicationConfig) { + private boolean exceedPipelineNumberLimit(RatisReplicationConfig replicationConfig) { + // Apply limits only for replication factor THREE if (replicationConfig.getReplicationFactor() != ReplicationFactor.THREE) { - // Only put limits for Factor THREE pipelines. return false; } - // Per datanode limit + + PipelineStateManager pipelineStateManager = getPipelineStateManager(); + int totalActivePipelines = pipelineStateManager.getPipelines(replicationConfig).size(); + int closedPipelines = pipelineStateManager.getPipelines(replicationConfig, PipelineState.CLOSED).size(); + int openPipelines = totalActivePipelines - closedPipelines; + // Check per-datanode pipeline limit if (maxPipelinePerDatanode > 0) { - return (getPipelineStateManager().getPipelines(replicationConfig).size() - - getPipelineStateManager().getPipelines(replicationConfig, - PipelineState.CLOSED).size()) > maxPipelinePerDatanode * - getNodeManager().getNodeCount(NodeStatus.inServiceHealthy()) / - replicationConfig.getRequiredNodes(); + int healthyNodeCount = getNodeManager() + .getNodeCount(NodeStatus.inServiceHealthy()); + int allowedOpenPipelines = (maxPipelinePerDatanode * healthyNodeCount) + / replicationConfig.getRequiredNodes(); + return openPipelines >= allowedOpenPipelines; } - - // Global limit + // Check global pipeline limit if (pipelineNumberLimit > 0) { - return (getPipelineStateManager().getPipelines(replicationConfig).size() - - getPipelineStateManager().getPipelines( - replicationConfig, PipelineState.CLOSED).size()) > - (pipelineNumberLimit - getPipelineStateManager() - .getPipelines(RatisReplicationConfig - .getInstance(ReplicationFactor.ONE)) - .size()); + int factorOnePipelineCount = pipelineStateManager + .getPipelines(RatisReplicationConfig.getInstance(ReplicationFactor.ONE)).size(); + int allowedOpenPipelines = pipelineNumberLimit - factorOnePipelineCount; + return openPipelines >= allowedOpenPipelines; } - + // No limits are set return false; } @@ -147,10 +147,15 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, List excludedNodes, List favoredNodes) throws IOException { if (exceedPipelineNumberLimit(replicationConfig)) { - throw new SCMException("Ratis pipeline number meets the limit: " + - pipelineNumberLimit + " replicationConfig : " + - replicationConfig, - SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + String limitInfo = (maxPipelinePerDatanode > 0) + ? String.format("per datanode: %d", maxPipelinePerDatanode) + : String.format(": %d", pipelineNumberLimit); + + throw new SCMException( + String.format("Cannot create pipeline as it would exceed the limit %s replicationConfig: %s", + limitInfo, replicationConfig), + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE + ); } List dns; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 87ace151e7d6..bd10fb3da0b0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -436,7 +436,7 @@ public void testScmLayoutOnRegister() // creation, they will fail with not enough healthy nodes for ratis 3 // pipeline. Therefore we do not have to worry about this create call // failing due to datanodes reaching their maximum pipeline limit. - assertPipelineCreationFailsWithNotEnoughNodes(1); + assertPipelineCreationFailsWithExceedingLimit(2); // Heartbeat bad MLV nodes back to healthy. nodeManager.processLayoutVersionReport(badMlvNode1, CORRECT_LAYOUT_PROTO); @@ -468,6 +468,19 @@ private void assertPipelineCreationFailsWithNotEnoughNodes( actualNodeCount); } + private void assertPipelineCreationFailsWithExceedingLimit(int limit) { + // Build once, outside the assertion + ReplicationConfig config = ReplicationConfig.fromProtoTypeAndFactor( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + SCMException ex = assertThrows( + SCMException.class, + () -> scm.getPipelineManager().createPipeline(config), + "3 nodes should not have been found for a pipeline."); + assertThat(ex.getMessage()) + .contains("Cannot create pipeline as it would exceed the limit per datanode: " + limit); + } + private void assertPipelines(HddsProtos.ReplicationFactor factor, Predicate countCheck, Collection allowedDNs) throws Exception { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index 96ba75507088..3b40d6c6077c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -64,6 +64,8 @@ import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; /** * Test for {@link RatisPipelineProvider}. @@ -361,6 +363,31 @@ public void testCreatePipelinesWhenNotEnoughSpace(@TempDir File tempDir) throws } } + @ParameterizedTest + @CsvSource({ "1, 2", "2, 5" }) + public void testCreatePipelineThrowErrorWithDataNodeLimit(int limit, int pipelineCount) throws Exception { + init(limit); + + // Create pipelines up to the limit (2 for limit=1, 5 for limit=2). + for (int i = 0; i < pipelineCount; i++) { + stateManager.addPipeline( + provider.create(RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new ArrayList<>(), new ArrayList<>()).getProtobufMessage(ClientVersion.CURRENT_VERSION) + ); + } + + // Verify that creating an additional pipeline throws an exception. + SCMException exception = assertThrows(SCMException.class, () -> + provider.create(RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new ArrayList<>(), new ArrayList<>()) + ); + + // Validate exception message. + String expectedError = String.format( + "Cannot create pipeline as it would exceed the limit per datanode: %d replicationConfig: RATIS/THREE", limit); + assertEquals(expectedError, exception.getMessage()); + } + private void addPipeline( List dns, Pipeline.PipelineState open, ReplicationConfig replicationConfig) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java index cad08856281b..699c03c6c941 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java @@ -145,7 +145,7 @@ public void testPipelineCreationOnNodeRestart() throws Exception { pipelineManager.createPipeline(RatisReplicationConfig.getInstance( ReplicationFactor.THREE)), "pipeline creation should fail after shutting down pipeline"); - assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES, ioe.getResult()); + assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE, ioe.getResult()); // make sure pipelines is destroyed waitForPipelines(0);