Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,15 @@ public int processAndSendCommands(
return 0;
}

// don't place reconstructed replicas on exclude nodes, since they already
// have replicas
List<DatanodeDetails> excludedNodes = replicas.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
// DNs that are already waiting to receive replicas cannot be targets
excludedNodes.addAll(
pendingOps.stream()
.filter(containerReplicaOp -> containerReplicaOp.getOpType() ==
ContainerReplicaOp.PendingOpType.ADD)
.map(ContainerReplicaOp::getTarget)
.collect(Collectors.toList()));
ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
ReplicationManagerUtil.getExcludedAndUsedNodes(
new ArrayList<>(replicas), Collections.emptySet(), pendingOps,
replicationManager);
List<DatanodeDetails> excludedNodes
= excludedAndUsedNodes.getExcludedNodes();
excludedNodes.addAll(replicationManager.getExcludedNodes());
List<DatanodeDetails> usedNodes
= excludedAndUsedNodes.getUsedNodes();

final ContainerID id = container.containerID();
int commandsSent = 0;
Expand All @@ -162,14 +158,14 @@ public int processAndSendCommands(
IOException firstException = null;
try {
commandsSent += processMissingIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes);
availableSourceNodes, excludedNodes, usedNodes);
} catch (InsufficientDatanodesException
| CommandTargetOverloadedException e) {
firstException = e;
}
try {
commandsSent += processDecommissioningIndexes(replicaCount, sources,
availableSourceNodes, excludedNodes);
availableSourceNodes, excludedNodes, usedNodes);
} catch (InsufficientDatanodesException
| CommandTargetOverloadedException e) {
if (firstException == null) {
Expand All @@ -178,7 +174,7 @@ public int processAndSendCommands(
}
try {
commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources,
excludedNodes);
excludedNodes, usedNodes);
} catch (InsufficientDatanodesException
| CommandTargetOverloadedException e) {
if (firstException == null) {
Expand Down Expand Up @@ -272,7 +268,8 @@ private int processMissingIndexes(
ECContainerReplicaCount replicaCount, Map<Integer,
Pair<ContainerReplica, NodeStatus>> sources,
List<DatanodeDetails> availableSourceNodes,
List<DatanodeDetails> excludedNodes) throws IOException {
List<DatanodeDetails> excludedNodes,
List<DatanodeDetails> usedNodes) throws IOException {
ContainerInfo container = replicaCount.getContainer();
ECReplicationConfig repConfig =
(ECReplicationConfig)container.getReplicationConfig();
Expand All @@ -286,7 +283,7 @@ private int processMissingIndexes(
int expectedTargets = missingIndexes.size();
final List<DatanodeDetails> selectedDatanodes =
ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
expectedTargets, null, excludedNodes, currentContainerSize,
expectedTargets, usedNodes, excludedNodes, currentContainerSize,
container);

// If we got less targets than missing indexes, we need to prune the
Expand All @@ -297,7 +294,7 @@ private int processMissingIndexes(
missingIndexes.size()).clear();
}
if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
excludedNodes.addAll(selectedDatanodes);
usedNodes.addAll(selectedDatanodes);
// TODO - what are we adding all the selected nodes to available
// sources?
availableSourceNodes.addAll(selectedDatanodes);
Expand Down Expand Up @@ -357,18 +354,19 @@ private int processDecommissioningIndexes(
ECContainerReplicaCount replicaCount,
Map<Integer, Pair<ContainerReplica, NodeStatus>> sources,
List<DatanodeDetails> availableSourceNodes,
List<DatanodeDetails> excludedNodes) throws IOException {
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> usedNodes)
throws IOException {
ContainerInfo container = replicaCount.getContainer();
Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
int commandsSent = 0;
if (decomIndexes.size() > 0) {
final List<DatanodeDetails> selectedDatanodes =
ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
decomIndexes.size(), null, excludedNodes, currentContainerSize,
container);
decomIndexes.size(), usedNodes, excludedNodes,
currentContainerSize, container);

if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
excludedNodes.addAll(selectedDatanodes);
usedNodes.addAll(selectedDatanodes);
Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
// In this case we need to do one to one copy.
CommandTargetOverloadedException overloadedException = null;
Expand All @@ -382,11 +380,11 @@ private int processDecommissioningIndexes(
ContainerReplica sourceReplica = source.getLeft();
if (!iterator.hasNext()) {
LOG.warn("Couldn't find enough targets. Available source"
+ " nodes: {}, the target nodes: {}, excluded nodes: {}"
+ " and the decommission indexes: {}",
+ " nodes: {}, the target nodes: {}, excluded nodes: {},"
+ " usedNodes: {}, and the decommission indexes: {}",
sources.values().stream()
.map(Pair::getLeft).collect(Collectors.toSet()),
selectedDatanodes, excludedNodes, decomIndexes);
selectedDatanodes, excludedNodes, usedNodes, decomIndexes);
break;
}
try {
Expand Down Expand Up @@ -430,7 +428,8 @@ private int processDecommissioningIndexes(
private int processMaintenanceOnlyIndexes(
ECContainerReplicaCount replicaCount,
Map<Integer, Pair<ContainerReplica, NodeStatus>> sources,
List<DatanodeDetails> excludedNodes) throws IOException {
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> usedNodes)
throws IOException {
Set<Integer> maintIndexes = replicaCount.maintenanceOnlyIndexes(true);
if (maintIndexes.isEmpty()) {
return 0;
Expand All @@ -444,9 +443,9 @@ private int processMaintenanceOnlyIndexes(
return 0;
}
List<DatanodeDetails> targets = ReplicationManagerUtil.getTargetDatanodes(
containerPlacement, maintIndexes.size(), null, excludedNodes,
containerPlacement, maintIndexes.size(), usedNodes, excludedNodes,
currentContainerSize, container);
excludedNodes.addAll(targets);
usedNodes.addAll(targets);

Iterator<DatanodeDetails> iterator = targets.iterator();
int commandsSent = 0;
Expand All @@ -466,11 +465,12 @@ private int processMaintenanceOnlyIndexes(
ContainerReplica sourceReplica = source.getLeft();
if (!iterator.hasNext()) {
LOG.warn("Couldn't find enough targets. Available source"
+ " nodes: {}, target nodes: {}, excluded nodes: {} and"
+ " nodes: {}, target nodes: {}, excluded nodes: {},"
+ " usedNodes: {} and"
+ " maintenance indexes: {}",
sources.values().stream()
.map(Pair::getLeft).collect(Collectors.toSet()),
targets, excludedNodes, maintIndexes);
targets, excludedNodes, usedNodes, maintIndexes);
break;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -137,22 +138,23 @@ public int processAndSendCommands(
container.getContainerID());
return 0;
}

Set<ContainerReplica> sources = filterSources(replicas);
Set<ContainerReplica> replicasToBeReplicated = containerPlacement
.replicasToCopyToFixMisreplication(replicas.stream()
.collect(Collectors.toMap(Function.identity(), sources::contains)));
usedDns = replicas.stream().filter(r -> !replicasToBeReplicated.contains(r))
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
List<DatanodeDetails> excludedDns = replicasToBeReplicated.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());

ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes
= ReplicationManagerUtil.getExcludedAndUsedNodes(
new ArrayList(replicas), replicasToBeReplicated,
Collections.emptyList(), replicationManager);

int requiredNodes = replicasToBeReplicated.size();

List<DatanodeDetails> targetDatanodes = ReplicationManagerUtil
.getTargetDatanodes(containerPlacement, requiredNodes, usedDns,
excludedDns, currentContainerSize, container);
.getTargetDatanodes(containerPlacement, requiredNodes,
excludedAndUsedNodes.getUsedNodes(),
excludedAndUsedNodes.getExcludedNodes(), currentContainerSize,
container);
List<DatanodeDetails> availableSources = sources.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalLong;
Expand Down Expand Up @@ -264,26 +265,20 @@ private List<DatanodeDetails> getTargets(
LOG.debug("Need {} target datanodes for container {}. Current " +
"replicas: {}.", replicaCount.additionalReplicaNeeded(),
replicaCount.getContainer().containerID(), replicaCount.getReplicas());
// DNs that already have replicas cannot be targets and should be excluded
final List<DatanodeDetails> excludeList =
replicaCount.getReplicas().stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());

// DNs that are already waiting to receive replicas cannot be targets
final List<DatanodeDetails> pendingReplication =
pendingOps.stream()
.filter(containerReplicaOp -> containerReplicaOp.getOpType() ==
ContainerReplicaOp.PendingOpType.ADD)
.map(ContainerReplicaOp::getTarget)
.collect(Collectors.toList());
LOG.debug("Excluding DNs. excludeList: {}, size: {}. pendingReplication: " +
"{}, size: {}.", excludeList, excludeList.size(),
pendingReplication, pendingReplication.size());
excludeList.addAll(pendingReplication);
ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
ReplicationManagerUtil.getExcludedAndUsedNodes(
replicaCount.getReplicas(), Collections.emptySet(), pendingOps,
replicationManager);

List<DatanodeDetails> excluded = excludedAndUsedNodes.getExcludedNodes();
List<DatanodeDetails> used = excludedAndUsedNodes.getUsedNodes();

LOG.debug("UsedList: {}, size {}. ExcludeList: {}, size: {}. ",
used, used.size(), excluded, excluded.size());

return ReplicationManagerUtil.getTargetDatanodes(placementPolicy,
replicaCount.additionalReplicaNeeded(), null, excludeList,
replicaCount.additionalReplicaNeeded(), used, excluded,
currentContainerSize, replicaCount.getContainer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
* Utility class for ReplicationManager.
Expand Down Expand Up @@ -90,4 +96,96 @@ public static List<DatanodeDetails> getTargetDatanodes(PlacementPolicy policy,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}

/**
* Given a list of replicas and a set of nodes to be removed, returns an
* object container two lists. One is a list of nodes that should be excluded
* from being selected as targets for new replicas. The other is a list of
* nodes that are currently used by the container and the placement policy
* can consider for rack placement
* @param replicas List of existing replicas
* @param toBeRemoved Set of nodes containing replicas that are to be removed
* @param pendingReplicaOps List of pending replica operations
* @param replicationManager ReplicationManager instance to get NodeStatus
* @return ExcludedAndUsedNodes object containing the excluded and used lists
*/
public static ExcludedAndUsedNodes getExcludedAndUsedNodes(
List<ContainerReplica> replicas,
Set<ContainerReplica> toBeRemoved,
List<ContainerReplicaOp> pendingReplicaOps,
ReplicationManager replicationManager) {
List<DatanodeDetails> excludedNodes = new ArrayList<>();
List<DatanodeDetails> usedNodes = new ArrayList<>();

for (ContainerReplica r : replicas) {
if (r.getState() == ContainerReplicaProto.State.UNHEALTHY) {
// Hosts with an Unhealthy replica cannot receive a new replica, but
// they are not considered used as they will be removed later.
excludedNodes.add(r.getDatanodeDetails());
continue;
}
if (toBeRemoved.contains(r)) {
// This node is currently present, but we plan to remove it so it is not
// considered used, but must be excluded
excludedNodes.add(r.getDatanodeDetails());
continue;
}
try {
NodeStatus nodeStatus =
replicationManager.getNodeStatus(r.getDatanodeDetails());
if (nodeStatus.isDecommission()) {
// Decommission nodes are going to go away and their replicas need to
// be replaced. Therefore we mark them excluded.
// Maintenance nodes should return to the cluster, so they would still
// be considered used (handled in the catch all at the end of the loop
// ).
excludedNodes.add(r.getDatanodeDetails());
continue;
}
} catch (NodeNotFoundException e) {
LOG.warn("Node {} not found in node manager.", r.getDatanodeDetails());
// This should not happen, but if it does, just add the node to the
// exclude list
excludedNodes.add(r.getDatanodeDetails());
continue;
}
// If we get here, this is a used node
usedNodes.add(r.getDatanodeDetails());
}
for (ContainerReplicaOp pending : pendingReplicaOps) {
if (pending.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
// If we are adding a replicas, then its scheduled to become a used node
usedNodes.add(pending.getTarget());
}
if (pending.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
// If there are any ops pending delete, we cannot use the node, but they
// are not considered used as they will be removed later.
excludedNodes.add(pending.getTarget());
}
}
return new ExcludedAndUsedNodes(excludedNodes, usedNodes);
}


/**
* Simple class to hold the excluded and used nodes lists.
*/
public static class ExcludedAndUsedNodes {
private List<DatanodeDetails> excludedNodes;
private List<DatanodeDetails> usedNodes;

public ExcludedAndUsedNodes(List<DatanodeDetails> excludedNodes,
List<DatanodeDetails> usedNodes) {
this.excludedNodes = excludedNodes;
this.usedNodes = usedNodes;
}

public List<DatanodeDetails> getExcludedNodes() {
return excludedNodes;
}

public List<DatanodeDetails> getUsedNodes() {
return usedNodes;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ protected List<DatanodeDetails> chooseDatanodesInternal(
if (nodesRequiredToChoose > 1) {
throw new IllegalArgumentException("Only one node is allowed");
}
if (excludedNodes.contains(nodeToReturn)) {
if (excludedNodes.contains(nodeToReturn)
|| usedNodes.contains(nodeToReturn)) {
throw new SCMException("Insufficient Nodes available to choose",
SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
}
Expand Down
Loading