Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ private void processCancelledNodes() {
LOG.warn("Failed processing the cancel admin request for {}",
dn.getDatanodeDetails(), e);
}
// TODO - fire event to bring node back into service?
}
}

Expand Down Expand Up @@ -320,8 +319,6 @@ private void completeMaintenance(DatanodeAdminNodeDetails dn)
throws NodeNotFoundException {
// The end state of Maintenance is to put the node back IN_SERVICE, whether
// it is dead or not.
// TODO - if the node is dead do we trigger a dead node event here or leave
// it to the heartbeat manager?
LOG.info("Datanode {} has ended maintenance automatically",
dn.getDatanodeDetails());
putNodeBackInService(dn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ public void onMessage(final DatanodeDetails datanodeDetails,
* To be on a safer side, we double check here and take appropriate
* action.
*/

destroyPipelines(datanodeDetails);
closeContainers(datanodeDetails, publisher);

// Remove the container replicas associated with the dead node.
removeContainerReplicas(datanodeDetails);
// Remove the container replicas associated with the dead node unless it
// is IN_MAINTENANCE
if (!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to get this message for a node ENTERING_MAINTENANCE? If so, shouldn't it be ignored here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a node goes dead while "entering maintenance" then it should be handled as a dead node. The reason, is that we have not yet determine if all the containers on the host are sufficiently replicated. If it goes dead before that check has completed and the node has moved into IN_MAINTENANCE, then it must be handled as a dead node as normal.

This is the same for a node which is DECOMMISSIONING and not yet reached DECOMMISSIONED.

Unless the node has reached its end state (IN_MAINTENANCE or DECOMMISSIONED), if it goes dead the workflow is aborted and it is handled just like any other dead node.

removeContainerReplicas(datanodeDetails);
}

} catch (NodeNotFoundException ex) {
// This should not happen, we cannot get a dead node event for an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,13 @@ private enum NodeLifeCycleEvent {
TIMEOUT, RESTORE, RESURRECT
}

private enum NodeOperationStateEvent {
START_DECOMMISSION, COMPLETE_DECOMMISSION, START_MAINTENANCE,
ENTER_MAINTENANCE, RETURN_TO_SERVICE
}

private static final Logger LOG = LoggerFactory
.getLogger(NodeStateManager.class);

/**
* StateMachine for node lifecycle.
*/
private final StateMachine<NodeState, NodeLifeCycleEvent> nodeHealthSM;
/**
* StateMachine for node operational state.
*/
private final StateMachine<HddsProtos.NodeOperationalState,
NodeOperationStateEvent> nodeOpStateSM;
/**
* This is the map which maintains the current state of all datanodes.
*/
Expand All @@ -112,7 +102,7 @@ private enum NodeOperationStateEvent {
/**
* Maps the event to be triggered when a node state us updated.
*/
private final Map<NodeStatus, Event<DatanodeDetails>> state2EventMap;
private final Map<NodeState, Event<DatanodeDetails>> state2EventMap;
/**
* ExecutorService used for scheduling heartbeat processing thread.
*/
Expand Down Expand Up @@ -162,10 +152,7 @@ public NodeStateManager(Configuration conf, EventPublisher eventPublisher) {
this.state2EventMap = new HashMap<>();
initialiseState2EventMap();
Set<NodeState> finalStates = new HashSet<>();
Set<HddsProtos.NodeOperationalState> opStateFinalStates = new HashSet<>();
this.nodeHealthSM = new StateMachine<>(NodeState.HEALTHY, finalStates);
this.nodeOpStateSM = new StateMachine<>(
NodeOperationalState.IN_SERVICE, opStateFinalStates);
initializeStateMachines();
heartbeatCheckerIntervalMs = HddsServerUtil
.getScmheartbeatCheckerInterval(conf);
Expand All @@ -190,12 +177,10 @@ public NodeStateManager(Configuration conf, EventPublisher eventPublisher) {
* Populates state2event map.
*/
private void initialiseState2EventMap() {
state2EventMap.put(NodeStatus.inServiceStale(), SCMEvents.STALE_NODE);
state2EventMap.put(NodeStatus.inServiceDead(), SCMEvents.DEAD_NODE);
state2EventMap.put(NodeStatus.inServiceHealthy(),
state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE);
state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE);
state2EventMap.put(NodeState.HEALTHY,
SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE);
// TODO - add whatever events are needed for decomm / maint to stale, dead,
// healthy
}

/*
Expand Down Expand Up @@ -238,36 +223,6 @@ private void initializeStateMachines() {
NodeState.STALE, NodeState.HEALTHY, NodeLifeCycleEvent.RESTORE);
nodeHealthSM.addTransition(
NodeState.DEAD, NodeState.HEALTHY, NodeLifeCycleEvent.RESURRECT);

nodeOpStateSM.addTransition(
NodeOperationalState.IN_SERVICE, NodeOperationalState.DECOMMISSIONING,
NodeOperationStateEvent.START_DECOMMISSION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now enum NodeOperationStateEvent is unused. Can it be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this can be removed. I have made this change.

nodeOpStateSM.addTransition(
NodeOperationalState.DECOMMISSIONING, NodeOperationalState.IN_SERVICE,
NodeOperationStateEvent.RETURN_TO_SERVICE);
nodeOpStateSM.addTransition(
NodeOperationalState.DECOMMISSIONING,
NodeOperationalState.DECOMMISSIONED,
NodeOperationStateEvent.COMPLETE_DECOMMISSION);
nodeOpStateSM.addTransition(
NodeOperationalState.DECOMMISSIONED, NodeOperationalState.IN_SERVICE,
NodeOperationStateEvent.RETURN_TO_SERVICE);

nodeOpStateSM.addTransition(
NodeOperationalState.IN_SERVICE,
NodeOperationalState.ENTERING_MAINTENANCE,
NodeOperationStateEvent.START_MAINTENANCE);
nodeOpStateSM.addTransition(
NodeOperationalState.ENTERING_MAINTENANCE,
NodeOperationalState.IN_SERVICE,
NodeOperationStateEvent.RETURN_TO_SERVICE);
nodeOpStateSM.addTransition(
NodeOperationalState.ENTERING_MAINTENANCE,
NodeOperationalState.IN_MAINTENANCE,
NodeOperationStateEvent.ENTER_MAINTENANCE);
nodeOpStateSM.addTransition(
NodeOperationalState.IN_MAINTENANCE, NodeOperationalState.IN_SERVICE,
NodeOperationStateEvent.RETURN_TO_SERVICE);
}

/**
Expand All @@ -280,7 +235,7 @@ private void initializeStateMachines() {
public void addNode(DatanodeDetails datanodeDetails)
throws NodeAlreadyExistsException {
nodeStateMap.addNode(datanodeDetails, new NodeStatus(
nodeOpStateSM.getInitialState(), nodeHealthSM.getInitialState()));
NodeOperationalState.IN_SERVICE, nodeHealthSM.getInitialState()));
eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails);
}

Expand Down Expand Up @@ -404,8 +359,18 @@ public List<DatanodeInfo> getAllNodes() {
public void setNodeOperationalState(DatanodeDetails dn,
NodeOperationalState newState) throws NodeNotFoundException {
DatanodeInfo dni = nodeStateMap.getNodeInfo(dn.getUuid());
if (dni.getNodeStatus().getOperationalState() != newState) {
NodeStatus oldStatus = dni.getNodeStatus();
if (oldStatus.getOperationalState() != newState) {
nodeStateMap.updateNodeOperationalState(dn.getUuid(), newState);
// This will trigger an event based on the nodes health when the
// operational state changes. Eg a node that was IN_MAINTENANCE goes
// to IN_SERVICE + HEALTHY. This will trigger the HEALTHY node event to
// create new pipelines. OTH, if the nodes goes IN_MAINTENANCE to
// IN_SERVICE + DEAD, it will trigger the dead node handler to remove its
// container replicas. Sometimes the event will do nothing, but it will
// not do any harm either. Eg DECOMMISSIONING -> DECOMMISSIONED + HEALTHY
// but the pipeline creation logic will ignore decommissioning nodes.
fireHealthStateEvent(oldStatus.getHealth(), dn);
}
}

Expand Down Expand Up @@ -706,9 +671,7 @@ private void updateNodeState(DatanodeInfo node, Predicate<Long> condition,
getNextState(status.getHealth(), lifeCycleEvent);
NodeStatus newStatus =
nodeStateMap.updateNodeHealthState(node.getUuid(), newHealthState);
if (state2EventMap.containsKey(newStatus)) {
eventPublisher.fireEvent(state2EventMap.get(newStatus), node);
}
fireHealthStateEvent(newStatus.getHealth(), node);
}
} catch (InvalidStateTransitionException e) {
LOG.warn("Invalid state transition of node {}." +
Expand All @@ -717,6 +680,14 @@ private void updateNodeState(DatanodeInfo node, Predicate<Long> condition,
}
}

private void fireHealthStateEvent(HddsProtos.NodeState health,
DatanodeDetails node) {
Event<DatanodeDetails> event = state2EventMap.get(health);
if (event != null) {
eventPublisher.fireEvent(event, node);
}
}

@Override
public void close() {
executorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@
import org.apache.hadoop.security.authentication.client
.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.Assert;
import org.junit.After;
import org.mockito.Mockito;

/**
Expand Down Expand Up @@ -174,22 +174,44 @@ public void testOnMessage() throws IOException, NodeNotFoundException {
TestUtils.closeContainer(containerManager, container2.containerID());
TestUtils.quasiCloseContainer(containerManager, container3.containerID());

// First set the node to IN_MAINTENANCE and ensure the container replicas
// are not removed on the dead event
nodeManager.setNodeOperationalState(datanode1,
HddsProtos.NodeOperationalState.IN_MAINTENANCE);
deadNodeHandler.onMessage(datanode1, publisher);

Set<ContainerReplica> container1Replicas = containerManager
.getContainerReplicas(new ContainerID(container1.getContainerID()));
Assert.assertEquals(2, container1Replicas.size());

Set<ContainerReplica> container2Replicas = containerManager
.getContainerReplicas(new ContainerID(container2.getContainerID()));
Assert.assertEquals(2, container2Replicas.size());

Set<ContainerReplica> container3Replicas = containerManager
.getContainerReplicas(new ContainerID(container3.getContainerID()));
Assert.assertEquals(1, container3Replicas.size());

// Now set the node to anything other than IN_MAINTENANCE and the relevant
// replicas should be removed
nodeManager.setNodeOperationalState(datanode1,
HddsProtos.NodeOperationalState.IN_SERVICE);
deadNodeHandler.onMessage(datanode1, publisher);

container1Replicas = containerManager
.getContainerReplicas(new ContainerID(container1.getContainerID()));
Assert.assertEquals(1, container1Replicas.size());
Assert.assertEquals(datanode2,
container1Replicas.iterator().next().getDatanodeDetails());

Set<ContainerReplica> container2Replicas = containerManager
container2Replicas = containerManager
.getContainerReplicas(new ContainerID(container2.getContainerID()));
Assert.assertEquals(1, container2Replicas.size());
Assert.assertEquals(datanode2,
container2Replicas.iterator().next().getDatanodeDetails());

Set<ContainerReplica> container3Replicas = containerManager
.getContainerReplicas(new ContainerID(container3.getContainerID()));
container3Replicas = containerManager
.getContainerReplicas(new ContainerID(container3.getContainerID()));
Assert.assertEquals(1, container3Replicas.size());
Assert.assertEquals(datanode3,
container3Replicas.iterator().next().getDatanodeDetails());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.Event;
Expand Down Expand Up @@ -145,7 +146,7 @@ public void testNodeCanTransitionThroughHealthStatesAndFiresEvents()

DatanodeDetails dn = generateDatanode();
nsm.addNode(dn);
assertEquals("New_Node", eventPublisher.getLastEvent().getName());
assertEquals(SCMEvents.NEW_NODE, eventPublisher.getLastEvent());
DatanodeInfo dni = nsm.getNode(dn);
dni.updateLastHeartbeatTime();

Expand All @@ -159,31 +160,31 @@ public void testNodeCanTransitionThroughHealthStatesAndFiresEvents()
dni.updateLastHeartbeatTime(now - staleLimit);
nsm.checkNodesHealth();
assertEquals(NodeState.STALE, nsm.getNodeStatus(dn).getHealth());
assertEquals("Stale_Node", eventPublisher.getLastEvent().getName());
assertEquals(SCMEvents.STALE_NODE, eventPublisher.getLastEvent());

// Now make it dead
dni.updateLastHeartbeatTime(now - deadLimit);
nsm.checkNodesHealth();
assertEquals(NodeState.DEAD, nsm.getNodeStatus(dn).getHealth());
assertEquals("Dead_Node", eventPublisher.getLastEvent().getName());
assertEquals(SCMEvents.DEAD_NODE, eventPublisher.getLastEvent());

// Transition back to healthy from dead
dni.updateLastHeartbeatTime();
nsm.checkNodesHealth();
assertEquals(NodeState.HEALTHY, nsm.getNodeStatus(dn).getHealth());
assertEquals("NON_HEALTHY_TO_HEALTHY_NODE",
eventPublisher.getLastEvent().getName());
assertEquals(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE,
eventPublisher.getLastEvent());

// Make the node stale again, and transition to healthy.
dni.updateLastHeartbeatTime(now - staleLimit);
nsm.checkNodesHealth();
assertEquals(NodeState.STALE, nsm.getNodeStatus(dn).getHealth());
assertEquals("Stale_Node", eventPublisher.getLastEvent().getName());
assertEquals(SCMEvents.STALE_NODE, eventPublisher.getLastEvent());
dni.updateLastHeartbeatTime();
nsm.checkNodesHealth();
assertEquals(NodeState.HEALTHY, nsm.getNodeStatus(dn).getHealth());
assertEquals("NON_HEALTHY_TO_HEALTHY_NODE",
eventPublisher.getLastEvent().getName());
assertEquals(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE,
eventPublisher.getLastEvent());
}

@Test
Expand All @@ -202,6 +203,56 @@ public void testNodeOpStateCanBeSet()
newStatus.getHealth());
}

@Test
public void testHealthEventsFiredWhenOpStateChanged()
throws NodeAlreadyExistsException, NodeNotFoundException {
DatanodeDetails dn = generateDatanode();
nsm.addNode(dn);

// First set the node to decommissioned, then run through all op states in
// order and ensure the non_healthy_to_healthy event gets fired
nsm.setNodeOperationalState(dn,
HddsProtos.NodeOperationalState.DECOMMISSIONED);
for (HddsProtos.NodeOperationalState s :
HddsProtos.NodeOperationalState.values()) {
eventPublisher.clearEvents();
nsm.setNodeOperationalState(dn, s);
assertEquals(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE,
eventPublisher.getLastEvent());
}

// Now make the node stale and run through all states again ensuring the
// stale event gets fired
long now = Time.monotonicNow();
long staleLimit = HddsServerUtil.getStaleNodeInterval(conf) + 1000;
long deadLimit = HddsServerUtil.getDeadNodeInterval(conf) + 1000;
DatanodeInfo dni = nsm.getNode(dn);
dni.updateLastHeartbeatTime(now - staleLimit);
nsm.checkNodesHealth();
assertEquals(NodeState.STALE, nsm.getNodeStatus(dn).getHealth());
nsm.setNodeOperationalState(dn,
HddsProtos.NodeOperationalState.DECOMMISSIONED);
for (HddsProtos.NodeOperationalState s :
HddsProtos.NodeOperationalState.values()) {
eventPublisher.clearEvents();
nsm.setNodeOperationalState(dn, s);
assertEquals(SCMEvents.STALE_NODE, eventPublisher.getLastEvent());
}

// Finally make the node dead and run through all the op states again
dni.updateLastHeartbeatTime(now - deadLimit);
nsm.checkNodesHealth();
assertEquals(NodeState.DEAD, nsm.getNodeStatus(dn).getHealth());
nsm.setNodeOperationalState(dn,
HddsProtos.NodeOperationalState.DECOMMISSIONED);
for (HddsProtos.NodeOperationalState s :
HddsProtos.NodeOperationalState.values()) {
eventPublisher.clearEvents();
nsm.setNodeOperationalState(dn, s);
assertEquals(SCMEvents.DEAD_NODE, eventPublisher.getLastEvent());
}
}

private DatanodeDetails generateDatanode() {
String uuid = UUID.randomUUID().toString();
return DatanodeDetails.newBuilder().setUuid(uuid).build();
Expand Down