Skip to content
Merged
1 change: 1 addition & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ message ContainerBalancerConfigurationProto {

required bool shouldRun = 18;
optional int32 nextIterationIndex = 19;
optional int64 moveReplicationTimeout = 20;
}

message TransferLeadershipRequestProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,18 @@ private void validateConfiguration(ContainerBalancerConfiguration conf)
"should be greater than hdds.datanode.du.refresh.period {}",
conf.getBalancingInterval(), refreshPeriod);
}

// "move.replication.timeout" should be lesser than "move.timeout"
if (conf.getMoveReplicationTimeout().toMillis() >=
conf.getMoveTimeout().toMillis()) {
LOG.warn("hdds.container.balancer.move.replication.timeout {} should " +
"be less than hdds.container.balancer.move.timeout {}.",
conf.getMoveReplicationTimeout().toMinutes(),
conf.getMoveTimeout().toMinutes());
throw new InvalidContainerBalancerConfigurationException(
"hdds.container.balancer.move.replication.timeout should " +
"be less than hdds.container.balancer.move.timeout.");
}
}

public ContainerBalancerMetrics getMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,19 @@ public final class ContainerBalancerConfiguration {
"to exclude from balancing. For example \"1, 4, 5\" or \"1,4,5\".")
private String excludeContainers = "";

