diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java index 931a45e3bda4..8eb985ba3378 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java @@ -170,7 +170,6 @@ private void processCancelledNodes() { LOG.warn("Failed processing the cancel admin request for {}", dn.getDatanodeDetails(), e); } - // TODO - fire event to bring node back into service? } } @@ -320,8 +319,6 @@ private void completeMaintenance(DatanodeAdminNodeDetails dn) throws NodeNotFoundException { // The end state of Maintenance is to put the node back IN_SERVICE, whether // it is dead or not. - // TODO - if the node is dead do we trigger a dead node event here or leave - // it to the heartbeat manager? LOG.info("Datanode {} has ended maintenance automatically", dn.getDatanodeDetails()); putNodeBackInService(dn); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 17e1fedd9525..216f82cf87d2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -74,12 +74,14 @@ public void onMessage(final DatanodeDetails datanodeDetails, * To be on a safer side, we double check here and take appropriate * action. */ - destroyPipelines(datanodeDetails); closeContainers(datanodeDetails, publisher); - // Remove the container replicas associated with the dead node. - removeContainerReplicas(datanodeDetails); + // Remove the container replicas associated with the dead node unless it + // is IN_MAINTENANCE + if (!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) { + removeContainerReplicas(datanodeDetails); + } } catch (NodeNotFoundException ex) { // This should not happen, we cannot get a dead node event for an diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 4177b63dd396..b16e79c6ce59 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -80,11 +80,6 @@ private enum NodeLifeCycleEvent { TIMEOUT, RESTORE, RESURRECT } - private enum NodeOperationStateEvent { - START_DECOMMISSION, COMPLETE_DECOMMISSION, START_MAINTENANCE, - ENTER_MAINTENANCE, RETURN_TO_SERVICE - } - private static final Logger LOG = LoggerFactory .getLogger(NodeStateManager.class); @@ -92,11 +87,6 @@ private enum NodeOperationStateEvent { * StateMachine for node lifecycle. */ private final StateMachine nodeHealthSM; - /** - * StateMachine for node operational state. - */ - private final StateMachine nodeOpStateSM; /** * This is the map which maintains the current state of all datanodes. */ @@ -112,7 +102,7 @@ private enum NodeOperationStateEvent { /** * Maps the event to be triggered when a node state us updated. */ - private final Map> state2EventMap; + private final Map> state2EventMap; /** * ExecutorService used for scheduling heartbeat processing thread. */ @@ -162,10 +152,7 @@ public NodeStateManager(Configuration conf, EventPublisher eventPublisher) { this.state2EventMap = new HashMap<>(); initialiseState2EventMap(); Set finalStates = new HashSet<>(); - Set opStateFinalStates = new HashSet<>(); this.nodeHealthSM = new StateMachine<>(NodeState.HEALTHY, finalStates); - this.nodeOpStateSM = new StateMachine<>( - NodeOperationalState.IN_SERVICE, opStateFinalStates); initializeStateMachines(); heartbeatCheckerIntervalMs = HddsServerUtil .getScmheartbeatCheckerInterval(conf); @@ -190,12 +177,10 @@ public NodeStateManager(Configuration conf, EventPublisher eventPublisher) { * Populates state2event map. */ private void initialiseState2EventMap() { - state2EventMap.put(NodeStatus.inServiceStale(), SCMEvents.STALE_NODE); - state2EventMap.put(NodeStatus.inServiceDead(), SCMEvents.DEAD_NODE); - state2EventMap.put(NodeStatus.inServiceHealthy(), + state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE); + state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE); + state2EventMap.put(NodeState.HEALTHY, SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE); - // TODO - add whatever events are needed for decomm / maint to stale, dead, - // healthy } /* @@ -238,36 +223,6 @@ private void initializeStateMachines() { NodeState.STALE, NodeState.HEALTHY, NodeLifeCycleEvent.RESTORE); nodeHealthSM.addTransition( NodeState.DEAD, NodeState.HEALTHY, NodeLifeCycleEvent.RESURRECT); - - nodeOpStateSM.addTransition( - NodeOperationalState.IN_SERVICE, NodeOperationalState.DECOMMISSIONING, - NodeOperationStateEvent.START_DECOMMISSION); - nodeOpStateSM.addTransition( - NodeOperationalState.DECOMMISSIONING, NodeOperationalState.IN_SERVICE, - NodeOperationStateEvent.RETURN_TO_SERVICE); - nodeOpStateSM.addTransition( - NodeOperationalState.DECOMMISSIONING, - NodeOperationalState.DECOMMISSIONED, - NodeOperationStateEvent.COMPLETE_DECOMMISSION); - nodeOpStateSM.addTransition( - NodeOperationalState.DECOMMISSIONED, NodeOperationalState.IN_SERVICE, - NodeOperationStateEvent.RETURN_TO_SERVICE); - - nodeOpStateSM.addTransition( - NodeOperationalState.IN_SERVICE, - NodeOperationalState.ENTERING_MAINTENANCE, - NodeOperationStateEvent.START_MAINTENANCE); - nodeOpStateSM.addTransition( - NodeOperationalState.ENTERING_MAINTENANCE, - NodeOperationalState.IN_SERVICE, - NodeOperationStateEvent.RETURN_TO_SERVICE); - nodeOpStateSM.addTransition( - NodeOperationalState.ENTERING_MAINTENANCE, - NodeOperationalState.IN_MAINTENANCE, - NodeOperationStateEvent.ENTER_MAINTENANCE); - nodeOpStateSM.addTransition( - NodeOperationalState.IN_MAINTENANCE, NodeOperationalState.IN_SERVICE, - NodeOperationStateEvent.RETURN_TO_SERVICE); } /** @@ -280,7 +235,7 @@ private void initializeStateMachines() { public void addNode(DatanodeDetails datanodeDetails) throws NodeAlreadyExistsException { nodeStateMap.addNode(datanodeDetails, new NodeStatus( - nodeOpStateSM.getInitialState(), nodeHealthSM.getInitialState())); + NodeOperationalState.IN_SERVICE, nodeHealthSM.getInitialState())); eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails); } @@ -404,8 +359,18 @@ public List getAllNodes() { public void setNodeOperationalState(DatanodeDetails dn, NodeOperationalState newState) throws NodeNotFoundException { DatanodeInfo dni = nodeStateMap.getNodeInfo(dn.getUuid()); - if (dni.getNodeStatus().getOperationalState() != newState) { + NodeStatus oldStatus = dni.getNodeStatus(); + if (oldStatus.getOperationalState() != newState) { nodeStateMap.updateNodeOperationalState(dn.getUuid(), newState); + // This will trigger an event based on the nodes health when the + // operational state changes. Eg a node that was IN_MAINTENANCE goes + // to IN_SERVICE + HEALTHY. This will trigger the HEALTHY node event to + // create new pipelines. OTH, if the nodes goes IN_MAINTENANCE to + // IN_SERVICE + DEAD, it will trigger the dead node handler to remove its + // container replicas. Sometimes the event will do nothing, but it will + // not do any harm either. Eg DECOMMISSIONING -> DECOMMISSIONED + HEALTHY + // but the pipeline creation logic will ignore decommissioning nodes. + fireHealthStateEvent(oldStatus.getHealth(), dn); } } @@ -706,9 +671,7 @@ private void updateNodeState(DatanodeInfo node, Predicate condition, getNextState(status.getHealth(), lifeCycleEvent); NodeStatus newStatus = nodeStateMap.updateNodeHealthState(node.getUuid(), newHealthState); - if (state2EventMap.containsKey(newStatus)) { - eventPublisher.fireEvent(state2EventMap.get(newStatus), node); - } + fireHealthStateEvent(newStatus.getHealth(), node); } } catch (InvalidStateTransitionException e) { LOG.warn("Invalid state transition of node {}." + @@ -717,6 +680,14 @@ private void updateNodeState(DatanodeInfo node, Predicate condition, } } + private void fireHealthStateEvent(HddsProtos.NodeState health, + DatanodeDetails node) { + Event event = state2EventMap.get(health); + if (event != null) { + eventPublisher.fireEvent(event, node); + } + } + @Override public void close() { executorService.shutdown(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 1676af1c3b22..85ea68b8870a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -58,11 +58,11 @@ import org.apache.hadoop.security.authentication.client .AuthenticationException; import org.apache.hadoop.test.GenericTestUtils; -import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.Assert; +import org.junit.After; import org.mockito.Mockito; /** @@ -174,22 +174,44 @@ public void testOnMessage() throws IOException, NodeNotFoundException { TestUtils.closeContainer(containerManager, container2.containerID()); TestUtils.quasiCloseContainer(containerManager, container3.containerID()); + // First set the node to IN_MAINTENANCE and ensure the container replicas + // are not removed on the dead event + nodeManager.setNodeOperationalState(datanode1, + HddsProtos.NodeOperationalState.IN_MAINTENANCE); deadNodeHandler.onMessage(datanode1, publisher); Set container1Replicas = containerManager .getContainerReplicas(new ContainerID(container1.getContainerID())); + Assert.assertEquals(2, container1Replicas.size()); + + Set container2Replicas = containerManager + .getContainerReplicas(new ContainerID(container2.getContainerID())); + Assert.assertEquals(2, container2Replicas.size()); + + Set container3Replicas = containerManager + .getContainerReplicas(new ContainerID(container3.getContainerID())); + Assert.assertEquals(1, container3Replicas.size()); + + // Now set the node to anything other than IN_MAINTENANCE and the relevant + // replicas should be removed + nodeManager.setNodeOperationalState(datanode1, + HddsProtos.NodeOperationalState.IN_SERVICE); + deadNodeHandler.onMessage(datanode1, publisher); + + container1Replicas = containerManager + .getContainerReplicas(new ContainerID(container1.getContainerID())); Assert.assertEquals(1, container1Replicas.size()); Assert.assertEquals(datanode2, container1Replicas.iterator().next().getDatanodeDetails()); - Set container2Replicas = containerManager + container2Replicas = containerManager .getContainerReplicas(new ContainerID(container2.getContainerID())); Assert.assertEquals(1, container2Replicas.size()); Assert.assertEquals(datanode2, container2Replicas.iterator().next().getDatanodeDetails()); - Set container3Replicas = containerManager - .getContainerReplicas(new ContainerID(container3.getContainerID())); + container3Replicas = containerManager + .getContainerReplicas(new ContainerID(container3.getContainerID())); Assert.assertEquals(1, container3Replicas.size()); Assert.assertEquals(datanode3, container3Replicas.iterator().next().getDatanodeDetails()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java index 9fbf2519f5e1..5d3ee5e83d31 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.HddsServerUtil; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.server.events.Event; @@ -145,7 +146,7 @@ public void testNodeCanTransitionThroughHealthStatesAndFiresEvents() DatanodeDetails dn = generateDatanode(); nsm.addNode(dn); - assertEquals("New_Node", eventPublisher.getLastEvent().getName()); + assertEquals(SCMEvents.NEW_NODE, eventPublisher.getLastEvent()); DatanodeInfo dni = nsm.getNode(dn); dni.updateLastHeartbeatTime(); @@ -159,31 +160,31 @@ public void testNodeCanTransitionThroughHealthStatesAndFiresEvents() dni.updateLastHeartbeatTime(now - staleLimit); nsm.checkNodesHealth(); assertEquals(NodeState.STALE, nsm.getNodeStatus(dn).getHealth()); - assertEquals("Stale_Node", eventPublisher.getLastEvent().getName()); + assertEquals(SCMEvents.STALE_NODE, eventPublisher.getLastEvent()); // Now make it dead dni.updateLastHeartbeatTime(now - deadLimit); nsm.checkNodesHealth(); assertEquals(NodeState.DEAD, nsm.getNodeStatus(dn).getHealth()); - assertEquals("Dead_Node", eventPublisher.getLastEvent().getName()); + assertEquals(SCMEvents.DEAD_NODE, eventPublisher.getLastEvent()); // Transition back to healthy from dead dni.updateLastHeartbeatTime(); nsm.checkNodesHealth(); assertEquals(NodeState.HEALTHY, nsm.getNodeStatus(dn).getHealth()); - assertEquals("NON_HEALTHY_TO_HEALTHY_NODE", - eventPublisher.getLastEvent().getName()); + assertEquals(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE, + eventPublisher.getLastEvent()); // Make the node stale again, and transition to healthy. dni.updateLastHeartbeatTime(now - staleLimit); nsm.checkNodesHealth(); assertEquals(NodeState.STALE, nsm.getNodeStatus(dn).getHealth()); - assertEquals("Stale_Node", eventPublisher.getLastEvent().getName()); + assertEquals(SCMEvents.STALE_NODE, eventPublisher.getLastEvent()); dni.updateLastHeartbeatTime(); nsm.checkNodesHealth(); assertEquals(NodeState.HEALTHY, nsm.getNodeStatus(dn).getHealth()); - assertEquals("NON_HEALTHY_TO_HEALTHY_NODE", - eventPublisher.getLastEvent().getName()); + assertEquals(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE, + eventPublisher.getLastEvent()); } @Test @@ -202,6 +203,56 @@ public void testNodeOpStateCanBeSet() newStatus.getHealth()); } + @Test + public void testHealthEventsFiredWhenOpStateChanged() + throws NodeAlreadyExistsException, NodeNotFoundException { + DatanodeDetails dn = generateDatanode(); + nsm.addNode(dn); + + // First set the node to decommissioned, then run through all op states in + // order and ensure the non_healthy_to_healthy event gets fired + nsm.setNodeOperationalState(dn, + HddsProtos.NodeOperationalState.DECOMMISSIONED); + for (HddsProtos.NodeOperationalState s : + HddsProtos.NodeOperationalState.values()) { + eventPublisher.clearEvents(); + nsm.setNodeOperationalState(dn, s); + assertEquals(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE, + eventPublisher.getLastEvent()); + } + + // Now make the node stale and run through all states again ensuring the + // stale event gets fired + long now = Time.monotonicNow(); + long staleLimit = HddsServerUtil.getStaleNodeInterval(conf) + 1000; + long deadLimit = HddsServerUtil.getDeadNodeInterval(conf) + 1000; + DatanodeInfo dni = nsm.getNode(dn); + dni.updateLastHeartbeatTime(now - staleLimit); + nsm.checkNodesHealth(); + assertEquals(NodeState.STALE, nsm.getNodeStatus(dn).getHealth()); + nsm.setNodeOperationalState(dn, + HddsProtos.NodeOperationalState.DECOMMISSIONED); + for (HddsProtos.NodeOperationalState s : + HddsProtos.NodeOperationalState.values()) { + eventPublisher.clearEvents(); + nsm.setNodeOperationalState(dn, s); + assertEquals(SCMEvents.STALE_NODE, eventPublisher.getLastEvent()); + } + + // Finally make the node dead and run through all the op states again + dni.updateLastHeartbeatTime(now - deadLimit); + nsm.checkNodesHealth(); + assertEquals(NodeState.DEAD, nsm.getNodeStatus(dn).getHealth()); + nsm.setNodeOperationalState(dn, + HddsProtos.NodeOperationalState.DECOMMISSIONED); + for (HddsProtos.NodeOperationalState s : + HddsProtos.NodeOperationalState.values()) { + eventPublisher.clearEvents(); + nsm.setNodeOperationalState(dn, s); + assertEquals(SCMEvents.DEAD_NODE, eventPublisher.getLastEvent()); + } + } + private DatanodeDetails generateDatanode() { String uuid = UUID.randomUUID().toString(); return DatanodeDetails.newBuilder().setUuid(uuid).build();