diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java index 49a10058866f..4283f1813182 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java @@ -25,7 +25,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -69,8 +68,7 @@ public final class MoveManager implements private final ContainerManager containerManager; private final Clock clock; - private final Map, MoveDataNodePair>> pendingMoves = + private final Map pendingMoves = new ConcurrentHashMap<>(); public MoveManager(final ReplicationManager replicationManager, @@ -83,8 +81,7 @@ public MoveManager(final ReplicationManager replicationManager, /** * get all the pending move operations. */ - public Map, MoveDataNodePair>> getPendingMove() { + public Map getPendingMove() { return pendingMoves; } @@ -98,10 +95,9 @@ void resetState() { * @param cid Container id to which the move option is finished */ private void completeMove(final ContainerID cid, final MoveResult mr) { - Pair, MoveDataNodePair> move = - pendingMoves.remove(cid); + MoveOperation move = pendingMoves.remove(cid); if (move != null) { - CompletableFuture future = move.getLeft(); + CompletableFuture future = move.getResult(); if (future != null && mr != null) { // when we know the future is null, and we want to complete // the move , then we set mr to null. @@ -125,9 +121,8 @@ private void completeMove(final ContainerID cid, final MoveResult mr) { private void startMove( final ContainerInfo containerInfo, final DatanodeDetails src, final DatanodeDetails tgt, final CompletableFuture ret) { - Pair, MoveDataNodePair> move = - pendingMoves.putIfAbsent(containerInfo.containerID(), - Pair.of(ret, new MoveDataNodePair(src, tgt))); + MoveOperation move = pendingMoves.putIfAbsent(containerInfo.containerID(), + new MoveOperation(ret, new MoveDataNodePair(src, tgt))); if (move == null) { // A move for this container did not exist, so send a replicate command try { @@ -264,10 +259,9 @@ CompletableFuture move( */ private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp, ContainerID containerID) { - Pair, MoveDataNodePair> pair = - pendingMoves.get(containerID); - if (pair != null) { - MoveDataNodePair mdnp = pair.getRight(); + MoveOperation move = pendingMoves.get(containerID); + if (move != null) { + MoveDataNodePair mdnp = move.getMoveDataNodePair(); PendingOpType opType = containerReplicaOp.getOpType(); DatanodeDetails dn = containerReplicaOp.getTarget(); if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) { @@ -278,7 +272,7 @@ private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp, LOG.warn("Failed to handle successful Add for container {} being " + "moved from source {} to target {}.", containerID, mdnp.getSrc(), mdnp.getTgt(), e); - pair.getLeft().complete(MoveResult.FAIL_UNEXPECTED_ERROR); + move.getResult().complete(MoveResult.FAIL_UNEXPECTED_ERROR); } } else if ( opType.equals(PendingOpType.DELETE) && mdnp.getSrc().equals(dn)) { @@ -295,10 +289,9 @@ private void notifyContainerOpCompleted(ContainerReplicaOp containerReplicaOp, */ private void notifyContainerOpExpired(ContainerReplicaOp containerReplicaOp, ContainerID containerID) { - Pair, MoveDataNodePair> pair = - pendingMoves.get(containerID); + MoveOperation pair = pendingMoves.get(containerID); if (pair != null) { - MoveDataNodePair mdnp = pair.getRight(); + MoveDataNodePair mdnp = pair.getMoveDataNodePair(); PendingOpType opType = containerReplicaOp.getOpType(); DatanodeDetails dn = containerReplicaOp.getTarget(); if (opType.equals(PendingOpType.ADD) && mdnp.getTgt().equals(dn)) { @@ -314,12 +307,11 @@ private void handleSuccessfulAdd(final ContainerID cid) throws ContainerNotFoundException, ContainerReplicaNotFoundException, NodeNotFoundException, NotLeaderException { - Pair, MoveDataNodePair> pair = - pendingMoves.get(cid); - if (pair == null) { + MoveOperation moveOp = pendingMoves.get(cid); + if (moveOp == null) { return; } - MoveDataNodePair movePair = pair.getRight(); + MoveDataNodePair movePair = moveOp.getMoveDataNodePair(); final DatanodeDetails src = movePair.getSrc(); final DatanodeDetails tgt = movePair.getTgt(); LOG.debug("Handling successful addition of Container {} from" + @@ -356,7 +348,7 @@ private void handleSuccessfulAdd(final ContainerID cid) if (healthResult.getHealthState() == ContainerHealthResult.HealthState.HEALTHY) { - sendDeleteCommand(containerInfo, src); + sendDeleteCommand(containerInfo, src, moveOp.getMoveStartTime()); } else { LOG.info("Cannot remove source replica as the container health would " + "be {}", healthResult.getHealthState()); @@ -403,6 +395,7 @@ private void sendReplicateCommand( long now = clock.millis(); replicationManager.sendLowPriorityReplicateContainerCommand(containerInfo, replicaIndex, src, tgt, now + replicationTimeout); + pendingMoves.get(containerInfo.containerID()).setMoveStartTime(now); } /** @@ -411,17 +404,17 @@ private void sendReplicateCommand( * * @param containerInfo Container to be deleted * @param datanode The datanode on which the replica should be deleted + * @param moveStartTime The time at which the replicate command for the container was scheduled */ private void sendDeleteCommand( - final ContainerInfo containerInfo, final DatanodeDetails datanode) + final ContainerInfo containerInfo, final DatanodeDetails datanode, + long moveStartTime) throws ContainerReplicaNotFoundException, ContainerNotFoundException, NotLeaderException { int replicaIndex = getContainerReplicaIndex( containerInfo.containerID(), datanode); - long deleteTimeout = moveTimeout - replicationTimeout; - long now = clock.millis(); replicationManager.sendDeleteCommand( - containerInfo, replicaIndex, datanode, true, now + deleteTimeout); + containerInfo, replicaIndex, datanode, true, moveStartTime + moveTimeout); } private int getContainerReplicaIndex( @@ -454,6 +447,45 @@ void setReplicationTimeout(long replicationTimeout) { this.replicationTimeout = replicationTimeout; } + /** + * All details about a move operation. + */ + static class MoveOperation { + private CompletableFuture result; + private MoveDataNodePair moveDataNodePair; + private long moveStartTime; + + MoveOperation(CompletableFuture result, MoveDataNodePair srcTgt) { + this.result = result; + this.moveDataNodePair = srcTgt; + } + + public CompletableFuture getResult() { + return result; + } + + public MoveDataNodePair getMoveDataNodePair() { + return moveDataNodePair; + } + + public long getMoveStartTime() { + return moveStartTime; + } + + public void setResult( + CompletableFuture result) { + this.result = result; + } + + public void setMoveDataNodePair(MoveDataNodePair srcTgt) { + this.moveDataNodePair = srcTgt; + } + + public void setMoveStartTime(long time) { + this.moveStartTime = time; + } + } + /** * Various move return results. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java index 2f5df49e1607..b141fd82753e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java @@ -38,6 +38,7 @@ import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD; import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyLong; @@ -48,6 +49,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -78,6 +80,7 @@ import org.apache.ozone.test.TestClock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; /** * Tests for the MoveManager class. @@ -496,6 +499,54 @@ public void testMoveCompleteFutureReplicasUnhealthy() throws Exception { .sendDeleteCommand(eq(containerInfo), eq(0), eq(src), eq(true)); } + @Test + public void testDeleteNotSentWithExpirationTimeInPast() throws Exception { + containerInfo = ReplicationTestUtil.createContainer( + HddsProtos.LifeCycleState.CLOSED, new ECReplicationConfig(3, 2)); + setupMocks(); + long moveTimeout = 55 * 60 * 1000, replicationTimeout = 50 * 60 * 1000; + moveManager.setMoveTimeout(moveTimeout); + moveManager.setReplicationTimeout(replicationTimeout); + + replicas.addAll(ReplicationTestUtil + .createReplicas(containerInfo.containerID(), 1, 2, 3, 4, 5)); + Iterator iterator = replicas.iterator(); + ContainerReplica srcReplica = iterator.next(); + src = srcReplica.getDatanodeDetails(); + tgt = MockDatanodeDetails.randomDatanodeDetails(); + nodes.put(src, NodeStatus.inServiceHealthy()); + nodes.put(tgt, NodeStatus.inServiceHealthy()); + + CompletableFuture res = + moveManager.move(containerInfo.containerID(), src, tgt); + ArgumentCaptor longCaptorReplicate = ArgumentCaptor.forClass(Long.class); + verify(replicationManager).sendLowPriorityReplicateContainerCommand( + eq(containerInfo), eq(srcReplica.getReplicaIndex()), eq(src), + eq(tgt), longCaptorReplicate.capture()); + + ContainerReplicaOp op = new ContainerReplicaOp( + ADD, tgt, srcReplica.getReplicaIndex(), null, clock.millis() + 1000); + moveManager.opCompleted(op, containerInfo.containerID(), false); + ArgumentCaptor longCaptorDelete = ArgumentCaptor.forClass(Long.class); + verify(replicationManager).sendDeleteCommand( + eq(containerInfo), eq(srcReplica.getReplicaIndex()), eq(src), + eq(true), longCaptorDelete.capture()); + + // verify that command is sent with deadline as (moveStartTime + moveTimeout) + // moveStartTime can be calculated as (expirationTime set for replication - replicationTimeout) + assertEquals(longCaptorReplicate.getValue() - replicationTimeout + moveTimeout, longCaptorDelete.getValue()); + // replicationManager sends a datanode command with the deadline as + // (scmDeadlineEpochMs - rmConf.getDatanodeTimeoutOffset()). The offset is 6 minutes by default. + // For the datanode deadline to not be in the past, the below condition is checked. + assertTrue((longCaptorDelete.getValue() - Duration.ofMinutes(6).toMillis()) > clock.millis()); + + op = new ContainerReplicaOp( + DELETE, src, srcReplica.getReplicaIndex(), null, clock.millis() + 1000); + moveManager.opCompleted(op, containerInfo.containerID(), false); + MoveManager.MoveResult finalResult = res.get(); + assertEquals(COMPLETED, finalResult); + } + private CompletableFuture setupSuccessfulMove() throws Exception { replicas.addAll(ReplicationTestUtil