Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private void setConfiguration(ContainerBalancerConfiguration conf) {
* Find a {@link ContainerMoveSelection} consisting of a target and
* container to move for a source datanode. Favours more under-utilized nodes.
* @param source Datanode to find a target for
* @param candidateContainers Set of candidate containers satisfying
* @param container candidate container satisfying
* selection criteria
* {@link ContainerBalancerSelectionCriteria}
* (DatanodeDetails, Long) method returns true if the size specified in the
Expand All @@ -105,29 +105,27 @@ private void setConfiguration(ContainerBalancerConfiguration conf) {
*/
@Override
public ContainerMoveSelection findTargetForContainerMove(
DatanodeDetails source, Set<ContainerID> candidateContainers) {
DatanodeDetails source, ContainerID container) {
sortTargetForSource(source);
for (DatanodeUsageInfo targetInfo : potentialTargets) {
DatanodeDetails target = targetInfo.getDatanodeDetails();
for (ContainerID container : candidateContainers) {
Set<ContainerReplica> replicas;
ContainerInfo containerInfo;
try {
replicas = containerManager.getContainerReplicas(container);
containerInfo = containerManager.getContainer(container);
} catch (ContainerNotFoundException e) {
logger.warn("Could not get Container {} from Container Manager for " +
"obtaining replicas in Container Balancer.", container, e);
continue;
}
Set<ContainerReplica> replicas;
ContainerInfo containerInfo;
try {
replicas = containerManager.getContainerReplicas(container);
containerInfo = containerManager.getContainer(container);
} catch (ContainerNotFoundException e) {
logger.warn("Could not get Container {} from Container Manager for " +
"obtaining replicas in Container Balancer.", container, e);
return null;
}

if (replicas.stream().noneMatch(
replica -> replica.getDatanodeDetails().equals(target)) &&
containerMoveSatisfiesPlacementPolicy(container, replicas, source,
target) &&
canSizeEnterTarget(target, containerInfo.getUsedBytes())) {
return new ContainerMoveSelection(target, container);
}
if (replicas.stream().noneMatch(
replica -> replica.getDatanodeDetails().equals(target)) &&
containerMoveSatisfiesPlacementPolicy(container, replicas, source,
target) &&
canSizeEnterTarget(target, containerInfo.getUsedBytes())) {
return new ContainerMoveSelection(target, container);
}
}
logger.info("Container Balancer could not find a target for " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
Expand All @@ -52,6 +55,7 @@ public class ContainerBalancerSelectionCriteria {
private Set<ContainerID> selectedContainers;
private Set<ContainerID> excludeContainers;
private FindSourceStrategy findSourceStrategy;
private Map<DatanodeDetails, NavigableSet<ContainerID>> setMap;

public ContainerBalancerSelectionCriteria(
ContainerBalancerConfiguration balancerConfiguration,
Expand All @@ -66,6 +70,7 @@ public ContainerBalancerSelectionCriteria(
selectedContainers = new HashSet<>();
excludeContainers = balancerConfiguration.getExcludeContainers();
this.findSourceStrategy = findSourceStrategy;
this.setMap = new HashMap<>();
}

/**
Expand All @@ -79,38 +84,28 @@ private boolean isContainerReplicatingOrDeleting(ContainerID containerID) {
}

/**
* Gets containers that are suitable for moving based on the following
* required criteria:
* 1. Container must not be undergoing replication.
* 2. Container must not already be selected for balancing.
* 3. Container size should be closer to 5GB.
* 4. Container must not be in the configured exclude containers list.
* 5. Container should be closed.
* 6. If the {@link LegacyReplicationManager} is enabled, then the container should not be an EC container.
* @param node DatanodeDetails for which to find candidate containers.
* @return NavigableSet of candidate containers that satisfy the criteria.
* Get ContainerID Set for the Datanode, it will be returned as NavigableSet
* Since sorting will be time-consuming, the Set will be cached.
*
* @param node source datanode
* @return cached Navigable ContainerID Set
*/
public NavigableSet<ContainerID> getCandidateContainers(
DatanodeDetails node, long sizeMovedAlready) {
NavigableSet<ContainerID> containerIDSet =
new TreeSet<>(orderContainersByUsedBytes().reversed());
public Set<ContainerID> getContainerIDSet(DatanodeDetails node) {
try {
containerIDSet.addAll(nodeManager.getContainers(node));
// Initialize containerSet for node
if (!setMap.containsKey(node)) {
addNodeToSetMap(node);
}
// In case the node is removed
nodeManager.getContainers(node);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we making this call just to see if an exception gets thrown? In that case this is a bit awkward and confusing. Does node manager provide an API that we can use first to check if SCM knows the node, and then get its containers (or remove them from the cache is the node isn't there anymore)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since currently there is no explicit method to check if a node exists, this is used for this check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe getNodeStatus would be a better one to check the node is still there? getcontainers creates a new HashSet of all the containers on the node, so it is somewhat expensive if we don't use those returned contaienrs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @sodonnel. Would isNodeRegistered be even better?

@Override
public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) {
try {
nodeStateManager.getNode(datanodeDetails);
return true;
} catch (NodeNotFoundException e) {
return false;
}

} catch (NodeNotFoundException e) {
LOG.warn("Could not find Datanode {} while selecting candidate " +
"containers for Container Balancer.", node.toString(), e);
return containerIDSet;
}
if (excludeContainers != null) {
containerIDSet.removeAll(excludeContainers);
}
if (selectedContainers != null) {
containerIDSet.removeAll(selectedContainers);
setMap.remove(node);
return Collections.emptySet();
}

containerIDSet.removeIf(
containerID -> shouldBeExcluded(containerID, node, sizeMovedAlready));
return containerIDSet;
return setMap.get(node);
}

/**
Expand Down Expand Up @@ -165,7 +160,21 @@ private boolean isECContainerAndLegacyRMEnabled(ContainerInfo container) {
&& replicationManager.getConfig().isLegacyEnabled();
}

private boolean shouldBeExcluded(ContainerID containerID,
/**
* Gets containers that are suitable for moving based on the following
* required criteria:
* 1. Container must not be in ExcludedContainers.
* 2. Container must not be in SelectedContainers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Points 1 and 2 duplicate points 6 and 4, respectively.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siddhantsangwan Thanks for the detailed review. Updated, PTAL.

* 3. Container must not be undergoing replication.
* 4. Container must not already be selected for balancing.
* 5. Container size should be closer to 5GB.
* 6. Container must not be in the configured exclude containers list.
* 7. Container should be closed.
* 8. If the {@link LegacyReplicationManager} is enabled, then the container should not be an EC container.
* @param node DatanodeDetails for which to find candidate containers.
* @return Set of candidate containers that satisfy the criteria.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@return should be updated.

*/
public boolean shouldBeExcluded(ContainerID containerID,
DatanodeDetails node, long sizeMovedAlready) {
ContainerInfo container;
try {
Expand All @@ -175,7 +184,8 @@ private boolean shouldBeExcluded(ContainerID containerID,
"candidate container. Excluding it.", containerID);
return true;
}
return !isContainerClosed(container, node) || isECContainerAndLegacyRMEnabled(container) ||
return excludeContainers.contains(containerID) || selectedContainers.contains(containerID) ||
!isContainerClosed(container, node) || isECContainerAndLegacyRMEnabled(container) ||
isContainerReplicatingOrDeleting(containerID) ||
!findSourceStrategy.canSizeLeaveSource(node, container.getUsedBytes())
|| breaksMaxSizeToMoveLimit(container.containerID(),
Expand Down Expand Up @@ -242,4 +252,19 @@ public void setSelectedContainers(
this.selectedContainers = selectedContainers;
}


private void addNodeToSetMap(DatanodeDetails node)
throws NodeNotFoundException {
NavigableSet<ContainerID> newSet =
new TreeSet<>(orderContainersByUsedBytes().reversed());
Set<ContainerID> idSet = nodeManager.getContainers(node);
if (excludeContainers != null) {
idSet.removeAll(excludeContainers);
}
if (selectedContainers != null) {
idSet.removeAll(selectedContainers);
}
newSet.addAll(idSet);
setMap.put(node, newSet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -692,11 +691,10 @@ private long cancelMovesThatExceedTimeoutDuration() {
* @return ContainerMoveSelection containing the selected target and container
*/
private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source) {
NavigableSet<ContainerID> candidateContainers =
selectionCriteria.getCandidateContainers(source,
sizeScheduledForMoveInLatestIteration);
Set<ContainerID> sourceContainerIDSet =
selectionCriteria.getContainerIDSet(source);

if (candidateContainers.isEmpty()) {
if (sourceContainerIDSet.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("ContainerBalancer could not find any candidate containers " +
"for datanode {}", source.getUuidString());
Expand All @@ -708,9 +706,22 @@ private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source) {
LOG.debug("ContainerBalancer is finding suitable target for source " +
"datanode {}", source.getUuidString());
}
ContainerMoveSelection moveSelection =
findTargetStrategy.findTargetForContainerMove(
source, candidateContainers);

ContainerMoveSelection moveSelection = null;
Set<ContainerID> toRemoveContainerIds = new HashSet<>();
for (ContainerID containerId: sourceContainerIDSet) {
if (selectionCriteria.shouldBeExcluded(containerId, source,
sizeScheduledForMoveInLatestIteration)) {
toRemoveContainerIds.add(containerId);
continue;
}
moveSelection = findTargetStrategy.findTargetForContainerMove(source,
containerId);
if (moveSelection != null) {
break;
}
}
sourceContainerIDSet.removeAll(toRemoveContainerIds);

if (moveSelection == null) {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.List;
import java.util.Set;

/**
* This interface can be used to implement strategies to find a target for a
Expand All @@ -40,7 +39,7 @@ public interface FindTargetStrategy {
* enter a potential target.
*
* @param source Datanode to find a target for
* @param candidateContainers Set of candidate containers satisfying
* @param candidateContainer candidate containers satisfying
* selection criteria
* {@link ContainerBalancerSelectionCriteria}
* (DatanodeDetails, Long) method returns true if the size specified in the
Expand All @@ -49,7 +48,7 @@ public interface FindTargetStrategy {
* selected container
*/
ContainerMoveSelection findTargetForContainerMove(
DatanodeDetails source, Set<ContainerID> candidateContainers);
DatanodeDetails source, ContainerID candidateContainer);

/**
* increase the Entering size of a candidate target data node.
Expand Down