@Config(key = "move.timeout", type = ConfigType.TIME, defaultValue = "30m",
@Config(key = "move.timeout", type = ConfigType.TIME, defaultValue = "65m",
tags = {ConfigTag.BALANCER}, description =
"The amount of time to allow a single container to move " +
"from source to target.")
private long moveTimeout = Duration.ofMinutes(30).toMillis();
private long moveTimeout = Duration.ofMinutes(65).toMillis();

@Config(key = "move.replication.timeout", type = ConfigType.TIME,
defaultValue = "50m", tags = {ConfigTag.BALANCER}, description = "The " +
"amount of time to allow a single container's replication from source " +
"to target as part of container move. For example, if \"hdds.container" +
".balancer.move.timeout\" is 65 minutes, then out of those 65 minutes " +
"50 minutes will be the deadline for replication to complete.")
private long moveReplicationTimeout = Duration.ofMinutes(50).toMillis();

@Config(key = "balancing.iteration.interval", type = ConfigType.TIME,
defaultValue = "70m", tags = {ConfigTag.BALANCER}, description =
Expand Down Expand Up @@ -326,6 +334,14 @@ public void setMoveTimeout(long millis) {
this.moveTimeout = millis;
}

public Duration getMoveReplicationTimeout() {
return Duration.ofMillis(moveReplicationTimeout);
}

public void setMoveReplicationTimeout(long millis) {
this.moveReplicationTimeout = millis;
}

public Duration getBalancingInterval() {
return Duration.ofMillis(balancingInterval);
}
Expand Down Expand Up @@ -423,7 +439,8 @@ ContainerBalancerConfigurationProto.Builder toProtobufBuilder() {
.setIncludeDatanodes(includeNodes)
.setExcludeDatanodes(excludeNodes)
.setMoveNetworkTopologyEnable(networkTopologyEnable)
.setTriggerDuBeforeMoveEnable(triggerDuEnable);
.setTriggerDuBeforeMoveEnable(triggerDuEnable)
.setMoveReplicationTimeout(moveReplicationTimeout);
return builder;
}

Expand Down Expand Up @@ -472,6 +489,9 @@ static ContainerBalancerConfiguration fromProtobuf(
if (proto.hasTriggerDuBeforeMoveEnable()) {
config.setTriggerDuEnable(proto.getTriggerDuBeforeMoveEnable());
}
if (proto.hasMoveReplicationTimeout()) {
config.setMoveReplicationTimeout(proto.getMoveReplicationTimeout());
}
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ public ContainerBalancerTask(StorageContainerManager scm,
this.containerManager = scm.getContainerManager();
this.replicationManager = scm.getReplicationManager();
this.moveManager = scm.getMoveManager();
this.moveManager.setMoveTimeout(config.getMoveTimeout().toMillis());
this.moveManager.setReplicationTimeout(
config.getMoveReplicationTimeout().toMillis());
this.ozoneConfiguration = scm.getConfiguration();
this.containerBalancer = containerBalancer;
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ public enum MoveResult {
// TODO - Should pending ops notify under lock to allow MM to schedule a
// delete after the move, but before anything else can, eg RM?

// TODO - these need to be config defined somewhere, probably in the balancer
private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour
/*
moveTimeout and replicationTimeout are set by ContainerBalancer.
*/
private long moveTimeout = 1000 * 65 * 60;
private long replicationTimeout = 1000 * 50 * 60;
private static final double MOVE_DEADLINE_FACTOR = 0.95;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if using a factor like this makes sense for these longer duration timeouts.

The idea I had, is that the datanode should timeout before SCM does, so that when SCM abandons the command, we know the DN has given up on it too.

If we have a replication timeout of 50 mins, then 95% of that is 47.5, so the DN will give up 2.5 mins before SCM does. Feels like the DN is then giving up too early. If we have a 10 minute timeout or a 60 minute timeout, the DN doesn't need to give up earlier for the longer timeout.

Perhaps we could take factor away from here completely, and then let RM decide what the DN timeout should be. That would simplify the API slightly into RM, as we only need to pass a single timeout.

Perhaps 30 seconds less than the SCM timeout for all values, rather than a percentage like we have now, but it could have a single configuration in RM, rather than having the factor defined here and also in RM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have another small PR to change the factor if you think it will complicate this PR too much. I am happy either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a replication timeout of 50 mins, then 95% of that is 47.5, so the DN will give up 2.5 mins before SCM does. Feels like the DN is then giving up too early. If we have a 10 minute timeout or a 60 minute timeout, the DN doesn't need to give up earlier for the longer timeout.

That's a good point. Created HDDS-8230 for fixing this.


private final ReplicationManager replicationManager;
Expand Down Expand Up @@ -320,8 +323,11 @@ private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp,
try {
handleSuccessfulAdd(containerID);
} catch (ContainerNotFoundException | NodeNotFoundException |
ContainerReplicaNotFoundException e) {
LOG.warn("Can not handle successful Add for move", e);
ContainerReplicaNotFoundException | NotLeaderException e) {
LOG.warn("Failed to handle successful Add for container {} being " +
"moved from source {} to target {}.", containerID,
mdnp.getSrc(), mdnp.getTgt(), e);
pair.getLeft().complete(MoveResult.FAIL_UNEXPECTED_ERROR);
}
} else if (
opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) {
Expand Down Expand Up @@ -355,8 +361,8 @@ private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp,

private void handleSuccessfulAdd(final ContainerID cid)
throws ContainerNotFoundException,
ContainerReplicaNotFoundException, NodeNotFoundException {

ContainerReplicaNotFoundException, NodeNotFoundException,
NotLeaderException {
Pair<CompletableFuture<MoveResult>, MoveDataNodePair> pair =
pendingMoves.get(cid);
if (pair == null) {
Expand Down Expand Up @@ -442,29 +448,28 @@ private void sendReplicateCommand(
containerInfo.containerID(), src);
long now = clock.millis();
replicationManager.sendLowPriorityReplicateContainerCommand(containerInfo,
replicaIndex, src, tgt, now + MOVE_DEADLINE,
now + Math.round(MOVE_DEADLINE * MOVE_DEADLINE_FACTOR));
replicaIndex, src, tgt, now + replicationTimeout,
now + Math.round(replicationTimeout * MOVE_DEADLINE_FACTOR));
}

/**
* Sends delete container command for the given container to the given
* datanode.
*
* @param containerInfo Container to be deleted
* @param datanode The datanode on which the replica should be deleted
* @param datanode The datanode on which the replica should be deleted
*/
private void sendDeleteCommand(
final ContainerInfo containerInfo, final DatanodeDetails datanode)
throws ContainerReplicaNotFoundException, ContainerNotFoundException {
throws ContainerReplicaNotFoundException, ContainerNotFoundException,
NotLeaderException {
int replicaIndex = getContainerReplicaIndex(
containerInfo.containerID(), datanode);
try {
replicationManager.sendDeleteCommand(
containerInfo, replicaIndex, datanode, true);
} catch (NotLeaderException nle) {
LOG.warn("Skipped deleting the container as this SCM is not the leader.",
nle);
}
long deleteTimeout = moveTimeout - replicationTimeout;
long now = clock.millis();
replicationManager.sendDeleteCommand(
containerInfo, replicaIndex, datanode, true, now + deleteTimeout,
now + Math.round(deleteTimeout * MOVE_DEADLINE_FACTOR));
}

private int getContainerReplicaIndex(
Expand All @@ -488,4 +493,12 @@ public void opCompleted(ContainerReplicaOp op, ContainerID containerID,
notifyContainerOpCompleted(op, containerID);
}
}

void setMoveTimeout(long moveTimeout) {
this.moveTimeout = moveTimeout;
}

void setReplicationTimeout(long replicationTimeout) {
this.replicationTimeout = replicationTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,35 @@ public void sendDeleteCommand(final ContainerInfo container, int replicaIndex,
sendDatanodeCommand(deleteCommand, container, datanode);
}

/**
* Send a delete command with a deadline for the specified container.
* @param container container to be deleted
* @param replicaIndex index of the replica to be deleted
* @param datanode datanode that hosts the replica to be deleted
* @param force true to force delete a container that is open or not empty
* @param scmDeadlineEpochMs The epoch time in ms, after which the command
* will be discarded from the SCMPendingOps table.
* @param datanodeDeadlineEpochMs The epoch time in ms, after which the
* command will be discarded on the datanode if
* it has not been processed.
* @throws NotLeaderException when this SCM is not the leader
*/
public void sendDeleteCommand(final ContainerInfo container,
int replicaIndex, final DatanodeDetails datanode, boolean force,
long scmDeadlineEpochMs, long datanodeDeadlineEpochMs)
throws NotLeaderException {
LOG.debug("Sending delete command for container {} and index {} on {} " +
"with SCM deadline {} and Datanode deadline {}.",
container, replicaIndex, datanode, scmDeadlineEpochMs,
datanodeDeadlineEpochMs);

final DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(container.containerID(), force);
deleteCommand.setReplicaIndex(replicaIndex);
sendDatanodeCommand(deleteCommand, container, datanode,
scmDeadlineEpochMs, datanodeDeadlineEpochMs);
}

/**
* Create a ReplicateContainerCommand for the given container and to push the
* container to the target datanode. The list of sources are checked to ensure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
Expand Down Expand Up @@ -192,6 +193,30 @@ public void testNotifyStateChangeStopStart() throws Exception {
== ContainerBalancerTask.Status.STOPPED);
}

/**
* This tests that ContainerBalancer rejects certain invalid configurations
* while starting. It should fail to start in some cases.
*/
@Test
public void testValidationOfConfigurations() {
OzoneConfiguration conf = new OzoneConfiguration();

conf.setTimeDuration(
"hdds.container.balancer.move.replication.timeout", 60,
TimeUnit.MINUTES);

conf.setTimeDuration("hdds.container.balancer.move.timeout", 59,
TimeUnit.MINUTES);

balancerConfiguration =
conf.getObject(ContainerBalancerConfiguration.class);
Assertions.assertThrowsExactly(
InvalidContainerBalancerConfigurationException.class,
() -> containerBalancer.startBalancer(balancerConfiguration),
"hdds.container.balancer.move.replication.timeout should " +
"be less than hdds.container.balancer.move.timeout.");
}

private void startBalancer(ContainerBalancerConfiguration config)
throws IllegalContainerBalancerStateException, IOException,
InvalidContainerBalancerConfigurationException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.container.balancer;

import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -71,6 +72,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -725,16 +727,28 @@ public void testContainerBalancerConfiguration() {
ozoneConfiguration.set("ozone.scm.container.size", "5GB");
ozoneConfiguration.setDouble(
"hdds.container.balancer.utilization.threshold", 1);
long maxSizeLeavingSource = 26;
ozoneConfiguration.setStorageSize(
"hdds.container.balancer.size.leaving.source.max", maxSizeLeavingSource,
StorageUnit.GB);
long moveTimeout = 90;
ozoneConfiguration.setTimeDuration("hdds.container.balancer.move.timeout",
moveTimeout, TimeUnit.MINUTES);
long replicationTimeout = 60;
ozoneConfiguration.setTimeDuration(
"hdds.container.balancer.move.replication.timeout",
replicationTimeout, TimeUnit.MINUTES);

ContainerBalancerConfiguration cbConf =
ozoneConfiguration.getObject(ContainerBalancerConfiguration.class);
Assertions.assertEquals(1, cbConf.getThreshold(), 0.001);

Assertions.assertEquals(26 * 1024 * 1024 * 1024L,
// Expected is 26 GB
Assertions.assertEquals(maxSizeLeavingSource * 1024 * 1024 * 1024,
cbConf.getMaxSizeLeavingSource());

Assertions.assertEquals(30 * 60 * 1000,
cbConf.getMoveTimeout().toMillis());
Assertions.assertEquals(moveTimeout, cbConf.getMoveTimeout().toMinutes());
Assertions.assertEquals(replicationTimeout,
cbConf.getMoveReplicationTimeout().toMinutes());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,22 @@ public void testReplicationCommandFails() throws Exception {
MoveManager.MoveResult.FAIL_UNEXPECTED_ERROR, res.get());
}

@Test
public void testDeleteCommandFails() throws Exception {
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();

Mockito.doThrow(new ContainerNotFoundException("test"))
.when(containerManager).getContainer(any(ContainerID.class));

ContainerReplicaOp op = new ContainerReplicaOp(
ADD, tgt, 0, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);

MoveManager.MoveResult moveResult = res.get();
Assert.assertEquals(MoveManager.MoveResult.FAIL_UNEXPECTED_ERROR,
moveResult);
}

@Test
public void testSuccessfulMove() throws Exception {
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();
Expand All @@ -320,7 +336,7 @@ public void testSuccessfulMove() throws Exception {
moveManager.opCompleted(op, containerInfo.containerID(), false);

Mockito.verify(replicationManager).sendDeleteCommand(
eq(containerInfo), eq(0), eq(src), eq(true));
eq(containerInfo), eq(0), eq(src), eq(true), anyLong(), anyLong());

op = new ContainerReplicaOp(
DELETE, src, 0, clock.millis() + 1000);
Expand Down Expand Up @@ -358,7 +374,7 @@ public void testSuccessfulMoveNonZeroRepIndex() throws Exception {

Mockito.verify(replicationManager).sendDeleteCommand(
eq(containerInfo), eq(srcReplica.getReplicaIndex()), eq(src),
eq(true));
eq(true), anyLong(), anyLong());

op = new ContainerReplicaOp(
DELETE, src, srcReplica.getReplicaIndex(), clock.millis() + 1000);
Expand Down Expand Up @@ -390,7 +406,7 @@ public void testMoveTimeoutOnDelete() throws Exception {
moveManager.opCompleted(op, containerInfo.containerID(), false);

Mockito.verify(replicationManager).sendDeleteCommand(
eq(containerInfo), eq(0), eq(src), eq(true));
eq(containerInfo), eq(0), eq(src), eq(true), anyLong(), anyLong());

op = new ContainerReplicaOp(
DELETE, src, 0, clock.millis() + 1000);
Expand Down