Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@ public final class ContainerBalancerConfiguration {
"to exclude from balancing. For example \"1, 4, 5\" or \"1,4,5\".")
private String excludeContainers = "";

@Config(key = "move.timeout", type = ConfigType.TIME, defaultValue = "30m",
@Config(key = "move.timeout", type = ConfigType.TIME, defaultValue = "60m",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this in another PR, but there is a timeout hard coded into MoveManager right now. Probably we need to pass this value into MoveManager somehow so it passes a sensible timeout to RM when scheduling the command. That will then set the DN deadline and the pending Ops timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've reverted any changes in this PR related to timeouts. We can do that in the next PR.

tags = {ConfigTag.BALANCER}, description =
"The amount of time to allow a single container to move " +
"from source to target.")
private long moveTimeout = Duration.ofMinutes(30).toMillis();
private long moveTimeout = Duration.ofMinutes(60).toMillis();

@Config(key = "balancing.iteration.interval", type = ConfigType.TIME,
defaultValue = "70m", tags = {ConfigTag.BALANCER}, description =
defaultValue = "130m", tags = {ConfigTag.BALANCER}, description =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the timeout and interval not be quite close in time? If commands timeout after 60 mins, and the interval is 130m, does that mean the balancer will go idle for some time in between?

"The interval period between each iteration of Container Balancer.")
private long balancingInterval = Duration.ofMinutes(70).toMillis();
private long balancingInterval = Duration.ofMinutes(130).toMillis();

@Config(key = "include.datanodes", type = ConfigType.STRING, defaultValue =
"", tags = {ConfigTag.BALANCER}, description = "A list of Datanode " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
Expand All @@ -44,6 +45,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
Expand All @@ -68,6 +70,7 @@ public class ContainerBalancerTask implements Runnable {
private NodeManager nodeManager;
private ContainerManager containerManager;
private ReplicationManager replicationManager;
private MoveManager moveManager;
private OzoneConfiguration ozoneConfiguration;
private ContainerBalancer containerBalancer;
private final SCMContext scmContext;
Expand Down Expand Up @@ -131,6 +134,8 @@ public ContainerBalancerTask(StorageContainerManager scm,
this.nodeManager = scm.getScmNodeManager();
this.containerManager = scm.getContainerManager();
this.replicationManager = scm.getReplicationManager();
this.moveManager = new MoveManager(replicationManager, containerManager,
config.getMoveTimeout());
this.ozoneConfiguration = scm.getConfiguration();
this.containerBalancer = containerBalancer;
this.config = config;
Expand Down Expand Up @@ -592,7 +597,7 @@ private void checkIterationMoveResults() {
LOG.warn("Container balancer is interrupted");
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
long timeoutCounts = cancelAndCountPendingMoves();
long timeoutCounts = cancelMovesThatExceedTimeoutDuration();
LOG.warn("{} Container moves are canceled.", timeoutCounts);
metrics.incrementNumContainerMovesTimeoutInLatestIteration(timeoutCounts);
} catch (ExecutionException e) {
Expand Down Expand Up @@ -621,18 +626,42 @@ private void checkIterationMoveResults() {
metrics.getNumContainerMovesCompletedInLatestIteration());
}

private long cancelAndCountPendingMoves() {
return moveSelectionToFutureMap.entrySet().stream()
.filter(entry -> !entry.getValue().isDone())
.peek(entry -> {
LOG.warn("Container move timeout for container {} from source {}" +
" to target {}.",
entry.getKey().getContainerID(),
containerToSourceMap.get(entry.getKey().getContainerID())
.getUuidString(),
entry.getKey().getTargetNode().getUuidString());
entry.getValue().cancel(true);
}).count();
/**
* Cancels container moves that are not yet done. Note that if a move
* command has already been sent out to a Datanode, we don't yet have the
* capability to cancel it. However, those commands in the DN should time out
* if they haven't been processed yet.
*
* @return number of moves that did not complete (timed out) and were
* cancelled.
*/
private long cancelMovesThatExceedTimeoutDuration() {
Set<Map.Entry<ContainerMoveSelection,
CompletableFuture<MoveManager.MoveResult>>>
entries = moveSelectionToFutureMap.entrySet();
Iterator<Map.Entry<ContainerMoveSelection,
CompletableFuture<MoveManager.MoveResult>>>
iterator = entries.iterator();

int numCancelled = 0;
// iterate through all moves and cancel ones that aren't done yet
while (iterator.hasNext()) {
Map.Entry<ContainerMoveSelection,
CompletableFuture<MoveManager.MoveResult>>
entry = iterator.next();
if (!entry.getValue().isDone()) {
LOG.warn("Container move timed out for container {} from source {}" +
" to target {}.", entry.getKey().getContainerID(),
containerToSourceMap.get(entry.getKey().getContainerID())
.getUuidString(),
entry.getKey().getTargetNode().getUuidString());

entry.getValue().cancel(true);
numCancelled += 1;
}
}

return numCancelled;
}

/**
Expand Down Expand Up @@ -742,13 +771,13 @@ private boolean adaptOnReachingIterationLimits() {
}

/**
* Asks {@link ReplicationManager} to move the specified container from
* source to target.
* Asks {@link ReplicationManager} or {@link MoveManager} to move the
* specified container from source to target.
*
* @param source the source datanode
* @param moveSelection the selected container to move and target datanode
* @return false if an exception occurred or the move completed with a
* result other than ReplicationManager.MoveResult.COMPLETED. Returns true
* result other than MoveManager.MoveResult.COMPLETED. Returns true
* if the move completed with MoveResult.COMPLETED or move is not yet done
*/
private boolean moveContainer(DatanodeDetails source,
Expand All @@ -757,48 +786,62 @@ private boolean moveContainer(DatanodeDetails source,
CompletableFuture<MoveManager.MoveResult> future;
try {
ContainerInfo containerInfo = containerManager.getContainer(containerID);
future = replicationManager
.move(containerID, source, moveSelection.getTargetNode())
.whenComplete((result, ex) -> {

metrics.incrementCurrentIterationContainerMoveMetric(result, 1);
if (ex != null) {
LOG.info("Container move for container {} from source {} to " +
"target {} failed with exceptions {}",
containerID.toString(),
source.getUuidString(),
moveSelection.getTargetNode().getUuidString(), ex);
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
} else {
if (result == MoveManager.MoveResult.COMPLETED) {
sizeActuallyMovedInLatestIteration +=
containerInfo.getUsedBytes();
if (LOG.isDebugEnabled()) {
LOG.debug("Container move completed for container {} from " +
"source {} to target {}", containerID,
source.getUuidString(),
moveSelection.getTargetNode().getUuidString());
}
} else {
LOG.warn(
"Container move for container {} from source {} to target" +
" {} failed: {}",
moveSelection.getContainerID(), source.getUuidString(),
moveSelection.getTargetNode().getUuidString(), result);
}
}
});

/*
If LegacyReplicationManager is enabled, ReplicationManager will
redirect to it. Otherwise, use MoveManager.
*/
if (ozoneConfiguration.getBoolean("hdds.scm.replication.enable.legacy",
true)) {
future = replicationManager
.move(containerID, source, moveSelection.getTargetNode());
} else {
future = moveManager.move(containerID, source,
moveSelection.getTargetNode());
}

future = future.whenComplete((result, ex) -> {
metrics.incrementCurrentIterationContainerMoveMetric(result, 1);
if (ex != null) {
LOG.info("Container move for container {} from source {} to " +
"target {} failed with exceptions.",
containerID.toString(),
source.getUuidString(),
moveSelection.getTargetNode().getUuidString(), ex);
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
} else {
if (result == MoveManager.MoveResult.COMPLETED) {
sizeActuallyMovedInLatestIteration +=
containerInfo.getUsedBytes();
LOG.debug("Container move completed for container {} from " +
"source {} to target {}", containerID,
source.getUuidString(),
moveSelection.getTargetNode().getUuidString());
} else {
LOG.warn(
"Container move for container {} from source {} to target" +
" {} failed: {}",
moveSelection.getContainerID(), source.getUuidString(),
moveSelection.getTargetNode().getUuidString(), result);
}
}
});
} catch (ContainerNotFoundException e) {
LOG.warn("Could not find Container {} for container move",
containerID, e);
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
return false;
} catch (NodeNotFoundException | TimeoutException e) {
} catch (NodeNotFoundException | TimeoutException |
ContainerReplicaNotFoundException e) {
LOG.warn("Container move failed for container {}", containerID, e);
metrics.incrementNumContainerMovesFailedInLatestIteration(1);
return false;
}

/*
If the future hasn't failed yet, put it in moveSelectionToFutureMap for
processing later
*/
if (future.isDone()) {
if (future.isCompletedExceptionally()) {
return false;
Expand Down Expand Up @@ -1045,6 +1088,16 @@ void setConfig(ContainerBalancerConfiguration config) {
this.config = config;
}

@VisibleForTesting
void setMoveManager(MoveManager moveManager) {
this.moveManager = moveManager;
}

@VisibleForTesting
void setTaskStatus(Status taskStatus) {
this.taskStatus = taskStatus;
}

public Status getBalancerStatus() {
return taskStatus;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

/**
* A class which schedules, tracks and completes moves scheduled by the
Expand Down Expand Up @@ -111,7 +111,7 @@ public enum MoveResult {
// delete after the move, but before anything else can, eg RM?

// TODO - these need to be config defined somewhere, probably in the balancer
private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
private final long moveDeadline;
private static final double MOVE_DEADLINE_FACTOR = 0.95;

private final ReplicationManager replicationManager;
Expand All @@ -125,10 +125,11 @@ public enum MoveResult {
private volatile boolean running = false;

public MoveManager(final ReplicationManager replicationManager,
final Clock clock, final ContainerManager containerManager) {
final ContainerManager containerManager, final Duration moveDeadline) {
this.replicationManager = replicationManager;
this.containerManager = containerManager;
this.clock = clock;
this.clock = replicationManager.getClock();
this.moveDeadline = moveDeadline.toMillis();
}

/**
Expand Down Expand Up @@ -219,7 +220,7 @@ public void onNotLeader() {
public CompletableFuture<MoveResult> move(
ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
throws ContainerNotFoundException, NodeNotFoundException,
TimeoutException, ContainerReplicaNotFoundException {
ContainerReplicaNotFoundException {
CompletableFuture<MoveResult> ret = new CompletableFuture<>();

if (!running) {
Expand Down Expand Up @@ -470,8 +471,8 @@ private void sendReplicateCommand(
containerInfo.containerID(), src);
long now = clock.millis();
replicationManager.sendLowPriorityReplicateContainerCommand(containerInfo,
replicaIndex, src, tgt, now + MOVE_DEADLINE,
now + Math.round(MOVE_DEADLINE * MOVE_DEADLINE_FACTOR));
replicaIndex, src, tgt, now + moveDeadline,
now + Math.round(moveDeadline * MOVE_DEADLINE_FACTOR));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,9 @@ public ReplicationManagerConfiguration getConfig() {
return rmConf;
}

public Clock getClock() {
return clock;
}

/**
* following functions will be refactored in a separate jira.
Expand Down Expand Up @@ -1230,8 +1233,12 @@ public LegacyReplicationManager getLegacyReplicationManager() {
}

public boolean isContainerReplicatingOrDeleting(ContainerID containerID) {
return legacyReplicationManager
.isContainerReplicatingOrDeleting(containerID);
if (rmConf.isLegacyEnabled()) {
return legacyReplicationManager
.isContainerReplicatingOrDeleting(containerID);
} else {
return !getPendingReplicationOps(containerID).isEmpty();
}
}

private ECContainerReplicaCount getECContainerReplicaCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.time.Clock;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
Expand Down Expand Up @@ -82,6 +85,12 @@ public void setup() throws IOException, NodeNotFoundException,
when(scm.getStatefulServiceStateManager()).thenReturn(serviceStateManager);
when(scm.getSCMServiceManager()).thenReturn(mock(SCMServiceManager.class));

ReplicationManager replicationManager =
Mockito.mock(ReplicationManager.class);
when(scm.getReplicationManager()).thenReturn(replicationManager);
when(replicationManager.getClock()).thenReturn(
Clock.system(ZoneId.systemDefault()));

/*
When StatefulServiceStateManager#saveConfiguration is called, save to
in-memory serviceToConfigMap and read from same.
Expand Down
Loading