From 13fb07200a750c319f4357b23efa80c28fd5b2d2 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 17 Apr 2023 09:02:49 +0200 Subject: [PATCH 1/7] Use isPush() from config --- .../container/replication/ECMisReplicationHandler.java | 5 ++--- .../scm/container/replication/MisReplicationHandler.java | 9 +++------ .../replication/RatisMisReplicationHandler.java | 5 ++--- .../scm/container/replication/ReplicationManager.java | 4 ++-- .../replication/TestECMisReplicationHandler.java | 2 +- .../container/replication/TestMisReplicationHandler.java | 7 ++++++- .../replication/TestRatisMisReplicationHandler.java | 2 +- 7 files changed, 17 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java index 3c6c3a27f167..20151b707394 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java @@ -35,9 +35,8 @@ public class ECMisReplicationHandler extends MisReplicationHandler { public ECMisReplicationHandler( PlacementPolicy containerPlacement, - ConfigurationSource conf, ReplicationManager replicationManager, - boolean push) { - super(containerPlacement, conf, replicationManager, push); + ConfigurationSource conf, ReplicationManager replicationManager) { + super(containerPlacement, conf, replicationManager); } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java index abf7afe309b3..06ed99e0f9d7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java @@ -57,18 +57,15 @@ public abstract class MisReplicationHandler implements private final PlacementPolicy containerPlacement; private final long currentContainerSize; private final ReplicationManager replicationManager; - private boolean push; public MisReplicationHandler( final PlacementPolicy containerPlacement, - final ConfigurationSource conf, ReplicationManager replicationManager, - final boolean push) { + final ConfigurationSource conf, ReplicationManager replicationManager) { this.containerPlacement = containerPlacement; this.currentContainerSize = (long) conf.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); this.replicationManager = replicationManager; - this.push = push; } protected abstract ContainerReplicaCount getContainerReplicaCount( @@ -118,7 +115,7 @@ private Set filterSources(Set replicas) { protected abstract ReplicateContainerCommand updateReplicateCommand( ReplicateContainerCommand command, ContainerReplica replica); - private int sendReplicateCommands( + protected int sendReplicateCommands( ContainerInfo containerInfo, Set replicasToBeReplicated, List targetDns) @@ -132,7 +129,7 @@ private int sendReplicateCommands( long containerID = containerInfo.getContainerID(); DatanodeDetails source = replica.getDatanodeDetails(); DatanodeDetails target = targetDns.get(datanodeIdx); - if (push) { + if (replicationManager.getConfig().isPush()) { replicationManager.sendThrottledReplicationCommand(containerInfo, Collections.singletonList(source), target, replica.getReplicaIndex()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java index 7e9f0f48d1e2..8e798c643809 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java @@ -37,9 +37,8 @@ public class RatisMisReplicationHandler extends MisReplicationHandler { public RatisMisReplicationHandler( PlacementPolicy containerPlacement, - ConfigurationSource conf, ReplicationManager replicationManager, - boolean push) { - super(containerPlacement, conf, replicationManager, push); + ConfigurationSource conf, ReplicationManager replicationManager) { + super(containerPlacement, conf, replicationManager); } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 3da81d5c2740..312c9b0edbec 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -249,13 +249,13 @@ public ReplicationManager(final ConfigurationSource conf, ecOverReplicationHandler = new ECOverReplicationHandler(ecContainerPlacement, this); ecMisReplicationHandler = new ECMisReplicationHandler(ecContainerPlacement, - conf, this, rmConf.isPush()); + conf, this); ratisUnderReplicationHandler = new RatisUnderReplicationHandler( ratisContainerPlacement, conf, this); ratisOverReplicationHandler = new RatisOverReplicationHandler(ratisContainerPlacement, this); ratisMisReplicationHandler = new RatisMisReplicationHandler( - ratisContainerPlacement, conf, this, rmConf.isPush()); + ratisContainerPlacement, conf, this); underReplicatedProcessor = new UnderReplicatedProcessor(this, rmConf.getUnderReplicatedInterval()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java index a0d8f3220738..4f4117d4e29b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java @@ -207,6 +207,6 @@ protected MisReplicationHandler getMisreplicationHandler( PlacementPolicy placementPolicy, OzoneConfiguration conf, ReplicationManager replicationManager) { return new ECMisReplicationHandler(placementPolicy, conf, - replicationManager, true); + replicationManager); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java index fac35fd395b0..4115882812fd 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.net.NodeSchema; import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; @@ -72,6 +73,7 @@ public abstract class TestMisReplicationHandler { protected void setup(ReplicationConfig repConfig) throws NodeNotFoundException, CommandTargetOverloadedException, NotLeaderException { + conf = SCMTestUtils.getConf(); replicationManager = Mockito.mock(ReplicationManager.class); Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class))) @@ -80,6 +82,10 @@ protected void setup(ReplicationConfig repConfig) return new NodeStatus(dd.getPersistedOpState(), HddsProtos.NodeState.HEALTHY, 0); }); + ReplicationManagerConfiguration rmConf = + conf.getObject(ReplicationManagerConfiguration.class); + Mockito.when(replicationManager.getConfig()) + .thenReturn(rmConf); commandsSent = new HashSet<>(); ReplicationTestUtil.mockRMSendDatanodeCommand( @@ -87,7 +93,6 @@ protected void setup(ReplicationConfig repConfig) ReplicationTestUtil.mockRMSendThrottleReplicateCommand( replicationManager, commandsSent); - conf = SCMTestUtils.getConf(); container = ReplicationTestUtil .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig); NodeSchema[] schemas = diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java index 7996896f07e9..0a312c6705d4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java @@ -194,6 +194,6 @@ protected MisReplicationHandler getMisreplicationHandler( PlacementPolicy placementPolicy, OzoneConfiguration conf, ReplicationManager replicationManager) { return new RatisMisReplicationHandler(placementPolicy, conf, - replicationManager, true); + replicationManager); } } From 0b236e5b07db19ada048dfce4af69ba7d1841c3a Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 17 Apr 2023 09:05:59 +0200 Subject: [PATCH 2/7] Push down sendReplicateCommands() --- .../replication/ECMisReplicationHandler.java | 34 ++++++++++++++++++ .../replication/MisReplicationHandler.java | 32 ++++------------- .../RatisMisReplicationHandler.java | 35 +++++++++++++++++++ 3 files changed, 75 insertions(+), 26 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java index 20151b707394..94d2bcd8067f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java @@ -18,13 +18,16 @@ package org.apache.hadoop.hdds.scm.container.replication; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -63,4 +66,35 @@ protected ReplicateContainerCommand updateReplicateCommand( } + @Override + protected int sendReplicateCommands( + ContainerInfo containerInfo, + Set replicasToBeReplicated, + List targetDns) + throws CommandTargetOverloadedException, NotLeaderException { + ReplicationManager replicationManager = getReplicationManager(); + int commandsSent = 0; + int datanodeIdx = 0; + for (ContainerReplica replica : replicasToBeReplicated) { + if (datanodeIdx == targetDns.size()) { + break; + } + long containerID = containerInfo.getContainerID(); + DatanodeDetails source = replica.getDatanodeDetails(); + DatanodeDetails target = targetDns.get(datanodeIdx); + if (replicationManager.getConfig().isPush()) { + replicationManager.sendThrottledReplicationCommand(containerInfo, + Collections.singletonList(source), target, + replica.getReplicaIndex()); + } else { + ReplicateContainerCommand cmd = ReplicateContainerCommand + .fromSources(containerID, Collections.singletonList(source)); + updateReplicateCommand(cmd, replica); + replicationManager.sendDatanodeCommand(cmd, containerInfo, target); + } + commandsSent++; + datanodeIdx += 1; + } + return commandsSent; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java index 06ed99e0f9d7..d91704735975 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java @@ -68,6 +68,10 @@ public MisReplicationHandler( this.replicationManager = replicationManager; } + protected ReplicationManager getReplicationManager() { + return replicationManager; + } + protected abstract ContainerReplicaCount getContainerReplicaCount( ContainerInfo containerInfo, Set replicas, List pendingOps, int remainingMaintenanceRedundancy) @@ -115,35 +119,11 @@ private Set filterSources(Set replicas) { protected abstract ReplicateContainerCommand updateReplicateCommand( ReplicateContainerCommand command, ContainerReplica replica); - protected int sendReplicateCommands( + protected abstract int sendReplicateCommands( ContainerInfo containerInfo, Set replicasToBeReplicated, List targetDns) - throws CommandTargetOverloadedException, NotLeaderException { - int commandsSent = 0; - int datanodeIdx = 0; - for (ContainerReplica replica : replicasToBeReplicated) { - if (datanodeIdx == targetDns.size()) { - break; - } - long containerID = containerInfo.getContainerID(); - DatanodeDetails source = replica.getDatanodeDetails(); - DatanodeDetails target = targetDns.get(datanodeIdx); - if (replicationManager.getConfig().isPush()) { - replicationManager.sendThrottledReplicationCommand(containerInfo, - Collections.singletonList(source), target, - replica.getReplicaIndex()); - } else { - ReplicateContainerCommand cmd = ReplicateContainerCommand - .fromSources(containerID, Collections.singletonList(source)); - updateReplicateCommand(cmd, replica); - replicationManager.sendDatanodeCommand(cmd, containerInfo, target); - } - commandsSent++; - datanodeIdx += 1; - } - return commandsSent; - } + throws CommandTargetOverloadedException, NotLeaderException; @Override public int processAndSendCommands( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java index 8e798c643809..0847865c6f45 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java @@ -19,13 +19,16 @@ package org.apache.hadoop.hdds.scm.container.replication; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -61,4 +64,36 @@ protected ReplicateContainerCommand updateReplicateCommand( ReplicateContainerCommand command, ContainerReplica replica) { return command; } + + @Override + protected int sendReplicateCommands( + ContainerInfo containerInfo, + Set replicasToBeReplicated, + List targetDns) + throws CommandTargetOverloadedException, NotLeaderException { + ReplicationManager replicationManager = getReplicationManager(); + int commandsSent = 0; + int datanodeIdx = 0; + for (ContainerReplica replica : replicasToBeReplicated) { + if (datanodeIdx == targetDns.size()) { + break; + } + long containerID = containerInfo.getContainerID(); + DatanodeDetails source = replica.getDatanodeDetails(); + DatanodeDetails target = targetDns.get(datanodeIdx); + if (replicationManager.getConfig().isPush()) { + replicationManager.sendThrottledReplicationCommand(containerInfo, + Collections.singletonList(source), target, + replica.getReplicaIndex()); + } else { + ReplicateContainerCommand cmd = ReplicateContainerCommand + .fromSources(containerID, Collections.singletonList(source)); + updateReplicateCommand(cmd, replica); + replicationManager.sendDatanodeCommand(cmd, containerInfo, target); + } + commandsSent++; + datanodeIdx += 1; + } + return commandsSent; + } } From 50e1868d9842149d24745b41ab821ad8fb488ce9 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 17 Apr 2023 09:20:19 +0200 Subject: [PATCH 3/7] Let RatisMisReplicationHandler use all sources --- .../RatisMisReplicationHandler.java | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java index 0847865c6f45..b2f12d9d7f81 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java @@ -28,9 +28,9 @@ import org.apache.ratis.protocol.exceptions.NotLeaderException; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * Handles the Ratis mis replication processing and forming the respective SCM @@ -72,28 +72,24 @@ protected int sendReplicateCommands( List targetDns) throws CommandTargetOverloadedException, NotLeaderException { ReplicationManager replicationManager = getReplicationManager(); + long containerID = containerInfo.getContainerID(); + List sources = replicasToBeReplicated.stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + int commandsSent = 0; - int datanodeIdx = 0; - for (ContainerReplica replica : replicasToBeReplicated) { - if (datanodeIdx == targetDns.size()) { - break; - } - long containerID = containerInfo.getContainerID(); - DatanodeDetails source = replica.getDatanodeDetails(); - DatanodeDetails target = targetDns.get(datanodeIdx); + for (DatanodeDetails target : targetDns) { if (replicationManager.getConfig().isPush()) { replicationManager.sendThrottledReplicationCommand(containerInfo, - Collections.singletonList(source), target, - replica.getReplicaIndex()); + sources, target, 0); } else { ReplicateContainerCommand cmd = ReplicateContainerCommand - .fromSources(containerID, Collections.singletonList(source)); - updateReplicateCommand(cmd, replica); + .fromSources(containerID, sources); replicationManager.sendDatanodeCommand(cmd, containerInfo, target); } commandsSent++; - datanodeIdx += 1; } + return commandsSent; } } From 4cf35c9bf7e60c0ce6926f35e0d2e7d65d9dc8d3 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 17 Apr 2023 09:51:43 +0200 Subject: [PATCH 4/7] Fix checkstyle --- .../hdds/scm/container/replication/MisReplicationHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java index d91704735975..841f6aa7d14f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java @@ -59,8 +59,8 @@ public abstract class MisReplicationHandler implements private final ReplicationManager replicationManager; public MisReplicationHandler( - final PlacementPolicy containerPlacement, - final ConfigurationSource conf, ReplicationManager replicationManager) { + final PlacementPolicy containerPlacement, + final ConfigurationSource conf, ReplicationManager replicationManager) { this.containerPlacement = containerPlacement; this.currentContainerSize = (long) conf.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, From b80224ea7c9a21a3cfea6bde80dec187460a3aba Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 17 Apr 2023 13:40:28 +0200 Subject: [PATCH 5/7] updateReplicateCommand() is no longer needed --- .../replication/ECMisReplicationHandler.java | 14 +++----------- .../replication/MisReplicationHandler.java | 4 ---- .../replication/RatisMisReplicationHandler.java | 6 ------ 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java index 94d2bcd8067f..ebb06c0b1d43 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java @@ -56,16 +56,6 @@ protected ContainerReplicaCount getContainerReplicaCount( remainingMaintenanceRedundancy); } - @Override - protected ReplicateContainerCommand updateReplicateCommand( - ReplicateContainerCommand command, ContainerReplica replica) { - // For EC containers, we need to track the replica index which is - // to be replicated, so add it to the command. - command.setReplicaIndex(replica.getReplicaIndex()); - return command; - } - - @Override protected int sendReplicateCommands( ContainerInfo containerInfo, @@ -89,7 +79,9 @@ protected int sendReplicateCommands( } else { ReplicateContainerCommand cmd = ReplicateContainerCommand .fromSources(containerID, Collections.singletonList(source)); - updateReplicateCommand(cmd, replica); + // For EC containers, we need to track the replica index which is + // to be replicated, so add it to the command. + cmd.setReplicaIndex(replica.getReplicaIndex()); replicationManager.sendDatanodeCommand(cmd, containerInfo, target); } commandsSent++; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java index 841f6aa7d14f..cf9d71d5bc9e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException; -import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,9 +115,6 @@ private Set filterSources(Set replicas) { .collect(Collectors.toSet()); } - protected abstract ReplicateContainerCommand updateReplicateCommand( - ReplicateContainerCommand command, ContainerReplica replica); - protected abstract int sendReplicateCommands( ContainerInfo containerInfo, Set replicasToBeReplicated, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java index b2f12d9d7f81..fa853d28b84f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java @@ -59,12 +59,6 @@ protected ContainerReplicaCount getContainerReplicaCount( minHealthyForMaintenance, true); } - @Override - protected ReplicateContainerCommand updateReplicateCommand( - ReplicateContainerCommand command, ContainerReplica replica) { - return command; - } - @Override protected int sendReplicateCommands( ContainerInfo containerInfo, From c104916f97c4cb48af796349ccbf7daab919d5b7 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 17 Apr 2023 17:57:00 +0200 Subject: [PATCH 6/7] Pass all available sources --- .../scm/container/replication/ECMisReplicationHandler.java | 2 +- .../scm/container/replication/MisReplicationHandler.java | 7 +++++-- .../container/replication/RatisMisReplicationHandler.java | 6 +----- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java index ebb06c0b1d43..79228bfb67f8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java @@ -60,7 +60,7 @@ protected ContainerReplicaCount getContainerReplicaCount( protected int sendReplicateCommands( ContainerInfo containerInfo, Set replicasToBeReplicated, - List targetDns) + List sources, List targetDns) throws CommandTargetOverloadedException, NotLeaderException { ReplicationManager replicationManager = getReplicationManager(); int commandsSent = 0; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java index cf9d71d5bc9e..51ca902485a7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java @@ -118,7 +118,7 @@ private Set filterSources(Set replicas) { protected abstract int sendReplicateCommands( ContainerInfo containerInfo, Set replicasToBeReplicated, - List targetDns) + List sources, List targetDns) throws CommandTargetOverloadedException, NotLeaderException; @Override @@ -171,9 +171,12 @@ public int processAndSendCommands( int requiredNodes = replicasToBeReplicated.size(); List targetDatanodes = getTargetDatanodes(usedDns, excludedDns, container, requiredNodes); + List availableSources = sources.stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); int count = sendReplicateCommands(container, replicasToBeReplicated, - targetDatanodes); + availableSources, targetDatanodes); int found = targetDatanodes.size(); if (found < requiredNodes) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java index fa853d28b84f..f6633c1372c5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java @@ -30,7 +30,6 @@ import java.io.IOException; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; /** * Handles the Ratis mis replication processing and forming the respective SCM @@ -63,13 +62,10 @@ protected ContainerReplicaCount getContainerReplicaCount( protected int sendReplicateCommands( ContainerInfo containerInfo, Set replicasToBeReplicated, - List targetDns) + List sources, List targetDns) throws CommandTargetOverloadedException, NotLeaderException { ReplicationManager replicationManager = getReplicationManager(); long containerID = containerInfo.getContainerID(); - List sources = replicasToBeReplicated.stream() - .map(ContainerReplica::getDatanodeDetails) - .collect(Collectors.toList()); int commandsSent = 0; for (DatanodeDetails target : targetDns) { From 4c0d351dd14da1c0ecb473690a1a3201d3818d76 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Tue, 18 Apr 2023 09:10:19 +0200 Subject: [PATCH 7/7] Fix unit test --- .../TestECMisReplicationHandler.java | 10 ++++++++++ .../replication/TestMisReplicationHandler.java | 17 +++++++++++++---- .../TestRatisMisReplicationHandler.java | 9 +++++++++ 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java index 4f4117d4e29b..f8d119ab8734 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; import org.apache.hadoop.hdds.scm.PlacementPolicy; @@ -38,6 +39,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import static java.util.Collections.singletonList; @@ -209,4 +211,12 @@ protected MisReplicationHandler getMisreplicationHandler( return new ECMisReplicationHandler(placementPolicy, conf, replicationManager); } + + @Override + protected void assertReplicaIndex( + Map expectedReplicaIndexes, + DatanodeDetails sourceDatanode, int actualReplicaIndex) { + Assertions.assertEquals( + expectedReplicaIndexes.get(sourceDatanode), actualReplicaIndex); + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java index 4115882812fd..e36c7d88ce99 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java @@ -167,6 +167,11 @@ protected void testMisReplication(Set availableReplicas, return false; })); + Set sourceDns = sources.entrySet().stream() + .filter(Map.Entry::getValue) + .map(Map.Entry::getKey) + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toSet()); Set copy = sources.entrySet().stream() .filter(Map.Entry::getValue).limit(misreplicationCount) .map(Map.Entry::getKey).collect(Collectors.toSet()); @@ -192,7 +197,7 @@ protected void testMisReplication(Set availableReplicas, return targetNodes; }); } - Map copyReplicaIdxMap = copy.stream() + Map replicaIndexMap = copy.stream() .collect(Collectors.toMap(ContainerReplica::getDatanodeDetails, ContainerReplica::getReplicaIndex)); try { @@ -209,11 +214,15 @@ protected void testMisReplication(Set availableReplicas, container.getContainerID()); DatanodeDetails replicateSrcDn = pair.getKey(); DatanodeDetails target = replicateContainerCommand.getTargetDatanode(); - Assertions.assertTrue(copyReplicaIdxMap.containsKey(replicateSrcDn)); + Assertions.assertTrue(sourceDns.contains(replicateSrcDn)); Assertions.assertTrue(targetNodes.contains(target)); - Assertions.assertEquals(copyReplicaIdxMap.get(replicateSrcDn), - replicateContainerCommand.getReplicaIndex()); + int replicaIndex = replicateContainerCommand.getReplicaIndex(); + assertReplicaIndex(replicaIndexMap, replicateSrcDn, replicaIndex); } } } + + protected abstract void assertReplicaIndex( + Map expectedReplicaIndexes, + DatanodeDetails sourceDatanode, int actualReplicaIndex); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java index 0a312c6705d4..b02cb4380a18 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; @@ -39,6 +40,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; @@ -196,4 +198,11 @@ protected MisReplicationHandler getMisreplicationHandler( return new RatisMisReplicationHandler(placementPolicy, conf, replicationManager); } + + @Override + protected void assertReplicaIndex( + Map expectedReplicaIndexes, + DatanodeDetails sourceDatanode, int actualReplicaIndex) { + Assertions.assertEquals(0, actualReplicaIndex); + } }