diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java index 46816a63d349..2fc04e00f23b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java @@ -73,8 +73,17 @@ public class ScmConfig extends ReconfigurableConfig { + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. " + "The class decides which pipeline will be used to find or " + "allocate Ratis containers. If not set, " - + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms. " - + "RandomPipelineChoosePolicy will be used as default value." + + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms." + + "RandomPipelineChoosePolicy will be used as default value. " + + "The following values can be used, " + + "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms." + + "RandomPipelineChoosePolicy : random choose one pipeline. " + + "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms." + + "HealthyPipelineChoosePolicy : random choose one healthy pipeline. " + + "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms." + + "CapacityPipelineChoosePolicy : choose the pipeline with lower " + + "utilization from the two pipelines. Note that random choose " + + "method will be executed twice in this policy." ) private String pipelineChoosePolicyName; @@ -85,11 +94,20 @@ public class ScmConfig extends ReconfigurableConfig { tags = { ConfigTag.SCM, ConfigTag.PIPELINE }, description = "The full name of class which implements " - + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. " - + "The class decides which pipeline will be used when " - + "selecting an EC Pipeline. If not set, " - + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms. " - + "RandomPipelineChoosePolicy will be used as default value." + + "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. " + + "The class decides which pipeline will be used when " + + "selecting an EC Pipeline. If not set, " + + "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms." + + "RandomPipelineChoosePolicy will be used as default value. " + + "The following values can be used, " + + "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms." + + "RandomPipelineChoosePolicy : random choose one pipeline. " + + "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms." + + "HealthyPipelineChoosePolicy : random choose one healthy pipeline. " + + "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms." + + "CapacityPipelineChoosePolicy : choose the pipeline with lower " + + "utilization from the two pipelines. Note that random choose " + + "method will be executed twice in this policy." ) private String ecPipelineChoosePolicyName; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java similarity index 86% rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java index 76439a78464f..e1d0fdd35aa1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import java.util.List; @@ -26,6 +27,15 @@ */ public interface PipelineChoosePolicy { + /** + * Updates the policy with NodeManager. + * @return updated policy. + */ + default PipelineChoosePolicy init(final NodeManager nodeManager) { + // override if the policy requires nodeManager + return this; + } + /** * Given an initial list of pipelines, return one of the pipelines. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java index 330bf67416ae..094e535dcbd9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java @@ -23,7 +23,8 @@ /** * SCM Node Metric that is used in the placement classes. */ -public class SCMNodeMetric implements DatanodeMetric { +public class SCMNodeMetric implements DatanodeMetric, + Comparable { private SCMNodeStat stat; /** @@ -195,12 +196,12 @@ public void subtract(SCMNodeStat value) { * @throws ClassCastException if the specified object's type prevents it * from being compared to this object. */ - //@Override - public int compareTo(SCMNodeStat o) { - if (isEqual(o)) { + @Override + public int compareTo(SCMNodeMetric o) { + if (isEqual(o.get())) { return 0; } - if (isGreater(o)) { + if (isGreater(o.get())) { return 1; } else { return -1; @@ -225,4 +226,9 @@ public boolean equals(Object o) { public int hashCode() { return stat != null ? stat.hashCode() : 0; } + + @Override + public String toString() { + return "SCMNodeMetric{" + stat.toString() + '}'; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java index 2a848a04eff5..5456e6ee5273 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java @@ -174,4 +174,13 @@ public int hashCode() { return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get() ^ committed.get() ^ freeSpaceToSpare.get()); } + + @Override + public String toString() { + return "SCMNodeStat{" + + "capacity=" + capacity.get() + + ", scmUsed=" + scmUsed.get() + + ", remaining=" + remaining.get() + + '}'; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java new file mode 100644 index 000000000000..a95a473de6d4 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms; + +import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.PipelineRequestInformation; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Deque; +import java.util.List; +import java.util.Objects; + +/** + * Pipeline choose policy that randomly choose pipeline with relatively + * lower utilization. + *

+ * The Algorithm is as follows, Pick 2 random pipelines from a given pool of + * pipelines and then pick the pipeline which has lower utilization. + * This leads to a higher probability of pipelines with lower utilization + * to be picked. + *

+ * For those wondering why we choose two pipelines randomly and choose the + * pipeline with lower utilization. There are links to this original papers in + * HDFS-11564. + * Also, the same algorithm applies to SCMContainerPlacementCapacity. + *

+ */ +public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy { + + private static final Logger LOG = + LoggerFactory.getLogger(PipelineChoosePolicy.class); + + private NodeManager nodeManager; + + private final PipelineChoosePolicy healthPolicy; + + public CapacityPipelineChoosePolicy() { + healthPolicy = new HealthyPipelineChoosePolicy(); + } + + @Override + public PipelineChoosePolicy init(final NodeManager scmNodeManager) { + this.nodeManager = scmNodeManager; + return this; + } + + @Override + public Pipeline choosePipeline(List pipelineList, + PipelineRequestInformation pri) { + Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri); + Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri); + + int result = new CapacityPipelineComparator(this) + .compare(pipeline1, pipeline2); + + LOG.debug("Chosen the {} pipeline", result <= 0 ? "first" : "second"); + return result <= 0 ? pipeline1 : pipeline2; + } + + @Override + public int choosePipelineIndex(List pipelineList, + PipelineRequestInformation pri) { + List mutableList = new ArrayList<>(pipelineList); + Pipeline pipeline = choosePipeline(mutableList, pri); + return pipelineList.indexOf(pipeline); + } + + /** + * Return a list of SCMNodeMetrics corresponding to the DataNodes in the + * pipeline, sorted in descending order based on scm used storage. + * @param pipeline pipeline + * @return sorted SCMNodeMetrics corresponding the pipeline + */ + private Deque getSortedNodeFromPipeline(Pipeline pipeline) { + Deque sortedNodeStack = new ArrayDeque<>(); + pipeline.getNodes().stream() + .map(nodeManager::getNodeStat) + .filter(Objects::nonNull) + .sorted() + .forEach(sortedNodeStack::push); + return sortedNodeStack; + } + + static class CapacityPipelineComparator implements Comparator { + private final CapacityPipelineChoosePolicy policy; + + CapacityPipelineComparator(CapacityPipelineChoosePolicy policy) { + this.policy = policy; + } + @Override + public int compare(Pipeline p1, Pipeline p2) { + if (p1.getId().equals(p2.getId())) { + LOG.debug("Compare the same pipeline {}", p1); + return 0; + } + Deque sortedNodes1 = policy.getSortedNodeFromPipeline(p1); + Deque sortedNodes2 = policy.getSortedNodeFromPipeline(p2); + + // Compare the scmUsed weight of the node in the two sorted node stacks + LOG.debug("Compare scmUsed weight in pipelines, first : {}, second : {}", + sortedNodes1, sortedNodes2); + int result = 0; + int count = 0; + while (result == 0 && + !sortedNodes1.isEmpty() && !sortedNodes2.isEmpty()) { + count++; + LOG.debug("Compare {} round", count); + result = sortedNodes1.pop().compareTo(sortedNodes2.pop()); + } + return result; + } + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java index d040dbe2bcaf..90736a018132 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,14 +49,14 @@ public final class PipelineChoosePolicyFactory { private PipelineChoosePolicyFactory() { } - public static PipelineChoosePolicy getPolicy( + public static PipelineChoosePolicy getPolicy(final NodeManager nodeManager, ScmConfig scmConfig, boolean forEC) throws SCMException { Class policyClass = null; String policyName = forEC ? scmConfig.getECPipelineChoosePolicyName() : scmConfig.getPipelineChoosePolicyName(); try { policyClass = getClass(policyName, PipelineChoosePolicy.class); - return createPipelineChoosePolicyFromClass(policyClass); + return createPipelineChoosePolicyFromClass(nodeManager, policyClass); } catch (Exception e) { Class defaultPolicy = forEC ? OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT : @@ -64,13 +65,14 @@ public static PipelineChoosePolicy getPolicy( LOG.error("Met an exception while create pipeline choose policy " + "for the given class {}. Fallback to the default pipeline " + " choose policy {}", policyName, defaultPolicy, e); - return createPipelineChoosePolicyFromClass(defaultPolicy); + return createPipelineChoosePolicyFromClass(nodeManager, defaultPolicy); } throw e; } } private static PipelineChoosePolicy createPipelineChoosePolicyFromClass( + final NodeManager nodeManager, Class policyClass) throws SCMException { Constructor constructor; try { @@ -86,7 +88,7 @@ private static PipelineChoosePolicy createPipelineChoosePolicyFromClass( } try { - return constructor.newInstance(); + return constructor.newInstance().init(nodeManager); } catch (Exception e) { throw new RuntimeException("Failed to instantiate class " + policyClass.getCanonicalName() + " for " + e.getMessage()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 1a3ea2515f2d..046be68760c6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -804,9 +804,9 @@ private void initializeSystemManagers(OzoneConfiguration conf, ScmConfig scmConfig = conf.getObject(ScmConfig.class); pipelineChoosePolicy = PipelineChoosePolicyFactory - .getPolicy(scmConfig, false); + .getPolicy(scmNodeManager, scmConfig, false); ecPipelineChoosePolicy = PipelineChoosePolicyFactory - .getPolicy(scmConfig, true); + .getPolicy(scmNodeManager, scmConfig, true); if (configurator.getWritableContainerFactory() != null) { writableContainerFactory = configurator.getWritableContainerFactory(); } else { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java index 54d2ffed8284..4f86450d03e7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java @@ -34,7 +34,11 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub; import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; +import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; +import org.apache.hadoop.hdds.scm.net.NodeSchema; +import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig; +import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.CapacityPipelineChoosePolicy; import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy; import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy; import org.apache.hadoop.hdds.utils.db.DBStore; @@ -54,8 +58,13 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES; +import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -84,7 +93,7 @@ public class TestWritableECContainerProvider { private OzoneConfiguration conf; private DBStore dbStore; private SCMHAManager scmhaManager; - private MockNodeManager nodeManager; + private static MockNodeManager nodeManager; private WritableContainerProvider provider; private ECReplicationConfig repConfig; @@ -93,8 +102,20 @@ public class TestWritableECContainerProvider { public static Collection policies() { Collection policies = new ArrayList<>(); + // init nodeManager + NodeSchemaManager.getInstance().init(new NodeSchema[] + {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}, true); + NetworkTopologyImpl cluster = + new NetworkTopologyImpl(NodeSchemaManager.getInstance()); + int count = 10; + List datanodes = IntStream.range(0, count) + .mapToObj(i -> MockDatanodeDetails.randomDatanodeDetails()) + .collect(Collectors.toList()); + nodeManager = new MockNodeManager(cluster, datanodes, false, count); + policies.add(new RandomPipelineChoosePolicy()); policies.add(new HealthyPipelineChoosePolicy()); + policies.add(new CapacityPipelineChoosePolicy().init(nodeManager)); return policies; } @@ -110,7 +131,6 @@ void setup(@TempDir File testDir) throws IOException { dbStore = DBStoreBuilder.createDBStore( conf, new SCMDBDefinition()); scmhaManager = SCMHAManagerStub.getInstance(true); - nodeManager = new MockNodeManager(true, 10); pipelineManager = new MockPipelineManager(dbStore, scmhaManager, nodeManager); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java new file mode 100644 index 000000000000..421d2396bfaf --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; +import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test for the capacity pipeline choose policy. + */ +public class TestCapacityPipelineChoosePolicy { + + @Test + public void testChoosePipeline() throws Exception { + + // given 4 datanode + List datanodes = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + datanodes.add(MockDatanodeDetails.randomDatanodeDetails()); + } + // dn0 dn1 dn2 dn3 + // used 0 10 20 30 + NodeManager mockNodeManager = mock(NodeManager.class); + when(mockNodeManager.getNodeStat(datanodes.get(0))) + .thenReturn(new SCMNodeMetric(100L, 0, 100L, 0, 0)); + when(mockNodeManager.getNodeStat(datanodes.get(1))) + .thenReturn(new SCMNodeMetric(100L, 10L, 90L, 0, 0)); + when(mockNodeManager.getNodeStat(datanodes.get(2))) + .thenReturn(new SCMNodeMetric(100L, 20L, 80L, 0, 0)); + when(mockNodeManager.getNodeStat(datanodes.get(3))) + .thenReturn(new SCMNodeMetric(100L, 30L, 70L, 0, 0)); + + PipelineChoosePolicy policy = new CapacityPipelineChoosePolicy().init(mockNodeManager); + + // generate 4 pipelines, and every pipeline has 3 datanodes + // + // pipeline0 dn1 dn2 dn3 + // pipeline1 dn0 dn2 dn3 + // pipeline2 dn0 dn1 dn3 + // pipeline3 dn0 dn1 dn2 + // + // In the above scenario, pipeline0 vs pipeline1 runs through three rounds + // of comparisons, (dn3 <-> dn3) -> (dn2 <-> dn2 ) -> (dn1 <-> dn0), + // finally comparing dn0 and dn1, and dn0 wins, so pipeline1 is selected. + // + List pipelines = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + List dns = new ArrayList<>(); + for (int j = 0; j < datanodes.size(); j++) { + if (i != j) { + dns.add(datanodes.get(j)); + } + } + Pipeline pipeline = MockPipeline.createPipeline(dns); + MockRatisPipelineProvider.markPipelineHealthy(pipeline); + pipelines.add(pipeline); + } + + Map selectedCount = new HashMap<>(); + for (Pipeline pipeline : pipelines) { + selectedCount.put(pipeline, 0); + } + for (int i = 0; i < 1000; i++) { + // choosePipeline + Pipeline pipeline = policy.choosePipeline(pipelines, null); + assertNotNull(pipeline); + selectedCount.put(pipeline, selectedCount.get(pipeline) + 1); + } + + // The selected count from most to least should be : + // pipeline3 > pipeline2 > pipeline1 > pipeline0 + assertThat(selectedCount.get(pipelines.get(3))).isGreaterThan(selectedCount.get(pipelines.get(2))); + assertThat(selectedCount.get(pipelines.get(2))).isGreaterThan(selectedCount.get(pipelines.get(1))); + assertThat(selectedCount.get(pipelines.get(1))).isGreaterThan(selectedCount.get(pipelines.get(0))); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java index 7d0a72ed2fb8..82fed5953aa8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestPipelineChoosePolicyFactory.java @@ -21,7 +21,9 @@ import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; import org.apache.hadoop.hdds.scm.PipelineRequestInformation; import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -42,17 +44,20 @@ public class TestPipelineChoosePolicyFactory { private ScmConfig scmConfig; + private NodeManager nodeManager; + @BeforeEach public void setup() { //initialize network topology instance conf = new OzoneConfiguration(); scmConfig = conf.getObject(ScmConfig.class); + nodeManager = new MockNodeManager(true, 5); } @Test public void testDefaultPolicy() throws IOException { PipelineChoosePolicy policy = PipelineChoosePolicyFactory - .getPolicy(scmConfig, false); + .getPolicy(nodeManager, scmConfig, false); assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT, policy.getClass()); } @@ -60,7 +65,7 @@ public void testDefaultPolicy() throws IOException { @Test public void testDefaultPolicyEC() throws IOException { PipelineChoosePolicy policy = PipelineChoosePolicyFactory - .getPolicy(scmConfig, true); + .getPolicy(nodeManager, scmConfig, true); assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT, policy.getClass()); } @@ -69,7 +74,7 @@ public void testDefaultPolicyEC() throws IOException { public void testNonDefaultPolicyEC() throws IOException { scmConfig.setECPipelineChoosePolicyName(DummyGoodImpl.class.getName()); PipelineChoosePolicy policy = PipelineChoosePolicyFactory - .getPolicy(scmConfig, true); + .getPolicy(nodeManager, scmConfig, true); assertSame(DummyGoodImpl.class, policy.getClass()); } @@ -121,10 +126,10 @@ public void testConstructorNotFound() throws SCMException { scmConfig.setPipelineChoosePolicyName(DummyImpl.class.getName()); scmConfig.setECPipelineChoosePolicyName(DummyImpl.class.getName()); PipelineChoosePolicy policy = - PipelineChoosePolicyFactory.getPolicy(scmConfig, false); + PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig, false); assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT, policy.getClass()); - policy = PipelineChoosePolicyFactory.getPolicy(scmConfig, true); + policy = PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig, true); assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT, policy.getClass()); } @@ -137,10 +142,10 @@ public void testClassNotImplemented() throws SCMException { scmConfig.setECPipelineChoosePolicyName( "org.apache.hadoop.hdds.scm.pipeline.choose.policy.HelloWorld"); PipelineChoosePolicy policy = - PipelineChoosePolicyFactory.getPolicy(scmConfig, false); + PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig, false); assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT, policy.getClass()); - policy = PipelineChoosePolicyFactory.getPolicy(scmConfig, true); + policy = PipelineChoosePolicyFactory.getPolicy(nodeManager, scmConfig, true); assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT, policy.getClass()); }