Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ private void initializeStateMachines() {
*/
public void addNode(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutInfo) throws NodeAlreadyExistsException {
NodeStatus newNodeStatus = newNodeStatus(datanodeDetails, layoutInfo);
nodeStateMap.addNode(datanodeDetails, newNodeStatus, layoutInfo);
nodeStateMap.addNode(newDatanodeInfo(datanodeDetails, layoutInfo));
try {
updateLastKnownLayoutVersion(datanodeDetails, layoutInfo);
} catch (NodeNotFoundException ex) {
Expand All @@ -318,6 +317,11 @@ public void addNode(DatanodeDetails datanodeDetails,
}
}

private DatanodeInfo newDatanodeInfo(DatanodeDetails datanode, LayoutVersionProto layout) {
final NodeStatus status = newNodeStatus(datanode, layout);
return new DatanodeInfo(datanode, status, layout);
}

/**
* When a node registers with SCM, the operational state stored on the
* datanode is the source of truth. Therefore, if the datanode reports
Expand Down Expand Up @@ -414,14 +418,10 @@ public void updateLastKnownLayoutVersion(DatanodeDetails datanodeDetails,
public void updateNode(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutInfo)
throws NodeNotFoundException {
final DatanodeInfo datanodeInfo = nodeStateMap.getNodeInfo(datanodeDetails.getID());
NodeStatus newNodeStatus = newNodeStatus(datanodeDetails, layoutInfo);
LOG.info("updating node {} from {} to {} with status {}",
datanodeDetails.getUuidString(),
datanodeInfo,
datanodeDetails,
newNodeStatus);
nodeStateMap.updateNode(datanodeDetails, newNodeStatus, layoutInfo);
final DatanodeInfo newInfo = newDatanodeInfo(datanodeDetails, layoutInfo);
final DatanodeInfo oldInfo = nodeStateMap.updateNode(newInfo);
LOG.info("Updated datanode {} {} to {} {}",
oldInfo, oldInfo.getNodeStatus(), newInfo, newInfo.getNodeStatus());
updateLastKnownLayoutVersion(datanodeDetails, layoutInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@

package org.apache.hadoop.hdds.scm.node.states;

import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
Expand All @@ -47,14 +43,42 @@
* - thread-safe
*/
public class NodeStateMap {
/**
* Node id to node info map.
*/
private final Map<DatanodeID, DatanodeInfo> nodeMap = new HashMap<>();
/**
* Node to set of containers on the node.
*/
private final Map<DatanodeID, Set<ContainerID>> nodeToContainer = new HashMap<>();
private static class Entry {
private final DatanodeInfo info;
private final Set<ContainerID> containers = new TreeSet<>();

Entry(DatanodeInfo info) {
this.info = info;
}

DatanodeInfo getInfo() {
return info;
}

int getContainerCount() {
return containers.size();
}

Set<ContainerID> copyContainers() {
return new TreeSet<>(containers);
}

void add(ContainerID containerId) {
containers.add(containerId);
}

void remove(ContainerID containerID) {
containers.remove(containerID);
}

void setContainersForTesting(Set<ContainerID> newContainers) {
containers.clear();
containers.addAll(newContainers);
}
}

/** Map: {@link DatanodeID} -> ({@link DatanodeInfo}, {@link ContainerID}s). */
private final Map<DatanodeID, Entry> nodeMap = new HashMap<>();

private final ReadWriteLock lock = new ReentrantReadWriteLock();

Expand All @@ -64,27 +88,18 @@ public class NodeStateMap {
public NodeStateMap() { }

/**
* Adds a node to NodeStateMap.
*
* @param datanodeDetails DatanodeDetails
* @param nodeStatus initial NodeStatus
* @param layoutInfo initial LayoutVersionProto
* Adds the given datanode.
*
* @throws NodeAlreadyExistsException if the node already exist
*/
public void addNode(DatanodeDetails datanodeDetails, NodeStatus nodeStatus,
LayoutVersionProto layoutInfo)

throws NodeAlreadyExistsException {
public void addNode(DatanodeInfo datanode) throws NodeAlreadyExistsException {
final DatanodeID id = datanode.getID();
lock.writeLock().lock();
try {
final DatanodeID id = datanodeDetails.getID();
if (nodeMap.containsKey(id)) {
throw new NodeAlreadyExistsException(id);
}
nodeMap.put(id, new DatanodeInfo(datanodeDetails, nodeStatus,
layoutInfo));
nodeToContainer.put(id, new HashSet<>());
nodeMap.put(id, new Entry(datanode));
} finally {
lock.writeLock().unlock();
}
Expand All @@ -97,35 +112,30 @@ public void removeNode(DatanodeID datanodeID) {
lock.writeLock().lock();
try {
nodeMap.remove(datanodeID);
nodeToContainer.remove(datanodeID);
} finally {
lock.writeLock().unlock();
}
}

/**
* Update a node in NodeStateMap.
*
* @param datanodeDetails DatanodeDetails
* @param nodeStatus initial NodeStatus
* @param layoutInfo initial LayoutVersionProto
* Update the given datanode.
*
* @return the existing {@link DatanodeInfo}.
*/
public void updateNode(DatanodeDetails datanodeDetails, NodeStatus nodeStatus,
LayoutVersionProto layoutInfo)

throws NodeNotFoundException {
public DatanodeInfo updateNode(DatanodeInfo datanode) throws NodeNotFoundException {
final DatanodeID id = datanode.getID();
final DatanodeInfo oldInfo;
lock.writeLock().lock();
try {
final DatanodeID id = datanodeDetails.getID();
if (!nodeMap.containsKey(id)) {
oldInfo = getNodeInfo(id);
if (oldInfo == null) {
throw new NodeNotFoundException(id);
}
nodeMap.put(id, new DatanodeInfo(datanodeDetails, nodeStatus,
layoutInfo));
nodeMap.put(id, new Entry(datanode));
} finally {
lock.writeLock().unlock();
}
return oldInfo;
}

/**
Expand All @@ -140,7 +150,7 @@ public NodeStatus updateNodeHealthState(DatanodeID nodeId, NodeState newHealth)
throws NodeNotFoundException {
lock.writeLock().lock();
try {
DatanodeInfo dn = getNodeInfoUnsafe(nodeId);
final DatanodeInfo dn = getExisting(nodeId).getInfo();
final NodeStatus newStatus = dn.getNodeStatus().newNodeState(newHealth);
dn.setNodeStatus(newStatus);
return newStatus;
Expand All @@ -162,7 +172,7 @@ public NodeStatus updateNodeOperationalState(DatanodeID nodeId,
throws NodeNotFoundException {
lock.writeLock().lock();
try {
DatanodeInfo dn = getNodeInfoUnsafe(nodeId);
final DatanodeInfo dn = getExisting(nodeId).getInfo();
final NodeStatus newStatus = dn.getNodeStatus().newOperationalState(newOpState, opStateExpiryEpochSeconds);
dn.setNodeStatus(newStatus);
return newStatus;
Expand All @@ -178,7 +188,7 @@ public NodeStatus updateNodeOperationalState(DatanodeID nodeId,
public DatanodeInfo getNodeInfo(DatanodeID datanodeID) throws NodeNotFoundException {
lock.readLock().lock();
try {
return getNodeInfoUnsafe(datanodeID);
return getExisting(datanodeID).getInfo();
} finally {
lock.readLock().unlock();
}
Expand All @@ -199,9 +209,11 @@ public int getNodeCount() {
* @return list of all the node ids
*/
public List<DatanodeInfo> getAllDatanodeInfos() {
lock.readLock().lock();
try {
lock.readLock().lock();
return new ArrayList<>(nodeMap.values());
return nodeMap.values().stream()
.map(Entry::getInfo)
.collect(Collectors.toList());
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -229,18 +241,15 @@ public List<DatanodeInfo> getDatanodeInfos(NodeStatus status) {
*/
public List<DatanodeInfo> getDatanodeInfos(
NodeOperationalState opState, NodeState health) {
return filterNodes(opState, health);
return opState != null && health != null ? filterNodes(matching(opState, health))
: opState != null ? filterNodes(matching(opState))
: health != null ? filterNodes(matching(health))
: getAllDatanodeInfos();
}

/**
* Returns the count of nodes in the specified state.
*
* @param state NodeStatus
*
* @return Number of nodes in the specified state
*/
public int getNodeCount(NodeStatus state) {
return getDatanodeInfos(state).size();
/** @return Number of nodes in the given status */
public int getNodeCount(NodeStatus status) {
return countNodes(matching(status));
}

/**
Expand All @@ -253,7 +262,10 @@ public int getNodeCount(NodeStatus state) {
* @return Number of nodes in the specified state
*/
public int getNodeCount(NodeOperationalState opState, NodeState health) {
return filterNodes(opState, health).size();
return opState != null && health != null ? countNodes(matching(opState, health))
: opState != null ? countNodes(matching(opState))
: health != null ? countNodes(matching(health))
: getTotalNodeCount();
}

/**
Expand Down Expand Up @@ -282,7 +294,7 @@ public int getTotalNodeCount() {
public NodeStatus getNodeStatus(DatanodeID datanodeID) throws NodeNotFoundException {
lock.readLock().lock();
try {
return getNodeInfoUnsafe(datanodeID).getNodeStatus();
return getExisting(datanodeID).getInfo().getNodeStatus();
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -312,8 +324,7 @@ public void setContainersForTesting(DatanodeID id, Set<ContainerID> containers)
throws NodeNotFoundException {
lock.writeLock().lock();
try {
getExisting(id);
nodeToContainer.put(id, containers);
getExisting(id).setContainersForTesting(containers);
} finally {
lock.writeLock().unlock();
}
Expand All @@ -323,7 +334,7 @@ public Set<ContainerID> getContainers(DatanodeID id)
throws NodeNotFoundException {
lock.readLock().lock();
try {
return new HashSet<>(getExisting(id));
return getExisting(id).copyContainers();
} finally {
lock.readLock().unlock();
}
Expand All @@ -332,7 +343,7 @@ public Set<ContainerID> getContainers(DatanodeID id)
public int getContainerCount(DatanodeID datanodeID) throws NodeNotFoundException {
lock.readLock().lock();
try {
return getExisting(datanodeID).size();
return getExisting(datanodeID).getContainerCount();
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -369,63 +380,44 @@ public String toString() {
}

/**
* @return the container set mapping to the given id.
* @return the entry mapping to the given id.
* @throws NodeNotFoundException If the node is missing.
*/
private Set<ContainerID> getExisting(DatanodeID id) throws NodeNotFoundException {
final Set<ContainerID> containers = nodeToContainer.get(id);
if (containers == null) {
private Entry getExisting(DatanodeID id) throws NodeNotFoundException {
final Entry entry = nodeMap.get(id);
if (entry == null) {
throw new NodeNotFoundException(id);
}
return containers;
return entry;
}

/**
* Create a list of datanodeInfo for all nodes matching the passed states.
* Passing null for one of the states acts like a wildcard for that state.
*
* @param opState
* @param health
* @return List of DatanodeInfo objects matching the passed state
*/
private List<DatanodeInfo> filterNodes(
NodeOperationalState opState, NodeState health) {
if (opState != null && health != null) {
return filterNodes(matching(opState, health));
}
if (opState != null) {
return filterNodes(matching(opState));
}
if (health != null) {
return filterNodes(matching(health));
private int countNodes(Predicate<DatanodeInfo> filter) {
final long count;
lock.readLock().lock();
try {
count = nodeMap.values().stream()
.map(Entry::getInfo)
.filter(filter)
.count();
} finally {
lock.readLock().unlock();
}
return getAllDatanodeInfos();
return Math.toIntExact(count);
}

/**
* @return a list of all nodes matching the {@code filter}
*/
private List<DatanodeInfo> filterNodes(Predicate<DatanodeInfo> filter) {
List<DatanodeInfo> result = new LinkedList<>();
lock.readLock().lock();
try {
for (DatanodeInfo dn : nodeMap.values()) {
if (filter.test(dn)) {
result.add(dn);
}
}
return nodeMap.values().stream()
.map(Entry::getInfo)
.filter(filter)
.collect(Collectors.toList());
} finally {
lock.readLock().unlock();
}
return result;
}

private @Nonnull DatanodeInfo getNodeInfoUnsafe(@Nonnull DatanodeID id) throws NodeNotFoundException {
final DatanodeInfo info = nodeMap.get(id);
if (info == null) {
throw new NodeNotFoundException(id);
}
return info;
}

private static Predicate<DatanodeInfo> matching(NodeStatus status) {
Expand Down
Loading