diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 21aee428f0c..5940a630e51 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -57,6 +57,8 @@ public final class Pipeline { private UUID leaderId; // Timestamp for pipeline upon creation private Long creationTimestamp; + // Only valid for Ratis THREE pipeline. No need persist. + private int nodeIdsHash; /** * The immutable properties of pipeline object is used in @@ -72,6 +74,7 @@ private Pipeline(PipelineID id, ReplicationType type, this.state = state; this.nodeStatus = nodeStatus; this.creationTimestamp = System.currentTimeMillis(); + this.nodeIdsHash = 0; } /** @@ -128,6 +131,14 @@ void setCreationTimestamp(Long creationTimestamp) { this.creationTimestamp = creationTimestamp; } + public int getNodeIdsHash() { + return nodeIdsHash; + } + + void setNodeIdsHash(int nodeIdsHash) { + this.nodeIdsHash = nodeIdsHash; + } + /** * Return the pipeline leader's UUID. * @@ -328,6 +339,7 @@ public static class Builder { private List nodesInOrder = null; private UUID leaderId = null; private Long creationTimestamp = null; + private int nodeIdsHash = 0; public Builder() {} @@ -340,6 +352,7 @@ public Builder(Pipeline pipeline) { this.nodesInOrder = pipeline.nodesInOrder.get(); this.leaderId = pipeline.getLeaderId(); this.creationTimestamp = pipeline.getCreationTimestamp(); + this.nodeIdsHash = 0; } public Builder setId(PipelineID id1) { @@ -378,6 +391,11 @@ public Builder setNodesInOrder(List orders) { return this; } + public Builder setNodeIdsHash(int nodeIdsHash1) { + this.nodeIdsHash = nodeIdsHash1; + return this; + } + public Pipeline build() { Preconditions.checkNotNull(id); Preconditions.checkNotNull(type); @@ -386,6 +404,7 @@ public Pipeline build() { Preconditions.checkNotNull(nodeStatus); Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus); pipeline.setLeaderId(leaderId); + pipeline.setNodeIdsHash(nodeIdsHash); // overwrite with original creationTimestamp if (creationTimestamp != null) { pipeline.setCreationTimestamp(creationTimestamp); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 23eb5745421..bc65d14d4a5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -162,7 +162,7 @@ List filterViableNodes( // filter nodes that meet the size and pipeline engagement criteria. // Pipeline placement doesn't take node space left into account. List healthyList = healthyNodes.stream() - .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired) + .filter(d -> meetCriteria(d, nodesRequired)) .collect(Collectors.toList()); if (healthyList.size() < nodesRequired) { @@ -308,6 +308,7 @@ public DatanodeDetails chooseNode( } // the pick is decided and it should be removed from candidates. healthyNodes.remove(datanodeDetails); + return datanodeDetails; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 180d0bfa388..5bf65c42df4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -133,6 +133,13 @@ Pipeline openPipeline(PipelineID pipelineId) throws IOException { pipeline = pipelineStateMap .updatePipelineState(pipelineId, PipelineState.OPEN); } + // Amend nodeIdsHash if needed. + if (pipeline.getType() == ReplicationType.RATIS && + pipeline.getFactor() == ReplicationFactor.THREE && + pipeline.getNodeIdsHash() == 0) { + pipeline.setNodeIdsHash(RatisPipelineUtils + .encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes())); + } return pipeline; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index ceba620f3a0..d0c29789e29 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -157,6 +157,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException { } List dns; + int nodeIdHash = 0; switch(factor) { case ONE: @@ -165,6 +166,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException { case THREE: dns = placementPolicy.chooseDatanodes(null, null, factor.getNumber(), 0); + nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(dns); break; default: throw new IllegalStateException("Unknown factor: " + factor.name()); @@ -176,6 +178,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException { .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(dns) + .setNodeIdsHash(nodeIdHash) .build(); // Send command to datanodes to create pipeline @@ -197,12 +200,17 @@ public Pipeline create(ReplicationFactor factor) throws IOException { @Override public Pipeline create(ReplicationFactor factor, List nodes) { + int nodeIdHash = 0; + if (factor == ReplicationFactor.THREE) { + nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes); + } return Pipeline.newBuilder() .setId(PipelineID.randomId()) .setState(PipelineState.ALLOCATED) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(nodes) + .setNodeIdsHash(nodeIdHash) .build(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 04393a1131a..366eabc42dd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hdds.scm.pipeline; import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.ratis.RatisHelper; @@ -35,7 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Utility class for Ratis pipelines. Contains methods to create and destroy * ratis pipelines. @@ -100,4 +102,38 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, true, p.getId()); } } + + static int encodeNodeIdsOfFactorThreePipeline(List nodes) { + if (nodes.size() != HddsProtos.ReplicationFactor.THREE.getNumber()) { + return 0; + } + return nodes.get(0).getUuid().hashCode() ^ + nodes.get(1).getUuid().hashCode() ^ + nodes.get(2).getUuid().hashCode(); + } + + /** + * Return first existed pipeline which share the same set of datanodes + * with the input pipeline. + * @param stateManager PipelineStateManager + * @param pipeline input pipeline + * @return first matched pipeline + */ + static Pipeline checkPipelineContainSameDatanodes( + PipelineStateManager stateManager, Pipeline pipeline) { + List matchedPipelines = stateManager.getPipelines( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE) + .stream().filter(p -> !p.getId().equals(pipeline.getId()) && + (// For all OPEN or ALLOCATED pipelines + p.getPipelineState() == Pipeline.PipelineState.OPEN || + p.getPipelineState() == Pipeline.PipelineState.ALLOCATED) && + p.getNodeIdsHash() == pipeline.getNodeIdsHash()) + .collect(Collectors.toList()); + if (matchedPipelines.size() == 0) { + return null; + } else { + return matchedPipelines.stream().findFirst().get(); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index e89206d57fb..ccb406f4c62 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -128,6 +128,19 @@ public void setPipelineProvider(ReplicationType replicationType, pipelineFactory.setProvider(replicationType, provider); } + private int computeNodeIdHash(Pipeline pipeline) { + if (pipeline.getType() != ReplicationType.RATIS) { + return 0; + } + + if (pipeline.getFactor() != ReplicationFactor.THREE) { + return 0; + } + + return RatisPipelineUtils. + encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes()); + } + private void initializePipelineState() throws IOException { if (pipelineStore.isEmpty()) { LOG.info("No pipeline exists in current db"); @@ -143,6 +156,7 @@ private void initializePipelineState() throws IOException { Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState( HddsProtos.PipelineState.PIPELINE_ALLOCATED).build()); Preconditions.checkNotNull(pipeline); + pipeline.setNodeIdsHash(computeNodeIdHash(pipeline)); stateManager.addPipeline(pipeline); nodeManager.addPipeline(pipeline); } @@ -163,6 +177,18 @@ public synchronized Pipeline createPipeline(ReplicationType type, metrics.incNumPipelineCreated(); metrics.createPerPipelineMetrics(pipeline); } + Pipeline overlapPipeline = RatisPipelineUtils + .checkPipelineContainSameDatanodes(stateManager, pipeline); + if (overlapPipeline != null) { + metrics.incNumPipelineContainSameDatanodes(); + //TODO remove until pipeline allocation is proved equally distributed. + LOG.info("Pipeline: " + pipeline.getId().toString() + + " contains same datanodes as previous pipeline: " + + overlapPipeline.getId().toString() + " nodeIds: " + + pipeline.getNodes().get(0).getUuid().toString() + + ", " + pipeline.getNodes().get(1).getUuid().toString() + + ", " + pipeline.getNodes().get(2).getUuid().toString()); + } return pipeline; } catch (IOException ex) { metrics.incNumPipelineCreationFailed(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java index 8c348ed87cf..1cf8d3a1e52 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java @@ -54,6 +54,7 @@ public final class SCMPipelineMetrics implements MetricsSource { private @Metric MutableCounterLong numPipelineDestroyFailed; private @Metric MutableCounterLong numPipelineReportProcessed; private @Metric MutableCounterLong numPipelineReportProcessingFailed; + private @Metric MutableCounterLong numPipelineContainSameDatanodes; private Map numBlocksAllocated; /** Private constructor. */ @@ -92,6 +93,7 @@ public void getMetrics(MetricsCollector collector, boolean all) { numPipelineDestroyFailed.snapshot(recordBuilder, true); numPipelineReportProcessed.snapshot(recordBuilder, true); numPipelineReportProcessingFailed.snapshot(recordBuilder, true); + numPipelineContainSameDatanodes.snapshot(recordBuilder, true); numBlocksAllocated .forEach((pid, metric) -> metric.snapshot(recordBuilder, true)); } @@ -176,4 +178,11 @@ void incNumPipelineReportProcessed() { void incNumPipelineReportProcessingFailed() { numPipelineReportProcessingFailed.incr(); } + + /** + * Increments number of pipeline who contains same set of datanodes. + */ + void incNumPipelineContainSameDatanodes() { + numPipelineContainSameDatanodes.incr(); + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 613146d2c22..9da78172c35 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -16,7 +16,7 @@ */ package org.apache.hadoop.hdds.scm.container; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.TestUtils; @@ -93,7 +93,8 @@ public class MockNodeManager implements NodeManager { private NetworkTopology clusterMap; private ConcurrentMap> dnsToUuidMap; - public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { + public MockNodeManager(NetworkTopologyImpl clusterMap, + boolean initializeFakeNodes, int nodeCount) { this.healthyNodes = new LinkedList<>(); this.staleNodes = new LinkedList<>(); this.deadNodes = new LinkedList<>(); @@ -101,8 +102,8 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.node2PipelineMap = new Node2PipelineMap(); this.node2ContainerMap = new Node2ContainerMap(); this.dnsToUuidMap = new ConcurrentHashMap<>(); - aggregateStat = new SCMNodeStat(); - clusterMap = new NetworkTopologyImpl(new Configuration()); + this.aggregateStat = new SCMNodeStat(); + this.clusterMap = clusterMap; if (initializeFakeNodes) { for (int x = 0; x < nodeCount; x++) { DatanodeDetails dd = TestUtils.randomDatanodeDetails(); @@ -114,6 +115,11 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.commandMap = new HashMap<>(); } + public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { + this(new NetworkTopologyImpl(new OzoneConfiguration()), + initializeFakeNodes, nodeCount); + } + /** * Invoked from ctor to create some node Metrics. * diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index 4f503e4f537..2ae18ed5551 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; @@ -67,13 +68,14 @@ public static void setUp() throws Exception { .getTestDir(TestCloseContainerEventHandler.class.getSimpleName()); configuration .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + configuration.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT, 16); nodeManager = new MockNodeManager(true, 10); eventQueue = new EventQueue(); pipelineManager = new SCMPipelineManager(configuration, nodeManager, eventQueue); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, - pipelineManager.getStateManager(), configuration); + pipelineManager.getStateManager(), configuration, eventQueue); pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, mockRatisProvider); containerManager = new @@ -93,6 +95,9 @@ public static void tearDown() throws Exception { if (containerManager != null) { containerManager.close(); } + if (pipelineManager != null) { + pipelineManager.close(); + } FileUtil.fullyDelete(testDir); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index ff524702725..3eb146a2c9c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -73,6 +73,8 @@ public Pipeline create(HddsProtos.ReplicationFactor factor) .setType(initialPipeline.getType()) .setFactor(factor) .setNodes(initialPipeline.getNodes()) + .setNodeIdsHash(RatisPipelineUtils + .encodeNodeIdsOfFactorThreePipeline(initialPipeline.getNodes())) .build(); } } @@ -91,6 +93,8 @@ public Pipeline create(HddsProtos.ReplicationFactor factor, .setType(HddsProtos.ReplicationType.RATIS) .setFactor(factor) .setNodes(nodes) + .setNodeIdsHash(RatisPipelineUtils + .encodeNodeIdsOfFactorThreePipeline(nodes)) .build(); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java new file mode 100644 index 00000000000..45f85eff17d --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE; + +/** + * Test for pipeline datanodes intersection. + */ +@RunWith(Parameterized.class) +public class TestPipelineDatanodesIntersection { + private static final Logger LOG = LoggerFactory + .getLogger(TestPipelineDatanodesIntersection.class.getName()); + + private int nodeCount; + private int nodeHeaviness; + private OzoneConfiguration conf; + private boolean end; + + @Before + public void initialize() { + conf = new OzoneConfiguration(); + end = false; + } + + public TestPipelineDatanodesIntersection(int nodeCount, int nodeHeaviness) { + this.nodeCount = nodeCount; + this.nodeHeaviness = nodeHeaviness; + } + + @Parameterized.Parameters + public static Collection inputParams() { + return Arrays.asList(new Object[][] { + {4, 5}, + {10, 5}, + {20, 5}, + {50, 5}, + {100, 5}, + {100, 10} + }); + } + + @Test + public void testPipelineDatanodesIntersection() { + NodeManager nodeManager= new MockNodeManager(true, nodeCount); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, nodeHeaviness); + conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); + PipelineStateManager stateManager = new PipelineStateManager(conf); + PipelineProvider provider = new MockRatisPipelineProvider(nodeManager, + stateManager, conf); + + int healthyNodeCount = nodeManager + .getNodeCount(HddsProtos.NodeState.HEALTHY); + int intersectionCount = 0; + int createdPipelineCount = 0; + while (!end && createdPipelineCount <= healthyNodeCount * nodeHeaviness) { + try { + Pipeline pipeline = provider.create(HddsProtos.ReplicationFactor.THREE); + stateManager.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); + Pipeline overlapPipeline = RatisPipelineUtils + .checkPipelineContainSameDatanodes(stateManager, pipeline); + if (overlapPipeline != null){ + intersectionCount++; + LOG.info("This pipeline: " + pipeline.getId().toString() + + " overlaps with previous pipeline: " + overlapPipeline.getId() + + ". They share same set of datanodes as: " + + pipeline.getNodesInOrder().get(0).getUuid() + "/" + + pipeline.getNodesInOrder().get(1).getUuid() + "/" + + pipeline.getNodesInOrder().get(2).getUuid() + " and " + + overlapPipeline.getNodesInOrder().get(0).getUuid() + "/" + + overlapPipeline.getNodesInOrder().get(1).getUuid() + "/" + + overlapPipeline.getNodesInOrder().get(2).getUuid() + + " is the same."); + } + createdPipelineCount++; + } catch(SCMException e) { + end = true; + } catch (IOException e) { + end = true; + // Should not throw regular IOException. + Assert.fail(); + } + } + + end = false; + + LOG.info("Among total " + + stateManager.getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE).size() + " created pipelines" + + " with " + healthyNodeCount + " healthy datanodes and " + + nodeHeaviness + " as node heaviness, " + + intersectionCount + " pipelines has same set of datanodes."); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index cf43accda1e..d4c0d292449 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -21,10 +21,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -34,9 +34,14 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.commons.collections.CollectionUtils.intersection; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; /** @@ -50,12 +55,13 @@ public class TestRatisPipelineProvider { private NodeManager nodeManager; private PipelineProvider provider; private PipelineStateManager stateManager; + private OzoneConfiguration conf; @Before public void init() throws Exception { nodeManager = new MockNodeManager(true, 10); - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1); + conf = new OzoneConfiguration(); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); stateManager = new PipelineStateManager(conf); provider = new MockRatisPipelineProvider(nodeManager, stateManager, conf); @@ -75,8 +81,12 @@ private void createPipelineAndAssertions( // New pipeline should not overlap with the previous created pipeline assertTrue( intersection(pipeline.getNodes(), pipeline1.getNodes()) - .isEmpty()); + .size() < factor.getNumber()); + if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) { + assertNotEquals(pipeline.getNodeIdsHash(), pipeline1.getNodeIdsHash()); + } stateManager.addPipeline(pipeline1); + nodeManager.addPipeline(pipeline1); } @Test @@ -92,10 +102,9 @@ public void testCreatePipelineWithFactor() throws IOException { assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE, Pipeline.PipelineState.ALLOCATED); stateManager.addPipeline(pipeline1); - // New pipeline should overlap with the previous created pipeline, - // and one datanode should overlap between the two types. - assertEquals(1, - intersection(pipeline.getNodes(), pipeline1.getNodes()).size()); + // With enough pipeline quote on datanodes, they should not share + // the same set of datanodes. + assertNotEquals(pipeline.getNodeIdsHash(), pipeline1.getNodeIdsHash()); } @Test @@ -130,6 +139,49 @@ public void testCreatePipelineWithNodes() { Pipeline.PipelineState.OPEN); } + @Test + public void testComputeNodeIdsHash() { + int total = HddsProtos.ReplicationFactor.THREE.getNumber(); + List nodes1 = new ArrayList<>(); + for (int i = 0; i < total; i++) { + nodes1.add(TestUtils.createDatanodeDetails( + UUID.fromString("00000-11000-00000-00000-0000" + (i + 1)))); + } + + Assert.assertEquals(total, nodes1.size()); + Assert.assertNotEquals(0, + RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes1)); + + List nodes2 = new ArrayList<>(); + for (int i = 0; i < total; i++) { + nodes2.add(TestUtils.createDatanodeDetails( + UUID.fromString("00000-11000-00000-00000-0000" + (total - i)))); + } + Assert.assertEquals(total, nodes2.size()); + Assert.assertNotEquals(0, + RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes2)); + + Assert.assertEquals( + RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes1), + RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes2)); + } + + @Test + public void testCreateFactorTHREEPipelineWithSameDatanodes() { + List healthyNodes = nodeManager + .getNodes(HddsProtos.NodeState.HEALTHY).stream() + .limit(3).collect(Collectors.toList()); + + Pipeline pipeline1 = provider.create( + HddsProtos.ReplicationFactor.THREE, healthyNodes); + Pipeline pipeline2 = provider.create( + HddsProtos.ReplicationFactor.THREE, healthyNodes); + + Assert.assertTrue(pipeline1.getNodes().parallelStream() + .allMatch(pipeline2.getNodes()::contains)); + Assert.assertEquals(pipeline1.getNodeIdsHash(), pipeline2.getNodeIdsHash()); + } + @Test public void testCreatePipelinesDnExclude() throws IOException { List healthyNodes = @@ -141,7 +193,11 @@ public void testCreatePipelinesDnExclude() throws IOException { // Use up first 3 DNs for an open pipeline. List dns = healthyNodes.subList(0, 3); - addPipeline(dns, factor, Pipeline.PipelineState.OPEN, REPLICATION_TYPE); + for (int i = 0; i < conf.getInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, + OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT); i++) { + // Saturate pipeline counts on all the 1st 3 DNs. + addPipeline(dns, factor, Pipeline.PipelineState.OPEN, REPLICATION_TYPE); + } Set membersOfOpenPipelines = new HashSet<>(dns); // Use up next 3 DNs for a closed pipeline. @@ -160,7 +216,7 @@ public void testCreatePipelinesDnExclude() throws IOException { List nodes = pipeline.getNodes(); assertTrue( - "nodes of new pipeline cannot be from open pipelines", + "nodes of new pipeline cannot be all from open pipelines", nodes.stream().noneMatch(membersOfOpenPipelines::contains)); assertTrue( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 491e2893cf3..e6bf7a09d01 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; +import static org.apache.commons.collections.CollectionUtils.intersection; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; @@ -89,8 +91,10 @@ public void testPipelineReload() throws IOException { pipelineManager.getStateManager(), conf); pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, mockRatisProvider); + int pipelineNum = 5; + Set pipelines = new HashSet<>(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < pipelineNum; i++) { Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); @@ -112,6 +116,15 @@ public void testPipelineReload() throws IOException { List pipelineList = pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipelines, new HashSet<>(pipelineList)); + // All NodeIdsHash from original pipeline list + List originalPipelineHash = pipelineList.stream() + .map(Pipeline::getNodeIdsHash).collect(Collectors.toList()); + // All NodeIdsHash from reloaded pipeline list + List reloadedPipelineHash = pipelines.stream() + .map(Pipeline::getNodeIdsHash).collect(Collectors.toList()); + // Original NodeIdsHash list should contain same items from reloaded one. + Assert.assertEquals(pipelineNum, + intersection(originalPipelineHash, reloadedPipelineHash).size()); // clean up for (Pipeline pipeline : pipelines) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 7e8ec52bdf9..069844360a8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -168,11 +168,11 @@ public Set getPipelines(DatanodeDetails dnId) { /** * Get the count of pipelines a datanodes is associated with. - * @param dnId DatanodeDetails + * @param dn DatanodeDetails * @return The number of pipelines */ @Override - public int getPipelinesCount(DatanodeDetails dnId) { + public int getPipelinesCount(DatanodeDetails dn) { throw new UnsupportedOperationException("Not yet implemented"); }