diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java index 5dbd6ee54beb..9b9be03a49fc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java @@ -135,6 +135,16 @@ private NodeStatus(NodeState health, NodeOperationalState op, long opExpiryEpoch this.opStateExpiryEpochSeconds = opExpiryEpochSeconds; } + /** @return the status with the new health. */ + public NodeStatus newNodeState(NodeState newHealth) { + return NodeStatus.valueOf(operationalState, newHealth, opStateExpiryEpochSeconds); + } + + /** @return the status with the new op state and expiry epoch seconds. */ + public NodeStatus newOperationalState(NodeOperationalState op, long opExpiryEpochSeconds) { + return NodeStatus.valueOf(op, health, opExpiryEpochSeconds); + } + /** Is this node writeable ({@link NodeState#HEALTHY} and {@link NodeOperationalState#IN_SERVICE}) ? */ public boolean isNodeWritable() { return health == HEALTHY && operationalState == IN_SERVICE; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java index 79c4346bf8d7..ce49cb6c6452 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java @@ -141,8 +141,7 @@ public NodeStatus updateNodeHealthState(DatanodeID nodeId, NodeState newHealth) lock.writeLock().lock(); try { DatanodeInfo dn = getNodeInfoUnsafe(nodeId); - NodeStatus oldStatus = dn.getNodeStatus(); - NodeStatus newStatus = NodeStatus.valueOf(oldStatus.getOperationalState(), newHealth); + final NodeStatus newStatus = dn.getNodeStatus().newNodeState(newHealth); dn.setNodeStatus(newStatus); return newStatus; } finally { @@ -164,8 +163,7 @@ public NodeStatus updateNodeOperationalState(DatanodeID nodeId, lock.writeLock().lock(); try { DatanodeInfo dn = getNodeInfoUnsafe(nodeId); - NodeStatus oldStatus = dn.getNodeStatus(); - NodeStatus newStatus = NodeStatus.valueOf(newOpState, oldStatus.getHealth(), opStateExpiryEpochSeconds); + final NodeStatus newStatus = dn.getNodeStatus().newOperationalState(newOpState, opStateExpiryEpochSeconds); dn.setNodeStatus(newStatus); return newStatus; } finally { @@ -389,7 +387,7 @@ private Set getExisting(DatanodeID id) throws NodeNotFoundException private List filterNodes( NodeOperationalState opState, NodeState health) { if (opState != null && health != null) { - return filterNodes(matching(NodeStatus.valueOf(opState, health))); + return filterNodes(matching(opState, health)); } if (opState != null) { return filterNodes(matching(opState)); @@ -430,6 +428,10 @@ private static Predicate matching(NodeStatus status) { return dn -> status.equals(dn.getNodeStatus()); } + private static Predicate matching(NodeOperationalState op, NodeState health) { + return dn -> matching(op).test(dn) && matching(health).test(dn); + } + private static Predicate matching(NodeOperationalState state) { return dn -> state.equals(dn.getNodeStatus().getOperationalState()); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java index 1f338f7beb5f..e105d72549c9 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hdds.scm.node.states; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; @@ -39,7 +41,10 @@ * NodeStateManager. */ public class TestNodeStateMap { + private static final NodeStatus DECOMMISSIONING_HEALTHY_999 = NodeStatus.valueOf( + NodeOperationalState.DECOMMISSIONING, NodeState.HEALTHY, 999); + private final DatanodeDetails dn = generateDatanode(); private NodeStateMap map; @BeforeEach @@ -54,59 +59,76 @@ public void tearDown() { @Test public void testNodeCanBeAddedAndRetrieved() throws NodeAlreadyExistsException, NodeNotFoundException { - DatanodeDetails dn = generateDatanode(); NodeStatus status = NodeStatus.inServiceHealthy(); map.addNode(dn, status, null); assertEquals(dn, map.getNodeInfo(dn.getID())); assertEquals(status, map.getNodeStatus(dn.getID())); } + private void runTestUpdateHealth(NodeStatus original, NodeState newHealth) throws Exception { + map.addNode(dn, original, null); + final NodeStatus returned = map.updateNodeHealthState(dn.getID(), newHealth); + + final NodeStatus expected = NodeStatus.valueOf( + original.getOperationalState(), newHealth, original.getOpStateExpiryEpochSeconds()); + assertEquals(expected, returned); + assertEquals(returned, map.getNodeStatus(dn.getID())); + } + @Test - public void testNodeHealthStateCanBeUpdated() - throws NodeAlreadyExistsException, NodeNotFoundException { - DatanodeDetails dn = generateDatanode(); - NodeStatus status = NodeStatus.inServiceHealthy(); - map.addNode(dn, status, null); + public void testUpdateHealthyToStale() throws Exception { + runTestUpdateHealth(NodeStatus.inServiceHealthy(), NodeState.STALE); + } - NodeStatus expectedStatus = NodeStatus.inServiceStale(); - NodeStatus returnedStatus = - map.updateNodeHealthState(dn.getID(), expectedStatus.getHealth()); - assertEquals(expectedStatus, returnedStatus); - assertEquals(returnedStatus, map.getNodeStatus(dn.getID())); + @Test + public void testUpdateDecommissioningHealthyToStale() throws Exception { + runTestUpdateHealth(DECOMMISSIONING_HEALTHY_999, NodeState.STALE); } @Test public void testNodeOperationalStateCanBeUpdated() throws NodeAlreadyExistsException, NodeNotFoundException { - DatanodeDetails dn = generateDatanode(); NodeStatus status = NodeStatus.inServiceHealthy(); map.addNode(dn, status, null); - NodeStatus expectedStatus = NodeStatus.valueOf( - NodeOperationalState.DECOMMISSIONING, - NodeState.HEALTHY, 999); + NodeStatus expectedStatus = DECOMMISSIONING_HEALTHY_999; NodeStatus returnedStatus = map.updateNodeOperationalState( - dn.getID(), expectedStatus.getOperationalState(), 999); + dn.getID(), expectedStatus.getOperationalState(), expectedStatus.getOpStateExpiryEpochSeconds()); assertEquals(expectedStatus, returnedStatus); assertEquals(returnedStatus, map.getNodeStatus(dn.getID())); - assertEquals(999, returnedStatus.getOpStateExpiryEpochSeconds()); + } + + @Test + public void testGetNodeNonZeroExpiry() throws Exception { + runTestGetNode(123); } @Test public void testGetNodeMethodsReturnCorrectCountsAndStates() throws NodeAlreadyExistsException { + runTestGetNode(0); + } + + private void runTestGetNode(long opExpiryEpochSeconds) + throws NodeAlreadyExistsException { // Add one node for all possible states int nodeCount = 0; for (NodeOperationalState op : NodeOperationalState.values()) { for (NodeState health : NodeState.values()) { - addRandomNodeWithState(op, health); + addRandomNodeWithState(op, health, opExpiryEpochSeconds); nodeCount++; } } - NodeStatus requestedState = NodeStatus.inServiceStale(); + final NodeStatus requestedState = NodeStatus.valueOf( + NodeOperationalState.IN_SERVICE, NodeState.STALE, opExpiryEpochSeconds); List nodes = map.getDatanodeInfos(requestedState); assertEquals(1, nodes.size()); assertEquals(1, map.getNodeCount(requestedState)); + + List nodes2 = map.getDatanodeInfos( + requestedState.getOperationalState(), requestedState.getHealth()); + assertEquals(1, nodes2.size()); + assertEquals(nodeCount, map.getTotalNodeCount()); assertEquals(nodeCount, map.getNodeCount()); assertEquals(nodeCount, map.getAllDatanodeInfos().size()); @@ -140,13 +162,14 @@ public void testConcurrency() throws Exception { CountDownLatch elementRemoved = new CountDownLatch(1); CountDownLatch loopStarted = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference<>(); new Thread(() -> { try { loopStarted.await(); map.removeContainer(id, ContainerID.valueOf(1L)); elementRemoved.countDown(); } catch (Exception e) { - e.printStackTrace(); + failure.set(e); } }).start(); @@ -160,23 +183,15 @@ public void testConcurrency() throws Exception { first = false; System.out.println(key); } - } - - private void addNodeWithState( - DatanodeDetails dn, - NodeOperationalState opState, NodeState health - ) - throws NodeAlreadyExistsException { - NodeStatus status = NodeStatus.valueOf(opState, health); - map.addNode(dn, status, null); + assertNull(failure.get()); } private void addRandomNodeWithState( - NodeOperationalState opState, NodeState health - ) - throws NodeAlreadyExistsException { - DatanodeDetails dn = generateDatanode(); - addNodeWithState(dn, opState, health); + NodeOperationalState opState, NodeState health, + long opExpiryEpochSeconds) throws NodeAlreadyExistsException { + DatanodeDetails random = generateDatanode(); + NodeStatus status = NodeStatus.valueOf(opState, health, opExpiryEpochSeconds); + map.addNode(random, status, null); } private DatanodeDetails generateDatanode() {