Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ private NodeStatus(NodeState health, NodeOperationalState op, long opExpiryEpoch
this.opStateExpiryEpochSeconds = opExpiryEpochSeconds;
}

/** @return the status with the new health. */
public NodeStatus newNodeState(NodeState newHealth) {
return NodeStatus.valueOf(operationalState, newHealth, opStateExpiryEpochSeconds);
}

/** @return the status with the new op state and expiry epoch seconds. */
public NodeStatus newOperationalState(NodeOperationalState op, long opExpiryEpochSeconds) {
return NodeStatus.valueOf(op, health, opExpiryEpochSeconds);
}

/** Is this node writeable ({@link NodeState#HEALTHY} and {@link NodeOperationalState#IN_SERVICE}) ? */
public boolean isNodeWritable() {
return health == HEALTHY && operationalState == IN_SERVICE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ public NodeStatus updateNodeHealthState(DatanodeID nodeId, NodeState newHealth)
lock.writeLock().lock();
try {
DatanodeInfo dn = getNodeInfoUnsafe(nodeId);
NodeStatus oldStatus = dn.getNodeStatus();
NodeStatus newStatus = NodeStatus.valueOf(oldStatus.getOperationalState(), newHealth);
final NodeStatus newStatus = dn.getNodeStatus().newNodeState(newHealth);
dn.setNodeStatus(newStatus);
return newStatus;
} finally {
Expand All @@ -164,8 +163,7 @@ public NodeStatus updateNodeOperationalState(DatanodeID nodeId,
lock.writeLock().lock();
try {
DatanodeInfo dn = getNodeInfoUnsafe(nodeId);
NodeStatus oldStatus = dn.getNodeStatus();
NodeStatus newStatus = NodeStatus.valueOf(newOpState, oldStatus.getHealth(), opStateExpiryEpochSeconds);
final NodeStatus newStatus = dn.getNodeStatus().newOperationalState(newOpState, opStateExpiryEpochSeconds);
dn.setNodeStatus(newStatus);
return newStatus;
} finally {
Expand Down Expand Up @@ -389,7 +387,7 @@ private Set<ContainerID> getExisting(DatanodeID id) throws NodeNotFoundException
private List<DatanodeInfo> filterNodes(
NodeOperationalState opState, NodeState health) {
if (opState != null && health != null) {
return filterNodes(matching(NodeStatus.valueOf(opState, health)));
return filterNodes(matching(opState, health));
}
if (opState != null) {
return filterNodes(matching(opState));
Expand Down Expand Up @@ -430,6 +428,10 @@ private static Predicate<DatanodeInfo> matching(NodeStatus status) {
return dn -> status.equals(dn.getNodeStatus());
}

private static Predicate<DatanodeInfo> matching(NodeOperationalState op, NodeState health) {
return dn -> matching(op).test(dn) && matching(health).test(dn);
}

private static Predicate<DatanodeInfo> matching(NodeOperationalState state) {
return dn -> state.equals(dn.getNodeStatus().getOperationalState());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.hadoop.hdds.scm.node.states;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
Expand All @@ -39,7 +41,10 @@
* NodeStateManager.
*/
public class TestNodeStateMap {
private static final NodeStatus DECOMMISSIONING_HEALTHY_999 = NodeStatus.valueOf(
NodeOperationalState.DECOMMISSIONING, NodeState.HEALTHY, 999);

private final DatanodeDetails dn = generateDatanode();
private NodeStateMap map;

@BeforeEach
Expand All @@ -54,59 +59,76 @@ public void tearDown() {
@Test
public void testNodeCanBeAddedAndRetrieved()
throws NodeAlreadyExistsException, NodeNotFoundException {
DatanodeDetails dn = generateDatanode();
NodeStatus status = NodeStatus.inServiceHealthy();
map.addNode(dn, status, null);
assertEquals(dn, map.getNodeInfo(dn.getID()));
assertEquals(status, map.getNodeStatus(dn.getID()));
}

private void runTestUpdateHealth(NodeStatus original, NodeState newHealth) throws Exception {
map.addNode(dn, original, null);
final NodeStatus returned = map.updateNodeHealthState(dn.getID(), newHealth);

final NodeStatus expected = NodeStatus.valueOf(
original.getOperationalState(), newHealth, original.getOpStateExpiryEpochSeconds());
assertEquals(expected, returned);
assertEquals(returned, map.getNodeStatus(dn.getID()));
}

@Test
public void testNodeHealthStateCanBeUpdated()
throws NodeAlreadyExistsException, NodeNotFoundException {
DatanodeDetails dn = generateDatanode();
NodeStatus status = NodeStatus.inServiceHealthy();
map.addNode(dn, status, null);
public void testUpdateHealthyToStale() throws Exception {
runTestUpdateHealth(NodeStatus.inServiceHealthy(), NodeState.STALE);
}

NodeStatus expectedStatus = NodeStatus.inServiceStale();
NodeStatus returnedStatus =
map.updateNodeHealthState(dn.getID(), expectedStatus.getHealth());
assertEquals(expectedStatus, returnedStatus);
assertEquals(returnedStatus, map.getNodeStatus(dn.getID()));
@Test
public void testUpdateDecommissioningHealthyToStale() throws Exception {
runTestUpdateHealth(DECOMMISSIONING_HEALTHY_999, NodeState.STALE);
}

@Test
public void testNodeOperationalStateCanBeUpdated()
throws NodeAlreadyExistsException, NodeNotFoundException {
DatanodeDetails dn = generateDatanode();
NodeStatus status = NodeStatus.inServiceHealthy();
map.addNode(dn, status, null);

NodeStatus expectedStatus = NodeStatus.valueOf(
NodeOperationalState.DECOMMISSIONING,
NodeState.HEALTHY, 999);
NodeStatus expectedStatus = DECOMMISSIONING_HEALTHY_999;
NodeStatus returnedStatus = map.updateNodeOperationalState(
dn.getID(), expectedStatus.getOperationalState(), 999);
dn.getID(), expectedStatus.getOperationalState(), expectedStatus.getOpStateExpiryEpochSeconds());
assertEquals(expectedStatus, returnedStatus);
assertEquals(returnedStatus, map.getNodeStatus(dn.getID()));
assertEquals(999, returnedStatus.getOpStateExpiryEpochSeconds());
}

@Test
public void testGetNodeNonZeroExpiry() throws Exception {
runTestGetNode(123);
}

@Test
public void testGetNodeMethodsReturnCorrectCountsAndStates()
throws NodeAlreadyExistsException {
runTestGetNode(0);
}

private void runTestGetNode(long opExpiryEpochSeconds)
throws NodeAlreadyExistsException {
// Add one node for all possible states
int nodeCount = 0;
for (NodeOperationalState op : NodeOperationalState.values()) {
for (NodeState health : NodeState.values()) {
addRandomNodeWithState(op, health);
addRandomNodeWithState(op, health, opExpiryEpochSeconds);
nodeCount++;
}
}
NodeStatus requestedState = NodeStatus.inServiceStale();
final NodeStatus requestedState = NodeStatus.valueOf(
NodeOperationalState.IN_SERVICE, NodeState.STALE, opExpiryEpochSeconds);
List<DatanodeInfo> nodes = map.getDatanodeInfos(requestedState);
assertEquals(1, nodes.size());
assertEquals(1, map.getNodeCount(requestedState));

List<DatanodeInfo> nodes2 = map.getDatanodeInfos(
requestedState.getOperationalState(), requestedState.getHealth());
assertEquals(1, nodes2.size());

assertEquals(nodeCount, map.getTotalNodeCount());
assertEquals(nodeCount, map.getNodeCount());
assertEquals(nodeCount, map.getAllDatanodeInfos().size());
Expand Down Expand Up @@ -140,13 +162,14 @@ public void testConcurrency() throws Exception {
CountDownLatch elementRemoved = new CountDownLatch(1);
CountDownLatch loopStarted = new CountDownLatch(1);

final AtomicReference<Exception> failure = new AtomicReference<>();
new Thread(() -> {
try {
loopStarted.await();
map.removeContainer(id, ContainerID.valueOf(1L));
elementRemoved.countDown();
} catch (Exception e) {
e.printStackTrace();
failure.set(e);
}

}).start();
Expand All @@ -160,23 +183,15 @@ public void testConcurrency() throws Exception {
first = false;
System.out.println(key);
}
}

private void addNodeWithState(
DatanodeDetails dn,
NodeOperationalState opState, NodeState health
)
throws NodeAlreadyExistsException {
NodeStatus status = NodeStatus.valueOf(opState, health);
map.addNode(dn, status, null);
assertNull(failure.get());
}

private void addRandomNodeWithState(
NodeOperationalState opState, NodeState health
)
throws NodeAlreadyExistsException {
DatanodeDetails dn = generateDatanode();
addNodeWithState(dn, opState, health);
NodeOperationalState opState, NodeState health,
long opExpiryEpochSeconds) throws NodeAlreadyExistsException {
DatanodeDetails random = generateDatanode();
NodeStatus status = NodeStatus.valueOf(opState, health, opExpiryEpochSeconds);
map.addNode(random, status, null);
}

private DatanodeDetails generateDatanode() {
Expand Down