diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 16ea4887aab5..1d9b80fafc2d 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -451,6 +451,7 @@ message ContainerBalancerConfigurationProto { required bool shouldRun = 18; optional int32 nextIterationIndex = 19; + optional int64 moveReplicationTimeout = 20; } message TransferLeadershipRequestProto { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java index 22fbc57f32ca..2606b087c158 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java @@ -409,6 +409,18 @@ private void validateConfiguration(ContainerBalancerConfiguration conf) "should be greater than hdds.datanode.du.refresh.period {}", conf.getBalancingInterval(), refreshPeriod); } + + // "move.replication.timeout" should be lesser than "move.timeout" + if (conf.getMoveReplicationTimeout().toMillis() >= + conf.getMoveTimeout().toMillis()) { + LOG.warn("hdds.container.balancer.move.replication.timeout {} should " + + "be less than hdds.container.balancer.move.timeout {}.", + conf.getMoveReplicationTimeout().toMinutes(), + conf.getMoveTimeout().toMinutes()); + throw new InvalidContainerBalancerConfigurationException( + "hdds.container.balancer.move.replication.timeout should " + + "be less than hdds.container.balancer.move.timeout."); + } } public ContainerBalancerMetrics getMetrics() { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java index 9d5083768cd1..62bddfa19ba1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java @@ -93,11 +93,19 @@ public final class ContainerBalancerConfiguration { "to exclude from balancing. For example \"1, 4, 5\" or \"1,4,5\".") private String excludeContainers = ""; - @Config(key = "move.timeout", type = ConfigType.TIME, defaultValue = "30m", + @Config(key = "move.timeout", type = ConfigType.TIME, defaultValue = "65m", tags = {ConfigTag.BALANCER}, description = "The amount of time to allow a single container to move " + "from source to target.") - private long moveTimeout = Duration.ofMinutes(30).toMillis(); + private long moveTimeout = Duration.ofMinutes(65).toMillis(); + + @Config(key = "move.replication.timeout", type = ConfigType.TIME, + defaultValue = "50m", tags = {ConfigTag.BALANCER}, description = "The " + + "amount of time to allow a single container's replication from source " + + "to target as part of container move. For example, if \"hdds.container" + + ".balancer.move.timeout\" is 65 minutes, then out of those 65 minutes " + + "50 minutes will be the deadline for replication to complete.") + private long moveReplicationTimeout = Duration.ofMinutes(50).toMillis(); @Config(key = "balancing.iteration.interval", type = ConfigType.TIME, defaultValue = "70m", tags = {ConfigTag.BALANCER}, description = @@ -326,6 +334,14 @@ public void setMoveTimeout(long millis) { this.moveTimeout = millis; } + public Duration getMoveReplicationTimeout() { + return Duration.ofMillis(moveReplicationTimeout); + } + + public void setMoveReplicationTimeout(long millis) { + this.moveReplicationTimeout = millis; + } + public Duration getBalancingInterval() { return Duration.ofMillis(balancingInterval); } @@ -423,7 +439,8 @@ ContainerBalancerConfigurationProto.Builder toProtobufBuilder() { .setIncludeDatanodes(includeNodes) .setExcludeDatanodes(excludeNodes) .setMoveNetworkTopologyEnable(networkTopologyEnable) - .setTriggerDuBeforeMoveEnable(triggerDuEnable); + .setTriggerDuBeforeMoveEnable(triggerDuEnable) + .setMoveReplicationTimeout(moveReplicationTimeout); return builder; } @@ -472,6 +489,9 @@ static ContainerBalancerConfiguration fromProtobuf( if (proto.hasTriggerDuBeforeMoveEnable()) { config.setTriggerDuEnable(proto.getTriggerDuBeforeMoveEnable()); } + if (proto.hasMoveReplicationTimeout()) { + config.setMoveReplicationTimeout(proto.getMoveReplicationTimeout()); + } return config; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java index 766aa4a2fe26..2f8b095ba4bf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java @@ -135,6 +135,9 @@ public ContainerBalancerTask(StorageContainerManager scm, this.containerManager = scm.getContainerManager(); this.replicationManager = scm.getReplicationManager(); this.moveManager = scm.getMoveManager(); + this.moveManager.setMoveTimeout(config.getMoveTimeout().toMillis()); + this.moveManager.setReplicationTimeout( + config.getMoveReplicationTimeout().toMillis()); this.ozoneConfiguration = scm.getConfiguration(); this.containerBalancer = containerBalancer; this.config = config; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java index 895c744122b5..9120ae9e09ed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java @@ -109,8 +109,11 @@ public enum MoveResult { // TODO - Should pending ops notify under lock to allow MM to schedule a // delete after the move, but before anything else can, eg RM? - // TODO - these need to be config defined somewhere, probably in the balancer - private static final long MOVE_DEADLINE = 1000 * 60 * 60; // 1 hour + /* + moveTimeout and replicationTimeout are set by ContainerBalancer. + */ + private long moveTimeout = 1000 * 65 * 60; + private long replicationTimeout = 1000 * 50 * 60; private static final double MOVE_DEADLINE_FACTOR = 0.95; private final ReplicationManager replicationManager; @@ -320,8 +323,11 @@ private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp, try { handleSuccessfulAdd(containerID); } catch (ContainerNotFoundException | NodeNotFoundException | - ContainerReplicaNotFoundException e) { - LOG.warn("Can not handle successful Add for move", e); + ContainerReplicaNotFoundException | NotLeaderException e) { + LOG.warn("Failed to handle successful Add for container {} being " + + "moved from source {} to target {}.", containerID, + mdnp.getSrc(), mdnp.getTgt(), e); + pair.getLeft().complete(MoveResult.FAIL_UNEXPECTED_ERROR); } } else if ( opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) { @@ -355,8 +361,8 @@ private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp, private void handleSuccessfulAdd(final ContainerID cid) throws ContainerNotFoundException, - ContainerReplicaNotFoundException, NodeNotFoundException { - + ContainerReplicaNotFoundException, NodeNotFoundException, + NotLeaderException { Pair, MoveDataNodePair> pair = pendingMoves.get(cid); if (pair == null) { @@ -442,8 +448,8 @@ private void sendReplicateCommand( containerInfo.containerID(), src); long now = clock.millis(); replicationManager.sendLowPriorityReplicateContainerCommand(containerInfo, - replicaIndex, src, tgt, now + MOVE_DEADLINE, - now + Math.round(MOVE_DEADLINE * MOVE_DEADLINE_FACTOR)); + replicaIndex, src, tgt, now + replicationTimeout, + now + Math.round(replicationTimeout * MOVE_DEADLINE_FACTOR)); } /** @@ -451,20 +457,19 @@ private void sendReplicateCommand( * datanode. * * @param containerInfo Container to be deleted - * @param datanode The datanode on which the replica should be deleted + * @param datanode The datanode on which the replica should be deleted */ private void sendDeleteCommand( final ContainerInfo containerInfo, final DatanodeDetails datanode) - throws ContainerReplicaNotFoundException, ContainerNotFoundException { + throws ContainerReplicaNotFoundException, ContainerNotFoundException, + NotLeaderException { int replicaIndex = getContainerReplicaIndex( containerInfo.containerID(), datanode); - try { - replicationManager.sendDeleteCommand( - containerInfo, replicaIndex, datanode, true); - } catch (NotLeaderException nle) { - LOG.warn("Skipped deleting the container as this SCM is not the leader.", - nle); - } + long deleteTimeout = moveTimeout - replicationTimeout; + long now = clock.millis(); + replicationManager.sendDeleteCommand( + containerInfo, replicaIndex, datanode, true, now + deleteTimeout, + now + Math.round(deleteTimeout * MOVE_DEADLINE_FACTOR)); } private int getContainerReplicaIndex( @@ -488,4 +493,12 @@ public void opCompleted(ContainerReplicaOp op, ContainerID containerID, notifyContainerOpCompleted(op, containerID); } } + + void setMoveTimeout(long moveTimeout) { + this.moveTimeout = moveTimeout; + } + + void setReplicationTimeout(long replicationTimeout) { + this.replicationTimeout = replicationTimeout; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 287c1f14d511..b6bfe0600e27 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -437,6 +437,35 @@ public void sendDeleteCommand(final ContainerInfo container, int replicaIndex, sendDatanodeCommand(deleteCommand, container, datanode); } + /** + * Send a delete command with a deadline for the specified container. + * @param container container to be deleted + * @param replicaIndex index of the replica to be deleted + * @param datanode datanode that hosts the replica to be deleted + * @param force true to force delete a container that is open or not empty + * @param scmDeadlineEpochMs The epoch time in ms, after which the command + * will be discarded from the SCMPendingOps table. + * @param datanodeDeadlineEpochMs The epoch time in ms, after which the + * command will be discarded on the datanode if + * it has not been processed. + * @throws NotLeaderException when this SCM is not the leader + */ + public void sendDeleteCommand(final ContainerInfo container, + int replicaIndex, final DatanodeDetails datanode, boolean force, + long scmDeadlineEpochMs, long datanodeDeadlineEpochMs) + throws NotLeaderException { + LOG.debug("Sending delete command for container {} and index {} on {} " + + "with SCM deadline {} and Datanode deadline {}.", + container, replicaIndex, datanode, scmDeadlineEpochMs, + datanodeDeadlineEpochMs); + + final DeleteContainerCommand deleteCommand = + new DeleteContainerCommand(container.containerID(), force); + deleteCommand.setReplicaIndex(replicaIndex); + sendDatanodeCommand(deleteCommand, container, datanode, + scmDeadlineEpochMs, datanodeDeadlineEpochMs); + } + /** * Create a ReplicateContainerCommand for the given container and to push the * container to the target datanode. The list of sources are checked to ensure diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java index eb4e9ac3f495..76d445191871 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ha.SCMContext; @@ -192,6 +193,30 @@ public void testNotifyStateChangeStopStart() throws Exception { == ContainerBalancerTask.Status.STOPPED); } + /** + * This tests that ContainerBalancer rejects certain invalid configurations + * while starting. It should fail to start in some cases. + */ + @Test + public void testValidationOfConfigurations() { + OzoneConfiguration conf = new OzoneConfiguration(); + + conf.setTimeDuration( + "hdds.container.balancer.move.replication.timeout", 60, + TimeUnit.MINUTES); + + conf.setTimeDuration("hdds.container.balancer.move.timeout", 59, + TimeUnit.MINUTES); + + balancerConfiguration = + conf.getObject(ContainerBalancerConfiguration.class); + Assertions.assertThrowsExactly( + InvalidContainerBalancerConfigurationException.class, + () -> containerBalancer.startBalancer(balancerConfiguration), + "hdds.container.balancer.move.replication.timeout should " + + "be less than hdds.container.balancer.move.timeout."); + } + private void startBalancer(ContainerBalancerConfiguration config) throws IllegalContainerBalancerStateException, IOException, InvalidContainerBalancerConfigurationException, TimeoutException { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java index 9e1c01009b2a..868be2911e41 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container.balancer; import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -71,6 +72,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -725,16 +727,28 @@ public void testContainerBalancerConfiguration() { ozoneConfiguration.set("ozone.scm.container.size", "5GB"); ozoneConfiguration.setDouble( "hdds.container.balancer.utilization.threshold", 1); + long maxSizeLeavingSource = 26; + ozoneConfiguration.setStorageSize( + "hdds.container.balancer.size.leaving.source.max", maxSizeLeavingSource, + StorageUnit.GB); + long moveTimeout = 90; + ozoneConfiguration.setTimeDuration("hdds.container.balancer.move.timeout", + moveTimeout, TimeUnit.MINUTES); + long replicationTimeout = 60; + ozoneConfiguration.setTimeDuration( + "hdds.container.balancer.move.replication.timeout", + replicationTimeout, TimeUnit.MINUTES); ContainerBalancerConfiguration cbConf = ozoneConfiguration.getObject(ContainerBalancerConfiguration.class); Assertions.assertEquals(1, cbConf.getThreshold(), 0.001); - Assertions.assertEquals(26 * 1024 * 1024 * 1024L, + // Expected is 26 GB + Assertions.assertEquals(maxSizeLeavingSource * 1024 * 1024 * 1024, cbConf.getMaxSizeLeavingSource()); - - Assertions.assertEquals(30 * 60 * 1000, - cbConf.getMoveTimeout().toMillis()); + Assertions.assertEquals(moveTimeout, cbConf.getMoveTimeout().toMinutes()); + Assertions.assertEquals(replicationTimeout, + cbConf.getMoveReplicationTimeout().toMinutes()); } @Test diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java index a55888fe8396..62b5b701f1cb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java @@ -311,6 +311,22 @@ public void testReplicationCommandFails() throws Exception { MoveManager.MoveResult.FAIL_UNEXPECTED_ERROR, res.get()); } + @Test + public void testDeleteCommandFails() throws Exception { + CompletableFuture res = setupSuccessfulMove(); + + Mockito.doThrow(new ContainerNotFoundException("test")) + .when(containerManager).getContainer(any(ContainerID.class)); + + ContainerReplicaOp op = new ContainerReplicaOp( + ADD, tgt, 0, clock.millis() + 1000); + moveManager.opCompleted(op, containerInfo.containerID(), false); + + MoveManager.MoveResult moveResult = res.get(); + Assert.assertEquals(MoveManager.MoveResult.FAIL_UNEXPECTED_ERROR, + moveResult); + } + @Test public void testSuccessfulMove() throws Exception { CompletableFuture res = setupSuccessfulMove(); @@ -320,7 +336,7 @@ public void testSuccessfulMove() throws Exception { moveManager.opCompleted(op, containerInfo.containerID(), false); Mockito.verify(replicationManager).sendDeleteCommand( - eq(containerInfo), eq(0), eq(src), eq(true)); + eq(containerInfo), eq(0), eq(src), eq(true), anyLong(), anyLong()); op = new ContainerReplicaOp( DELETE, src, 0, clock.millis() + 1000); @@ -358,7 +374,7 @@ public void testSuccessfulMoveNonZeroRepIndex() throws Exception { Mockito.verify(replicationManager).sendDeleteCommand( eq(containerInfo), eq(srcReplica.getReplicaIndex()), eq(src), - eq(true)); + eq(true), anyLong(), anyLong()); op = new ContainerReplicaOp( DELETE, src, srcReplica.getReplicaIndex(), clock.millis() + 1000); @@ -390,7 +406,7 @@ public void testMoveTimeoutOnDelete() throws Exception { moveManager.opCompleted(op, containerInfo.containerID(), false); Mockito.verify(replicationManager).sendDeleteCommand( - eq(containerInfo), eq(0), eq(src), eq(true)); + eq(containerInfo), eq(0), eq(src), eq(true), anyLong(), anyLong()); op = new ContainerReplicaOp( DELETE, src, 0, clock.millis() + 1000);