Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,39 @@ protected int sendReplicateCommands(
ReplicationManager replicationManager = getReplicationManager();
int commandsSent = 0;
int datanodeIdx = 0;
CommandTargetOverloadedException overloadedException = null;
for (ContainerReplica replica : replicasToBeReplicated) {
if (datanodeIdx == targetDns.size()) {
break;
}
long containerID = containerInfo.getContainerID();
DatanodeDetails source = replica.getDatanodeDetails();
DatanodeDetails target = targetDns.get(datanodeIdx);
if (replicationManager.getConfig().isPush()) {
replicationManager.sendThrottledReplicationCommand(containerInfo,
Collections.singletonList(source), target,
replica.getReplicaIndex());
} else {
ReplicateContainerCommand cmd = ReplicateContainerCommand
.fromSources(containerID, Collections.singletonList(source));
// For EC containers, we need to track the replica index which is
// to be replicated, so add it to the command.
cmd.setReplicaIndex(replica.getReplicaIndex());
replicationManager.sendDatanodeCommand(cmd, containerInfo, target);
try {
if (replicationManager.getConfig().isPush()) {
replicationManager.sendThrottledReplicationCommand(containerInfo,
Collections.singletonList(source), target,
replica.getReplicaIndex());
} else {
ReplicateContainerCommand cmd = ReplicateContainerCommand
.fromSources(containerID, Collections.singletonList(source));
// For EC containers, we need to track the replica index which is
// to be replicated, so add it to the command.
cmd.setReplicaIndex(replica.getReplicaIndex());
replicationManager.sendDatanodeCommand(cmd, containerInfo, target);
}
commandsSent++;
} catch (CommandTargetOverloadedException e) {
LOG.debug("Unable to replicate container {} and index {} from {} to {}"
+ " because the source is overloaded",
containerID, replica.getReplicaIndex(), source, target);
overloadedException = e;
}
commandsSent++;
datanodeIdx += 1;
}
if (overloadedException != null) {
throw overloadedException;
}
return commandsSent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,25 +159,28 @@ public int processAndSendCommands(
.collect(Collectors.toList());

try {
InsufficientDatanodesException firstException = null;
IOException firstException = null;
try {
commandsSent += processMissingIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes);
} catch (InsufficientDatanodesException e) {
} catch (InsufficientDatanodesException
| CommandTargetOverloadedException e) {
firstException = e;
}
try {
commandsSent += processDecommissioningIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes);
} catch (InsufficientDatanodesException e) {
} catch (InsufficientDatanodesException
| CommandTargetOverloadedException e) {
if (firstException == null) {
firstException = e;
}
}
try {
commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources,
excludedNodes);
} catch (InsufficientDatanodesException e) {
} catch (InsufficientDatanodesException
| CommandTargetOverloadedException e) {
if (firstException == null) {
firstException = e;
}
Expand Down Expand Up @@ -313,6 +316,11 @@ private int processMissingIndexes(
sourceDatanodesWithIndex, selectedDatanodes,
int2byte(missingIndexes),
repConfig);
// This can throw a CommandTargetOverloadedException, but there is no
// point in retrying here. The sources we picked already have the
// overloaded nodes excluded, so we should not get an overloaded
// exception, but it could happen due to other threads adding work to
// the DNs. If it happens here, we just let the exception bubble up.
replicationManager.sendThrottledReconstructionCommand(
container, reconstructionCommand);
for (int i = 0; i < missingIndexes.size(); i++) {
Expand Down Expand Up @@ -363,6 +371,7 @@ private int processDecommissioningIndexes(
excludedNodes.addAll(selectedDatanodes);
Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
// In this case we need to do one to one copy.
CommandTargetOverloadedException overloadedException = null;
for (Integer decomIndex : decomIndexes) {
Pair<ContainerReplica, NodeStatus> source = sources.get(decomIndex);
if (source == null) {
Expand All @@ -380,9 +389,20 @@ private int processDecommissioningIndexes(
selectedDatanodes, excludedNodes, decomIndexes);
break;
}
createReplicateCommand(
container, iterator, sourceReplica, replicaCount);
commandsSent++;
try {
createReplicateCommand(
container, iterator, sourceReplica, replicaCount);
commandsSent++;
} catch (CommandTargetOverloadedException e) {
LOG.debug("Unable to send Replicate command for container {}" +
" index {} because the source node {} is overloaded.",
container.getContainerID(), sourceReplica.getReplicaIndex(),
sourceReplica.getDatanodeDetails());
overloadedException = e;
}
}
if (overloadedException != null) {
throw overloadedException;
}
}
if (selectedDatanodes.size() != decomIndexes.size()) {
Expand Down Expand Up @@ -432,6 +452,7 @@ private int processMaintenanceOnlyIndexes(
int commandsSent = 0;
// copy replica from source maintenance DN to a target DN

CommandTargetOverloadedException overloadedException = null;
for (Integer maintIndex : maintIndexes) {
if (additionalMaintenanceCopiesNeeded <= 0) {
break;
Expand All @@ -452,9 +473,21 @@ private int processMaintenanceOnlyIndexes(
targets, excludedNodes, maintIndexes);
break;
}
createReplicateCommand(container, iterator, sourceReplica, replicaCount);
commandsSent++;
additionalMaintenanceCopiesNeeded -= 1;
try {
createReplicateCommand(
container, iterator, sourceReplica, replicaCount);
commandsSent++;
additionalMaintenanceCopiesNeeded -= 1;
} catch (CommandTargetOverloadedException e) {
LOG.debug("Unable to send Replicate command for container {}" +
" index {} because the source node {} is overloaded.",
container.getContainerID(), sourceReplica.getReplicaIndex(),
sourceReplica.getDatanodeDetails());
overloadedException = e;
}
}
if (overloadedException != null) {
throw overloadedException;
}
if (targets.size() != maintIndexes.size()) {
LOG.debug("Insufficient nodes were returned from the placement policy" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
Expand Down Expand Up @@ -355,13 +356,21 @@ public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
* to the commandsSent set.
* @param mock Mock of ReplicationManager
* @param commandsSent Set to add the command to rather than sending it.
* @param throwOverloaded If the atomic boolean is true, throw a
* CommandTargetOverloadedException and set the boolean
* to false, instead of creating the replicate command.
* @throws NotLeaderException
* @throws CommandTargetOverloadedException
*/
public static void mockRMSendThrottleReplicateCommand(ReplicationManager mock,
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent,
AtomicBoolean throwOverloaded)
throws NotLeaderException, CommandTargetOverloadedException {
doAnswer((Answer<Void>) invocationOnMock -> {
if (throwOverloaded.get()) {
throwOverloaded.set(false);
throw new CommandTargetOverloadedException("Overloaded");
}
List<DatanodeDetails> sources = invocationOnMock.getArgument(1);
ContainerInfo containerInfo = invocationOnMock.getArgument(0);
ReplicateContainerCommand command = ReplicateContainerCommand
Expand All @@ -381,14 +390,22 @@ public static void mockRMSendThrottleReplicateCommand(ReplicationManager mock,
* created to the commandsSent set.
* @param mock Mock of ReplicationManager
* @param commandsSent Set to add the command to rather than sending it.
* @param throwOverloaded If the atomic boolean is true, throw a
* CommandTargetOverloadedException and set the boolean
* to false, instead of creating the replicate command.
* @throws NotLeaderException
* @throws CommandTargetOverloadedException
*/
public static void mockSendThrottledReconstructionCommand(
ReplicationManager mock,
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent)
Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent,
AtomicBoolean throwOverloaded)
throws NotLeaderException, CommandTargetOverloadedException {
doAnswer((Answer<Void>) invocationOnMock -> {
if (throwOverloaded.get()) {
throwOverloaded.set(false);
throw new CommandTargetOverloadedException("Overloaded");
}
ReconstructECContainersCommand cmd = invocationOnMock.getArgument(1);
commandsSent.add(Pair.of(cmd.getTargetDatanodes().get(0), cmd));
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,18 @@ public void testAllSourcesOverloaded() throws IOException {
Collections.emptyList(), 0, 1, 1, 0));
}

@Test
public void testFirstSourcesOverloaded() {
setThrowThrottledException(true);
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
Pair.of(IN_SERVICE, 5));
assertThrows(CommandTargetOverloadedException.class,
() -> testMisReplication(availableReplicas, mockPlacementPolicy(),
Collections.emptyList(), 0, 2, 2, 1));
}

@Test
public void commandsForFewerThanRequiredNodes() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.singleton;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
Expand Down Expand Up @@ -94,6 +95,10 @@ public class TestECUnderReplicationHandler {
private PlacementPolicy ecPlacementPolicy;
private int remainingMaintenanceRedundancy = 1;
private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
private AtomicBoolean throwOverloadedExceptionOnReplication
= new AtomicBoolean(false);
private AtomicBoolean throwOverloadedExceptionOnReconstruction
= new AtomicBoolean(false);

@BeforeEach
public void setup() throws NodeNotFoundException,
Expand Down Expand Up @@ -122,9 +127,11 @@ public NodeStatus getNodeStatus(DatanodeDetails dd) {
ReplicationTestUtil.mockRMSendDatanodeCommand(
replicationManager, commandsSent);
ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
replicationManager, commandsSent);
replicationManager, commandsSent,
throwOverloadedExceptionOnReplication);
ReplicationTestUtil.mockSendThrottledReconstructionCommand(
replicationManager, commandsSent);
replicationManager, commandsSent,
throwOverloadedExceptionOnReconstruction);

conf = SCMTestUtils.getConf();
repConfig = new ECReplicationConfig(DATA, PARITY);
Expand Down Expand Up @@ -494,6 +501,30 @@ public void testPartialReconstructionIfNotEnoughNodes() {
Assertions.assertEquals(1, cmd.getTargetDatanodes().size());
}

@Test
public void testOverloadedReconstructionContinuesNextStages() {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 2), Pair.of(DECOMMISSIONING, 3));
ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
policy, conf, replicationManager);

ContainerHealthResult.UnderReplicatedHealthResult underRep =
new ContainerHealthResult.UnderReplicatedHealthResult(container,
0, false, false, false);

// Setup so reconstruction fails, but we should still get a replicate
// command for the decommissioning node and an exception thrown.
throwOverloadedExceptionOnReconstruction.set(true);
assertThrows(CommandTargetOverloadedException.class, () ->
ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(),
underRep, 1));
Assertions.assertEquals(1, commandsSent.size());
SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
Assertions.assertEquals(
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}

@Test
public void testPartialDecommissionIfNotEnoughNodes() {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
Expand All @@ -518,6 +549,29 @@ public void testPartialDecommissionIfNotEnoughNodes() {
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}

@Test
public void testPartialDecommissionOverloadedNodes() {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5));
ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
policy, conf, replicationManager);

ContainerHealthResult.UnderReplicatedHealthResult underRep =
new ContainerHealthResult.UnderReplicatedHealthResult(container,
0, true, false, false);

throwOverloadedExceptionOnReplication.set(true);
assertThrows(CommandTargetOverloadedException.class, () ->
ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(),
underRep, 1));
Assertions.assertEquals(1, commandsSent.size());
SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
Assertions.assertEquals(
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}

@Test
public void testPartialMaintenanceIfNotEnoughNodes() {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
Expand All @@ -543,6 +597,30 @@ public void testPartialMaintenanceIfNotEnoughNodes() {
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}

@Test
public void testPartialMaintenanceOverloadedNodes() {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
Pair.of(ENTERING_MAINTENANCE, 4),
Pair.of(ENTERING_MAINTENANCE, 5));
ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
policy, conf, replicationManager);

ContainerHealthResult.UnderReplicatedHealthResult underRep =
new ContainerHealthResult.UnderReplicatedHealthResult(container,
0, false, false, false);

throwOverloadedExceptionOnReplication.set(true);
assertThrows(CommandTargetOverloadedException.class, () ->
ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(),
underRep, 2));
Assertions.assertEquals(1, commandsSent.size());
SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
Assertions.assertEquals(
SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
}

@Test
public void testUnderRepWithDecommissionAndNotEnoughNodes()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -69,6 +70,7 @@ public abstract class TestMisReplicationHandler {
private OzoneConfiguration conf;
private ReplicationManager replicationManager;
private Set<Pair<DatanodeDetails, SCMCommand<?>>> commandsSent;
private AtomicBoolean throwThrottledException = new AtomicBoolean(false);

protected void setup(ReplicationConfig repConfig)
throws NodeNotFoundException, CommandTargetOverloadedException,
Expand All @@ -91,7 +93,7 @@ protected void setup(ReplicationConfig repConfig)
ReplicationTestUtil.mockRMSendDatanodeCommand(
replicationManager, commandsSent);
ReplicationTestUtil.mockRMSendThrottleReplicateCommand(
replicationManager, commandsSent);
replicationManager, commandsSent, throwThrottledException);

container = ReplicationTestUtil
.createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
Expand All @@ -104,6 +106,10 @@ protected ReplicationManager getReplicationManager() {
return replicationManager;
}

protected void setThrowThrottledException(boolean showThrow) {
throwThrottledException.set(showThrow);
}

static PlacementPolicy<?> mockPlacementPolicy() {
PlacementPolicy<?> placementPolicy = Mockito.mock(PlacementPolicy.class);
ContainerPlacementStatus mockedContainerPlacementStatus =
Expand Down
Loading