Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdds.scm.container.balancer;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
Expand All @@ -29,70 +30,66 @@
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;

/**
* Find a target giving preference to more under-utilized nodes.
* Find a target for a source datanode with greedy strategy.
*/
public class FindTargetGreedy implements FindTargetStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(FindTargetGreedy.class);

public abstract class AbstractFindTargetGreedy implements FindTargetStrategy {
private Logger logger;
private ContainerManager containerManager;
private PlacementPolicy placementPolicy;
private Map<DatanodeDetails, Long> sizeEnteringNode;
private NodeManager nodeManager;
private ContainerBalancerConfiguration config;
private Double upperLimit;
private TreeSet<DatanodeUsageInfo> potentialTargets;
private Collection<DatanodeUsageInfo> potentialTargets;

public FindTargetGreedy(
protected AbstractFindTargetGreedy(
ContainerManager containerManager,
PlacementPolicy placementPolicy,
NodeManager nodeManager) {
sizeEnteringNode = new HashMap<>();
this.containerManager = containerManager;
this.placementPolicy = placementPolicy;
this.nodeManager = nodeManager;
}

potentialTargets = new TreeSet<>((a, b) -> {
double currentUsageOfA = a.calculateUtilization(
sizeEnteringNode.get(a.getDatanodeDetails()));
double currentUsageOfB = b.calculateUtilization(
sizeEnteringNode.get(b.getDatanodeDetails()));
int ret = Double.compare(currentUsageOfA, currentUsageOfB);
if (ret != 0) {
return ret;
}
UUID uuidA = a.getDatanodeDetails().getUuid();
UUID uuidB = b.getDatanodeDetails().getUuid();
return uuidA.compareTo(uuidB);
});
protected void setLogger(Logger log) {
logger = log;
}

protected void setPotentialTargets(Collection<DatanodeUsageInfo> pt) {
potentialTargets = pt;
}

private void setUpperLimit(Double upperLimit){
this.upperLimit = upperLimit;
}

private void setPotentialTargets(
List<DatanodeUsageInfo> potentialTargetDataNodes) {
sizeEnteringNode.clear();
potentialTargetDataNodes.forEach(
p -> sizeEnteringNode.put(p.getDatanodeDetails(), 0L));
potentialTargets.clear();
potentialTargets.addAll(potentialTargetDataNodes);
protected int compareByUsage(DatanodeUsageInfo a, DatanodeUsageInfo b) {
double currentUsageOfA = a.calculateUtilization(
sizeEnteringNode.get(a.getDatanodeDetails()));
double currentUsageOfB = b.calculateUtilization(
sizeEnteringNode.get(b.getDatanodeDetails()));
int ret = Double.compare(currentUsageOfA, currentUsageOfB);
if (ret != 0) {
return ret;
}
UUID uuidA = a.getDatanodeDetails().getUuid();
UUID uuidB = b.getDatanodeDetails().getUuid();
return uuidA.compareTo(uuidB);
}

private void setConfiguration(ContainerBalancerConfiguration conf) {
this.config = conf;
config = conf;
}

/**
Expand All @@ -109,6 +106,7 @@ private void setConfiguration(ContainerBalancerConfiguration conf) {
@Override
public ContainerMoveSelection findTargetForContainerMove(
DatanodeDetails source, Set<ContainerID> candidateContainers) {
sortTargetForSource(source);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorting would happen every time function is called. I think we can optimise for same source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, will do this optimization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after a deep thought, i think we have to sort potentialTargets every time , even for same source. after a certain target is selected and a move option is scheduled to it, sizeEntering of it will increase, and thus the utilization will increase. so when choosing a target for even the same source, if two candidate target has the same network topology distance to the source , the priority may change according to current usageinfo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the target and re-add it I think. But we can do this in a separate PR.

for (DatanodeUsageInfo targetInfo : potentialTargets) {
DatanodeDetails target = targetInfo.getDatanodeDetails();
for (ContainerID container : candidateContainers) {
Expand All @@ -118,7 +116,7 @@ public ContainerMoveSelection findTargetForContainerMove(
replicas = containerManager.getContainerReplicas(container);
containerInfo = containerManager.getContainer(container);
} catch (ContainerNotFoundException e) {
LOG.warn("Could not get Container {} from Container Manager for " +
logger.warn("Could not get Container {} from Container Manager for " +
"obtaining replicas in Container Balancer.", container, e);
continue;
}
Expand All @@ -132,8 +130,8 @@ public ContainerMoveSelection findTargetForContainerMove(
}
}
}
LOG.info("Container Balancer could not find a target for source datanode " +
"{}", source.getUuidString());
logger.info("Container Balancer could not find a target for " +
"source datanode {}", source.getUuidString());
return null;
}

Expand All @@ -153,7 +151,7 @@ private boolean containerMoveSatisfiesPlacementPolicy(
try {
containerInfo = containerManager.getContainer(containerID);
} catch (ContainerNotFoundException e) {
LOG.warn("Could not get Container {} from Container Manager while " +
logger.warn("Could not get Container {} from Container Manager while " +
"checking if container move satisfies placement policy in " +
"Container Balancer.", containerID.toString(), e);
return false;
Expand Down Expand Up @@ -212,7 +210,7 @@ public void increaseSizeEntering(DatanodeDetails target, long size) {
}
return;
}
LOG.warn("Cannot find {} in the candidates target nodes",
logger.warn("Cannot find {} in the candidates target nodes",
target.getUuid());
}

Expand All @@ -225,6 +223,23 @@ public void reInitialize(List<DatanodeUsageInfo> potentialDataNodes,
Double upLimit) {
setConfiguration(conf);
setUpperLimit(upLimit);
setPotentialTargets(potentialDataNodes);
sizeEnteringNode.clear();
potentialDataNodes.forEach(
p -> sizeEnteringNode.put(p.getDatanodeDetails(), 0L));
potentialTargets.clear();
potentialTargets.addAll(potentialDataNodes);
}

@VisibleForTesting
public Collection<DatanodeUsageInfo> getPotentialTargets() {
return potentialTargets;
}

/**
* sort potentialTargets for specified source datanode according to
* network topology if enabled.
* @param source the specified source datanode
*/
@VisibleForTesting
public abstract void sortTargetForSource(DatanodeDetails source);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
Expand Down Expand Up @@ -85,6 +86,8 @@ public class ContainerBalancer {
private long clusterUsed;
private long clusterRemaining;
private double clusterAvgUtilisation;
private PlacementPolicy placementPolicy;
private NetworkTopology networkTopology;
private double upperLimit;
private double lowerLimit;
private volatile boolean balancerRunning;
Expand Down Expand Up @@ -115,6 +118,7 @@ public ContainerBalancer(
ReplicationManager replicationManager,
OzoneConfiguration ozoneConfiguration,
final SCMContext scmContext,
NetworkTopology networkTopology,
PlacementPolicy placementPolicy) {
this.nodeManager = nodeManager;
this.containerManager = containerManager;
Expand All @@ -129,10 +133,10 @@ public ContainerBalancer(
this.underUtilizedNodes = new ArrayList<>();
this.withinThresholdUtilizedNodes = new ArrayList<>();
this.unBalancedNodes = new ArrayList<>();
this.placementPolicy = placementPolicy;
this.networkTopology = networkTopology;

this.lock = new ReentrantLock();
findTargetStrategy = new FindTargetGreedy(
containerManager, placementPolicy, nodeManager);
findSourceStrategy = new FindSourceGreedy(nodeManager);
}

Expand Down Expand Up @@ -179,6 +183,13 @@ private void balance() {
this.maxDatanodesRatioToInvolvePerIteration =
config.getMaxDatanodesRatioToInvolvePerIteration();
this.maxSizeToMovePerIteration = config.getMaxSizeToMovePerIteration();
if (config.getNetworkTopologyEnable()) {
findTargetStrategy = new FindTargetGreedyByNetworkTopology(
containerManager, placementPolicy, nodeManager, networkTopology);
} else {
findTargetStrategy = new FindTargetGreedyByUsageInfo(containerManager,
placementPolicy, nodeManager);
}
for (int i = 0; i < idleIteration && balancerRunning; i++) {
// stop balancing if iteration is not initialized
if (!initializeIteration()) {
Expand Down Expand Up @@ -367,11 +378,6 @@ private IterationResult doIteration() {
try {
// match each overUtilized node with a target
while (true) {
DatanodeDetails source =
findSourceStrategy.getNextCandidateSourceDataNode();
if (source == null) {
break;
}
if (!isBalancerRunning()) {
return IterationResult.ITERATION_INTERRUPTED;
}
Expand All @@ -381,6 +387,12 @@ private IterationResult doIteration() {
return result;
}

DatanodeDetails source =
findSourceStrategy.getNextCandidateSourceDataNode();
if (source == null) {
break;
}

ContainerMoveSelection moveSelection = matchSourceWithTarget(source);
if (moveSelection != null) {
isMoveGenerated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,19 @@ public final class ContainerBalancerConfiguration {
private String includeNodes = "";

@Config(key = "exclude.datanodes", type = ConfigType.STRING, defaultValue =
"", tags = ConfigTag.BALANCER, description = "A list of Datanode " +
"", tags = {ConfigTag.BALANCER}, description = "A list of Datanode " +
"hostnames or ip addresses separated by commas. The Datanodes specified" +
" in this list are excluded from balancing. This configuration is empty" +
" by default.")
private String excludeNodes = "";

@Config(key = "move.networkTopology.enable", type = ConfigType.BOOLEAN,
defaultValue = "false", tags = {ConfigTag.BALANCER},
description = "whether to take network topology into account when " +
"selecting a target for a source. " +
"This configuration is false by default.")
private boolean networkTopologyEnable = false;

private DUFactory.Conf duConf;

/**
Expand Down Expand Up @@ -196,6 +203,24 @@ public void setIdleIteration(int count) {
this.idleIterations = count;
}

/**
* Get the NetworkTopologyEnable value for Container Balancer.
*
* @return the boolean value of networkTopologyEnable
*/
public Boolean getNetworkTopologyEnable() {
return networkTopologyEnable;
}

/**
* Set the NetworkTopologyEnable value for Container Balancer.
*
* @param enable the boolean value to be set to networkTopologyEnable
*/
public void setNetworkTopologyEnable(Boolean enable) {
networkTopologyEnable = enable;
}

/**
* Gets the ratio of maximum number of datanodes that will be involved in
* balancing by Container Balancer in one iteration to the total number of
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.container.balancer;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.slf4j.LoggerFactory;


import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

/**
* an implementation of FindTargetGreedy, which will always select the
* target with the shortest distance according to network topology
* distance to the give source datanode.
*/
public class FindTargetGreedyByNetworkTopology
extends AbstractFindTargetGreedy {

private NetworkTopology networkTopology;
private List potentialTargets;

public FindTargetGreedyByNetworkTopology(
ContainerManager containerManager,
PlacementPolicy placementPolicy,
NodeManager nodeManager,
NetworkTopology networkTopology) {
super(containerManager, placementPolicy, nodeManager);
setLogger(LoggerFactory.getLogger(FindTargetGreedyByNetworkTopology.class));
potentialTargets = new LinkedList<>();
setPotentialTargets(potentialTargets);
this.networkTopology = networkTopology;
}

/**
* sort potentialTargets for specified source datanode according to
* network topology.
* @param source the specified source datanode
*/
@VisibleForTesting
public void sortTargetForSource(DatanodeDetails source) {
Collections.sort(potentialTargets,
(DatanodeUsageInfo da, DatanodeUsageInfo db) -> {
DatanodeDetails a = da.getDatanodeDetails();
DatanodeDetails b = db.getDatanodeDetails();
// sort by network topology first
int distanceToA = networkTopology.getDistanceCost(source, a);
int distanceToB = networkTopology.getDistanceCost(source, b);
if (distanceToA != distanceToB) {
return distanceToA - distanceToB;
}
// if distance to source is equal , sort by usage
return compareByUsage(da, db);
});
}
}
Loading