diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java index fc1ddffb835..f46cd466a7c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; @@ -44,6 +45,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -68,6 +70,7 @@ public class ContainerBalancerTask implements Runnable { private NodeManager nodeManager; private ContainerManager containerManager; private ReplicationManager replicationManager; + private MoveManager moveManager; private OzoneConfiguration ozoneConfiguration; private ContainerBalancer containerBalancer; private final SCMContext scmContext; @@ -131,6 +134,7 @@ public ContainerBalancerTask(StorageContainerManager scm, this.nodeManager = scm.getScmNodeManager(); this.containerManager = scm.getContainerManager(); this.replicationManager = scm.getReplicationManager(); + this.moveManager = new MoveManager(replicationManager, containerManager); this.ozoneConfiguration = scm.getConfiguration(); this.containerBalancer = containerBalancer; this.config = config; @@ -592,7 +596,7 @@ private void checkIterationMoveResults() { LOG.warn("Container balancer is interrupted"); Thread.currentThread().interrupt(); } catch (TimeoutException e) { - long timeoutCounts = cancelAndCountPendingMoves(); + long timeoutCounts = cancelMovesThatExceedTimeoutDuration(); LOG.warn("{} Container moves are canceled.", timeoutCounts); metrics.incrementNumContainerMovesTimeoutInLatestIteration(timeoutCounts); } catch (ExecutionException e) { @@ -621,18 +625,42 @@ private void checkIterationMoveResults() { metrics.getNumContainerMovesCompletedInLatestIteration()); } - private long cancelAndCountPendingMoves() { - return moveSelectionToFutureMap.entrySet().stream() - .filter(entry -> !entry.getValue().isDone()) - .peek(entry -> { - LOG.warn("Container move timeout for container {} from source {}" + - " to target {}.", - entry.getKey().getContainerID(), - containerToSourceMap.get(entry.getKey().getContainerID()) - .getUuidString(), - entry.getKey().getTargetNode().getUuidString()); - entry.getValue().cancel(true); - }).count(); + /** + * Cancels container moves that are not yet done. Note that if a move + * command has already been sent out to a Datanode, we don't yet have the + * capability to cancel it. However, those commands in the DN should time out + * if they haven't been processed yet. + * + * @return number of moves that did not complete (timed out) and were + * cancelled. + */ + private long cancelMovesThatExceedTimeoutDuration() { + Set>> + entries = moveSelectionToFutureMap.entrySet(); + Iterator>> + iterator = entries.iterator(); + + int numCancelled = 0; + // iterate through all moves and cancel ones that aren't done yet + while (iterator.hasNext()) { + Map.Entry> + entry = iterator.next(); + if (!entry.getValue().isDone()) { + LOG.warn("Container move timed out for container {} from source {}" + + " to target {}.", entry.getKey().getContainerID(), + containerToSourceMap.get(entry.getKey().getContainerID()) + .getUuidString(), + entry.getKey().getTargetNode().getUuidString()); + + entry.getValue().cancel(true); + numCancelled += 1; + } + } + + return numCancelled; } /** @@ -742,13 +770,13 @@ private boolean adaptOnReachingIterationLimits() { } /** - * Asks {@link ReplicationManager} to move the specified container from - * source to target. + * Asks {@link ReplicationManager} or {@link MoveManager} to move the + * specified container from source to target. * * @param source the source datanode * @param moveSelection the selected container to move and target datanode * @return false if an exception occurred or the move completed with a - * result other than ReplicationManager.MoveResult.COMPLETED. Returns true + * result other than MoveManager.MoveResult.COMPLETED. Returns true * if the move completed with MoveResult.COMPLETED or move is not yet done */ private boolean moveContainer(DatanodeDetails source, @@ -757,48 +785,62 @@ private boolean moveContainer(DatanodeDetails source, CompletableFuture future; try { ContainerInfo containerInfo = containerManager.getContainer(containerID); - future = replicationManager - .move(containerID, source, moveSelection.getTargetNode()) - .whenComplete((result, ex) -> { - - metrics.incrementCurrentIterationContainerMoveMetric(result, 1); - if (ex != null) { - LOG.info("Container move for container {} from source {} to " + - "target {} failed with exceptions {}", - containerID.toString(), - source.getUuidString(), - moveSelection.getTargetNode().getUuidString(), ex); - metrics.incrementNumContainerMovesFailedInLatestIteration(1); - } else { - if (result == MoveManager.MoveResult.COMPLETED) { - sizeActuallyMovedInLatestIteration += - containerInfo.getUsedBytes(); - if (LOG.isDebugEnabled()) { - LOG.debug("Container move completed for container {} from " + - "source {} to target {}", containerID, - source.getUuidString(), - moveSelection.getTargetNode().getUuidString()); - } - } else { - LOG.warn( - "Container move for container {} from source {} to target" + - " {} failed: {}", - moveSelection.getContainerID(), source.getUuidString(), - moveSelection.getTargetNode().getUuidString(), result); - } - } - }); + + /* + If LegacyReplicationManager is enabled, ReplicationManager will + redirect to it. Otherwise, use MoveManager. + */ + if (ozoneConfiguration.getBoolean("hdds.scm.replication.enable.legacy", + true)) { + future = replicationManager + .move(containerID, source, moveSelection.getTargetNode()); + } else { + future = moveManager.move(containerID, source, + moveSelection.getTargetNode()); + } + + future = future.whenComplete((result, ex) -> { + metrics.incrementCurrentIterationContainerMoveMetric(result, 1); + if (ex != null) { + LOG.info("Container move for container {} from source {} to " + + "target {} failed with exceptions.", + containerID.toString(), + source.getUuidString(), + moveSelection.getTargetNode().getUuidString(), ex); + metrics.incrementNumContainerMovesFailedInLatestIteration(1); + } else { + if (result == MoveManager.MoveResult.COMPLETED) { + sizeActuallyMovedInLatestIteration += + containerInfo.getUsedBytes(); + LOG.debug("Container move completed for container {} from " + + "source {} to target {}", containerID, + source.getUuidString(), + moveSelection.getTargetNode().getUuidString()); + } else { + LOG.warn( + "Container move for container {} from source {} to target" + + " {} failed: {}", + moveSelection.getContainerID(), source.getUuidString(), + moveSelection.getTargetNode().getUuidString(), result); + } + } + }); } catch (ContainerNotFoundException e) { LOG.warn("Could not find Container {} for container move", containerID, e); metrics.incrementNumContainerMovesFailedInLatestIteration(1); return false; - } catch (NodeNotFoundException | TimeoutException e) { + } catch (NodeNotFoundException | TimeoutException | + ContainerReplicaNotFoundException e) { LOG.warn("Container move failed for container {}", containerID, e); metrics.incrementNumContainerMovesFailedInLatestIteration(1); return false; } + /* + If the future hasn't failed yet, put it in moveSelectionToFutureMap for + processing later + */ if (future.isDone()) { if (future.isCompletedExceptionally()) { return false; @@ -955,6 +997,7 @@ private void incSizeSelectedForMoving(DatanodeDetails source, * Resets some variables and metrics for this iteration. */ private void resetState() { + moveManager.resetState(); this.clusterCapacity = 0L; this.clusterRemaining = 0L; this.overUtilizedNodes.clear(); @@ -1045,6 +1088,16 @@ void setConfig(ContainerBalancerConfiguration config) { this.config = config; } + @VisibleForTesting + void setMoveManager(MoveManager moveManager) { + this.moveManager = moveManager; + } + + @VisibleForTesting + void setTaskStatus(Status taskStatus) { + this.taskStatus = taskStatus; + } + public Status getBalancerStatus() { return taskStatus; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java index 47bdcf3b5a4..dcba0657abf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/MoveManager.java @@ -46,7 +46,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeoutException; /** * A class which schedules, tracks and completes moves scheduled by the @@ -125,10 +124,10 @@ public enum MoveResult { private volatile boolean running = false; public MoveManager(final ReplicationManager replicationManager, - final Clock clock, final ContainerManager containerManager) { + final ContainerManager containerManager) { this.replicationManager = replicationManager; this.containerManager = containerManager; - this.clock = clock; + this.clock = replicationManager.getClock(); } /** @@ -139,6 +138,10 @@ Pair, MoveDataNodePair>> getPendingMove() { return pendingMoves; } + void resetState() { + pendingMoves.clear(); + } + /** * completeMove a move action for a given container. * @@ -219,7 +222,7 @@ public void onNotLeader() { public CompletableFuture move( ContainerID cid, DatanodeDetails src, DatanodeDetails tgt) throws ContainerNotFoundException, NodeNotFoundException, - TimeoutException, ContainerReplicaNotFoundException { + ContainerReplicaNotFoundException { CompletableFuture ret = new CompletableFuture<>(); if (!running) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index b6eeac20655..ab673c63671 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -1195,6 +1195,9 @@ public ReplicationManagerConfiguration getConfig() { return rmConf; } + public Clock getClock() { + return clock; + } /** * following functions will be refactored in a separate jira. @@ -1230,8 +1233,12 @@ public LegacyReplicationManager getLegacyReplicationManager() { } public boolean isContainerReplicatingOrDeleting(ContainerID containerID) { - return legacyReplicationManager - .isContainerReplicatingOrDeleting(containerID); + if (rmConf.isLegacyEnabled()) { + return legacyReplicationManager + .isContainerReplicatingOrDeleting(containerID); + } else { + return !getPendingReplicationOps(containerID).isEmpty(); + } } private ECContainerReplicaCount getECContainerReplicaCount( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java index 50d3c294388..f00c293406e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java @@ -20,10 +20,13 @@ import com.google.protobuf.ByteString; import java.io.IOException; +import java.time.Clock; +import java.time.ZoneId; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager; @@ -82,6 +85,12 @@ public void setup() throws IOException, NodeNotFoundException, when(scm.getStatefulServiceStateManager()).thenReturn(serviceStateManager); when(scm.getSCMServiceManager()).thenReturn(mock(SCMServiceManager.class)); + ReplicationManager replicationManager = + Mockito.mock(ReplicationManager.class); + when(scm.getReplicationManager()).thenReturn(replicationManager); + when(replicationManager.getClock()).thenReturn( + Clock.system(ZoneId.systemDefault())); + /* When StatefulServiceStateManager#saveConfiguration is called, save to in-memory serviceToConfigMap and read from same. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java index 1435bc30054..1a72b06bb62 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java @@ -60,7 +60,9 @@ import org.slf4j.event.Level; import java.io.IOException; +import java.time.Clock; import java.time.Duration; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -72,7 +74,9 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import static org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; @@ -86,6 +90,7 @@ public class TestContainerBalancerTask { LoggerFactory.getLogger(TestContainerBalancerTask.class); private ReplicationManager replicationManager; + private MoveManager moveManager; private ContainerManager containerManager; private ContainerBalancerTask containerBalancerTask; private MockNodeManager mockNodeManager; @@ -122,6 +127,7 @@ public void setup() throws IOException, NodeNotFoundException, replicationManager = Mockito.mock(ReplicationManager.class); serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class); SCMServiceManager scmServiceManager = Mockito.mock(SCMServiceManager.class); + moveManager = Mockito.mock(MoveManager.class); // these configs will usually be specified in each test balancerConfiguration = @@ -158,6 +164,9 @@ public void setup() throws IOException, NodeNotFoundException, .thenReturn(CompletableFuture. completedFuture(MoveManager.MoveResult.COMPLETED)); + Mockito.when(replicationManager.getClock()) + .thenReturn(Clock.system(ZoneId.systemDefault())); + when(containerManager.getContainerReplicas(Mockito.any(ContainerID.class))) .thenAnswer(invocationOnMock -> { ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0]; @@ -209,7 +218,13 @@ public void setup() throws IOException, NodeNotFoundException, .register(Mockito.any(SCMService.class)); ContainerBalancer sb = new ContainerBalancer(scm); containerBalancerTask = new ContainerBalancerTask(scm, 0, sb, - sb.getMetrics(), null); + sb.getMetrics(), balancerConfiguration); + + containerBalancerTask.setMoveManager(moveManager); + Mockito.when(moveManager.move(any(ContainerID.class), + any(DatanodeDetails.class), any(DatanodeDetails.class))) + .thenReturn(CompletableFuture.completedFuture( + MoveManager.MoveResult.COMPLETED)); } @Test @@ -263,6 +278,27 @@ public void testCalculationOfUtilization() { } } + @Test + public void testBalancerWithMoveManager() + throws IllegalContainerBalancerStateException, IOException, + InvalidContainerBalancerConfigurationException, TimeoutException, + NodeNotFoundException { + ReplicationManagerConfiguration rmConf = + conf.getObject(ReplicationManagerConfiguration.class); + rmConf.setEnableLegacy(false); + conf.setFromObject(rmConf); + + startBalancer(balancerConfiguration); + Mockito.verify(moveManager, atLeastOnce()) + .move(Mockito.any(ContainerID.class), + Mockito.any(DatanodeDetails.class), + Mockito.any(DatanodeDetails.class)); + + Mockito.verify(replicationManager, times(0)) + .move(Mockito.any(ContainerID.class), Mockito.any( + DatanodeDetails.class), Mockito.any(DatanodeDetails.class)); + } + /** * Checks whether the list of unBalanced nodes is empty when the cluster is * balanced. @@ -480,6 +516,18 @@ move should equal number of unique, selected containers (from Mockito.verify(replicationManager, times(numContainers)) .move(any(ContainerID.class), any(DatanodeDetails.class), any(DatanodeDetails.class)); + + /* + Try the same test by disabling LegacyReplicationManager so that + MoveManager is used. + */ + conf.setBoolean("hdds.scm.replication.enable.legacy", false); + startBalancer(balancerConfiguration); + stopBalancer(); + numContainers = containerBalancerTask.getContainerToTargetMap().size(); + Mockito.verify(moveManager, times(numContainers)) + .move(any(ContainerID.class), any(DatanodeDetails.class), + any(DatanodeDetails.class)); } @Test @@ -730,8 +778,22 @@ public void checkIterationResult() ContainerBalancerTask.IterationResult.ITERATION_COMPLETED, containerBalancerTask.getIterationResult()); stopBalancer(); + + /* + Try the same but use MoveManager for container move instead of legacy RM. + */ + conf.setBoolean("hdds.scm.replication.enable.legacy", false); + startBalancer(balancerConfiguration); + Assertions.assertEquals( + ContainerBalancerTask.IterationResult.ITERATION_COMPLETED, + containerBalancerTask.getIterationResult()); + stopBalancer(); } + /** + * Tests the situation where some container moves time out because they + * take longer than "move.timeout". + */ @Test public void checkIterationResultTimeout() throws NodeNotFoundException, IOException, @@ -742,7 +804,8 @@ public void checkIterationResultTimeout() Mockito.when(replicationManager.move(Mockito.any(ContainerID.class), Mockito.any(DatanodeDetails.class), Mockito.any(DatanodeDetails.class))) - .thenReturn(genCompletableFuture(200), genCompletableFuture(2000)); + .thenReturn(genCompletableFuture(10)) + .thenAnswer(invocation -> genCompletableFuture(2000)); balancerConfiguration.setThreshold(10); balancerConfiguration.setIterations(1); @@ -767,6 +830,28 @@ public void checkIterationResultTimeout() .getNumContainerMovesTimeoutInLatestIteration() > 1); stopBalancer(); + /* + Test the same but use MoveManager instead of LegacyReplicationManager. + The first move being 10ms falls within the timeout duration of 500ms. It + should be successful. The rest should fail. + */ + conf.setBoolean("hdds.scm.replication.enable.legacy", false); + Mockito.when(moveManager.move(Mockito.any(ContainerID.class), + Mockito.any(DatanodeDetails.class), + Mockito.any(DatanodeDetails.class))) + .thenReturn(genCompletableFuture(10)) + .thenAnswer(invocation -> genCompletableFuture(2000)); + + startBalancer(balancerConfiguration); + Assertions.assertEquals( + ContainerBalancerTask.IterationResult.ITERATION_COMPLETED, + containerBalancerTask.getIterationResult()); + Assertions.assertEquals(1, + containerBalancerTask.getMetrics() + .getNumContainerMovesCompletedInLatestIteration()); + Assertions.assertTrue(containerBalancerTask.getMetrics() + .getNumContainerMovesTimeoutInLatestIteration() > 1); + stopBalancer(); } @Test @@ -796,6 +881,23 @@ public void checkIterationResultTimeoutFromReplicationManager() Assertions.assertTrue(containerBalancerTask.getMetrics() .getNumContainerMovesTimeoutInLatestIteration() > 0); + Assertions.assertEquals(0, containerBalancerTask.getMetrics() + .getNumContainerMovesCompletedInLatestIteration()); + stopBalancer(); + + /* + Try the same test with MoveManager instead of LegacyReplicationManager. + */ + Mockito.when(moveManager.move(Mockito.any(ContainerID.class), + Mockito.any(DatanodeDetails.class), + Mockito.any(DatanodeDetails.class))) + .thenReturn(future).thenAnswer(invocation -> future2); + + startBalancer(balancerConfiguration); + Assertions.assertTrue(containerBalancerTask.getMetrics() + .getNumContainerMovesTimeoutInLatestIteration() > 0); + Assertions.assertEquals(0, containerBalancerTask.getMetrics() + .getNumContainerMovesCompletedInLatestIteration()); stopBalancer(); } @@ -812,15 +914,15 @@ public void checkIterationResultException() Mockito.when(replicationManager.move(Mockito.any(ContainerID.class), Mockito.any(DatanodeDetails.class), Mockito.any(DatanodeDetails.class))) - .thenThrow(new ContainerNotFoundException("Test Container not found"), - new NodeNotFoundException("Test Node not found")) - .thenReturn(future).thenReturn(CompletableFuture.supplyAsync(() -> { + .thenReturn(CompletableFuture.supplyAsync(() -> { try { - Thread.sleep(200); - } catch (Exception ex) { + Thread.sleep(1); + } catch (Exception ignored) { } - throw new RuntimeException("Throw"); - })); + throw new RuntimeException("Runtime Exception after doing work"); + })) + .thenThrow(new ContainerNotFoundException("Test Container not found")) + .thenReturn(future); balancerConfiguration.setThreshold(10); balancerConfiguration.setIterations(1); @@ -839,6 +941,30 @@ public void checkIterationResultException() .getNumContainerMovesFailed() >= 3); stopBalancer(); + /* + Try the same test but with MoveManager instead of ReplicationManager. + */ + Mockito.when(moveManager.move(Mockito.any(ContainerID.class), + Mockito.any(DatanodeDetails.class), + Mockito.any(DatanodeDetails.class))) + .thenReturn(CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(1); + } catch (Exception ignored) { + } + throw new RuntimeException("Runtime Exception after doing work"); + })) + .thenThrow(new ContainerNotFoundException("Test Container not found")) + .thenReturn(future); + + startBalancer(balancerConfiguration); + Assertions.assertEquals( + ContainerBalancerTask.IterationResult.ITERATION_COMPLETED, + containerBalancerTask.getIterationResult()); + Assertions.assertTrue( + containerBalancerTask.getMetrics() + .getNumContainerMovesFailed() >= 3); + stopBalancer(); } /** @@ -1019,6 +1145,7 @@ private void startBalancer(ContainerBalancerConfiguration config) throws IllegalContainerBalancerStateException, IOException, InvalidContainerBalancerConfigurationException, TimeoutException { containerBalancerTask.setConfig(config); + containerBalancerTask.setTaskStatus(ContainerBalancerTask.Status.RUNNING); containerBalancerTask.run(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java index 03a1380f6e4..722d85306be 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java @@ -107,9 +107,9 @@ private void setupMocks() throws ContainerNotFoundException, .thenReturn(pendingOps); Mockito.when(replicationManager.getContainerReplicationHealth(any(), any())) .thenReturn(new ContainerHealthResult.HealthyResult(containerInfo)); + Mockito.when(replicationManager.getClock()).thenReturn(clock); - moveManager = new MoveManager( - replicationManager, clock, containerManager); + moveManager = new MoveManager(replicationManager, containerManager); moveManager.onLeaderReady(); }