Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
/**
* Exception class used to indicate that all sources are overloaded.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: stale javadoc comment

*/
public class AllSourcesOverloadedException extends IOException {
public class CommandTargetOverloadedException extends IOException {

public AllSourcesOverloadedException(String message) {
public CommandTargetOverloadedException(String message) {
super(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ECOverReplicationHandler(PlacementPolicy placementPolicy,
public int processAndSendCommands(
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult result, int remainingMaintenanceRedundancy)
throws NotLeaderException {
throws NotLeaderException, CommandTargetOverloadedException {
ContainerInfo container = result.getContainerInfo();

// We are going to check for over replication, so we should filter out any
Expand Down Expand Up @@ -154,6 +154,7 @@ public int processAndSendCommands(
replicaIndexCounts.put(r.getReplicaIndex(),
replicaIndexCounts.getOrDefault(r.getReplicaIndex(), 0) + 1);
}
CommandTargetOverloadedException firstException = null;
for (ContainerReplica r : replicasToRemove) {
int currentCount = replicaIndexCounts.getOrDefault(
r.getReplicaIndex(), 0);
Expand All @@ -162,16 +163,32 @@ public int processAndSendCommands(
"for that index to zero. Candidate Replicas: {}", r, candidates);
continue;
}
replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
replicationManager.sendDeleteCommand(container, r.getReplicaIndex(),
r.getDatanodeDetails(), true);
commandsSent++;
try {
replicationManager.sendThrottledDeleteCommand(container,
r.getReplicaIndex(), r.getDatanodeDetails(), true);
replicaIndexCounts.put(r.getReplicaIndex(), currentCount - 1);
commandsSent++;
} catch (CommandTargetOverloadedException e) {
LOG.debug("Unable to send delete command for container {} replica " +
"index {} to {}",
container.getContainerID(), r.getReplicaIndex(),
r.getDatanodeDetails());
if (firstException == null) {
firstException = e;
}
}
}

if (commandsSent == 0) {
LOG.warn("With the current state of available replicas {}, no" +
" commands were created to remove excess replicas.", replicas);
}
// If any of the "to remove" replicas were not able to be removed due to
// load on the datanodes, then throw the first exception we encountered.
// This will allow the container to be re-queued and tried again later.
if (firstException != null) {
throw firstException;
}
return commandsSent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private int processMaintenanceOnlyIndexes(
private void createReplicateCommand(
ContainerInfo container, Iterator<DatanodeDetails> iterator,
ContainerReplica replica, ECContainerReplicaCount replicaCount)
throws AllSourcesOverloadedException, NotLeaderException {
throws CommandTargetOverloadedException, NotLeaderException {
final boolean push = replicationManager.getConfig().isPush();
DatanodeDetails source = replica.getDatanodeDetails();
DatanodeDetails target = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private int sendReplicateCommands(
ContainerInfo containerInfo,
Set<ContainerReplica> replicasToBeReplicated,
List<DatanodeDetails> targetDns)
throws AllSourcesOverloadedException, NotLeaderException {
throws CommandTargetOverloadedException, NotLeaderException {
int commandsSent = 0;
int datanodeIdx = 0;
for (ContainerReplica replica : replicasToBeReplicated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,24 +248,39 @@ private List<ContainerReplica> sortReplicas(

private int createCommands(
ContainerInfo containerInfo, List<ContainerReplica> replicas,
int excess) throws NotLeaderException {
int excess) throws NotLeaderException, CommandTargetOverloadedException {

/*
Being in the over replication queue means we have enough replicas that
match the container's state, so unhealthy or mismatched replicas can be
deleted. This might make the container violate placement policy.
*/
int commandsSent = 0;
int initialExcess = excess;
CommandTargetOverloadedException firstOverloadedException = null;
List<ContainerReplica> replicasRemoved = new ArrayList<>();
for (ContainerReplica replica : replicas) {
if (excess == 0) {
return commandsSent;
break;
}
if (!ReplicationManager.compareState(
containerInfo.getState(), replica.getState())) {
replicationManager.sendDeleteCommand(containerInfo,
replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
commandsSent++;
// Delete commands are throttled, so they may fail to send. However, the
// replicas here are not in the same state as the container, so they
// must be deleted in preference to "healthy" replicas later. Therefore,
// if they fail to delete, we continue to mark them as deleted by
// reducing the excess so healthy container are not removed later in
// this method.
try {
replicationManager.sendThrottledDeleteCommand(containerInfo,
replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
commandsSent++;
} catch (CommandTargetOverloadedException e) {
LOG.debug("Unable to send delete command for a mis-matched state " +
"container {} to {} as it has too many pending delete commands",
containerInfo.containerID(), replica.getDatanodeDetails());
firstOverloadedException = e;
}
replicasRemoved.add(replica);
excess--;
}
Expand All @@ -281,17 +296,34 @@ private int createCommands(
// iterate through replicas in deterministic order
for (ContainerReplica replica : replicas) {
if (excess == 0) {
return commandsSent;
break;
}

if (super.isPlacementStatusActuallyEqualAfterRemove(replicaSet, replica,
containerInfo.getReplicationFactor().getNumber())) {
replicationManager.sendDeleteCommand(containerInfo,
replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
commandsSent++;
excess--;
try {
replicationManager.sendThrottledDeleteCommand(containerInfo,
replica.getReplicaIndex(), replica.getDatanodeDetails(), true);
commandsSent++;
excess--;
} catch (CommandTargetOverloadedException e) {
LOG.debug("Unable to send delete command for container {} to {} as " +
"it has too many pending delete commands",
containerInfo.containerID(), replica.getDatanodeDetails());
if (firstOverloadedException == null) {
firstOverloadedException = e;
}
}
}
}
// If we encountered an overloaded exception, and then did not send as many
// delete commands as the original excess number, then it means there must
// be some replicas we did not delete when we should have. In this case,
// throw the exception so that container is requeued and processed again
// later.
if (firstOverloadedException != null && commandsSent != initialExcess) {
throw firstOverloadedException;
}
return commandsSent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private List<DatanodeDetails> getTargets(

private int sendReplicationCommands(
ContainerInfo containerInfo, List<DatanodeDetails> sources,
List<DatanodeDetails> targets) throws AllSourcesOverloadedException,
List<DatanodeDetails> targets) throws CommandTargetOverloadedException,
NotLeaderException {
final boolean push = replicationManager.getConfig().isPush();
int commandsSent = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.conf.PostConstruct;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
Expand Down Expand Up @@ -74,6 +75,7 @@
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -177,6 +179,7 @@ public class ReplicationManager implements SCMService {
private final OverReplicatedProcessor overReplicatedProcessor;
private final HealthCheck containerCheckChain;
private final int datanodeReplicationLimit;
private final int datanodeDeleteLimit;

/**
* Constructs ReplicationManager instance with the given configuration.
Expand Down Expand Up @@ -228,6 +231,7 @@ public ReplicationManager(final ConfigurationSource conf,
this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
this.ratisMaintenanceMinReplicas = rmConf.getMaintenanceReplicaMinimum();
this.datanodeReplicationLimit = rmConf.getDatanodeReplicationLimit();
this.datanodeDeleteLimit = rmConf.getDatanodeDeleteLimit();

ecUnderReplicationHandler = new ECUnderReplicationHandler(
ecContainerPlacement, conf, this);
Expand Down Expand Up @@ -468,42 +472,58 @@ public void sendDeleteCommand(final ContainerInfo container,
scmDeadlineEpochMs, datanodeDeadlineEpochMs);
}

/**
* Sends delete container command for the given container to the given
* datanode, provided that the datanode is not overloaded with delete
* container commands. If the datanode is overloaded, an exception will be
* thrown.
* @param container Container to be deleted
* @param replicaIndex Index of the container replica to be deleted
* @param datanode The datanode on which the replica should be deleted
* @param force true to force delete a container that is open or not empty
* @throws NotLeaderException when this SCM is not the leader
* @throws CommandTargetOverloadedException If the target datanode is has too
* many pending commands.
*/
public void sendThrottledDeleteCommand(final ContainerInfo container,
int replicaIndex, final DatanodeDetails datanode, boolean force)
throws NotLeaderException, CommandTargetOverloadedException {
List<Pair<Integer, DatanodeDetails>> datanodeWithCommandCount =
getAvailableDatanodes(Collections.singletonList(datanode),
Type.deleteContainerCommand, datanodeDeleteLimit);
if (datanodeWithCommandCount.isEmpty()) {
throw new CommandTargetOverloadedException("Cannot schedule a delete " +
"container command for container " + container.containerID() +
" on datanode " + datanode + " as it has too many pending delete " +
"commands");
}
sendDeleteCommand(container, replicaIndex, datanodeWithCommandCount.get(0)
.getRight(), force);
}

/**
* Create a ReplicateContainerCommand for the given container and to push the
* container to the target datanode. The list of sources are checked to ensure
* the datanode has sufficient capacity to accept the container command, and
* then the command is sent to the datanode with the fewest pending commands.
* If all sources are overloaded, an AllSourcesOverloadedException is thrown.
* @param containerID The containerID to be replicated
* If all sources are overloaded, a CommandTargetOverloadedException is
* thrown.
* @param containerInfo The container to be replicated
* @param sources The list of datanodes that can be used as sources
* @param target The target datanode where the container should be replicated
* @param replicaIndex The index of the container replica to be replicated
* @return A pair containing the datanode that the command was sent to, and
* the command created.
* @throws AllSourcesOverloadedException
* @throws CommandTargetOverloadedException
*/
public Pair<DatanodeDetails, SCMCommand<?>>
createThrottledReplicationCommand(long containerID,
public void sendThrottledReplicationCommand(ContainerInfo containerInfo,
List<DatanodeDetails> sources, DatanodeDetails target, int replicaIndex)
throws AllSourcesOverloadedException {
List<Pair<Integer, DatanodeDetails>> sourceWithCmds = new ArrayList<>();
for (DatanodeDetails source : sources) {
try {
int commandCount = nodeManager.getTotalDatanodeCommandCount(source,
Type.replicateContainerCommand);
if (commandCount >= datanodeReplicationLimit) {
LOG.debug("Source {} has reached the maximum number of queued " +
"replication commands ({})", source, datanodeReplicationLimit);
continue;
}
sourceWithCmds.add(Pair.of(commandCount, source));
} catch (NodeNotFoundException e) {
LOG.error("Node {} not found in NodeManager. Should not happen",
source, e);
}
}
throws CommandTargetOverloadedException, NotLeaderException {
long containerID = containerInfo.getContainerID();
List<Pair<Integer, DatanodeDetails>> sourceWithCmds = getAvailableDatanodes(
sources, Type.replicateContainerCommand, datanodeReplicationLimit);
if (sourceWithCmds.isEmpty()) {
throw new AllSourcesOverloadedException("No sources with capacity " +
throw new CommandTargetOverloadedException("No sources with capacity " +
"available for replication of container " + containerID + " to " +
target);
}
Expand All @@ -513,16 +533,42 @@ public void sendDeleteCommand(final ContainerInfo container,
ReplicateContainerCommand cmd =
ReplicateContainerCommand.toTarget(containerID, target);
cmd.setReplicaIndex(replicaIndex);
return Pair.of(sourceWithCmds.get(0).getRight(), cmd);
sendDatanodeCommand(cmd, containerInfo, sourceWithCmds.get(0).getRight());
}

public void sendThrottledReplicationCommand(ContainerInfo containerInfo,
List<DatanodeDetails> sources, DatanodeDetails target, int replicaIndex)
throws AllSourcesOverloadedException, NotLeaderException {
Pair<DatanodeDetails, SCMCommand<?>> cmdPair =
createThrottledReplicationCommand(containerInfo.getContainerID(),
sources, target, replicaIndex);
sendDatanodeCommand(cmdPair.getRight(), containerInfo, cmdPair.getLeft());
/**
* For the given datanodes and command type, lookup the current queue command
* count and return a list of datanodes with the current command count. If
* any datanode is at or beyond the limit, then it will not be included in the
* returned list.
* @param datanodes List of datanodes to check for available capacity
* @param commandType The Type of datanode command to check the capacity for.
* @param limit The limit of commands of that type.
* @return List of datanodes with the current command count that are not over
* the limit.
*/
private List<Pair<Integer, DatanodeDetails>> getAvailableDatanodes(
List<DatanodeDetails> datanodes,
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type commandType,
int limit) {
List<Pair<Integer, DatanodeDetails>> datanodeWithCommandCount
= new ArrayList<>();
for (DatanodeDetails dn : datanodes) {
try {
int commandCount = nodeManager.getTotalDatanodeCommandCount(dn,
commandType);
if (commandCount >= limit) {
LOG.debug("Datanode {} has reached the maximum number of queued " +
"{} commands ({})", dn, commandType, limit);
continue;
}
datanodeWithCommandCount.add(Pair.of(commandCount, dn));
} catch (NodeNotFoundException e) {
LOG.error("Node {} not found in NodeManager. Should not happen",
dn, e);
}
}
return datanodeWithCommandCount;
}

/**
Expand Down Expand Up @@ -1147,6 +1193,21 @@ public int getDatanodeReplicationLimit() {
return datanodeReplicationLimit;
}

@Config(key = "datanode.delete.container.limit",
type = ConfigType.INT,
defaultValue = "40",
tags = { SCM, DATANODE },
description = "A limit to restrict the total number of delete " +
"container commands queued on a datanode. Note this is intended " +
"to be a temporary config until we have a more dynamic way of " +
"limiting load"
)
private int datanodeDeleteLimit = 40;

public int getDatanodeDeleteLimit() {
return datanodeDeleteLimit;
}

public void setDatanodeReplicationLimit(int limit) {
this.datanodeReplicationLimit = limit;
}
Expand Down
Loading