Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4527,33 +4527,10 @@ void processExtraRedundancyBlocksOnInService(
* based on its liveness. Dead nodes cannot always be safely decommissioned
* or in maintenance.
*/
boolean isNodeHealthyForDecommissionOrMaintenance(DatanodeDescriptor node) {
if (!node.checkBlockReportReceived()) {
LOG.info("Node {} hasn't sent its first block report.", node);
return false;
}

if (node.isAlive()) {
return true;
}

boolean anyLowRedundancyOrPendingReplicationBlocks() {
updateState();
if (pendingReconstructionBlocksCount == 0 &&
lowRedundancyBlocksCount == 0) {
LOG.info("Node {} is dead and there are no low redundancy" +
" blocks or blocks pending reconstruction. Safe to decommission or" +
" put in maintenance.", node);
return true;
}

LOG.warn("Node {} is dead " +
"while in {}. Cannot be safely " +
"decommissioned or be in maintenance since there is risk of reduced " +
"data durability or data loss. Either restart the failed node or " +
"force decommissioning or maintenance by removing, calling " +
"refreshNodes, then re-adding to the excludes or host config files.",
node, node.getAdminState());
return false;
return pendingReconstructionBlocksCount != 0 ||
lowRedundancyBlocksCount != 0;
}

public int getActiveBlockCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ private void processCancelledNodes() {
DatanodeDescriptor dn = cancelledNodes.poll();
outOfServiceNodeBlocks.remove(dn);
pendingRep.remove(dn);
dn.setTrackedForDecommissionOrMaintenance(false);
}
}

Expand Down Expand Up @@ -301,6 +302,42 @@ private void check() {

// Finally move the nodes to their final state if they are ready.
processCompletedNodes(toRemove);

for (final DatanodeDescriptor dn : outOfServiceNodeBlocks.keySet()) {
if (!dn.isAlive()) {
if (blockManager.anyLowRedundancyOrPendingReplicationBlocks()) {
// Dead datanode should remain in decommissioning because there is risk of data loss
LOG.warn("Node {} is dead while in {}. Cannot be safely "
+ "decommissioned or be in maintenance since there is risk of reduced "
+ "data durability or data loss. Either restart the failed node or "
+ "force decommissioning or maintenance by removing, calling "
+ "refreshNodes, then re-adding to the excludes or host config files.", dn,
dn.getAdminState());
// No need to track unhealthy datanodes, when the datanode comes alive again
// and re-registers, it will be re-added to the DatanodeAdminManager
cancelledNodes.add(dn);
} else {
// Dead node can potentially be decommissioned because there is no risk of data loss
LOG.info("Node {} is dead and there are no low redundancy "
+ "blocks or blocks pending reconstruction. Safe to decommission or "
+ "put in maintenance.", dn);
}
} else if (!dn.checkBlockReportReceived()) {
// Cannot untrack a live node with no first block report because it
// may not be re-added to the DatanodeAdminManager, requeue the node if necessary
LOG.info("Healthy Node {} hasn't sent its first block report.", dn);
}
}

// Having more nodes decommissioning than can be tracked will impact decommissioning
// performance due to queueing delay
int numDecommissioningNodes =
outOfServiceNodeBlocks.size() + pendingNodes.size() - cancelledNodes.size();
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
LOG.warn("There are {} nodes decommissioning but only {} nodes will be tracked at a time. "
+ "{} nodes are currently queued waiting to be decommissioned.",
numDecommissioningNodes, maxConcurrentTrackedNodes, getPendingNodes().size());
}
}

