Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,17 @@ public class ScmConfig extends ReconfigurableConfig {
+ "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+ "The class decides which pipeline will be used to find or "
+ "allocate Ratis containers. If not set, "
+ "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms. "
+ "RandomPipelineChoosePolicy will be used as default value."
+ "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ "RandomPipelineChoosePolicy will be used as default value. "
+ "The following values can be used, "
+ "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ "RandomPipelineChoosePolicy : random choose one pipeline. "
+ "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ "HealthyPipelineChoosePolicy : random choose one healthy pipeline. "
+ "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ "CapacityPipelineChoosePolicy : choose the pipeline with lower "
+ "utilization from the two pipelines. Note that random choose "
+ "method will be executed twice in this policy."
)
private String pipelineChoosePolicyName;

Expand All @@ -85,11 +94,20 @@ public class ScmConfig extends ReconfigurableConfig {
tags = { ConfigTag.SCM, ConfigTag.PIPELINE },
description =
"The full name of class which implements "
+ "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+ "The class decides which pipeline will be used when "
+ "selecting an EC Pipeline. If not set, "
+ "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms. "
+ "RandomPipelineChoosePolicy will be used as default value."
+ "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+ "The class decides which pipeline will be used when "
+ "selecting an EC Pipeline. If not set, "
+ "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ "RandomPipelineChoosePolicy will be used as default value. "
+ "The following values can be used, "
+ "(1) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ "RandomPipelineChoosePolicy : random choose one pipeline. "
+ "(2) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ "HealthyPipelineChoosePolicy : random choose one healthy pipeline. "
+ "(3) org.apache.hadoop.hdds.scm.pipeline.choose.algorithms."
+ "CapacityPipelineChoosePolicy : choose the pipeline with lower "
+ "utilization from the two pipelines. Note that random choose "
+ "method will be executed twice in this policy."
)
private String ecPipelineChoosePolicyName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hdds.scm;

import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;

