diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 161780668ab0..a27d44e87009 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -366,6 +366,11 @@ public final class ScmConfigKeys { public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled"; public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true; + public static final String OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL = + "ozone.scm.datanode.admin.monitor.interval"; + public static final String OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT = + "30s"; + /** * Never constructed. */ diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index b0a59fa209cc..62878a4a0763 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -2501,4 +2501,15 @@ The number of Recon Tasks that are waiting on updates from OM. + + ozone.scm.datanode.admin.monitor.interval + 30s + SCM + + This sets how frequently the datanode admin monitor runs to check for + nodes added to the admin workflow or removed from it. The progress + of decommissioning and entering maintenance nodes is also checked to see + if they have completed. + + diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 43d396e0cb12..97e998c90c46 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -169,6 +169,12 @@ public final class SCMEvents { public static final TypedEvent DEAD_NODE = new TypedEvent<>(DatanodeDetails.class, "Dead_Node"); + /** + * This event will be triggered whenever a datanode is moved into maintenance. + */ + public static final TypedEvent START_ADMIN_ON_NODE = + new TypedEvent<>(DatanodeDetails.class, "START_ADMIN_ON_NODE"); + /** * This event will be triggered whenever a datanode is moved from non-healthy * state to healthy state. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java new file mode 100644 index 000000000000..3f3521766056 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; +import org.apache.hadoop.ozone.common.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Queue; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Set; + +/** + * Monitor thread which watches for nodes to be decommissioned, recommissioned + * or placed into maintenance. Newly added nodes are queued in pendingNodes + * and recommissoned nodes are queued in cancelled nodes. On each monitor + * 'tick', the cancelled nodes are processed and removed from the monitor. + * Then any pending nodes are added to the trackedNodes set, where they stay + * until decommission or maintenance has ended. + * + * Once an node is placed into tracked nodes, it goes through a workflow where + * the following happens: + * + * 1. First an event is fired to close any pipelines on the node, which will + * also close any containers. + * 2. Next the containers on the node are obtained and checked to see if new + * replicas are needed. If so, the new replicas are scheduled. + * 3. After scheduling replication, the node remains pending until replication + * has completed. + * 4. At this stage the node will complete decommission or enter maintenance. + * 5. Maintenance nodes will remain tracked by this monitor until maintenance + * is manually ended, or the maintenance window expires. + */ +public class DatanodeAdminMonitor implements DatanodeAdminMonitorInterface { + + private OzoneConfiguration conf; + private EventPublisher eventQueue; + private NodeManager nodeManager; + private PipelineManager pipelineManager; + private Queue pendingNodes = new ArrayDeque(); + private Queue cancelledNodes = new ArrayDeque(); + private Set trackedNodes = new HashSet<>(); + private StateMachine workflowSM; + + /** + * States that a node must pass through when being decommissioned or placed + * into maintenance. + */ + public enum States { + CLOSE_PIPELINES(1), + GET_CONTAINERS(2), + REPLICATE_CONTAINERS(3), + AWAIT_MAINTENANCE_END(4), + COMPLETE(5); + + private int sequenceNumber; + + States(int sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + public int getSequenceNumber() { + return sequenceNumber; + } + } + + /** + * Transition events that occur to move a node from one state to the next. + */ + public enum Transitions { + COMPLETE_DECOM_STAGE, COMPLETE_MAINT_STAGE, UNEXPECTED_NODE_STATE + } + + private static final Logger LOG = + LoggerFactory.getLogger(DatanodeAdminMonitor.class); + + public DatanodeAdminMonitor(OzoneConfiguration config) { + conf = config; + initializeStateMachine(); + } + + @Override + public void setConf(OzoneConfiguration config) { + conf = config; + } + + @Override + public void setEventQueue(EventPublisher eventQueue) { + this.eventQueue = eventQueue; + } + + @Override + public void setNodeManager(NodeManager nm) { + nodeManager = nm; + } + + @Override + public void setPipelineManager(PipelineManager pm) { + pipelineManager = pm; + } + + /** + * Add a node to the decommission or maintenance workflow. The node will be + * queued and added to the workflow after a defined interval. + * + * @param dn The datanode to move into an admin state + * @param endInHours For nodes going into maintenance, the number of hours + * from now for maintenance to automatically end. Ignored + * for decommissioning nodes. + */ + @Override + public synchronized void startMonitoring(DatanodeDetails dn, int endInHours) { + DatanodeAdminNodeDetails nodeDetails = + new DatanodeAdminNodeDetails(dn, workflowSM.getInitialState(), + endInHours); + cancelledNodes.remove(nodeDetails); + pendingNodes.add(nodeDetails); + } + + /** + * Remove a node from the decommission or maintenance workflow, and return it + * to service. The node will be queued and removed from decommission or + * maintenance after a defined interval. + * @param dn The datanode for which to stop decommission or maintenance. + */ + @Override + public synchronized void stopMonitoring(DatanodeDetails dn) { + DatanodeAdminNodeDetails nodeDetails = new DatanodeAdminNodeDetails(dn, + workflowSM.getInitialState(), 0); + pendingNodes.remove(nodeDetails); + cancelledNodes.add(nodeDetails); + } + + /** + * Run an iteration of the monitor. This is the main run loop, and performs + * the following checks: + * + * 1. Check for any cancelled nodes and process them + * 2. Check for any newly added nodes and add them to the workflow + * 3. Wait for any nodes which have completed closing pipelines + */ + @Override + public void run() { + try { + synchronized (this) { + processCancelledNodes(); + processPendingNodes(); + } + checkPipelinesClosed(); + if (trackedNodes.size() > 0 || pendingNodes.size() > 0) { + LOG.info("There are {} nodes tracked for decommission and "+ + "maintenance. {} pending nodes.", + trackedNodes.size(), pendingNodes.size()); + } + } catch (Exception e) { + LOG.error("Caught an error in the DatanodeAdminMonitor", e); + } + } + + @Override + public int getPendingCount() { + return pendingNodes.size(); + } + + @Override + public int getCancelledCount() { + return cancelledNodes.size(); + } + + @Override + public int getTrackedNodeCount() { + return trackedNodes.size(); + } + + @VisibleForTesting + public Set getTrackedNodes() { + return trackedNodes; + } + + /** + * Return the state machine used to transition a node through the admin + * workflow. + * @return The StateMachine used by the admin workflow + */ + @VisibleForTesting + public StateMachine getWorkflowStateMachine() { + return workflowSM; + } + + private void processCancelledNodes() { + while(!cancelledNodes.isEmpty()) { + DatanodeAdminNodeDetails dn = cancelledNodes.poll(); + trackedNodes.remove(dn); + // TODO - fire event to bring node back into service? + } + } + + private void processPendingNodes() { + while(!pendingNodes.isEmpty()) { + DatanodeAdminNodeDetails dn = pendingNodes.poll(); + // Trigger event to async close the node pipelines. + eventQueue.fireEvent(SCMEvents.START_ADMIN_ON_NODE, + dn.getDatanodeDetails()); + trackedNodes.add(dn); + } + } + + private void checkPipelinesClosed() { + for (DatanodeAdminNodeDetails dn : trackedNodes) { + if (dn.getCurrentState() != States.CLOSE_PIPELINES) { + continue; + } + DatanodeDetails dnd = dn.getDatanodeDetails(); + Set pipelines = nodeManager.getPipelines(dnd); + if (pipelines == null || pipelines.size() == 0) { + NodeStatus nodeStatus = nodeManager.getNodeStatus(dnd); + try { + dn.transitionState(workflowSM, nodeStatus.getOperationalState()); + } catch (InvalidStateTransitionException e) { + LOG.warn("Unexpected state transition", e); + // TODO - how to handle this? This means the node is not in + // an expected state, eg it is IN_SERVICE when it should be + // decommissioning, so should we abort decom altogether for it? + // This could happen if a node is queued for cancel and not yet + // processed. + } + } else { + LOG.info("Waiting for pipelines to close for {}. There are {} "+ + "pipelines", dnd, pipelines.size()); + } + } + } + + /** + * Setup the state machine with the allowed transitions for a node to move + * through the maintenance workflow. + */ + private void initializeStateMachine() { + Set finalStates = new HashSet<>(); + workflowSM = new StateMachine<>(States.CLOSE_PIPELINES, finalStates); + workflowSM.addTransition(States.CLOSE_PIPELINES, + States.GET_CONTAINERS, Transitions.COMPLETE_DECOM_STAGE); + workflowSM.addTransition(States.GET_CONTAINERS, States.REPLICATE_CONTAINERS, + Transitions.COMPLETE_DECOM_STAGE); + workflowSM.addTransition(States.REPLICATE_CONTAINERS, States.COMPLETE, + Transitions.COMPLETE_DECOM_STAGE); + + workflowSM.addTransition(States.CLOSE_PIPELINES, + States.GET_CONTAINERS, Transitions.COMPLETE_MAINT_STAGE); + workflowSM.addTransition(States.GET_CONTAINERS, States.REPLICATE_CONTAINERS, + Transitions.COMPLETE_MAINT_STAGE); + workflowSM.addTransition(States.REPLICATE_CONTAINERS, + States.AWAIT_MAINTENANCE_END, Transitions.COMPLETE_MAINT_STAGE); + workflowSM.addTransition(States.AWAIT_MAINTENANCE_END, + States.COMPLETE, Transitions.COMPLETE_MAINT_STAGE); + } + +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorInterface.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorInterface.java new file mode 100644 index 000000000000..d15162b70357 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorInterface.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +/** + * Interface used by the DatanodeAdminMonitor, which can be used to + * decommission or recommission nodes and take them in and out of maintenance. + */ +public interface DatanodeAdminMonitorInterface extends Runnable { + + void setConf(OzoneConfiguration conf); + void setEventQueue(EventPublisher scm); + void setNodeManager(NodeManager nm); + void setPipelineManager(PipelineManager pm); + void startMonitoring(DatanodeDetails dn, int endInHours); + void stopMonitoring(DatanodeDetails dn); + + int getPendingCount(); + int getCancelledCount(); + int getTrackedNodeCount(); +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java new file mode 100644 index 000000000000..a0607e97679a --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; +import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; +import org.apache.hadoop.ozone.common.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used by the DatanodeAdminMonitor to track the state and + * details for Datanode decommission and maintenance. It provides a wrapper + * around a DatanodeDetails object adding some additional states and helper + * methods related to the admin workflow. + */ +public class DatanodeAdminNodeDetails { + private DatanodeDetails datanodeDetails; + private long maintenanceEndTime; + private DatanodeAdminMonitor.States currentState; + private long enteredStateAt = 0; + + private static final Logger LOG = + LoggerFactory.getLogger(DatanodeAdminNodeDetails.class); + + + /** + * Create a new object given the DatanodeDetails and the maintenance endtime. + * @param dn The datanode going through the admin workflow + * @param maintenanceEnd The number of hours from 'now', when maintenance + * should end automatically. Passing zero indicates + * indicates maintenance will never end automatically. + */ + DatanodeAdminNodeDetails(DatanodeDetails dn, + DatanodeAdminMonitor.States initialState, long maintenanceEnd) { + datanodeDetails = dn; + setMaintenanceEnd(maintenanceEnd); + currentState = initialState; + enteredStateAt = System.currentTimeMillis(); + } + + public boolean shouldMaintenanceEnd() { + if (0 == maintenanceEndTime) { + return false; + } + return System.currentTimeMillis() >= maintenanceEndTime; + } + + public DatanodeDetails getDatanodeDetails() { + return datanodeDetails; + } + + /** + * Get the current admin workflow state for this node. + * @return The current Admin workflow state for this node + */ + public DatanodeAdminMonitor.States getCurrentState() { + return currentState; + } + + /** + * Set the number of hours after which maintenance should end. Passing zero + * indicates maintenance will never end automatically. It is possible to pass + * a negative number of hours can be passed for testing purposes. + * @param hoursFromNow The number of hours from now when maintenance should + * end, or zero for it to never end. + */ + @VisibleForTesting + public void setMaintenanceEnd(long hoursFromNow) { + if (0 == hoursFromNow) { + maintenanceEndTime = 0; + return; + } + // Convert hours to ms + long msFromNow = hoursFromNow * 60L * 60L * 1000L; + maintenanceEndTime = System.currentTimeMillis() + msFromNow; + } + + /** + * Given the workflow stateMachine and the current node status + * (DECOMMISSIONING or ENTERING_MAINTENANCE) move the node to the next + * admin workflow state. + * @param sm The stateMachine which controls the state flow + * @param nodeOperationalState The current operational state for the node, eg + * decommissioning or entering_maintenance + * @return + * @throws InvalidStateTransitionException + */ + public DatanodeAdminMonitor.States transitionState( + StateMachine sm, + NodeOperationalState nodeOperationalState) + throws InvalidStateTransitionException { + + DatanodeAdminMonitor.States newState = sm.getNextState(currentState, + getTransition(nodeOperationalState)); + long currentTime = System.currentTimeMillis(); + LOG.info("Datanode {} moved from admin workflow state {} to {} after {} "+ + "seconds", datanodeDetails, currentState, newState, + (currentTime - enteredStateAt)/1000L); + currentState = newState; + enteredStateAt = currentTime; + return currentState; + } + + private DatanodeAdminMonitor.Transitions getTransition( + NodeOperationalState nodeState) { + if (nodeState == NodeOperationalState.DECOMMISSIONED || + nodeState == NodeOperationalState.DECOMMISSIONING) { + return DatanodeAdminMonitor.Transitions.COMPLETE_DECOM_STAGE; + } else if (nodeState == + NodeOperationalState.ENTERING_MAINTENANCE || + nodeState == NodeOperationalState.IN_MAINTENANCE) { + return DatanodeAdminMonitor.Transitions.COMPLETE_MAINT_STAGE; + } else { + return DatanodeAdminMonitor.Transitions.UNEXPECTED_NODE_STATE; + } + } + + /** + * Matches only on the DatanodeDetails field, which compares only the UUID + * of the node to determine of they are the same object or not. + * + * @param o The object to compare this with + * @return True if the object match, otherwise false + * + */ + @Override + public boolean equals(Object o) { + return o instanceof DatanodeAdminNodeDetails && + datanodeDetails.equals( + ((DatanodeAdminNodeDetails) o).getDatanodeDetails()); + } + + @Override + public int hashCode() { + return datanodeDetails.hashCode(); + } + +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java index 9806fbb430c8..fbc5981eeea7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java @@ -16,14 +16,16 @@ */ package org.apache.hadoop.hdds.scm.node; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,23 +35,28 @@ import java.net.UnknownHostException; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Class used to manage datanodes scheduled for maintenance or decommission. */ public class NodeDecommissionManager { + private ScheduledExecutorService executor; + private DatanodeAdminMonitorInterface monitor; + private NodeManager nodeManager; - // private PipelineManager pipeLineManager; + private PipelineManager pipelineManager; // private ContainerManager containerManager; - // private OzoneConfiguration conf; + private EventPublisher eventQueue; + private OzoneConfiguration conf; private boolean useHostnames; - - private List pendingNodes = new LinkedList<>(); + private long monitorInterval; private static final Logger LOG = - LoggerFactory.getLogger(DatanodeAdminManager.class); - + LoggerFactory.getLogger(NodeDecommissionManager.class); static class HostDefinition { private String rawHostname; @@ -157,17 +164,47 @@ private boolean validateDNPortMatch(int port, DatanodeDetails dn) { return false; } - public NodeDecommissionManager(OzoneConfiguration conf, - NodeManager nodeManager, PipelineManager pipelineManager, - ContainerManager containerManager) { - this.nodeManager = nodeManager; - //this.conf = conf; - //this.pipeLineManager = pipelineManager; + public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm, + PipelineManager pm, ContainerManager containerManager, + EventPublisher eventQueue) { + this.nodeManager = nm; + conf = config; + this.pipelineManager = pm; //this.containerManager = containerManager; + this.eventQueue = eventQueue; + + executor = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("DatanodeAdminManager-%d") + .setDaemon(true).build()); useHostnames = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); + + monitorInterval = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL, + ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT, + TimeUnit.SECONDS); + if (monitorInterval <= 0) { + LOG.warn("{} must be greater than zero, defaulting to {}", + ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL, + ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT); + conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL, + ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT); + monitorInterval = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL, + ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT, + TimeUnit.SECONDS); + } + + monitor = new DatanodeAdminMonitor(conf); + monitor.setConf(conf); + monitor.setEventQueue(this.eventQueue); + monitor.setNodeManager(nodeManager); + monitor.setPipelineManager(pipelineManager); + + executor.scheduleAtFixedRate(monitor, monitorInterval, monitorInterval, + TimeUnit.SECONDS); } public synchronized void decommissionNodes(List nodes) @@ -201,7 +238,7 @@ public synchronized void startDecommission(DatanodeDetails dn) LOG.info("Starting Decommission for node {}", dn); nodeManager.setNodeOperationalState( dn, NodeOperationalState.DECOMMISSIONING); - pendingNodes.add(dn); + monitor.startMonitoring(dn, 0); } else if (opState == NodeOperationalState.DECOMMISSIONING || opState == NodeOperationalState.DECOMMISSIONED) { LOG.info("Start Decommission called on node {} in state {}. Nothing to "+ @@ -238,7 +275,7 @@ public synchronized void recommission(DatanodeDetails dn) if (opState != NodeOperationalState.IN_SERVICE) { nodeManager.setNodeOperationalState( dn, NodeOperationalState.IN_SERVICE); - pendingNodes.remove(dn); + monitor.stopMonitoring(dn); LOG.info("Recommissioned node {}", dn); } else { LOG.info("Recommission called on node {} with state {}. "+ @@ -278,7 +315,7 @@ public synchronized void startMaintenance(DatanodeDetails dn, int endInHours) if (opState == NodeOperationalState.IN_SERVICE) { nodeManager.setNodeOperationalState( dn, NodeOperationalState.ENTERING_MAINTENANCE); - pendingNodes.add(dn); + monitor.startMonitoring(dn, endInHours); LOG.info("Starting Maintenance for node {}", dn); } else if (opState == NodeOperationalState.ENTERING_MAINTENANCE || opState == NodeOperationalState.IN_MAINTENANCE) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java new file mode 100644 index 000000000000..9418a7a3e362 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StartDatanodeAdminHandler.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Set; + +/** + * Handler which is fired when a datanode starts admin (decommission or + * maintenance). + */ +public class StartDatanodeAdminHandler + implements EventHandler { + private static final Logger LOG = + LoggerFactory.getLogger(StartDatanodeAdminHandler.class); + + private final NodeManager nodeManager; + private final PipelineManager pipelineManager; + + public StartDatanodeAdminHandler(NodeManager nodeManager, + PipelineManager pipelineManager) { + this.nodeManager = nodeManager; + this.pipelineManager = pipelineManager; + } + + @Override + public void onMessage(DatanodeDetails datanodeDetails, + EventPublisher publisher) { + Set pipelineIds = + nodeManager.getPipelines(datanodeDetails); + LOG.info("Admin start on datanode {}. Finalizing its pipelines {}", + datanodeDetails, pipelineIds); + for (PipelineID pipelineID : pipelineIds) { + try { + Pipeline pipeline = pipelineManager.getPipeline(pipelineID); + pipelineManager.finalizeAndDestroyPipeline(pipeline, false); + } catch (IOException e) { + LOG.info("Could not finalize pipeline={} for dn={}", pipelineID, + datanodeDetails); + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 24572c7ab18c..db0f7cfe5f8c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreRDBImpl; import org.apache.hadoop.hdds.scm.node.DeadNodeHandler; import org.apache.hadoop.hdds.scm.node.NewNodeHandler; +import org.apache.hadoop.hdds.scm.node.StartDatanodeAdminHandler; import org.apache.hadoop.hdds.scm.node.NonHealthyToHealthyNodeHandler; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeReportHandler; @@ -296,6 +297,8 @@ public StorageContainerManager(OzoneConfiguration conf, new StaleNodeHandler(scmNodeManager, pipelineManager, conf); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, pipelineManager, containerManager); + StartDatanodeAdminHandler datanodeStartAdminHandler = + new StartDatanodeAdminHandler(scmNodeManager, pipelineManager); NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler = new NonHealthyToHealthyNodeHandler(pipelineManager, conf); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); @@ -338,7 +341,7 @@ public StorageContainerManager(OzoneConfiguration conf, pipelineManager); scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager, - pipelineManager, containerManager); + pipelineManager, containerManager, eventQueue); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); @@ -353,6 +356,8 @@ public StorageContainerManager(OzoneConfiguration conf, eventQueue.addHandler(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE, nonHealthyToHealthyNodeHandler); eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); + eventQueue.addHandler(SCMEvents.START_ADMIN_ON_NODE, + datanodeStartAdminHandler); eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); eventQueue .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java new file mode 100644 index 000000000000..0aa02218dfca --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; +import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static junit.framework.TestCase.assertEquals; + +/** + * Tests to ensure the DatanodeAdminMonitor is working correctly. + */ +public class TestDatanodeAdminMonitor { + + private StorageContainerManager scm; + private NodeManager nodeManager; + private ContainerManager containerManager; + private SCMPipelineManager pipelineManager; + private OzoneConfiguration conf; + private DatanodeAdminMonitor monitor; + private DatanodeDetails datanode1; + private DatanodeDetails datanode2; + private DatanodeDetails datanode3; + + @Before + public void setup() throws IOException, AuthenticationException { + // This creates a mocked cluster of 6 nodes, where there are mock pipelines + // etc. Borrows heavily from TestDeadNodeHandler. + conf = new OzoneConfiguration(); + String storageDir = GenericTestUtils.getTempPath( + TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID()); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir); + + scm = HddsTestUtils.getScm(conf); + nodeManager = scm.getScmNodeManager(); + pipelineManager = (SCMPipelineManager)scm.getPipelineManager(); + containerManager = scm.getContainerManager(); + + monitor = new DatanodeAdminMonitor(conf); + monitor.setEventQueue(scm.getEventQueue()); + monitor.setNodeManager(nodeManager); + monitor.setPipelineManager(pipelineManager); + + PipelineProvider mockRatisProvider = + new MockRatisPipelineProvider(nodeManager, + pipelineManager.getStateManager(), conf); + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + mockRatisProvider); + + datanode1 = TestUtils.randomDatanodeDetails(); + datanode2 = TestUtils.randomDatanodeDetails(); + datanode3 = TestUtils.randomDatanodeDetails(); + + String storagePath = GenericTestUtils.getRandomizedTempPath() + .concat("/" + datanode1.getUuidString()); + + StorageContainerDatanodeProtocolProtos.StorageReportProto + storageOne = TestUtils.createStorageReport( + datanode1.getUuid(), storagePath, 100, 10, 90, null); + + nodeManager.register(datanode1, + TestUtils.createNodeReport(storageOne), null); + nodeManager.register(datanode2, + TestUtils.createNodeReport(storageOne), null); + nodeManager.register(datanode3, + TestUtils.createNodeReport(storageOne), null); + nodeManager.register(TestUtils.randomDatanodeDetails(), + TestUtils.createNodeReport(storageOne), null); + nodeManager.register(TestUtils.randomDatanodeDetails(), + TestUtils.createNodeReport(storageOne), null); + nodeManager.register(TestUtils.randomDatanodeDetails(), + TestUtils.createNodeReport(storageOne), null); + } + + @After + public void teardown() { + } + + @Test + public void testNodeCanBeQueuedAndCancelled() { + DatanodeDetails dn = TestUtils.randomDatanodeDetails(); + monitor.startMonitoring(dn, 0); + assertEquals(1, monitor.getPendingCount()); + + monitor.stopMonitoring(dn); + assertEquals(0, monitor.getPendingCount()); + assertEquals(1, monitor.getCancelledCount()); + + monitor.startMonitoring(dn, 0); + assertEquals(1, monitor.getPendingCount()); + assertEquals(0, monitor.getCancelledCount()); + + } + + @Test + public void testMonitoredNodeHasPipelinesClosed() + throws NodeNotFoundException, TimeoutException, InterruptedException { + + GenericTestUtils.waitFor(() -> nodeManager + .getPipelines(datanode1).size() == 2, 100, 20000); + + nodeManager.setNodeOperationalState(datanode1, + HddsProtos.NodeOperationalState.DECOMMISSIONING); + monitor.startMonitoring(datanode1, 0); + monitor.run(); + // Ensure the node moves from pending to tracked + assertEquals(0, monitor.getPendingCount()); + assertEquals(1, monitor.getTrackedNodeCount()); + + // Ensure the pipelines are closed, as this is the first step in the admin + // workflow + GenericTestUtils.waitFor(() -> nodeManager + .getPipelines(datanode1).size() == 0, 100, 20000); + + // Run the run loop again and ensure the tracked node is moved to the next + // state + monitor.run(); + for (DatanodeAdminNodeDetails node : monitor.getTrackedNodes()) { + assertEquals( + DatanodeAdminMonitor.States.GET_CONTAINERS, node.getCurrentState()); + } + // Finally, cancel decommission and see the node is removed from tracking + monitor.stopMonitoring(datanode1); + monitor.run(); + assertEquals(0, monitor.getTrackedNodeCount()); + } + +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminNodeDetails.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminNodeDetails.java new file mode 100644 index 000000000000..3b5177e47697 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminNodeDetails.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; +import + org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static junit.framework.TestCase.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; + +/** + * Tests to validate the DatanodeAdminNodeDetails class. + */ +public class TestDatanodeAdminNodeDetails { + + private OzoneConfiguration conf; + private DatanodeAdminMonitor monitor; + private final DatanodeAdminMonitor.States initialState = + DatanodeAdminMonitor.States.CLOSE_PIPELINES; + + @Before + public void setup() { + conf = new OzoneConfiguration(); + monitor = new DatanodeAdminMonitor(conf); + } + + @After + public void teardown() { + } + + @Test + public void testEqualityBasedOnDatanodeDetails() { + DatanodeDetails dn1 = TestUtils.randomDatanodeDetails(); + DatanodeDetails dn2 = TestUtils.randomDatanodeDetails(); + DatanodeAdminNodeDetails details1 = + new DatanodeAdminNodeDetails(dn1, initialState, 0); + DatanodeAdminNodeDetails details2 = + new DatanodeAdminNodeDetails(dn2, initialState, 0); + + assertNotEquals(details1, details2); + assertEquals(details1, + new DatanodeAdminNodeDetails(dn1, initialState, 0)); + assertNotEquals(details1, dn1); + } + + @Test + public void testUnexpectedNodeStateGivesBadTransition() { + DatanodeDetails dn = TestUtils.randomDatanodeDetails(); + DatanodeAdminNodeDetails details = + new DatanodeAdminNodeDetails(dn, initialState, 0); + + try { + details.transitionState(monitor.getWorkflowStateMachine(), + NodeOperationalState.IN_SERVICE); + fail("InvalidStateTransitionException should be thrown"); + } catch (InvalidStateTransitionException e) { + + } + } + + @Test + public void testWorkflowStatesTransitionCorrectlyForDecom() + throws InvalidStateTransitionException { + DatanodeDetails dn = TestUtils.randomDatanodeDetails(); + DatanodeAdminNodeDetails details = new DatanodeAdminNodeDetails(dn, + initialState, 0); + + // Initial state should be CLOSE_PIPELINES + assertEquals(DatanodeAdminMonitor.States.CLOSE_PIPELINES, + details.getCurrentState()); + + // Next State is GET_CONTAINERS + details.transitionState(monitor.getWorkflowStateMachine(), + NodeOperationalState.DECOMMISSIONING); + assertEquals(DatanodeAdminMonitor.States.GET_CONTAINERS, + details.getCurrentState()); + + // Next State is REPLICATE_CONTAINERS + details.transitionState(monitor.getWorkflowStateMachine(), + NodeOperationalState.DECOMMISSIONING); + assertEquals(DatanodeAdminMonitor.States.REPLICATE_CONTAINERS, + details.getCurrentState()); + + // Next State is COMPLETE + details.transitionState(monitor.getWorkflowStateMachine(), + NodeOperationalState.DECOMMISSIONING); + assertEquals(DatanodeAdminMonitor.States.COMPLETE, + details.getCurrentState()); + } + + @Test + public void testWorkflowStatesTransitionCorrectlyForMaint() + throws InvalidStateTransitionException { + DatanodeDetails dn = TestUtils.randomDatanodeDetails(); + DatanodeAdminNodeDetails details = new DatanodeAdminNodeDetails(dn, + initialState, 0); + + // Initial state should be CLOSE_PIPELINES + assertEquals(DatanodeAdminMonitor.States.CLOSE_PIPELINES, + details.getCurrentState()); + + // Next State is GET_CONTAINERS + details.transitionState(monitor.getWorkflowStateMachine(), + NodeOperationalState.ENTERING_MAINTENANCE); + assertEquals(DatanodeAdminMonitor.States.GET_CONTAINERS, + details.getCurrentState()); + + // Next State is REPLICATE_CONTAINER + details.transitionState(monitor.getWorkflowStateMachine(), + NodeOperationalState.ENTERING_MAINTENANCE); + assertEquals(DatanodeAdminMonitor.States.REPLICATE_CONTAINERS, + details.getCurrentState()); + + // Next State is AWAIT_MAINTENANCE_END + details.transitionState(monitor.getWorkflowStateMachine(), + NodeOperationalState.ENTERING_MAINTENANCE); + assertEquals(DatanodeAdminMonitor.States.AWAIT_MAINTENANCE_END, + details.getCurrentState()); + + // Next State is COMPLETE + details.transitionState(monitor.getWorkflowStateMachine(), + NodeOperationalState.ENTERING_MAINTENANCE); + assertEquals(DatanodeAdminMonitor.States.COMPLETE, + details.getCurrentState()); + } + + @Test + public void testMaintenanceEnd() { + DatanodeDetails dn = TestUtils.randomDatanodeDetails(); + // End in zero hours - should never end. + DatanodeAdminNodeDetails details = new DatanodeAdminNodeDetails(dn, + initialState, 0); + assertFalse(details.shouldMaintenanceEnd()); + + // End 1 hour - maintenance should not end yet. + details.setMaintenanceEnd(1); + assertFalse(details.shouldMaintenanceEnd()); + + // End 1 hour ago - maintenance should end. + details.setMaintenanceEnd(-1); + assertTrue(details.shouldMaintenanceEnd()); + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java index 82bd26b59854..e3b92084d82a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java @@ -55,7 +55,7 @@ public void setup() throws Exception { TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID()); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir); nodeManager = createNodeManager(conf); - decom = new NodeDecommissionManager(conf, nodeManager, null, null); + decom = new NodeDecommissionManager(conf, nodeManager, null, null, null); } @Test