Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -52,7 +51,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
* Container balancer is a service in SCM to move containers between over- and
Expand Down Expand Up @@ -86,15 +84,15 @@ public class ContainerBalancer {
private long clusterRemaining;
private double clusterAvgUtilisation;
private double upperLimit;
private double lowerLimit;
private volatile boolean balancerRunning;
private volatile Thread currentBalancingThread;
private Lock lock;
private ContainerBalancerSelectionCriteria selectionCriteria;
private Map<DatanodeDetails, ContainerMoveSelection> sourceToTargetMap;
private Map<DatanodeDetails, Long> sizeLeavingNode;
private Map<DatanodeDetails, Long> sizeEnteringNode;
private Set<ContainerID> selectedContainers;
private FindTargetStrategy findTargetStrategy;
private FindSourceStrategy findSourceStrategy;
private Map<ContainerMoveSelection,
CompletableFuture<ReplicationManager.MoveResult>>
moveSelectionToFutureMap;
Expand Down Expand Up @@ -131,8 +129,9 @@ public ContainerBalancer(
this.unBalancedNodes = new ArrayList<>();

this.lock = new ReentrantLock();
findTargetStrategy =
new FindTargetGreedy(containerManager, placementPolicy);
findTargetStrategy = new FindTargetGreedy(
containerManager, placementPolicy, nodeManager);
findSourceStrategy = new FindSourceGreedy(nodeManager);
}

/**
Expand Down Expand Up @@ -251,7 +250,6 @@ private boolean initializeIteration() {
this.selectedContainers.clear();
this.overUtilizedNodes.clear();
this.underUtilizedNodes.clear();
this.withinThresholdUtilizedNodes.clear();
this.unBalancedNodes.clear();
this.countDatanodesInvolvedPerIteration = 0;
this.sizeMovedPerIteration = 0;
Expand All @@ -262,20 +260,19 @@ private boolean initializeIteration() {
clusterAvgUtilisation);
}

// under utilized nodes have utilization(that is, used / capacity) less
// than lower limit
double lowerLimit = clusterAvgUtilisation - threshold;

// over utilized nodes have utilization(that is, used / capacity) greater
// than upper limit
this.upperLimit = clusterAvgUtilisation + threshold;
// under utilized nodes have utilization(that is, used / capacity) less
// than lower limit
this.lowerLimit = clusterAvgUtilisation - threshold;

if (LOG.isDebugEnabled()) {
LOG.debug("Lower limit for utilization is {} and Upper limit for " +
"utilization is {}", lowerLimit, upperLimit);
}

long overUtilizedBytes = 0L, underUtilizedBytes = 0L;
long totalOverUtilizedBytes = 0L, totalUnderUtilizedBytes = 0L;
// find over and under utilized nodes
for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) {
if (!isBalancerRunning()) {
Expand All @@ -291,7 +288,7 @@ private boolean initializeIteration() {
datanodeUsageInfo.getScmNodeStat().getRemaining().get(),
utilization);
}
if (utilization > upperLimit) {
if (Double.compare(utilization, upperLimit) > 0) {
overUtilizedNodes.add(datanodeUsageInfo);
metrics.incrementDatanodesNumToBalance(1);

Expand All @@ -300,27 +297,30 @@ private boolean initializeIteration() {
ratioToPercent(utilization)));

// amount of bytes greater than upper limit in this node
overUtilizedBytes += ratioToBytes(
Long overUtilizedBytes = ratioToBytes(
datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
utilization) - ratioToBytes(
datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
upperLimit);
} else if (utilization < lowerLimit) {
totalOverUtilizedBytes += overUtilizedBytes;
} else if (Double.compare(utilization, lowerLimit) < 0) {
underUtilizedNodes.add(datanodeUsageInfo);
metrics.incrementDatanodesNumToBalance(1);

// amount of bytes lesser than lower limit in this node
underUtilizedBytes += ratioToBytes(
Long underUtilizedBytes = ratioToBytes(
datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
lowerLimit) - ratioToBytes(
datanodeUsageInfo.getScmNodeStat().getCapacity().get(),
utilization);
totalUnderUtilizedBytes += underUtilizedBytes;
} else {
withinThresholdUtilizedNodes.add(datanodeUsageInfo);
}
}
metrics.setDataSizeToBalanceGB(
Math.max(overUtilizedBytes, underUtilizedBytes) / OzoneConsts.GB);
Math.max(totalOverUtilizedBytes, totalUnderUtilizedBytes) /
OzoneConsts.GB);
Collections.reverse(underUtilizedNodes);

unBalancedNodes = new ArrayList<>(
Expand All @@ -338,102 +338,62 @@ private boolean initializeIteration() {
overUtilizedNodes.size(), underUtilizedNodes.size());

selectionCriteria = new ContainerBalancerSelectionCriteria(config,
nodeManager, replicationManager, containerManager);
nodeManager, replicationManager, containerManager, findSourceStrategy);
sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() +
withinThresholdUtilizedNodes.size());

// initialize maps to track how much size is leaving and entering datanodes
sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() +
withinThresholdUtilizedNodes.size());
overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
.put(datanodeUsageInfo.getDatanodeDetails(), 0L));
withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode
.put(datanodeUsageInfo.getDatanodeDetails(), 0L));

sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() +
withinThresholdUtilizedNodes.size());
underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
.put(datanodeUsageInfo.getDatanodeDetails(), 0L));
withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode
.put(datanodeUsageInfo.getDatanodeDetails(), 0L));

return true;
}

private IterationResult doIteration() {
// note that potential and selected targets are updated in the following
// loop
List<DatanodeDetails> potentialTargets = getPotentialTargets();
//TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
// source and target
findSourceStrategy.reInitialize(getPotentialSources(), config, lowerLimit);
List<DatanodeUsageInfo> potentialTargets = getPotentialTargets();
findTargetStrategy.reInitialize(potentialTargets, config, upperLimit);

Set<DatanodeDetails> selectedTargets =
new HashSet<>(potentialTargets.size());
moveSelectionToFutureMap = new HashMap<>(unBalancedNodes.size());
boolean isMoveGenerated = false;

try {
// match each overUtilized node with a target
for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) {
while (true) {
DatanodeDetails source =
findSourceStrategy.getNextCandidateSourceDataNode();
if (source == null) {
break;
}
if (!isBalancerRunning()) {
return IterationResult.ITERATION_INTERRUPTED;
}
DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();

IterationResult result = checkConditionsForBalancing();
if (result != null) {
return result;
}

ContainerMoveSelection moveSelection =
matchSourceWithTarget(source, potentialTargets);
ContainerMoveSelection moveSelection = matchSourceWithTarget(source);
if (moveSelection != null) {
isMoveGenerated = true;
LOG.info("ContainerBalancer is trying to move container {} from " +
"source datanode {} to target datanode {}",
moveSelection.getContainerID().toString(), source.getUuidString(),
moveSelection.getContainerID().toString(),
source.getUuidString(),
moveSelection.getTargetNode().getUuidString());

if (moveContainer(source, moveSelection)) {
// consider move successful for now, and update selection criteria
potentialTargets = updateTargetsAndSelectionCriteria(
potentialTargets, selectedTargets, moveSelection, source);
updateTargetsAndSelectionCriteria(
selectedTargets, moveSelection, source);
}
} else {
// can not find any target for this source
findSourceStrategy.removeCandidateSourceDataNode(source);
}
}

// if not all underUtilized nodes have been selected, try to match
// withinThresholdUtilized nodes with underUtilized nodes
if (selectedTargets.size() < underUtilizedNodes.size()) {
potentialTargets.removeAll(selectedTargets);
Collections.reverse(withinThresholdUtilizedNodes);

for (DatanodeUsageInfo datanodeUsageInfo :
withinThresholdUtilizedNodes) {
if (!balancerRunning) {
return IterationResult.ITERATION_INTERRUPTED;
}
DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails();
IterationResult result = checkConditionsForBalancing();
if (result != null) {
return result;
}

ContainerMoveSelection moveSelection =
matchSourceWithTarget(source, potentialTargets);
if (moveSelection != null) {
isMoveGenerated = true;
LOG.info("ContainerBalancer is trying to move container {} from " +
"source datanode {} to target datanode {}",
moveSelection.getContainerID().toString(),
source.getUuidString(),
moveSelection.getTargetNode().getUuidString());

if (moveContainer(source, moveSelection)) {
// consider move successful for now, and update selection criteria
potentialTargets =
updateTargetsAndSelectionCriteria(potentialTargets,
selectedTargets, moveSelection, source);
}
}
}
}
if (!isMoveGenerated) {
//no move option is generated, so the cluster can not be
//balanced any more, just stop iteration and exit
Expand Down Expand Up @@ -502,12 +462,9 @@ private void checkIterationMoveResults(Set<DatanodeDetails> selectedTargets) {
* Match a source datanode with a target datanode and identify the container
* to move.
*
* @param potentialTargets Collection of potential targets to move
* container to
* @return ContainerMoveSelection containing the selected target and container
*/
private ContainerMoveSelection matchSourceWithTarget(
DatanodeDetails source, Collection<DatanodeDetails> potentialTargets) {
private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source) {
NavigableSet<ContainerID> candidateContainers =
selectionCriteria.getCandidateContainers(source);

Expand All @@ -518,14 +475,14 @@ private ContainerMoveSelection matchSourceWithTarget(
}
return null;
}

if (LOG.isDebugEnabled()) {
LOG.debug("ContainerBalancer is finding suitable target for source " +
"datanode {}", source.getUuidString());
}
ContainerMoveSelection moveSelection =
findTargetStrategy.findTargetForContainerMove(
source, potentialTargets, candidateContainers,
this::canSizeEnterTarget);
source, candidateContainers);