import java.util.List;
Expand All @@ -26,6 +27,15 @@
*/
public interface PipelineChoosePolicy {

/**
* Updates the policy with NodeManager.
* @return updated policy.
*/
default PipelineChoosePolicy init(final NodeManager nodeManager) {
// override if the policy requires nodeManager
return this;
}
Comment on lines +34 to +37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!


/**
* Given an initial list of pipelines, return one of the pipelines.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
/**
* SCM Node Metric that is used in the placement classes.
*/
public class SCMNodeMetric implements DatanodeMetric<SCMNodeStat, Long> {
public class SCMNodeMetric implements DatanodeMetric<SCMNodeStat, Long>,
Comparable<SCMNodeMetric> {
private SCMNodeStat stat;

/**
Expand Down Expand Up @@ -195,12 +196,12 @@ public void subtract(SCMNodeStat value) {
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this object.
*/
//@Override
public int compareTo(SCMNodeStat o) {
if (isEqual(o)) {
@Override
public int compareTo(SCMNodeMetric o) {
if (isEqual(o.get())) {
return 0;
}
if (isGreater(o)) {
if (isGreater(o.get())) {
return 1;
} else {
return -1;
Expand All @@ -225,4 +226,9 @@ public boolean equals(Object o) {
public int hashCode() {
return stat != null ? stat.hashCode() : 0;
}

@Override
public String toString() {
return "SCMNodeMetric{" + stat.toString() + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,13 @@ public int hashCode() {
return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get() ^
committed.get() ^ freeSpaceToSpare.get());
}

@Override
public String toString() {
return "SCMNodeStat{" +
"capacity=" + capacity.get() +
", scmUsed=" + scmUsed.get() +
", remaining=" + remaining.get() +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;

import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Objects;

/**
* Pipeline choose policy that randomly choose pipeline with relatively
* lower utilization.
* <p>
* The Algorithm is as follows, Pick 2 random pipelines from a given pool of
* pipelines and then pick the pipeline which has lower utilization.
* This leads to a higher probability of pipelines with lower utilization
* to be picked.
* <p>
* For those wondering why we choose two pipelines randomly and choose the
* pipeline with lower utilization. There are links to this original papers in
* HDFS-11564.
* Also, the same algorithm applies to SCMContainerPlacementCapacity.
* <p>
*/
public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {

private static final Logger LOG =
LoggerFactory.getLogger(PipelineChoosePolicy.class);

private NodeManager nodeManager;

private final PipelineChoosePolicy healthPolicy;

public CapacityPipelineChoosePolicy() {
healthPolicy = new HealthyPipelineChoosePolicy();
}

@Override
public PipelineChoosePolicy init(final NodeManager scmNodeManager) {
this.nodeManager = scmNodeManager;
return this;
}

@Override
public Pipeline choosePipeline(List<Pipeline> pipelineList,
PipelineRequestInformation pri) {
Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some Cluster, There's maybe close hundred pipelines. We just compare two Pipeline in here.
Does this make the probability of the largest (in capacity) Pipeline being selected low?

Perhaps a possible solution is to add a configuration that determines how many Pipelines are compared at a time, which takes the value [0, 1]

  • When it is 0, only one Pipeline is selected at a time, which is basically equivalent to the RandomPipelineChoosePolicy.
  • When 1, it compares all Pipelines, and strictly chooses the largest Pipeline in the whole world.

PS: But even if this feature needs to be implemented, I think it can be done in another PR, and when this PR is merged, the current solution will work in a small cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some Cluster, There's maybe close hundred pipelines. We just compare two Pipeline in here. Does this make the probability of the largest (in capacity) Pipeline being selected low?

Perhaps a possible solution is to add a configuration that determines how many Pipelines are compared at a time, which takes the value [0, 1]

  • When it is 0, only one Pipeline is selected at a time, which is basically equivalent to the RandomPipelineChoosePolicy.
  • When 1, it compares all Pipelines, and strictly chooses the largest Pipeline in the whole world.

PS: But even if this feature needs to be implemented, I think it can be done in another PR, and when this PR is merged, the current solution will work in a small cluster.

@xichen01 Thanks for review ! About the logic of selection, there are links to this original papers in HDFS-11564. The algorithms of choosing 2 random nodes and then placing the container on the lower utilization node is discussed in great depth in this survey paper.
https://pdfs.semanticscholar.org/3597/66cb47572028eb70c797115e987ff203e83f.pdf
In addition, SCMContainerPlacementCapacity#chooseNode also uses this algorithm. So, I wonder if it is not necessary to find the pipeline with minimum storage every time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any test result for this algo comparing with random healthy node policy? just to see effectivness of algo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xichen01 Thanks for review ! About the logic of selection, there are links to this original papers in HDFS-11564. The algorithms of choosing 2 random nodes and then placing the container on the lower utilization node is discussed in great depth in this survey paper.
https://pdfs.semanticscholar.org/3597/66cb47572028eb70c797115e987ff203e83f.pdf
In addition, SCMContainerPlacementCapacity#chooseNode also uses this algorithm. So, I wonder if it is not necessary to find the pipeline with minimum storage every time?

@whbing Understood. For a fairly balanced cluster, such as a new one, this strategy can work very well, providing similar loads to all DataNodes.
However, for a significantly unbalanced cluster, like when adding new nodes, this strategy might be limited, especially in larger clusters.
But for the latter case (adding new nodes), we can also balance it using a balancer

Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);

int result = new CapacityPipelineComparator(this)
.compare(pipeline1, pipeline2);

LOG.debug("Chosen the {} pipeline", result <= 0 ? "first" : "second");
return result <= 0 ? pipeline1 : pipeline2;
}

@Override
public int choosePipelineIndex(List<Pipeline> pipelineList,
PipelineRequestInformation pri) {
List<Pipeline> mutableList = new ArrayList<>(pipelineList);
Pipeline pipeline = choosePipeline(mutableList, pri);
return pipelineList.indexOf(pipeline);
}

/**
* Return a list of SCMNodeMetrics corresponding to the DataNodes in the
* pipeline, sorted in descending order based on scm used storage.
* @param pipeline pipeline
* @return sorted SCMNodeMetrics corresponding the pipeline
*/
private Deque<SCMNodeMetric> getSortedNodeFromPipeline(Pipeline pipeline) {
Deque<SCMNodeMetric> sortedNodeStack = new ArrayDeque<>();
pipeline.getNodes().stream()
.map(nodeManager::getNodeStat)
.filter(Objects::nonNull)
.sorted()
.forEach(sortedNodeStack::push);
return sortedNodeStack;
}

static class CapacityPipelineComparator implements Comparator<Pipeline> {
private final CapacityPipelineChoosePolicy policy;

CapacityPipelineComparator(CapacityPipelineChoosePolicy policy) {
this.policy = policy;
}
@Override
public int compare(Pipeline p1, Pipeline p2) {
if (p1.getId().equals(p2.getId())) {
LOG.debug("Compare the same pipeline {}", p1);
return 0;
}
Deque<SCMNodeMetric> sortedNodes1 = policy.getSortedNodeFromPipeline(p1);
Deque<SCMNodeMetric> sortedNodes2 = policy.getSortedNodeFromPipeline(p2);

// Compare the scmUsed weight of the node in the two sorted node stacks
LOG.debug("Compare scmUsed weight in pipelines, first : {}, second : {}",
sortedNodes1, sortedNodes2);
int result = 0;
int count = 0;
while (result == 0 &&
!sortedNodes1.isEmpty() && !sortedNodes2.isEmpty()) {
count++;
LOG.debug("Compare {} round", count);
result = sortedNodes1.pop().compareTo(sortedNodes2.pop());
}
return result;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,14 +49,14 @@ public final class PipelineChoosePolicyFactory {
private PipelineChoosePolicyFactory() {
}

public static PipelineChoosePolicy getPolicy(
public static PipelineChoosePolicy getPolicy(final NodeManager nodeManager,
ScmConfig scmConfig, boolean forEC) throws SCMException {
Class<? extends PipelineChoosePolicy> policyClass = null;
String policyName = forEC ? scmConfig.getECPipelineChoosePolicyName() :
scmConfig.getPipelineChoosePolicyName();
try {
policyClass = getClass(policyName, PipelineChoosePolicy.class);
return createPipelineChoosePolicyFromClass(policyClass);
return createPipelineChoosePolicyFromClass(nodeManager, policyClass);
} catch (Exception e) {
Class<? extends PipelineChoosePolicy> defaultPolicy = forEC ?
OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT :
Expand All @@ -64,13 +65,14 @@ public static PipelineChoosePolicy getPolicy(
LOG.error("Met an exception while create pipeline choose policy "
+ "for the given class {}. Fallback to the default pipeline "
+ " choose policy {}", policyName, defaultPolicy, e);
return createPipelineChoosePolicyFromClass(defaultPolicy);
return createPipelineChoosePolicyFromClass(nodeManager, defaultPolicy);
}
throw e;
}
}

private static PipelineChoosePolicy createPipelineChoosePolicyFromClass(
final NodeManager nodeManager,
Class<? extends PipelineChoosePolicy> policyClass) throws SCMException {
Constructor<? extends PipelineChoosePolicy> constructor;
try {
Expand All @@ -86,7 +88,7 @@ private static PipelineChoosePolicy createPipelineChoosePolicyFromClass(
}

try {
return constructor.newInstance();
return constructor.newInstance().init(nodeManager);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate class " +
policyClass.getCanonicalName() + " for " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,9 +804,9 @@ private void initializeSystemManagers(OzoneConfiguration conf,

ScmConfig scmConfig = conf.getObject(ScmConfig.class);
pipelineChoosePolicy = PipelineChoosePolicyFactory
.getPolicy(scmConfig, false);
.getPolicy(scmNodeManager, scmConfig, false);
ecPipelineChoosePolicy = PipelineChoosePolicyFactory
.getPolicy(scmConfig, true);
.getPolicy(scmNodeManager, scmConfig, true);
if (configurator.getWritableContainerFactory() != null) {
writableContainerFactory = configurator.getWritableContainerFactory();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.CapacityPipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.RandomPipelineChoosePolicy;
import org.apache.hadoop.hdds.utils.db.DBStore;
Expand All @@ -54,8 +58,13 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -84,7 +93,7 @@ public class TestWritableECContainerProvider {
private OzoneConfiguration conf;
private DBStore dbStore;
private SCMHAManager scmhaManager;
private MockNodeManager nodeManager;
private static MockNodeManager nodeManager;
private WritableContainerProvider<ECReplicationConfig> provider;
private ECReplicationConfig repConfig;

Expand All @@ -93,8 +102,20 @@ public class TestWritableECContainerProvider {

public static Collection<PipelineChoosePolicy> policies() {
Collection<PipelineChoosePolicy> policies = new ArrayList<>();
// init nodeManager
NodeSchemaManager.getInstance().init(new NodeSchema[]
{ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}, true);
NetworkTopologyImpl cluster =
new NetworkTopologyImpl(NodeSchemaManager.getInstance());
int count = 10;
List<DatanodeDetails> datanodes = IntStream.range(0, count)
.mapToObj(i -> MockDatanodeDetails.randomDatanodeDetails())
.collect(Collectors.toList());
nodeManager = new MockNodeManager(cluster, datanodes, false, count);

policies.add(new RandomPipelineChoosePolicy());
policies.add(new HealthyPipelineChoosePolicy());
policies.add(new CapacityPipelineChoosePolicy().init(nodeManager));
return policies;
}

Expand All @@ -110,7 +131,6 @@ void setup(@TempDir File testDir) throws IOException {
dbStore = DBStoreBuilder.createDBStore(
conf, new SCMDBDefinition());
scmhaManager = SCMHAManagerStub.getInstance(true);
nodeManager = new MockNodeManager(true, 10);
pipelineManager =
new MockPipelineManager(dbStore, scmhaManager, nodeManager);

Expand Down
Loading