diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java index 3c6c3a27f167..79228bfb67f8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java @@ -18,13 +18,16 @@ package org.apache.hadoop.hdds.scm.container.replication; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -35,9 +38,8 @@ public class ECMisReplicationHandler extends MisReplicationHandler { public ECMisReplicationHandler( PlacementPolicy containerPlacement, - ConfigurationSource conf, ReplicationManager replicationManager, - boolean push) { - super(containerPlacement, conf, replicationManager, push); + ConfigurationSource conf, ReplicationManager replicationManager) { + super(containerPlacement, conf, replicationManager); } @Override @@ -55,13 +57,36 @@ protected ContainerReplicaCount getContainerReplicaCount( } @Override - protected ReplicateContainerCommand updateReplicateCommand( - ReplicateContainerCommand command, ContainerReplica replica) { - // For EC containers, we need to track the replica index which is - // to be replicated, so add it to the command. - command.setReplicaIndex(replica.getReplicaIndex()); - return command; + protected int sendReplicateCommands( + ContainerInfo containerInfo, + Set replicasToBeReplicated, + List sources, List targetDns) + throws CommandTargetOverloadedException, NotLeaderException { + ReplicationManager replicationManager = getReplicationManager(); + int commandsSent = 0; + int datanodeIdx = 0; + for (ContainerReplica replica : replicasToBeReplicated) { + if (datanodeIdx == targetDns.size()) { + break; + } + long containerID = containerInfo.getContainerID(); + DatanodeDetails source = replica.getDatanodeDetails(); + DatanodeDetails target = targetDns.get(datanodeIdx); + if (replicationManager.getConfig().isPush()) { + replicationManager.sendThrottledReplicationCommand(containerInfo, + Collections.singletonList(source), target, + replica.getReplicaIndex()); + } else { + ReplicateContainerCommand cmd = ReplicateContainerCommand + .fromSources(containerID, Collections.singletonList(source)); + // For EC containers, we need to track the replica index which is + // to be replicated, so add it to the command. + cmd.setReplicaIndex(replica.getReplicaIndex()); + replicationManager.sendDatanodeCommand(cmd, containerInfo, target); + } + commandsSent++; + datanodeIdx += 1; + } + return commandsSent; } - - } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java index 4ab454955651..ba245dea3a02 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException; -import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,18 +55,19 @@ public abstract class MisReplicationHandler implements private final PlacementPolicy containerPlacement; private final long currentContainerSize; private final ReplicationManager replicationManager; - private boolean push; public MisReplicationHandler( - final PlacementPolicy containerPlacement, - final ConfigurationSource conf, ReplicationManager replicationManager, - final boolean push) { + final PlacementPolicy containerPlacement, + final ConfigurationSource conf, ReplicationManager replicationManager) { this.containerPlacement = containerPlacement; this.currentContainerSize = (long) conf.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); this.replicationManager = replicationManager; - this.push = push; + } + + protected ReplicationManager getReplicationManager() { + return replicationManager; } protected abstract ContainerReplicaCount getContainerReplicaCount( @@ -95,38 +95,11 @@ private Set filterSources(Set replicas) { .collect(Collectors.toSet()); } - protected abstract ReplicateContainerCommand updateReplicateCommand( - ReplicateContainerCommand command, ContainerReplica replica); - - private int sendReplicateCommands( + protected abstract int sendReplicateCommands( ContainerInfo containerInfo, Set replicasToBeReplicated, - List targetDns) - throws CommandTargetOverloadedException, NotLeaderException { - int commandsSent = 0; - int datanodeIdx = 0; - for (ContainerReplica replica : replicasToBeReplicated) { - if (datanodeIdx == targetDns.size()) { - break; - } - long containerID = containerInfo.getContainerID(); - DatanodeDetails source = replica.getDatanodeDetails(); - DatanodeDetails target = targetDns.get(datanodeIdx); - if (push) { - replicationManager.sendThrottledReplicationCommand(containerInfo, - Collections.singletonList(source), target, - replica.getReplicaIndex()); - } else { - ReplicateContainerCommand cmd = ReplicateContainerCommand - .fromSources(containerID, Collections.singletonList(source)); - updateReplicateCommand(cmd, replica); - replicationManager.sendDatanodeCommand(cmd, containerInfo, target); - } - commandsSent++; - datanodeIdx += 1; - } - return commandsSent; - } + List sources, List targetDns) + throws CommandTargetOverloadedException, NotLeaderException; @Override public int processAndSendCommands( @@ -180,9 +153,12 @@ public int processAndSendCommands( List targetDatanodes = ReplicationManagerUtil .getTargetDatanodes(containerPlacement, requiredNodes, usedDns, excludedDns, currentContainerSize, container); + List availableSources = sources.stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); int count = sendReplicateCommands(container, replicasToBeReplicated, - targetDatanodes); + availableSources, targetDatanodes); int found = targetDatanodes.size(); if (found < requiredNodes) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java index 7e9f0f48d1e2..f6633c1372c5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java @@ -19,11 +19,13 @@ package org.apache.hadoop.hdds.scm.container.replication; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import java.io.IOException; import java.util.List; @@ -37,9 +39,8 @@ public class RatisMisReplicationHandler extends MisReplicationHandler { public RatisMisReplicationHandler( PlacementPolicy containerPlacement, - ConfigurationSource conf, ReplicationManager replicationManager, - boolean push) { - super(containerPlacement, conf, replicationManager, push); + ConfigurationSource conf, ReplicationManager replicationManager) { + super(containerPlacement, conf, replicationManager); } @Override @@ -58,8 +59,27 @@ protected ContainerReplicaCount getContainerReplicaCount( } @Override - protected ReplicateContainerCommand updateReplicateCommand( - ReplicateContainerCommand command, ContainerReplica replica) { - return command; + protected int sendReplicateCommands( + ContainerInfo containerInfo, + Set replicasToBeReplicated, + List sources, List targetDns) + throws CommandTargetOverloadedException, NotLeaderException { + ReplicationManager replicationManager = getReplicationManager(); + long containerID = containerInfo.getContainerID(); + + int commandsSent = 0; + for (DatanodeDetails target : targetDns) { + if (replicationManager.getConfig().isPush()) { + replicationManager.sendThrottledReplicationCommand(containerInfo, + sources, target, 0); + } else { + ReplicateContainerCommand cmd = ReplicateContainerCommand + .fromSources(containerID, sources); + replicationManager.sendDatanodeCommand(cmd, containerInfo, target); + } + commandsSent++; + } + + return commandsSent; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 3da81d5c2740..312c9b0edbec 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -249,13 +249,13 @@ public ReplicationManager(final ConfigurationSource conf, ecOverReplicationHandler = new ECOverReplicationHandler(ecContainerPlacement, this); ecMisReplicationHandler = new ECMisReplicationHandler(ecContainerPlacement, - conf, this, rmConf.isPush()); + conf, this); ratisUnderReplicationHandler = new RatisUnderReplicationHandler( ratisContainerPlacement, conf, this); ratisOverReplicationHandler = new RatisOverReplicationHandler(ratisContainerPlacement, this); ratisMisReplicationHandler = new RatisMisReplicationHandler( - ratisContainerPlacement, conf, this, rmConf.isPush()); + ratisContainerPlacement, conf, this); underReplicatedProcessor = new UnderReplicatedProcessor(this, rmConf.getUnderReplicatedInterval()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java index a0d8f3220738..f8d119ab8734 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; import org.apache.hadoop.hdds.scm.PlacementPolicy; @@ -38,6 +39,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import static java.util.Collections.singletonList; @@ -207,6 +209,14 @@ protected MisReplicationHandler getMisreplicationHandler( PlacementPolicy placementPolicy, OzoneConfiguration conf, ReplicationManager replicationManager) { return new ECMisReplicationHandler(placementPolicy, conf, - replicationManager, true); + replicationManager); + } + + @Override + protected void assertReplicaIndex( + Map expectedReplicaIndexes, + DatanodeDetails sourceDatanode, int actualReplicaIndex) { + Assertions.assertEquals( + expectedReplicaIndexes.get(sourceDatanode), actualReplicaIndex); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java index fac35fd395b0..e36c7d88ce99 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.net.NodeSchema; import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; @@ -72,6 +73,7 @@ public abstract class TestMisReplicationHandler { protected void setup(ReplicationConfig repConfig) throws NodeNotFoundException, CommandTargetOverloadedException, NotLeaderException { + conf = SCMTestUtils.getConf(); replicationManager = Mockito.mock(ReplicationManager.class); Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class))) @@ -80,6 +82,10 @@ protected void setup(ReplicationConfig repConfig) return new NodeStatus(dd.getPersistedOpState(), HddsProtos.NodeState.HEALTHY, 0); }); + ReplicationManagerConfiguration rmConf = + conf.getObject(ReplicationManagerConfiguration.class); + Mockito.when(replicationManager.getConfig()) + .thenReturn(rmConf); commandsSent = new HashSet<>(); ReplicationTestUtil.mockRMSendDatanodeCommand( @@ -87,7 +93,6 @@ protected void setup(ReplicationConfig repConfig) ReplicationTestUtil.mockRMSendThrottleReplicateCommand( replicationManager, commandsSent); - conf = SCMTestUtils.getConf(); container = ReplicationTestUtil .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig); NodeSchema[] schemas = @@ -162,6 +167,11 @@ protected void testMisReplication(Set availableReplicas, return false; })); + Set sourceDns = sources.entrySet().stream() + .filter(Map.Entry::getValue) + .map(Map.Entry::getKey) + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toSet()); Set copy = sources.entrySet().stream() .filter(Map.Entry::getValue).limit(misreplicationCount) .map(Map.Entry::getKey).collect(Collectors.toSet()); @@ -187,7 +197,7 @@ protected void testMisReplication(Set availableReplicas, return targetNodes; }); } - Map copyReplicaIdxMap = copy.stream() + Map replicaIndexMap = copy.stream() .collect(Collectors.toMap(ContainerReplica::getDatanodeDetails, ContainerReplica::getReplicaIndex)); try { @@ -204,11 +214,15 @@ protected void testMisReplication(Set availableReplicas, container.getContainerID()); DatanodeDetails replicateSrcDn = pair.getKey(); DatanodeDetails target = replicateContainerCommand.getTargetDatanode(); - Assertions.assertTrue(copyReplicaIdxMap.containsKey(replicateSrcDn)); + Assertions.assertTrue(sourceDns.contains(replicateSrcDn)); Assertions.assertTrue(targetNodes.contains(target)); - Assertions.assertEquals(copyReplicaIdxMap.get(replicateSrcDn), - replicateContainerCommand.getReplicaIndex()); + int replicaIndex = replicateContainerCommand.getReplicaIndex(); + assertReplicaIndex(replicaIndexMap, replicateSrcDn, replicaIndex); } } } + + protected abstract void assertReplicaIndex( + Map expectedReplicaIndexes, + DatanodeDetails sourceDatanode, int actualReplicaIndex); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java index 7996896f07e9..b02cb4380a18 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; @@ -39,6 +40,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; @@ -194,6 +196,13 @@ protected MisReplicationHandler getMisreplicationHandler( PlacementPolicy placementPolicy, OzoneConfiguration conf, ReplicationManager replicationManager) { return new RatisMisReplicationHandler(placementPolicy, conf, - replicationManager, true); + replicationManager); + } + + @Override + protected void assertReplicaIndex( + Map expectedReplicaIndexes, + DatanodeDetails sourceDatanode, int actualReplicaIndex) { + Assertions.assertEquals(0, actualReplicaIndex); } }