Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.LinkedList;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.stream.Collectors;

/**
* This class implements the logic to track decommissioning and entering
Expand Down Expand Up @@ -149,7 +150,7 @@ protected void processConf() {
*/
@Override
public void stopTrackingNode(DatanodeDescriptor dn) {
pendingNodes.remove(dn);
getPendingNodes().remove(dn);
cancelledNodes.add(dn);
}

Expand Down Expand Up @@ -189,6 +190,29 @@ public void run() {
* node will be removed from tracking by the pending cancel.
*/
processCancelledNodes();

// Having more nodes decommissioning than can be tracked will impact decommissioning
// performance due to queueing delay
int numTrackedNodes = outOfServiceNodeBlocks.size();
int numQueuedNodes = getPendingNodes().size();
int numDecommissioningNodes = numTrackedNodes + numQueuedNodes;
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
LOG.warn(
"{} nodes are decommissioning but only {} nodes will be tracked at a time. "
+ "{} nodes are currently queued waiting to be decommissioned.",
numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes);

// Re-queue unhealthy nodes to make space for decommissioning healthy nodes
final List<DatanodeDescriptor> unhealthyDns = outOfServiceNodeBlocks.keySet().stream()
.filter(dn -> !blockManager.isNodeHealthyForDecommissionOrMaintenance(dn))
.collect(Collectors.toList());
getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
getPendingNodes().add(dn);
outOfServiceNodeBlocks.remove(dn);
pendingRep.remove(dn);
});
}

