Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor {
private EventPublisher eventQueue;
private NodeManager nodeManager;
private ReplicationManager replicationManager;
private Queue<DatanodeAdminNodeDetails> pendingNodes = new ArrayDeque();
private Queue<DatanodeAdminNodeDetails> cancelledNodes = new ArrayDeque();
private Set<DatanodeAdminNodeDetails> trackedNodes = new HashSet<>();
private Queue<DatanodeDetails> pendingNodes = new ArrayDeque();
private Queue<DatanodeDetails> cancelledNodes = new ArrayDeque();
private Set<DatanodeDetails> trackedNodes = new HashSet<>();

private static final Logger LOG =
LoggerFactory.getLogger(DatanodeAdminMonitorImpl.class);
Expand All @@ -93,10 +93,8 @@ public DatanodeAdminMonitorImpl(
*/
@Override
public synchronized void startMonitoring(DatanodeDetails dn, int endInHours) {
DatanodeAdminNodeDetails nodeDetails =
new DatanodeAdminNodeDetails(dn, endInHours);
cancelledNodes.remove(nodeDetails);
pendingNodes.add(nodeDetails);
cancelledNodes.remove(dn);
pendingNodes.add(dn);
}

/**
Expand All @@ -108,9 +106,8 @@ public synchronized void startMonitoring(DatanodeDetails dn, int endInHours) {
*/
@Override
public synchronized void stopMonitoring(DatanodeDetails dn) {
DatanodeAdminNodeDetails nodeDetails = new DatanodeAdminNodeDetails(dn, 0);
pendingNodes.remove(nodeDetails);
cancelledNodes.add(nodeDetails);
pendingNodes.remove(dn);
cancelledNodes.add(dn);
}

/**
Expand Down Expand Up @@ -155,20 +152,19 @@ public int getTrackedNodeCount() {
}

@VisibleForTesting
public Set<DatanodeAdminNodeDetails> getTrackedNodes() {
public Set<DatanodeDetails> getTrackedNodes() {
return trackedNodes;
}

private void processCancelledNodes() {
while (!cancelledNodes.isEmpty()) {
DatanodeAdminNodeDetails dn = cancelledNodes.poll();
DatanodeDetails dn = cancelledNodes.poll();
try {
stopTrackingNode(dn);
putNodeBackInService(dn);
LOG.info("Recommissioned node {}", dn.getDatanodeDetails());
LOG.info("Recommissioned node {}", dn);
} catch (NodeNotFoundException e) {
LOG.warn("Failed processing the cancel admin request for {}",
dn.getDatanodeDetails(), e);
LOG.warn("Failed processing the cancel admin request for {}", dn, e);
}
}
}
Expand All @@ -180,11 +176,11 @@ private void processPendingNodes() {
}

private void processTransitioningNodes() {
Iterator<DatanodeAdminNodeDetails> iterator = trackedNodes.iterator();
Iterator<DatanodeDetails> iterator = trackedNodes.iterator();
while (iterator.hasNext()) {
DatanodeAdminNodeDetails dn = iterator.next();
DatanodeDetails dn = iterator.next();
try {
NodeStatus status = getNodeStatus(dn.getDatanodeDetails());
NodeStatus status = getNodeStatus(dn);

if (!shouldContinueWorkflow(dn, status)) {
abortWorkflow(dn);
Expand All @@ -193,7 +189,7 @@ private void processTransitioningNodes() {
}

if (status.isMaintenance()) {
if (dn.shouldMaintenanceEnd()) {
if (status.operationalStateExpired()) {
completeMaintenance(dn);
iterator.remove();
continue;
Expand All @@ -205,12 +201,12 @@ private void processTransitioningNodes() {
// Ensure the DN has received and persisted the current maint
// state.
&& status.getOperationalState()
== dn.getDatanodeDetails().getPersistedOpState()
== dn.getPersistedOpState()
&& checkContainersReplicatedOnNode(dn)) {
// CheckContainersReplicatedOnNode may take a short time to run
// so after it completes, re-get the nodestatus to check the health
// and ensure the state is still good to continue
status = getNodeStatus(dn.getDatanodeDetails());
status = getNodeStatus(dn);
if (status.isDead()) {
LOG.warn("Datanode {} is dead and the admin workflow cannot " +
"continue. The node will be put back to IN_SERVICE and " +
Expand All @@ -228,7 +224,7 @@ && checkContainersReplicatedOnNode(dn)) {

} catch (NodeNotFoundException e) {
LOG.error("An unexpected error occurred processing datanode {}. " +
"Aborting the admin workflow", dn.getDatanodeDetails(), e);
"Aborting the admin workflow", dn, e);
abortWorkflow(dn);
iterator.remove();
}
Expand All @@ -244,43 +240,43 @@ && checkContainersReplicatedOnNode(dn)) {
* @param nodeStatus The current NodeStatus for the datanode
* @return True if admin can continue, false otherwise
*/
private boolean shouldContinueWorkflow(DatanodeAdminNodeDetails dn,
private boolean shouldContinueWorkflow(DatanodeDetails dn,
NodeStatus nodeStatus) {
if (!nodeStatus.isDecommission() && !nodeStatus.isMaintenance()) {
LOG.warn("Datanode {} has an operational state of {} when it should " +
"be undergoing decommission or maintenance. Aborting admin for " +
"this node.",
dn.getDatanodeDetails(), nodeStatus.getOperationalState());
"this node.", dn, nodeStatus.getOperationalState());
return false;
}
if (nodeStatus.isDead() && !nodeStatus.isInMaintenance()) {
LOG.error("Datanode {} is dead but is not IN_MAINTENANCE. Aborting the " +
"admin workflow for this node", dn.getDatanodeDetails());
"admin workflow for this node", dn);
return false;
}
return true;
}

private boolean checkPipelinesClosedOnNode(DatanodeAdminNodeDetails dn) {
DatanodeDetails dnd = dn.getDatanodeDetails();
Set<PipelineID> pipelines = nodeManager.getPipelines(dnd);
private boolean checkPipelinesClosedOnNode(DatanodeDetails dn)
throws NodeNotFoundException {
Set<PipelineID> pipelines = nodeManager.getPipelines(dn);
NodeStatus status = nodeManager.getNodeStatus(dn);
if (pipelines == null || pipelines.size() == 0
|| dn.shouldMaintenanceEnd()) {
|| status.operationalStateExpired()) {
return true;
} else {
LOG.info("Waiting for pipelines to close for {}. There are {} " +
"pipelines", dnd, pipelines.size());
"pipelines", dn, pipelines.size());
return false;
}
}

private boolean checkContainersReplicatedOnNode(DatanodeAdminNodeDetails dn)
private boolean checkContainersReplicatedOnNode(DatanodeDetails dn)
throws NodeNotFoundException {
int sufficientlyReplicated = 0;
int underReplicated = 0;
int unhealthy = 0;
Set<ContainerID> containers =
nodeManager.getContainers(dn.getDatanodeDetails());
nodeManager.getContainers(dn);
for (ContainerID cid : containers) {
try {
ContainerReplicaCount replicaSet =
Expand All @@ -295,46 +291,40 @@ private boolean checkContainersReplicatedOnNode(DatanodeAdminNodeDetails dn)
}
} catch (ContainerNotFoundException e) {
LOG.warn("ContainerID {} present in node list for {} but not found " +
"in containerManager", cid, dn.getDatanodeDetails());
"in containerManager", cid, dn);
}
}
dn.setSufficientlyReplicatedContainers(sufficientlyReplicated);
dn.setUnderReplicatedContainers(underReplicated);
dn.setUnHealthyContainers(unhealthy);

