diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index b33f0ef352a7..3da81d5c2740 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -712,8 +712,26 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, getMetrics().incrEcReconstructionCmdsSentTotal(); } else if (cmd.getType() == Type.replicateContainerCommand) { ReplicateContainerCommand rcc = (ReplicateContainerCommand) cmd; - containerReplicaPendingOps.scheduleAddReplica(containerInfo.containerID(), - targetDatanode, rcc.getReplicaIndex(), scmDeadlineEpochMs); + + if (rcc.getTargetDatanode() == null) { + /* + This means the target will pull a replica from a source, so the + op's target Datanode should be the Datanode this command is being + sent to. + */ + containerReplicaPendingOps.scheduleAddReplica( + containerInfo.containerID(), + targetDatanode, rcc.getReplicaIndex(), scmDeadlineEpochMs); + } else { + /* + This means the source will push replica to the target, so the op's + target Datanode should be the Datanode the replica will be pushed to. + */ + containerReplicaPendingOps.scheduleAddReplica( + containerInfo.containerID(), + rcc.getTargetDatanode(), rcc.getReplicaIndex(), scmDeadlineEpochMs); + } + if (rcc.getReplicaIndex() > 0) { getMetrics().incrEcReplicationCmdsSentTotal(); } else if (rcc.getReplicaIndex() == 0) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index c0221681abd8..d9020216d3ed 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -858,6 +858,49 @@ public void testSendDatanodeReplicateCommand() throws NotLeaderException { .getNumReplicationCmdsSent()); } + /** + * Tests that a ReplicateContainerCommand that is sent from source to + * target has the correct deadline and that ContainerReplicaOp for + * replica ADD is created correctly. + */ + @Test + public void testReplicateContainerCommandToTarget() + throws NotLeaderException { + // create a closed EC container + ECReplicationConfig ecRepConfig = new ECReplicationConfig(3, 2); + ContainerInfo containerInfo = + ReplicationTestUtil.createContainerInfo(ecRepConfig, 1, + HddsProtos.LifeCycleState.CLOSED, 10, 20); + + // command will be pushed from source to target + DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails source = MockDatanodeDetails.randomDatanodeDetails(); + ReplicateContainerCommand command = ReplicateContainerCommand.toTarget( + containerInfo.getContainerID(), target); + command.setReplicaIndex(1); + replicationManager.sendDatanodeCommand(command, containerInfo, source); + + // check the command's deadline + ReplicationManager.ReplicationManagerConfiguration rmConf = configuration + .getObject(ReplicationManager.ReplicationManagerConfiguration.class); + long expectedDeadline = clock.millis() + rmConf.getEventTimeout() - + rmConf.getDatanodeTimeoutOffset(); + Assert.assertEquals(expectedDeadline, command.getDeadline()); + + List ops = containerReplicaPendingOps.getPendingOps( + containerInfo.containerID()); + Mockito.verify(nodeManager).addDatanodeCommand(any(), any()); + Assertions.assertEquals(1, ops.size()); + Assertions.assertEquals(ContainerReplicaOp.PendingOpType.ADD, + ops.get(0).getOpType()); + Assertions.assertEquals(target, ops.get(0).getTarget()); + Assertions.assertEquals(1, ops.get(0).getReplicaIndex()); + Assertions.assertEquals(1, replicationManager.getMetrics() + .getEcReplicationCmdsSentTotal()); + Assertions.assertEquals(0, replicationManager.getMetrics() + .getNumReplicationCmdsSent()); + } + @Test public void testSendLowPriorityReplicateContainerCommand() throws NotLeaderException {