Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
Expand Down Expand Up @@ -62,12 +63,6 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
private final long currentContainerSize;
private final ReplicationManager replicationManager;

private static class CannotFindTargetsException extends IOException {
CannotFindTargetsException(Throwable cause) {
super(cause);
}
}

public ECUnderReplicationHandler(final PlacementPolicy containerPlacement,
final ConfigurationSource conf, ReplicationManager replicationManager) {
this.containerPlacement = containerPlacement;
Expand Down Expand Up @@ -164,49 +159,60 @@ public int processAndSendCommands(
.collect(Collectors.toList());

try {
commandsSent += processMissingIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes);
commandsSent += processDecommissioningIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes);
commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources,
excludedNodes);
// TODO - we should be able to catch SCMException here and check the
// result code but the RackAware topology never sets the code.
} catch (CannotFindTargetsException e) {
// If we get here, we tried to find nodes to fix the under replication
// issues, but were not able to find any at some stage, and the
// placement policy threw an exception.
// At this stage. If the cluster is small and there are some
// over replicated indexes, it could stop us finding a new node as there
// are no more nodes left to try.
// If the container is also over replicated, then hand it off to the
// over-rep handler, and after those over-rep indexes are cleared the
// under replication can be re-tried in the next iteration of RM.
// However, we should only hand off to the over rep handler if there are
// no commands already created. If we have some commands, they may
// attempt to use sources the over-rep handler would remove. So we
// should let the commands we have created be processed, and then the
// container will be re-processed in a further RM pass.
LOG.debug("Unable to located new target nodes for container {}",
container, e);
if (commandsSent > 0) {
LOG.debug("Some commands have already been created, so returning " +
"with them only");
return commandsSent;
InsufficientDatanodesException firstException = null;
try {
commandsSent += processMissingIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes);
} catch (InsufficientDatanodesException e) {
firstException = e;
}
try {
commandsSent += processDecommissioningIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes);
} catch (InsufficientDatanodesException e) {
if (firstException == null) {
firstException = e;
}
}
try {
commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources,
excludedNodes);
} catch (InsufficientDatanodesException e) {
if (firstException == null) {
firstException = e;
}
}
if (firstException != null) {
// We had partial success through some of the steps, so just throw the
// first exception we got. This will cause the container to be
// re-queued and try again later.
throw firstException;
}
} catch (SCMException e) {
SCMException.ResultCodes code = e.getResult();
if (code != SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE) {
throw e;
}
// If we get here, we got an exception indicating the placement policy
// was not able to find ANY nodes to satisfy the replication at one of
// the processing stages (missing index, decom or maint). It is
// possible that some commands were sent to partially fix the
// replication, but a further run will be needed to fix the rest.
// On a small cluster, it is possible that over replication could stop
// nodes getting selected, so to check if that is the case, we run
// the over rep handler, which may free some nodes for the next run.
if (replicaCount.isOverReplicated()) {
LOG.debug("Container {} is both under and over replicated. Cannot " +
"find enough target nodes, so handing off to the " +
"OverReplication handler", container);
return replicationManager.processOverReplicatedContainer(result);
} else {
throw (SCMException)e.getCause();
replicationManager.processOverReplicatedContainer(result);
}
// As we want to re-queue and try again later, we just re-throw
throw e;
}
} catch (IOException | IllegalStateException ex) {
LOG.warn("Exception while processing for creating the EC reconstruction" +
" container commands for container {}.",
id, ex);
LOG.warn("Exception while creating the replication or" +
" reconstruction commands for container {}.", id, ex);
throw ex;
}
if (commandsSent == 0) {
Expand Down Expand Up @@ -253,25 +259,6 @@ private Map<Integer, Pair<ContainerReplica, NodeStatus>> filterSources(
}));
}

private List<DatanodeDetails> getTargetDatanodes(
List<DatanodeDetails> excludedNodes, ContainerInfo container,
int requiredNodes) throws IOException {
// We should ensure that the target datanode has enough space
// for a complete container to be created, but since the container
// size may be changed smaller than origin, we should be defensive.
final long dataSizeRequired =
Math.max(container.getUsedBytes(), currentContainerSize);
try {
return containerPlacement
.chooseDatanodes(excludedNodes, null, requiredNodes, 0,
dataSizeRequired);
} catch (SCMException e) {
// SCMException can come from many places in SCM, so catch it here and
// throw a more specific exception instead.
throw new CannotFindTargetsException(e);
}
}