if (moveSelection == null) {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -622,15 +579,13 @@ private boolean moveContainer(DatanodeDetails source,
/**
* Update targets and selection criteria after a move.
*
* @param potentialTargets potential target datanodes
* @param selectedTargets selected target datanodes
* @param moveSelection the target datanode and container that has been
* just selected
* @param source the source datanode
* @return List of updated potential targets
*/
private List<DatanodeDetails> updateTargetsAndSelectionCriteria(
Collection<DatanodeDetails> potentialTargets,
private void updateTargetsAndSelectionCriteria(
Set<DatanodeDetails> selectedTargets,
ContainerMoveSelection moveSelection, DatanodeDetails source) {
// count source if it has not been involved in move earlier
Expand All @@ -647,10 +602,6 @@ private List<DatanodeDetails> updateTargetsAndSelectionCriteria(
selectedTargets.add(moveSelection.getTargetNode());
selectedContainers.add(moveSelection.getContainerID());
selectionCriteria.setSelectedContainers(selectedContainers);

return potentialTargets.stream()
.filter(node -> sizeEnteringNode.get(node) <
config.getMaxSizeEnteringTarget()).collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -689,46 +640,31 @@ private long ratioToBytes(Long nodeCapacity, double utilizationRatio) {
return (clusterCapacity - clusterRemaining) / (double) clusterCapacity;
}




/**
* Checks if specified size can enter specified target datanode
* according to {@link ContainerBalancerConfiguration}
* "size.entering.target.max".
* Get potential targets for container move. Potential targets are under
* utilized and within threshold utilized nodes.
*
* @param target target datanode in which size is entering
* @param size size in bytes
* @return true if size can enter target, else false
* @return A list of potential target DatanodeUsageInfo.
*/
boolean canSizeEnterTarget(DatanodeDetails target, long size) {
if (sizeEnteringNode.containsKey(target)) {
long sizeEnteringAfterMove = sizeEnteringNode.get(target) + size;
//size can be moved into target datanode only when the following
//two condition are met.
//1 sizeEnteringAfterMove does not succeed the configured
// MaxSizeEnteringTarget
//2 current usage of target datanode plus sizeEnteringAfterMove
// is smaller than or equal to upperLimit
return sizeEnteringAfterMove <= config.getMaxSizeEnteringTarget() &&
nodeManager.getUsageInfo(target)
.calculateUtilization(sizeEnteringAfterMove) <= upperLimit;
}
return false;
private List<DatanodeUsageInfo> getPotentialTargets() {
//TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
// source and target
return underUtilizedNodes;
}

/**
* Get potential targets for container move. Potential targets are under
* Get potential sourecs for container move. Potential sourecs are over
* utilized and within threshold utilized nodes.
*
* @return A list of potential target DatanodeDetails.
* @return A list of potential source DatanodeUsageInfo.
*/
private List<DatanodeDetails> getPotentialTargets() {
List<DatanodeDetails> potentialTargets = new ArrayList<>(
underUtilizedNodes.size() + withinThresholdUtilizedNodes.size());

underUtilizedNodes
.forEach(node -> potentialTargets.add(node.getDatanodeDetails()));
withinThresholdUtilizedNodes
.forEach(node -> potentialTargets.add(node.getDatanodeDetails()));
return potentialTargets;
private List<DatanodeUsageInfo> getPotentialSources() {
//TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both
// source and target
return overUtilizedNodes;
}

/**
Expand Down Expand Up @@ -756,10 +692,10 @@ private void incSizeSelectedForMoving(DatanodeDetails source,
sizeMovedPerIteration += size;

// update sizeLeavingNode map with the recent moveSelection
sizeLeavingNode.put(source, sizeLeavingNode.get(source) + size);
findSourceStrategy.increaseSizeLeaving(source, size);

// update sizeEnteringNode map with the recent moveSelection
sizeEnteringNode.put(target, sizeEnteringNode.get(target) + size);
findTargetStrategy.increaseSizeEntering(target, size);
}

/**
Expand Down
Loading