processPendingNodes();
} finally {
namesystem.writeUnlock();
Expand All @@ -207,7 +231,7 @@ public void run() {
LOG.info("Checked {} blocks this tick. {} nodes are now " +
"in maintenance or transitioning state. {} nodes pending. {} " +
"nodes waiting to be cancelled.",
numBlocksChecked, outOfServiceNodeBlocks.size(), pendingNodes.size(),
numBlocksChecked, outOfServiceNodeBlocks.size(), getPendingNodes().size(),
cancelledNodes.size());
}
}
Expand All @@ -220,10 +244,10 @@ public void run() {
* the pendingNodes list from being modified externally.
*/
private void processPendingNodes() {
while (!pendingNodes.isEmpty() &&
while (!getPendingNodes().isEmpty() &&
(maxConcurrentTrackedNodes == 0 ||
outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
outOfServiceNodeBlocks.put(getPendingNodes().poll(), null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private boolean exceededNumBlocksPerCheck() {

@Override
public void stopTrackingNode(DatanodeDescriptor dn) {
pendingNodes.remove(dn);
getPendingNodes().remove(dn);
outOfServiceNodeBlocks.remove(dn);
}

Expand Down Expand Up @@ -164,19 +164,19 @@ public void run() {
LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " +
"in maintenance or transitioning state. {} nodes pending.",
numBlocksChecked, numNodesChecked, outOfServiceNodeBlocks.size(),
pendingNodes.size());
getPendingNodes().size());
}
}

/**
* Pop datanodes off the pending list and into decomNodeBlocks,
* Pop datanodes off the pending priority queue and into decomNodeBlocks,
* subject to the maxConcurrentTrackedNodes limit.
*/
private void processPendingNodes() {
while (!pendingNodes.isEmpty() &&
while (!getPendingNodes().isEmpty() &&
(maxConcurrentTrackedNodes == 0 ||
outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
outOfServiceNodeBlocks.put(getPendingNodes().poll(), null);
}
}

Expand All @@ -185,6 +185,7 @@ private void check() {
it = new CyclicIteration<>(outOfServiceNodeBlocks,
iterkey).iterator();
final List<DatanodeDescriptor> toRemove = new ArrayList<>();
final List<DatanodeDescriptor> unhealthyDns = new ArrayList<>();

while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
.isRunning()) {
Expand Down Expand Up @@ -221,6 +222,10 @@ private void check() {
LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
pruneReliableBlocks(dn, blocks);
}
final boolean isHealthy = blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (!isHealthy) {
unhealthyDns.add(dn);
}
if (blocks.size() == 0) {
if (!fullScan) {
// If we didn't just do a full scan, need to re-check with the
Expand All @@ -236,8 +241,6 @@ private void check() {
}
// If the full scan is clean AND the node liveness is okay,
// we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
final boolean isHealthy =
blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (blocks.size() == 0 && isHealthy) {
if (dn.isDecommissionInProgress()) {
dnAdmin.setDecommissioned(dn);
Expand Down Expand Up @@ -270,12 +273,31 @@ private void check() {
// an invalid state.
LOG.warn("DatanodeAdminMonitor caught exception when processing node "
+ "{}.", dn, e);
pendingNodes.add(dn);
getPendingNodes().add(dn);
toRemove.add(dn);
} finally {
iterkey = dn;
}
}

// Having more nodes decommissioning than can be tracked will impact decommissioning
// performance due to queueing delay
int numTrackedNodes = outOfServiceNodeBlocks.size() - toRemove.size();
int numQueuedNodes = getPendingNodes().size();
int numDecommissioningNodes = numTrackedNodes + numQueuedNodes;
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
LOG.warn(
"{} nodes are decommissioning but only {} nodes will be tracked at a time. "
+ "{} nodes are currently queued waiting to be decommissioned.",
numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes);

// Re-queue unhealthy nodes to make space for decommissioning healthy nodes
getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
getPendingNodes().add(dn);
outOfServiceNodeBlocks.remove(dn);
});
}

// Remove the datanodes that are DECOMMISSIONED or in service after
// maintenance expiration.
for (DatanodeDescriptor dn : toRemove) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.stream.Stream;

/**
* This abstract class provides some base methods which are inherited by
Expand All @@ -35,12 +38,20 @@
public abstract class DatanodeAdminMonitorBase
implements DatanodeAdminMonitorInterface, Configurable {

/**
* Sort by lastUpdate time descending order, such that unhealthy
* nodes are de-prioritized given they cannot be decommissioned.
*/
static final Comparator<DatanodeDescriptor> PENDING_NODES_QUEUE_COMPARATOR =
(dn1, dn2) -> Long.compare(dn2.getLastUpdate(), dn1.getLastUpdate());

protected BlockManager blockManager;
protected Namesystem namesystem;
protected DatanodeAdminManager dnAdmin;
protected Configuration conf;

protected final Queue<DatanodeDescriptor> pendingNodes = new ArrayDeque<>();
private final PriorityQueue<DatanodeDescriptor> pendingNodes = new PriorityQueue<>(
PENDING_NODES_QUEUE_COMPARATOR);

/**
* The maximum number of nodes to track in outOfServiceNodeBlocks.
Expand Down Expand Up @@ -151,4 +162,34 @@ public int getPendingNodeCount() {
public Queue<DatanodeDescriptor> getPendingNodes() {
return pendingNodes;
}

/**
* If node "is dead while in Decommission In Progress", it cannot be decommissioned
* until it becomes healthy again. If there are more pendingNodes than can be tracked
* & some unhealthy tracked nodes, then re-queue the unhealthy tracked nodes
* to avoid blocking decommissioning of healthy nodes.
*
* @param unhealthyDns The unhealthy datanodes which may be re-queued
* @param numDecommissioningNodes The total number of nodes being decommissioned
* @return Stream of unhealthy nodes to be re-queued
*/
Stream<DatanodeDescriptor> getUnhealthyNodesToRequeue(
final List<DatanodeDescriptor> unhealthyDns, int numDecommissioningNodes) {
if (!unhealthyDns.isEmpty()) {
// Compute the number of unhealthy nodes to re-queue
final int numUnhealthyNodesToRequeue =
Math.min(numDecommissioningNodes - maxConcurrentTrackedNodes, unhealthyDns.size());

LOG.warn("{} limit has been reached, re-queueing {} "
+ "nodes which are dead while in Decommission In Progress.",
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
numUnhealthyNodesToRequeue);

// Order unhealthy nodes by lastUpdate descending such that nodes
// which have been unhealthy the longest are preferred to be re-queued
return unhealthyDns.stream().sorted(PENDING_NODES_QUEUE_COMPARATOR.reversed())
.limit(numUnhealthyNodesToRequeue);
}
return Stream.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ protected void refreshNodes(final int nnIndex) throws IOException {
refreshNodes(conf);
}

static private DatanodeDescriptor getDatanodeDesriptor(
static DatanodeDescriptor getDatanodeDesriptor(
final FSNamesystem ns, final String datanodeUuid) {
return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
}
Expand Down
Loading