Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;

Expand All @@ -35,9 +38,8 @@
public class ECMisReplicationHandler extends MisReplicationHandler {
public ECMisReplicationHandler(
PlacementPolicy<ContainerReplica> containerPlacement,
ConfigurationSource conf, ReplicationManager replicationManager,
boolean push) {
super(containerPlacement, conf, replicationManager, push);
ConfigurationSource conf, ReplicationManager replicationManager) {
super(containerPlacement, conf, replicationManager);
}

@Override
Expand All @@ -55,13 +57,36 @@ protected ContainerReplicaCount getContainerReplicaCount(
}

@Override
protected ReplicateContainerCommand updateReplicateCommand(
ReplicateContainerCommand command, ContainerReplica replica) {
// For EC containers, we need to track the replica index which is
// to be replicated, so add it to the command.
command.setReplicaIndex(replica.getReplicaIndex());
return command;
protected int sendReplicateCommands(
ContainerInfo containerInfo,
Set<ContainerReplica> replicasToBeReplicated,
List<DatanodeDetails> sources, List<DatanodeDetails> targetDns)
throws CommandTargetOverloadedException, NotLeaderException {
ReplicationManager replicationManager = getReplicationManager();
int commandsSent = 0;
int datanodeIdx = 0;
for (ContainerReplica replica : replicasToBeReplicated) {
if (datanodeIdx == targetDns.size()) {
break;
}
long containerID = containerInfo.getContainerID();
DatanodeDetails source = replica.getDatanodeDetails();
DatanodeDetails target = targetDns.get(datanodeIdx);
if (replicationManager.getConfig().isPush()) {
replicationManager.sendThrottledReplicationCommand(containerInfo,
Collections.singletonList(source), target,
replica.getReplicaIndex());
} else {
ReplicateContainerCommand cmd = ReplicateContainerCommand
.fromSources(containerID, Collections.singletonList(source));
// For EC containers, we need to track the replica index which is
// to be replicated, so add it to the command.
cmd.setReplicaIndex(replica.getReplicaIndex());
replicationManager.sendDatanodeCommand(cmd, containerInfo, target);
}
commandsSent++;
datanodeIdx += 1;
}
return commandsSent;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -56,18 +55,19 @@ public abstract class MisReplicationHandler implements
private final PlacementPolicy<ContainerReplica> containerPlacement;
private final long currentContainerSize;
private final ReplicationManager replicationManager;
private boolean push;

public MisReplicationHandler(
final PlacementPolicy<ContainerReplica> containerPlacement,
final ConfigurationSource conf, ReplicationManager replicationManager,
final boolean push) {
final PlacementPolicy<ContainerReplica> containerPlacement,
final ConfigurationSource conf, ReplicationManager replicationManager) {
this.containerPlacement = containerPlacement;
this.currentContainerSize = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
this.replicationManager = replicationManager;
this.push = push;
}

protected ReplicationManager getReplicationManager() {
return replicationManager;
}

protected abstract ContainerReplicaCount getContainerReplicaCount(
Expand Down Expand Up @@ -95,38 +95,11 @@ private Set<ContainerReplica> filterSources(Set<ContainerReplica> replicas) {
.collect(Collectors.toSet());
}

protected abstract ReplicateContainerCommand updateReplicateCommand(
ReplicateContainerCommand command, ContainerReplica replica);

private int sendReplicateCommands(
protected abstract int sendReplicateCommands(
ContainerInfo containerInfo,
Set<ContainerReplica> replicasToBeReplicated,
List<DatanodeDetails> targetDns)
throws CommandTargetOverloadedException, NotLeaderException {
int commandsSent = 0;
int datanodeIdx = 0;
for (ContainerReplica replica : replicasToBeReplicated) {
if (datanodeIdx == targetDns.size()) {
break;
}
long containerID = containerInfo.getContainerID();
DatanodeDetails source = replica.getDatanodeDetails();
DatanodeDetails target = targetDns.get(datanodeIdx);
if (push) {
replicationManager.sendThrottledReplicationCommand(containerInfo,
Collections.singletonList(source), target,
replica.getReplicaIndex());
} else {
ReplicateContainerCommand cmd = ReplicateContainerCommand
.fromSources(containerID, Collections.singletonList(source));
updateReplicateCommand(cmd, replica);
replicationManager.sendDatanodeCommand(cmd, containerInfo, target);
}
commandsSent++;
datanodeIdx += 1;
}
return commandsSent;
}
List<DatanodeDetails> sources, List<DatanodeDetails> targetDns)
throws CommandTargetOverloadedException, NotLeaderException;

@Override
public int processAndSendCommands(
Expand Down Expand Up @@ -180,9 +153,12 @@ public int processAndSendCommands(
List<DatanodeDetails> targetDatanodes = ReplicationManagerUtil
.getTargetDatanodes(containerPlacement, requiredNodes, usedDns,
excludedDns, currentContainerSize, container);
List<DatanodeDetails> availableSources = sources.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());

int count = sendReplicateCommands(container, replicasToBeReplicated,
targetDatanodes);
availableSources, targetDatanodes);

int found = targetDatanodes.size();
if (found < requiredNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;

import java.io.IOException;
import java.util.List;
Expand All @@ -37,9 +39,8 @@ public class RatisMisReplicationHandler extends MisReplicationHandler {

public RatisMisReplicationHandler(
PlacementPolicy<ContainerReplica> containerPlacement,
ConfigurationSource conf, ReplicationManager replicationManager,
boolean push) {
super(containerPlacement, conf, replicationManager, push);
ConfigurationSource conf, ReplicationManager replicationManager) {
super(containerPlacement, conf, replicationManager);
}

@Override
Expand All @@ -58,8 +59,27 @@ protected ContainerReplicaCount getContainerReplicaCount(
}

@Override
protected ReplicateContainerCommand updateReplicateCommand(
ReplicateContainerCommand command, ContainerReplica replica) {
return command;
protected int sendReplicateCommands(
ContainerInfo containerInfo,
Set<ContainerReplica> replicasToBeReplicated,
List<DatanodeDetails> sources, List<DatanodeDetails> targetDns)
throws CommandTargetOverloadedException, NotLeaderException {
ReplicationManager replicationManager = getReplicationManager();
long containerID = containerInfo.getContainerID();

int commandsSent = 0;
for (DatanodeDetails target : targetDns) {
if (replicationManager.getConfig().isPush()) {
replicationManager.sendThrottledReplicationCommand(containerInfo,
sources, target, 0);
} else {
ReplicateContainerCommand cmd = ReplicateContainerCommand
.fromSources(containerID, sources);
replicationManager.sendDatanodeCommand(cmd, containerInfo, target);
}
commandsSent++;
}

return commandsSent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ public ReplicationManager(final ConfigurationSource conf,
ecOverReplicationHandler =
new ECOverReplicationHandler(ecContainerPlacement, this);
ecMisReplicationHandler = new ECMisReplicationHandler(ecContainerPlacement,
conf, this, rmConf.isPush());
conf, this);
ratisUnderReplicationHandler = new RatisUnderReplicationHandler(
ratisContainerPlacement, conf, this);
ratisOverReplicationHandler =
new RatisOverReplicationHandler(ratisContainerPlacement, this);
ratisMisReplicationHandler = new RatisMisReplicationHandler(
ratisContainerPlacement, conf, this, rmConf.isPush());
ratisContainerPlacement, conf, this);
underReplicatedProcessor =
new UnderReplicatedProcessor(this,
rmConf.getUnderReplicatedInterval());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
Expand All @@ -38,6 +39,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -207,6 +209,14 @@ protected MisReplicationHandler getMisreplicationHandler(
PlacementPolicy placementPolicy, OzoneConfiguration conf,
ReplicationManager replicationManager) {
return new ECMisReplicationHandler(placementPolicy, conf,
replicationManager, true);
replicationManager);
}

@Override
protected void assertReplicaIndex(
Map<DatanodeDetails, Integer> expectedReplicaIndexes,
DatanodeDetails sourceDatanode, int actualReplicaIndex) {
Assertions.assertEquals(
expectedReplicaIndexes.get(sourceDatanode), actualReplicaIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
Expand Down Expand Up @@ -72,6 +73,7 @@ public abstract class TestMisReplicationHandler {
protected void setup(ReplicationConfig repConfig)
throws NodeNotFoundException, CommandTargetOverloadedException,
NotLeaderException {
conf = SCMTestUtils.getConf();

replicationManager = Mockito.mock(ReplicationManager.class);
Mockito.when(replicationManager.getNodeStatus(any(DatanodeDetails.class)))
Expand All @@ -80,14 +82,17 @@ protected void setup(ReplicationConfig repConfig)
return new NodeStatus(dd.getPersistedOpState(),
HddsProtos.NodeState.HEALTHY, 0);
});
ReplicationManagerConfiguration rmConf =
conf.getObject(ReplicationManagerConfiguration.class);
Mockito.when(replicationManager.getConfig())
.thenReturn(rmConf);

commandsSent = new HashSet<>();
ReplicationTestUtil.mockRMSendDatanodeCommand(
replicationManager, commandsSent);
ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
replicationManager, commandsSent);

conf = SCMTestUtils.getConf();
container = ReplicationTestUtil
.createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
NodeSchema[] schemas =
Expand Down Expand Up @@ -162,6 +167,11 @@ protected void testMisReplication(Set<ContainerReplica> availableReplicas,
return false;
}));

Set<DatanodeDetails> sourceDns = sources.entrySet().stream()
.filter(Map.Entry::getValue)
.map(Map.Entry::getKey)
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toSet());
Set<ContainerReplica> copy = sources.entrySet().stream()
.filter(Map.Entry::getValue).limit(misreplicationCount)
.map(Map.Entry::getKey).collect(Collectors.toSet());
Expand All @@ -187,7 +197,7 @@ protected void testMisReplication(Set<ContainerReplica> availableReplicas,
return targetNodes;
});
}
Map<DatanodeDetails, Integer> copyReplicaIdxMap = copy.stream()
Map<DatanodeDetails, Integer> replicaIndexMap = copy.stream()
.collect(Collectors.toMap(ContainerReplica::getDatanodeDetails,
ContainerReplica::getReplicaIndex));
try {
Expand All @@ -204,11 +214,15 @@ protected void testMisReplication(Set<ContainerReplica> availableReplicas,
container.getContainerID());
DatanodeDetails replicateSrcDn = pair.getKey();
DatanodeDetails target = replicateContainerCommand.getTargetDatanode();
Assertions.assertTrue(copyReplicaIdxMap.containsKey(replicateSrcDn));
Assertions.assertTrue(sourceDns.contains(replicateSrcDn));
Assertions.assertTrue(targetNodes.contains(target));
Assertions.assertEquals(copyReplicaIdxMap.get(replicateSrcDn),
replicateContainerCommand.getReplicaIndex());
int replicaIndex = replicateContainerCommand.getReplicaIndex();
assertReplicaIndex(replicaIndexMap, replicateSrcDn, replicaIndex);
}
}
}

protected abstract void assertReplicaIndex(
Map<DatanodeDetails, Integer> expectedReplicaIndexes,
DatanodeDetails sourceDatanode, int actualReplicaIndex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
Expand All @@ -39,6 +40,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
Expand Down Expand Up @@ -194,6 +196,13 @@ protected MisReplicationHandler getMisreplicationHandler(
PlacementPolicy placementPolicy, OzoneConfiguration conf,
ReplicationManager replicationManager) {
return new RatisMisReplicationHandler(placementPolicy, conf,
replicationManager, true);
replicationManager);
}

@Override
protected void assertReplicaIndex(
Map<DatanodeDetails, Integer> expectedReplicaIndexes,
DatanodeDetails sourceDatanode, int actualReplicaIndex) {
Assertions.assertEquals(0, actualReplicaIndex);
}
}