Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private void setConfiguration(ContainerBalancerConfiguration conf) {
* Find a {@link ContainerMoveSelection} consisting of a target and
* container to move for a source datanode. Favours more under-utilized nodes.
* @param source Datanode to find a target for
* @param candidateContainers Set of candidate containers satisfying
* @param container candidate container satisfying
* selection criteria
* {@link ContainerBalancerSelectionCriteria}
* (DatanodeDetails, Long) method returns true if the size specified in the
Expand All @@ -105,29 +105,27 @@ private void setConfiguration(ContainerBalancerConfiguration conf) {
*/
@Override
public ContainerMoveSelection findTargetForContainerMove(
DatanodeDetails source, Set<ContainerID> candidateContainers) {
DatanodeDetails source, ContainerID container) {
sortTargetForSource(source);
for (DatanodeUsageInfo targetInfo : potentialTargets) {
DatanodeDetails target = targetInfo.getDatanodeDetails();
for (ContainerID container : candidateContainers) {
Set<ContainerReplica> replicas;
ContainerInfo containerInfo;
try {
replicas = containerManager.getContainerReplicas(container);
containerInfo = containerManager.getContainer(container);
} catch (ContainerNotFoundException e) {
logger.warn("Could not get Container {} from Container Manager for " +
"obtaining replicas in Container Balancer.", container, e);
continue;
}
Set<ContainerReplica> replicas;
ContainerInfo containerInfo;
try {
replicas = containerManager.getContainerReplicas(container);
containerInfo = containerManager.getContainer(container);
} catch (ContainerNotFoundException e) {
logger.warn("Could not get Container {} from Container Manager for " +
"obtaining replicas in Container Balancer.", container, e);
return null;
}

if (replicas.stream().noneMatch(
replica -> replica.getDatanodeDetails().equals(target)) &&
containerMoveSatisfiesPlacementPolicy(container, replicas, source,
target) &&
canSizeEnterTarget(target, containerInfo.getUsedBytes())) {
return new ContainerMoveSelection(target, container);
}
if (replicas.stream().noneMatch(
replica -> replica.getDatanodeDetails().equals(target)) &&
containerMoveSatisfiesPlacementPolicy(container, replicas, source,
target) &&
canSizeEnterTarget(target, containerInfo.getUsedBytes())) {
return new ContainerMoveSelection(target, container);
}
}
logger.info("Container Balancer could not find a target for " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
Expand All @@ -52,6 +55,7 @@ public class ContainerBalancerSelectionCriteria {
private Set<ContainerID> selectedContainers;
private Set<ContainerID> excludeContainers;
private FindSourceStrategy findSourceStrategy;
private Map<DatanodeDetails, NavigableSet<ContainerID>> setMap;

public ContainerBalancerSelectionCriteria(
ContainerBalancerConfiguration balancerConfiguration,
Expand All @@ -66,6 +70,7 @@ public ContainerBalancerSelectionCriteria(
selectedContainers = new HashSet<>();
excludeContainers = balancerConfiguration.getExcludeContainers();
this.findSourceStrategy = findSourceStrategy;
this.setMap = new HashMap<>();
}

/**
Expand All @@ -79,38 +84,20 @@ private boolean isContainerReplicatingOrDeleting(ContainerID containerID) {
}

/**
* Gets containers that are suitable for moving based on the following
* required criteria:
* 1. Container must not be undergoing replication.
* 2. Container must not already be selected for balancing.
* 3. Container size should be closer to 5GB.
* 4. Container must not be in the configured exclude containers list.
* 5. Container should be closed.
* 6. If the {@link LegacyReplicationManager} is enabled, then the container should not be an EC container.
* @param node DatanodeDetails for which to find candidate containers.
* @return NavigableSet of candidate containers that satisfy the criteria.
* Get ContainerID Set for the Datanode, it will be returned as NavigableSet
* Since sorting will be time-consuming, the Set will be cached.
*
* @param node source datanode
* @return cached Navigable ContainerID Set
*/
public NavigableSet<ContainerID> getCandidateContainers(
DatanodeDetails node, long sizeMovedAlready) {
NavigableSet<ContainerID> containerIDSet =
new TreeSet<>(orderContainersByUsedBytes().reversed());
try {
containerIDSet.addAll(nodeManager.getContainers(node));
} catch (NodeNotFoundException e) {
LOG.warn("Could not find Datanode {} while selecting candidate " +
"containers for Container Balancer.", node.toString(), e);
return containerIDSet;
public Set<ContainerID> getContainerIDSet(DatanodeDetails node) {
// Check if the node is registered at the beginning
if (!nodeManager.isNodeRegistered(node)) {
return Collections.emptySet();
}
if (excludeContainers != null) {
containerIDSet.removeAll(excludeContainers);
}
if (selectedContainers != null) {
containerIDSet.removeAll(selectedContainers);
}

containerIDSet.removeIf(
containerID -> shouldBeExcluded(containerID, node, sizeMovedAlready));
return containerIDSet;
Set<ContainerID> containers = setMap.computeIfAbsent(node,
this::getCandidateContainers);
return containers != null ? containers : Collections.emptySet();
}

/**
Expand Down Expand Up @@ -165,7 +152,19 @@ private boolean isECContainerAndLegacyRMEnabled(ContainerInfo container) {
&& replicationManager.getConfig().isLegacyEnabled();
}

private boolean shouldBeExcluded(ContainerID containerID,
/**
* Gets containers that are suitable for moving based on the following
* required criteria:
* 1. Container must not be undergoing replication.
* 2. Container must not already be selected for balancing.
* 3. Container size should be closer to 5GB.
* 4. Container must not be in the configured exclude containers list.
* 5. Container should be closed.
* 6. If the {@link LegacyReplicationManager} is enabled, then the container should not be an EC container.
* @param node DatanodeDetails for which to find candidate containers.
* @return true if the container should be excluded, else false
*/
public boolean shouldBeExcluded(ContainerID containerID,
DatanodeDetails node, long sizeMovedAlready) {
ContainerInfo container;
try {
Expand All @@ -175,7 +174,8 @@ private boolean shouldBeExcluded(ContainerID containerID,
"candidate container. Excluding it.", containerID);
return true;
}
return !isContainerClosed(container, node) || isECContainerAndLegacyRMEnabled(container) ||
return excludeContainers.contains(containerID) || selectedContainers.contains(containerID) ||
!isContainerClosed(container, node) || isECContainerAndLegacyRMEnabled(container) ||
isContainerReplicatingOrDeleting(containerID) ||
!findSourceStrategy.canSizeLeaveSource(node, container.getUsedBytes())
|| breaksMaxSizeToMoveLimit(container.containerID(),
Expand Down Expand Up @@ -242,4 +242,24 @@ public void setSelectedContainers(
this.selectedContainers = selectedContainers;
}


private NavigableSet<ContainerID> getCandidateContainers(DatanodeDetails node) {
NavigableSet<ContainerID> newSet =
new TreeSet<>(orderContainersByUsedBytes().reversed());
try {
Set<ContainerID> idSet = nodeManager.getContainers(node);
if (excludeContainers != null) {
idSet.removeAll(excludeContainers);
}
if (selectedContainers != null) {
idSet.removeAll(selectedContainers);
}
newSet.addAll(idSet);
return newSet;
} catch (NodeNotFoundException e) {
LOG.warn("Could not find Datanode {} while selecting candidate " +
"containers for Container Balancer.", node, e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -692,11 +691,10 @@ private long cancelMovesThatExceedTimeoutDuration() {
* @return ContainerMoveSelection containing the selected target and container
*/
private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source) {
NavigableSet<ContainerID> candidateContainers =
selectionCriteria.getCandidateContainers(source,
sizeScheduledForMoveInLatestIteration);
Set<ContainerID> sourceContainerIDSet =
selectionCriteria.getContainerIDSet(source);

if (candidateContainers.isEmpty()) {
if (sourceContainerIDSet.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("ContainerBalancer could not find any candidate containers " +
"for datanode {}", source.getUuidString());
Expand All @@ -708,9 +706,23 @@ private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source) {
LOG.debug("ContainerBalancer is finding suitable target for source " +
"datanode {}", source.getUuidString());
}
ContainerMoveSelection moveSelection =
findTargetStrategy.findTargetForContainerMove(
source, candidateContainers);

ContainerMoveSelection moveSelection = null;
Set<ContainerID> toRemoveContainerIds = new HashSet<>();
for (ContainerID containerId: sourceContainerIDSet) {
if (selectionCriteria.shouldBeExcluded(containerId, source,
sizeScheduledForMoveInLatestIteration)) {
toRemoveContainerIds.add(containerId);
continue;
}
moveSelection = findTargetStrategy.findTargetForContainerMove(source,
containerId);
if (moveSelection != null) {
break;
}
}
// Update cached containerIDSet in setMap
sourceContainerIDSet.removeAll(toRemoveContainerIds);

if (moveSelection == null) {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.List;
import java.util.Set;

/**
* This interface can be used to implement strategies to find a target for a
Expand All @@ -40,7 +39,7 @@ public interface FindTargetStrategy {
* enter a potential target.
*
* @param source Datanode to find a target for
* @param candidateContainers Set of candidate containers satisfying
* @param candidateContainer candidate containers satisfying
* selection criteria
* {@link ContainerBalancerSelectionCriteria}
* (DatanodeDetails, Long) method returns true if the size specified in the
Expand All @@ -49,7 +48,7 @@ public interface FindTargetStrategy {
* selected container
*/
ContainerMoveSelection findTargetForContainerMove(
DatanodeDetails source, Set<ContainerID> candidateContainers);
DatanodeDetails source, ContainerID candidateContainer);

/**
* increase the Entering size of a candidate target data node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
@Override
public Boolean isNodeRegistered(
DatanodeDetails datanodeDetails) {
return false;
return healthyNodes.contains(datanodeDetails);
}

@Override
Expand Down