/**
* Processes replicas that are in maintenance nodes and should need
* additional copies.
Expand All @@ -293,9 +280,19 @@ private int processMissingIndexes(

int commandsSent = 0;
if (sources.size() >= repConfig.getData()) {
final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
excludedNodes, container, missingIndexes.size());
int expectedTargets = missingIndexes.size();
final List<DatanodeDetails> selectedDatanodes =
ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
expectedTargets, null, excludedNodes, currentContainerSize,
container);

// If we got less targets than missing indexes, we need to prune the
// missing index list so it only tries to recover the nummber of indexes
// we have targets for.
if (selectedDatanodes.size() < expectedTargets) {
missingIndexes.subList(selectedDatanodes.size(),
missingIndexes.size()).clear();
}
if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
excludedNodes.addAll(selectedDatanodes);
// TODO - what are we adding all the selected nodes to available
Expand Down Expand Up @@ -324,6 +321,14 @@ private int processMissingIndexes(
}
commandsSent++;
}
if (selectedDatanodes.size() != expectedTargets) {
LOG.debug("Insufficient nodes were returned from the placement policy" +
" to fully reconstruct container {}. Requested {} received {}",
container.getContainerID(), expectedTargets,
selectedDatanodes.size());
throw new InsufficientDatanodesException(missingIndexes.size(),
selectedDatanodes.size());
}
} else {
LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
+ " to insufficient source replicas found. Number of source "
Expand All @@ -350,7 +355,10 @@ private int processDecommissioningIndexes(
int commandsSent = 0;
if (decomIndexes.size() > 0) {
final List<DatanodeDetails> selectedDatanodes =
getTargetDatanodes(excludedNodes, container, decomIndexes.size());
ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
decomIndexes.size(), null, excludedNodes, currentContainerSize,
container);

if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
excludedNodes.addAll(selectedDatanodes);
Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
Expand All @@ -377,6 +385,14 @@ private int processDecommissioningIndexes(
commandsSent++;
}
}
if (selectedDatanodes.size() != decomIndexes.size()) {
LOG.debug("Insufficient nodes were returned from the placement policy" +
" to fully replicate the decommission indexes for container {}." +
" Requested {} received {}", container.getContainerID(),
decomIndexes.size(), selectedDatanodes.size());
throw new InsufficientDatanodesException(decomIndexes.size(),
selectedDatanodes.size());
}
}
return commandsSent;
}
Expand Down Expand Up @@ -407,8 +423,9 @@ private int processMaintenanceOnlyIndexes(
if (additionalMaintenanceCopiesNeeded == 0) {
return 0;
}
List<DatanodeDetails> targets = getTargetDatanodes(excludedNodes, container,
additionalMaintenanceCopiesNeeded);
List<DatanodeDetails> targets = ReplicationManagerUtil.getTargetDatanodes(
containerPlacement, maintIndexes.size(), null, excludedNodes,
currentContainerSize, container);
excludedNodes.addAll(targets);

Iterator<DatanodeDetails> iterator = targets.iterator();
Expand Down Expand Up @@ -439,6 +456,14 @@ private int processMaintenanceOnlyIndexes(
commandsSent++;
additionalMaintenanceCopiesNeeded -= 1;
}
if (targets.size() != maintIndexes.size()) {
LOG.debug("Insufficient nodes were returned from the placement policy" +
" to fully replicate the maintenance indexes for container {}." +
" Requested {} received {}", container.getContainerID(),
maintIndexes.size(), targets.size());
throw new InsufficientDatanodesException(maintIndexes.size(),
targets.size());
}
return commandsSent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
Expand Down Expand Up @@ -76,25 +75,6 @@ protected abstract ContainerReplicaCount getContainerReplicaCount(
List<ContainerReplicaOp> pendingOps, int remainingMaintenanceRedundancy)
throws IOException;

private List<DatanodeDetails> getTargetDatanodes(
List<DatanodeDetails> usedNodes, List<DatanodeDetails> excludedNodes,
ContainerInfo container, int requiredNodes) throws IOException {
final long dataSizeRequired =
Math.max(container.getUsedBytes(), currentContainerSize);
while (requiredNodes > 0) {
try {
return containerPlacement.chooseDatanodes(usedNodes, excludedNodes,
null, requiredNodes, 0, dataSizeRequired);
} catch (IOException e) {
requiredNodes -= 1;
}
}
throw new SCMException(String.format("Placement Policy: %s did not return"
+ " any nodes. Number of required Nodes %d, Datasize Required: %d",
containerPlacement.getClass(), requiredNodes, dataSizeRequired),
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}

private Set<ContainerReplica> filterSources(Set<ContainerReplica> replicas) {
return replicas.stream()
.filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos
Expand Down Expand Up @@ -196,8 +176,10 @@ public int processAndSendCommands(
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
int requiredNodes = replicasToBeReplicated.size();
List<DatanodeDetails> targetDatanodes = getTargetDatanodes(usedDns,
excludedDns, container, requiredNodes);

List<DatanodeDetails> targetDatanodes = ReplicationManagerUtil
.getTargetDatanodes(containerPlacement, requiredNodes, usedDns,
excludedDns, currentContainerSize, container);

int count = sendReplicateCommands(container, replicasToBeReplicated,
targetDatanodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
Expand Down Expand Up @@ -241,28 +240,9 @@ private List<DatanodeDetails> getTargets(
.collect(Collectors.toList());
excludeList.addAll(pendingReplication);

/*
Ensure that target datanodes have enough space to hold a complete
container.
*/
final long dataSizeRequired =
Math.max(replicaCount.getContainer().getUsedBytes(),
currentContainerSize);
int requiredNodes = replicaCount.additionalReplicaNeeded();
while (requiredNodes > 0) {
try {
return placementPolicy.chooseDatanodes(excludeList, null,
requiredNodes, 0, dataSizeRequired);
} catch (IOException e) {
LOG.debug("Placement policy was not able to return {} nodes. ",
requiredNodes, e);
requiredNodes--;
}
}
throw new SCMException(String.format("Placement Policy: %s did not return"
+ " any nodes. Number of required Nodes %d, Datasize Required: %d",
placementPolicy.getClass(), requiredNodes, dataSizeRequired),
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
return ReplicationManagerUtil.getTargetDatanodes(placementPolicy,
replicaCount.additionalReplicaNeeded(), null, excludeList,
currentContainerSize, replicaCount.getContainer());
}

private int sendReplicationCommands(
Expand Down
Loading