return underReplicated == 0 && unhealthy == 0;
}

private void completeDecommission(DatanodeAdminNodeDetails dn)
private void completeDecommission(DatanodeDetails dn)
throws NodeNotFoundException {
setNodeOpState(dn, NodeOperationalState.DECOMMISSIONED);
LOG.info("Datanode {} has completed the admin workflow. The operational " +
"state has been set to {}", dn.getDatanodeDetails(),
"state has been set to {}", dn,
NodeOperationalState.DECOMMISSIONED);
}

private void putIntoMaintenance(DatanodeAdminNodeDetails dn)
private void putIntoMaintenance(DatanodeDetails dn)
throws NodeNotFoundException {
LOG.info("Datanode {} has entered maintenance", dn.getDatanodeDetails());
LOG.info("Datanode {} has entered maintenance", dn);
setNodeOpState(dn, NodeOperationalState.IN_MAINTENANCE);
}

private void completeMaintenance(DatanodeAdminNodeDetails dn)
private void completeMaintenance(DatanodeDetails dn)
throws NodeNotFoundException {
// The end state of Maintenance is to put the node back IN_SERVICE, whether
// it is dead or not.
LOG.info("Datanode {} has ended maintenance automatically",
dn.getDatanodeDetails());
LOG.info("Datanode {} has ended maintenance automatically", dn);
putNodeBackInService(dn);
}

private void startTrackingNode(DatanodeAdminNodeDetails dn) {
eventQueue.fireEvent(SCMEvents.START_ADMIN_ON_NODE,
dn.getDatanodeDetails());
private void startTrackingNode(DatanodeDetails dn) {
eventQueue.fireEvent(SCMEvents.START_ADMIN_ON_NODE, dn);
trackedNodes.add(dn);
}

private void stopTrackingNode(DatanodeAdminNodeDetails dn) {
private void stopTrackingNode(DatanodeDetails dn) {
trackedNodes.remove(dn);
}

Expand All @@ -345,24 +335,29 @@ private void stopTrackingNode(DatanodeAdminNodeDetails dn) {
*
* @param dn The datanode for which to abort tracking
*/
private void abortWorkflow(DatanodeAdminNodeDetails dn) {
private void abortWorkflow(DatanodeDetails dn) {
try {
putNodeBackInService(dn);
} catch (NodeNotFoundException e) {
LOG.error("Unable to set the node OperationalState for {} while " +
"aborting the datanode admin workflow", dn.getDatanodeDetails());
"aborting the datanode admin workflow", dn);
}
}

private void putNodeBackInService(DatanodeAdminNodeDetails dn)
private void putNodeBackInService(DatanodeDetails dn)
throws NodeNotFoundException {
setNodeOpState(dn, NodeOperationalState.IN_SERVICE);
}

private void setNodeOpState(DatanodeAdminNodeDetails dn,
private void setNodeOpState(DatanodeDetails dn,
HddsProtos.NodeOperationalState state) throws NodeNotFoundException {
nodeManager.setNodeOperationalState(dn.getDatanodeDetails(), state,
dn.getMaintenanceEnd() / 1000);
long expiry = 0;
if ((state == NodeOperationalState.IN_MAINTENANCE)
|| (state == NodeOperationalState.ENTERING_MAINTENANCE)) {
NodeStatus status = nodeManager.getNodeStatus(dn);
expiry = status.getOpStateExpiryEpochSeconds();
}
nodeManager.setNodeOperationalState(dn, state, expiry);
}

private NodeStatus getNodeStatus(DatanodeDetails dnd)
Expand Down

This file was deleted.

Loading