/**
Expand Down Expand Up @@ -346,9 +383,8 @@ private void processCompletedNodes(List<DatanodeDescriptor> toRemove) {
namesystem.writeLock();
try {
for (DatanodeDescriptor dn : toRemove) {
final boolean isHealthy =
blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (isHealthy) {
if (dn.checkBlockReportReceived() &&
(dn.isAlive() || !blockManager.anyLowRedundancyOrPendingReplicationBlocks())) {
if (dn.isDecommissionInProgress()) {
dnAdmin.setDecommissioned(dn);
outOfServiceNodeBlocks.remove(dn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ private boolean exceededNumBlocksPerCheck() {
public void stopTrackingNode(DatanodeDescriptor dn) {
pendingNodes.remove(dn);
outOfServiceNodeBlocks.remove(dn);
dn.setTrackedForDecommissionOrMaintenance(false);
}

@Override
Expand Down Expand Up @@ -185,6 +186,7 @@ private void check() {
it = new CyclicIteration<>(outOfServiceNodeBlocks,
iterkey).iterator();
final List<DatanodeDescriptor> toRemove = new ArrayList<>();
final List<DatanodeDescriptor> deadToRemove = new ArrayList<>();

while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
.isRunning()) {
Expand Down Expand Up @@ -221,7 +223,32 @@ private void check() {
LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
pruneReliableBlocks(dn, blocks);
}
if (blocks.size() == 0) {

final boolean anyLowRedundancyBlocks =
blockManager.anyLowRedundancyOrPendingReplicationBlocks();
if (!dn.isAlive() && !anyLowRedundancyBlocks) {
// Dead node can potentially be decommissioned because there is no risk of data loss
LOG.info("Node {} is dead and there are no low redundancy "
+ "blocks or blocks pending reconstruction. Safe to decommission or "
+ "put in maintenance.", dn);
}
if (!dn.isAlive() && anyLowRedundancyBlocks) {
// Dead datanode should remain in decommissioning because there is risk of data loss
LOG.warn("Node {} is dead while in {}. Cannot be safely "
+ "decommissioned or be in maintenance since there is risk of reduced "
+ "data durability or data loss. Either restart the failed node or "
+ "force decommissioning or maintenance by removing, calling "
+ "refreshNodes, then re-adding to the excludes or host config files.", dn,
dn.getAdminState());
// No need to track unhealthy datanodes, when the datanode comes alive again
// and re-registers, it will be re-added to the DatanodeAdminManager
deadToRemove.add(dn);
} else if (!dn.checkBlockReportReceived()) {
// Cannot untrack a live node with no first block report because it
// may not be re-added to the DatanodeAdminManager
LOG.info("Healthy Node {} hasn't sent its first block report, "
+ "cannot be decommissioned yet.", dn);
} else if (blocks.size() == 0) {
if (!fullScan) {
// If we didn't just do a full scan, need to re-check with the
// full block map.
Expand All @@ -236,9 +263,7 @@ private void check() {
}
// If the full scan is clean AND the node liveness is okay,
// we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
final boolean isHealthy =
blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (blocks.size() == 0 && isHealthy) {
if (blocks.size() == 0) {
if (dn.isDecommissionInProgress()) {
dnAdmin.setDecommissioned(dn);
toRemove.add(dn);
Expand All @@ -255,10 +280,10 @@ private void check() {
LOG.debug("Node {} is sufficiently replicated and healthy, "
+ "marked as {}.", dn, dn.getAdminState());
} else {
LOG.info("Node {} {} healthy."
LOG.info("Node {} is healthy."
+ " It needs to replicate {} more blocks."
+ " {} is still in progress.", dn,
isHealthy ? "is": "isn't", blocks.size(), dn.getAdminState());
+ " {} is still in progress.",
dn, blocks.size(), dn.getAdminState());
}
} else {
LOG.info("Node {} still has {} blocks to replicate "
Expand All @@ -276,6 +301,17 @@ private void check() {
iterkey = dn;
}
}

// Having more nodes decommissioning than can be tracked will impact decommissioning
// performance due to queueing delay
int numDecommissioningNodes =
outOfServiceNodeBlocks.size() + getPendingNodes().size() - toRemove.size();
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
LOG.warn("There are {} nodes decommissioning but only {} nodes will be tracked at a time. "
+ "{} nodes are currently queued waiting to be decommissioned.",
numDecommissioningNodes, maxConcurrentTrackedNodes, getPendingNodes().size());
}

// Remove the datanodes that are DECOMMISSIONED or in service after
// maintenance expiration.
for (DatanodeDescriptor dn : toRemove) {
Expand All @@ -284,6 +320,14 @@ private void check() {
dn);
outOfServiceNodeBlocks.remove(dn);
}

// Remove dead datanodes which are DECOMMISSION_INPROGRESS
for (DatanodeDescriptor dn : deadToRemove) {
Preconditions.checkState(dn.isDecommissionInProgress() && !dn.isAlive(),
"Removing dead node %s that is not decommission in progress!", dn);
outOfServiceNodeBlocks.remove(dn);
dn.setTrackedForDecommissionOrMaintenance(false);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ public void startDecommission(DatanodeDescriptor node) {
node.getLeavingServiceStatus().setStartTime(monotonicNow());
monitor.startTrackingNode(node);
}
} else if (!node.isTrackedForDecommissionOrMaintenance() && node.isDecommissionInProgress()
&& node.isAlive()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Resuming decommission of {} {} with {} blocks", node, storage,
storage.numBlocks());
}
monitor.startTrackingNode(node);
} else {
LOG.trace("startDecommission: Node {} in {}, nothing to do.",
node, node.getAdminState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public Configuration getConf() {
@Override
public void startTrackingNode(DatanodeDescriptor dn) {
pendingNodes.add(dn);
dn.setTrackedForDecommissionOrMaintenance(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ public Type getType() {
// HB processing can use it to tell if it is the first HB since DN restarted
private boolean heartbeatedSinceRegistration = false;

// Whether or not the node is tracked within the DatanodeAdminManager for decommissioning
// or maintenance. This is necessary to avoid adding duplicate nodes to the DatanodeAdminManager.
private boolean isTrackedForDecommissionOrMaintenance = false;

/**
* DatanodeDescriptor constructor
* @param nodeID id of the data node
Expand Down Expand Up @@ -1088,4 +1092,12 @@ public boolean hasStorageType(StorageType type) {
}
return false;
}

public void setTrackedForDecommissionOrMaintenance(final boolean isTracked) {
isTrackedForDecommissionOrMaintenance = isTracked;
}

public boolean isTrackedForDecommissionOrMaintenance() {
return isTrackedForDecommissionOrMaintenance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ protected void refreshNodes(final int nnIndex) throws IOException {
refreshNodes(conf);
}

static private DatanodeDescriptor getDatanodeDesriptor(
static DatanodeDescriptor getDatanodeDesriptor(
final FSNamesystem ns, final String datanodeUuid) {
return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
}
Expand Down
Loading