Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
Expand All @@ -44,6 +45,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
Expand All @@ -68,6 +70,7 @@ public class ContainerBalancerTask implements Runnable {
private NodeManager nodeManager;
private ContainerManager containerManager;
private ReplicationManager replicationManager;
private MoveManager moveManager;
private OzoneConfiguration ozoneConfiguration;
private ContainerBalancer containerBalancer;
private final SCMContext scmContext;
Expand Down Expand Up @@ -131,6 +134,7 @@ public ContainerBalancerTask(StorageContainerManager scm,
this.nodeManager = scm.getScmNodeManager();
this.containerManager = scm.getContainerManager();
this.replicationManager = scm.getReplicationManager();
this.moveManager = new MoveManager(replicationManager, containerManager);
this.ozoneConfiguration = scm.getConfiguration();
this.containerBalancer = containerBalancer;
this.config = config;
Expand Down Expand Up @@ -592,7 +596,7 @@ private void checkIterationMoveResults() {
LOG.warn("Container balancer is interrupted");
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
long timeoutCounts = cancelAndCountPendingMoves();
long timeoutCounts = cancelMovesThatExceedTimeoutDuration();
LOG.warn("{} Container moves are canceled.", timeoutCounts);
metrics.incrementNumContainerMovesTimeoutInLatestIteration(timeoutCounts);
} catch (ExecutionException e) {
Expand Down Expand Up @@ -621,18 +625,42 @@ private void checkIterationMoveResults() {
metrics.getNumContainerMovesCompletedInLatestIteration());
}

private long cancelAndCountPendingMoves() {
return moveSelectionToFutureMap.entrySet().stream()
.filter(entry -> !entry.getValue().isDone())
.peek(entry -> {
LOG.warn("Container move timeout for container {} from source {}" +
" to target {}.",
entry.getKey().getContainerID(),
containerToSourceMap.get(entry.getKey().getContainerID())
.getUuidString(),
entry.getKey().getTargetNode().getUuidString());
entry.getValue().cancel(true);
}).count();
/**
* Cancels container moves that are not yet done. Note that if a move
* command has already been sent out to a Datanode, we don't yet have the
* capability to cancel it. However, those commands in the DN should time out
* if they haven't been processed yet.
*
* @return number of moves that did not complete (timed out) and were
* cancelled.
*/
private long cancelMovesThatExceedTimeoutDuration() {
Set<Map.Entry<ContainerMoveSelection,
CompletableFuture<MoveManager.MoveResult>>>
entries = moveSelectionToFutureMap.entrySet();
Iterator<Map.Entry<ContainerMoveSelection,
CompletableFuture<MoveManager.MoveResult>>>
iterator = entries.iterator();

int numCancelled = 0;
// iterate through all moves and cancel ones that aren't done yet
while (iterator.hasNext()) {
Map.Entry<ContainerMoveSelection,
CompletableFuture<MoveManager.MoveResult>>
entry = iterator.next();
if (!entry.getValue().isDone()) {
LOG.warn("Container move timed out for container {} from source {}" +
" to target {}.", entry.getKey().getContainerID(),
containerToSourceMap.get(entry.getKey().getContainerID())
.getUuidString(),
entry.getKey().getTargetNode().getUuidString());

entry.getValue().cancel(true);
numCancelled += 1;
}
}

return numCancelled;
}

/**
Expand Down Expand Up @@ -742,13 +770,13 @@ private boolean adaptOnReachingIterationLimits() {
}

/**
* Asks {@link ReplicationManager} to move the specified container from
* source to target.
* Asks {@link ReplicationManager} or {@link MoveManager} to move the
* specified container from source to target.
*
* @param source the source datanode
* @param moveSelection the selected container to move and target datanode
* @return false if an exception occurred or the move completed with a
* result other than ReplicationManager.MoveResult.COMPLETED. Returns true
* result other than MoveManager.MoveResult.COMPLETED. Returns true
* if the move completed with MoveResult.COMPLETED or move is not yet done
*/
private boolean moveContainer(DatanodeDetails source,
Expand All @@ -757,48 +785,66 @@ private boolean moveContainer(DatanodeDetails source,
CompletableFuture<MoveManager.MoveResult> future;
try {
ContainerInfo containerInfo = containerManager.getContainer(containerID);
future = replicationManager
.move(containerID, source, moveSelection.getTargetNode())
.whenComplete((result, ex) -> {

metrics.incrementCurrentIterationContainerMoveMetric(result, 1);
if (ex != null) {
LOG.info("Container move for container {} from source {} to " +
"target {} failed with exceptions {}",
containerID.toString(),

ReplicationManager.ReplicationManagerConfiguration rmConf =
ozoneConfiguration.getObject(
ReplicationManager.ReplicationManagerConfiguration.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this should be stored in the constructor.

/*
If LegacyReplicationManager is enabled, ReplicationManager will
redirect to it. Otherwise, use MoveManager.
*/
if (rmConf.isLegacyEnabled()) {
future = replicationManager
.move(containerID, source, moveSelection.getTargetNode());
} else {
future = moveManager.move(containerID, source,
moveSelection.getTargetNode());
}

future = future.whenComplete((result, ex) -> {
metrics.incrementCurrentIterationContainerMoveMetric(result, 1);
if (ex != null) {
LOG.info("Container move for container {} from source {} to " +
"target {} failed with exceptions.",
containerID.toString(),
source.getUuidString(),
moveSelection.getTargetNode().getUuidString(), ex);
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
} else {
if (result == MoveManager.MoveResult.COMPLETED) {
sizeActuallyMovedInLatestIteration +=
containerInfo.getUsedBytes();
if (LOG.isDebugEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: We usually don't need to wrap debug calls in if (LOG.isDebugEnabled) if the parameters to the log are simple getters, which I think they are here.

LOG.debug("Container move completed for container {} from " +
"source {} to target {}", containerID,
source.getUuidString(),
moveSelection.getTargetNode().getUuidString(), ex);
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
} else {
if (result == MoveManager.MoveResult.COMPLETED) {
sizeActuallyMovedInLatestIteration +=
containerInfo.getUsedBytes();
if (LOG.isDebugEnabled()) {
LOG.debug("Container move completed for container {} from " +
"source {} to target {}", containerID,
source.getUuidString(),
moveSelection.getTargetNode().getUuidString());
}
} else {
LOG.warn(
"Container move for container {} from source {} to target" +
" {} failed: {}",
moveSelection.getContainerID(), source.getUuidString(),
moveSelection.getTargetNode().getUuidString(), result);
}
moveSelection.getTargetNode().getUuidString());
}
});
} else {
LOG.warn(
"Container move for container {} from source {} to target" +
" {} failed: {}",
moveSelection.getContainerID(), source.getUuidString(),
moveSelection.getTargetNode().getUuidString(), result);
}
}
});
} catch (ContainerNotFoundException e) {
LOG.warn("Could not find Container {} for container move",
containerID, e);
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
return false;
} catch (NodeNotFoundException | TimeoutException e) {
} catch (NodeNotFoundException | TimeoutException |
ContainerReplicaNotFoundException e) {
LOG.warn("Container move failed for container {}", containerID, e);
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
return false;
}

/*
If the future hasn't failed yet, put it in moveSelectionToFutureMap for
processing later
*/
if (future.isDone()) {
if (future.isCompletedExceptionally()) {
return false;
Expand Down Expand Up @@ -1045,6 +1091,16 @@ void setConfig(ContainerBalancerConfiguration config) {
this.config = config;
}

@VisibleForTesting
void setMoveManager(MoveManager moveManager) {
this.moveManager = moveManager;
}

@VisibleForTesting
void setTaskStatus(Status taskStatus) {
this.taskStatus = taskStatus;
}

public Status getBalancerStatus() {
return taskStatus;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

/**
* A class which schedules, tracks and completes moves scheduled by the
Expand Down Expand Up @@ -125,10 +124,10 @@ public enum MoveResult {
private volatile boolean running = false;

public MoveManager(final ReplicationManager replicationManager,
final Clock clock, final ContainerManager containerManager) {
final ContainerManager containerManager) {
this.replicationManager = replicationManager;
this.containerManager = containerManager;
this.clock = clock;
this.clock = replicationManager.getClock();
}

/**
Expand Down Expand Up @@ -219,7 +218,7 @@ public void onNotLeader() {
public CompletableFuture<MoveResult> move(
ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
throws ContainerNotFoundException, NodeNotFoundException,
TimeoutException, ContainerReplicaNotFoundException {
ContainerReplicaNotFoundException {
CompletableFuture<MoveResult> ret = new CompletableFuture<>();

if (!running) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,9 @@ public ReplicationManagerConfiguration getConfig() {
return rmConf;
}

public Clock getClock() {
return clock;
}

/**
* following functions will be refactored in a separate jira.
Expand Down Expand Up @@ -1230,8 +1233,12 @@ public LegacyReplicationManager getLegacyReplicationManager() {
}

public boolean isContainerReplicatingOrDeleting(ContainerID containerID) {
return legacyReplicationManager
.isContainerReplicatingOrDeleting(containerID);
if (rmConf.isLegacyEnabled()) {
return legacyReplicationManager
.isContainerReplicatingOrDeleting(containerID);
} else {
return !getPendingReplicationOps(containerID).isEmpty();
}
}

private ECContainerReplicaCount getECContainerReplicaCount(
Expand Down
Loading