Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class ContainerBalancerSelectionCriteria {
private ContainerManager containerManager;
private Set<ContainerID> selectedContainers;
private Set<ContainerID> excludeContainers;
private Set<ContainerID> excludeContainersDueToFailure;
private FindSourceStrategy findSourceStrategy;
private Map<DatanodeDetails, NavigableSet<ContainerID>> setMap;

Expand All @@ -68,6 +69,7 @@ public ContainerBalancerSelectionCriteria(
this.replicationManager = replicationManager;
this.containerManager = containerManager;
selectedContainers = new HashSet<>();
excludeContainersDueToFailure = new HashSet<>();
excludeContainers = balancerConfiguration.getExcludeContainers();
this.findSourceStrategy = findSourceStrategy;
this.setMap = new HashMap<>();
Expand Down Expand Up @@ -174,7 +176,8 @@ public boolean shouldBeExcluded(ContainerID containerID,
"candidate container. Excluding it.", containerID);
return true;
}
return excludeContainers.contains(containerID) || selectedContainers.contains(containerID) ||
return excludeContainers.contains(containerID) || excludeContainersDueToFailure.contains(containerID) ||
selectedContainers.contains(containerID) ||
!isContainerClosed(container, node) || isECContainerAndLegacyRMEnabled(container) ||
isContainerReplicatingOrDeleting(containerID) ||
!findSourceStrategy.canSizeLeaveSource(node, container.getUsedBytes())
Expand Down Expand Up @@ -242,6 +245,10 @@ public void setSelectedContainers(
this.selectedContainers = selectedContainers;
}

public void addToExcludeDueToFailContainers(ContainerID container) {
this.excludeContainersDueToFailure.add(container);
}


private NavigableSet<ContainerID> getCandidateContainers(DatanodeDetails node) {
NavigableSet<ContainerID> newSet =
Expand All @@ -251,6 +258,9 @@ private NavigableSet<ContainerID> getCandidateContainers(DatanodeDetails node) {
if (excludeContainers != null) {
idSet.removeAll(excludeContainers);
}
if (excludeContainersDueToFailure != null) {
idSet.removeAll(excludeContainersDueToFailure);
}
if (selectedContainers != null) {
idSet.removeAll(selectedContainers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,10 @@ private boolean processMoveSelection(DatanodeDetails source,
containerID,
containerToSourceMap.get(containerID),
containerToTargetMap.get(containerID));
// add source back to queue as a different container can be selected in next run.
findSourceStrategy.addBackSourceDataNode(source);
// exclude the container which caused failure of move to avoid error in next run.
selectionCriteria.addToExcludeDueToFailContainers(moveSelection.getContainerID());
return false;
}

Expand All @@ -563,6 +567,10 @@ private boolean processMoveSelection(DatanodeDetails source,
} catch (ContainerNotFoundException e) {
LOG.warn("Could not get container {} from Container Manager before " +
"starting a container move", containerID, e);
// add source back to queue as a different container can be selected in next run.
findSourceStrategy.addBackSourceDataNode(source);
// exclude the container which caused failure of move to avoid error in next run.
selectionCriteria.addToExcludeDueToFailContainers(moveSelection.getContainerID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about same container in another source? do that also needs keep excluding? I think we need reconsider this container exclusion to be at source level only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this comment - @sumitagrawl do you want to take another look? I think @Tejaskriya has updated the PR now.

Copy link
Contributor Author

@Tejaskriya Tejaskriya Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! I have created a Map of DN and containers to be excluded for that DN. Could you please take a look at the recent changes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need reconsider this container exclusion to be at source level only.

What's your reasoning behind this? If container manager can't find this container I think we should avoid this container for any DN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If 2 different source have same container choosen to be moved to target to reduce usages at source,
s1: c5.--> t5
s2: c5 --> t6
Now s1 for c5 had some failure, so this will avoid choosing c5 container for s1 (as expected). But old logic where comment is given, it will also make c5 container not be choosed in another source s2.

Copy link
Contributor

@siddhantsangwan siddhantsangwan Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a container is not found in the SCM, the problem is likely outside of balancer and will affect any other sources trying to move the same container later. So I think we should exclude the container for all sources when it's a ContainerInfo related error like in the above case.

return false;
}
LOG.info("ContainerBalancer is trying to move container {} with size " +
Expand Down Expand Up @@ -862,13 +870,23 @@ private boolean moveContainer(DatanodeDetails source,
} catch (ContainerNotFoundException e) {
LOG.warn("Could not find Container {} for container move",
containerID, e);
// add source back to queue as a different container can be selected in next run.
findSourceStrategy.addBackSourceDataNode(source);
// exclude the container which caused failure of move to avoid error in next run.
selectionCriteria.addToExcludeDueToFailContainers(moveSelection.getContainerID());
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
return false;
} catch (NodeNotFoundException | TimeoutException |
ContainerReplicaNotFoundException e) {
} catch (NodeNotFoundException | TimeoutException e) {
LOG.warn("Container move failed for container {}", containerID, e);
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
return false;
} catch (ContainerReplicaNotFoundException e) {
LOG.warn("Container move failed for container {}", containerID, e);
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
// add source back to queue for replica not found only
// the container is not excluded as it is a replica related failure
findSourceStrategy.addBackSourceDataNode(source);
return false;
}

/*
Expand All @@ -881,6 +899,16 @@ private boolean moveContainer(DatanodeDetails source,
} else {
MoveManager.MoveResult result = future.join();
moveSelectionToFutureMap.put(moveSelection, future);
if (result == MoveManager.MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE ||
result == MoveManager.MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET ||
result == MoveManager.MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED ||
result == MoveManager.MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION ||
result == MoveManager.MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION) {
// add source back to queue as a different container can be selected in next run.
// the container which caused failure of move is not excluded
// as it is an intermittent failure or a replica related failure
findSourceStrategy.addBackSourceDataNode(source);
}
return result == MoveManager.MoveResult.COMPLETED;
}
} else {
Expand Down Expand Up @@ -1098,6 +1126,11 @@ Set<DatanodeDetails> getSelectedTargets() {
return selectedTargets;
}

@VisibleForTesting
Set<DatanodeDetails> getSelectedSources() {
return selectedSources;
}

@VisibleForTesting
int getCountDatanodesInvolvedPerIteration() {
return countDatanodesInvolvedPerIteration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void increaseSizeLeaving(DatanodeDetails dui, long size) {
if (currentSize != null) {
sizeLeavingNode.put(dui, currentSize + size);
//reorder according to the latest sizeLeavingNode
potentialSources.add(nodeManager.getUsageInfo(dui));
addBackSourceDataNode(dui);
return;
}
LOG.warn("Cannot find datanode {} in candidate source datanodes",
Expand Down Expand Up @@ -138,6 +138,12 @@ public void removeCandidateSourceDataNode(DatanodeDetails dui) {
potentialSources.removeIf(a -> a.getDatanodeDetails().equals(dui));
}

@Override
public void addBackSourceDataNode(DatanodeDetails dn) {
DatanodeUsageInfo dui = nodeManager.getUsageInfo(dn);
potentialSources.add(dui);
}

/**
* Checks if specified size can leave a specified target datanode
* according to {@link ContainerBalancerConfiguration}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public interface FindSourceStrategy {
*/
void removeCandidateSourceDataNode(DatanodeDetails dui);

/**
* add the specified data node to the candidate source
* data nodes.
* This method does not check whether the specified Datanode is already present in the Collection.
* Callers must take the responsibility of checking and removing the Datanode before adding, if required.
*
* @param dn datanode to be added to potentialSources
*/
void addBackSourceDataNode(DatanodeDetails dn);

/**
* increase the Leaving size of a candidate source data node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,43 @@ public void balancerShouldExcludeECContainersWhenLegacyRmIsEnabled()
}

/**
* Tests if balancer is adding the polled source datanode back to potentialSources queue
* if a move has failed due to a container related failure, like REPLICATION_FAIL_NOT_EXIST_IN_SOURCE.
*/
@Test
public void testSourceDatanodeAddedBack()
throws NodeNotFoundException, IOException, IllegalContainerBalancerStateException,
InvalidContainerBalancerConfigurationException, TimeoutException, InterruptedException {

when(moveManager.move(any(ContainerID.class),
any(DatanodeDetails.class),
any(DatanodeDetails.class)))
.thenReturn(CompletableFuture.completedFuture(MoveManager.MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE))
.thenReturn(CompletableFuture.completedFuture(MoveManager.MoveResult.COMPLETED));
balancerConfiguration.setThreshold(10);
balancerConfiguration.setIterations(1);
balancerConfiguration.setMaxSizeEnteringTarget(10 * STORAGE_UNIT);
balancerConfiguration.setMaxSizeToMovePerIteration(100 * STORAGE_UNIT);
balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
String includeNodes = nodesInCluster.get(0).getDatanodeDetails().getHostName() + "," +
nodesInCluster.get(nodesInCluster.size() - 1).getDatanodeDetails().getHostName();
balancerConfiguration.setIncludeNodes(includeNodes);

startBalancer(balancerConfiguration);
GenericTestUtils.waitFor(() -> ContainerBalancerTask.IterationResult.ITERATION_COMPLETED ==
containerBalancerTask.getIterationResult(), 10, 50);

assertEquals(2, containerBalancerTask.getCountDatanodesInvolvedPerIteration());
assertTrue(containerBalancerTask.getMetrics().getNumContainerMovesCompletedInLatestIteration() >= 1);
assertThat(containerBalancerTask.getMetrics().getNumContainerMovesFailed()).isEqualTo(1);
assertTrue(containerBalancerTask.getSelectedTargets().contains(nodesInCluster.get(0)
.getDatanodeDetails()));
assertTrue(containerBalancerTask.getSelectedSources().contains(nodesInCluster.get(nodesInCluster.size() - 1)
.getDatanodeDetails()));
stopBalancer();
}

/**
* Test to check if balancer picks up only positive size
* containers to move from source to destination.
*/
Expand Down