Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ private static Pipeline getFromProtobufSetCreationTimestamp(HddsProtos.Pipeline
.build();
}

public Pipeline copyWithNodesInOrder(List<DatanodeDetails> nodes) {
public Pipeline copyWithNodesInOrder(List<? extends DatanodeDetails> nodes) {
return toBuilder().setNodesInOrder(nodes).build();
}

Expand Down Expand Up @@ -611,7 +611,7 @@ public Builder setNodeOrder(List<Integer> orders) {
return this;
}

public Builder setNodesInOrder(List<DatanodeDetails> nodes) {
public Builder setNodesInOrder(List<? extends DatanodeDetails> nodes) {
this.nodesInOrder = new LinkedList<>(nodes);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private synchronized boolean checkIfDecommissionPossible(List<DatanodeDetails> d
}
int reqNodes = cif.getReplicationConfig().getRequiredNodes();
if ((inServiceTotal - numDecom) < reqNodes) {
int unHealthyTotal = nodeManager.getAllNodes().size() - inServiceTotal;
final int unHealthyTotal = nodeManager.getAllNodeCount() - inServiceTotal;
String errorMsg = "Insufficient nodes. Tried to decommission " + dns.size() +
" nodes out of " + inServiceTotal + " IN-SERVICE HEALTHY and " + unHealthyTotal +
" not IN-SERVICE or not HEALTHY nodes. Cannot decommission as a minimum of " + reqNodes +
Expand Down Expand Up @@ -591,7 +591,7 @@ private synchronized boolean checkIfMaintenancePossible(List<DatanodeDetails> dn
minInService = maintenanceReplicaMinimum;
}
if ((inServiceTotal - numMaintenance) < minInService) {
int unHealthyTotal = nodeManager.getAllNodes().size() - inServiceTotal;
final int unHealthyTotal = nodeManager.getAllNodeCount() - inServiceTotal;
String errorMsg = "Insufficient nodes. Tried to start maintenance for " + dns.size() +
" nodes out of " + inServiceTotal + " IN-SERVICE HEALTHY and " + unHealthyTotal +
" not IN-SERVICE or not HEALTHY nodes. Cannot enter maintenance mode as a minimum of " + minInService +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,14 @@ int getNodeCount(
NodeOperationalState opState, NodeState health);

/**
* Get all datanodes known to SCM.
*
* @return List of DatanodeDetails known to SCM.
* @return all datanodes known to SCM.
*/
List<DatanodeDetails> getAllNodes();
List<? extends DatanodeDetails> getAllNodes();

/** @return the number of datanodes. */
default int getAllNodeCount() {
return getAllNodes().size();
}

/**
* Returns the aggregated node stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,10 @@ public List<DatanodeInfo> getAllNodes() {
return nodeStateMap.getAllDatanodeInfos();
}

int getAllNodeCount() {
return nodeStateMap.getNodeCount();
}

/**
* Sets the operational state of the given node. Intended to be called when
* a node is being decommissioned etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -252,15 +251,14 @@ public List<DatanodeDetails> getNodes(
.map(node -> (DatanodeDetails)node).collect(Collectors.toList());
}

/**
* Returns all datanodes that are known to SCM.
*
* @return List of DatanodeDetails
*/
@Override
public List<DatanodeDetails> getAllNodes() {
return nodeStateManager.getAllNodes().stream()
.map(node -> (DatanodeDetails) node).collect(Collectors.toList());
public List<DatanodeInfo> getAllNodes() {
return nodeStateManager.getAllNodes();
}

@Override
public int getAllNodeCount() {
return nodeStateManager.getAllNodeCount();
}

/**
Expand Down Expand Up @@ -449,9 +447,9 @@ public RegisteredCommand register(
LOG.info("Updated datanode to: {}", dn);
scmNodeEventPublisher.fireEvent(SCMEvents.NODE_ADDRESS_UPDATE, dn);
} else if (isVersionChange(oldNode.getVersion(), datanodeDetails.getVersion())) {
LOG.info("Update the version for registered datanode = {}, " +
LOG.info("Update the version for registered datanode {}, " +
"oldVersion = {}, newVersion = {}.",
datanodeDetails.getUuid(), oldNode.getVersion(), datanodeDetails.getVersion());
datanodeDetails, oldNode.getVersion(), datanodeDetails.getVersion());
nodeStateManager.updateNode(datanodeDetails, layoutInfo);
}
} catch (NodeNotFoundException e) {
Expand Down Expand Up @@ -1725,29 +1723,16 @@ public DatanodeInfo getNode(DatanodeID id) {
*/
@Override
public List<DatanodeDetails> getNodesByAddress(String address) {
List<DatanodeDetails> allNodes = getAllNodes();
List<DatanodeDetails> results = new LinkedList<>();
if (Strings.isNullOrEmpty(address)) {
LOG.warn("address is null");
return results;
return Collections.emptyList();
}
Set<DatanodeID> datanodeIDS = dnsToDnIdMap.get(address);
if (datanodeIDS == null) {
LOG.debug("Cannot find node for address {}", address);
return results;
return Collections.emptyList();
}

datanodeIDS.forEach(datanodeID -> {
try {
List<DatanodeDetails> datanodeDetails = allNodes.stream().
filter(node -> node.getID().equals(datanodeID)).
collect(Collectors.toList());
results.addAll(datanodeDetails);
} catch (Exception e) {
LOG.warn("Error find node for DataNode ID {}", datanodeID);
}
});
return results;
return datanodeIDS.stream()
.map(this::getNode)
.collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
*/
public class TestSCMCommonPlacementPolicy {

private NodeManager nodeManager;
private MockNodeManager nodeManager;
private OzoneConfiguration conf;

@BeforeEach
Expand Down Expand Up @@ -535,7 +535,7 @@ private static class DummyPlacementPolicy extends SCMCommonPlacementPolicy {
DummyPlacementPolicy(NodeManager nodeManager, ConfigurationSource conf,
int rackCnt) {
this(nodeManager, conf,
IntStream.range(0, nodeManager.getAllNodes().size()).boxed()
IntStream.range(0, nodeManager.getAllNodeCount()).boxed()
.collect(Collectors.toMap(Function.identity(),
idx -> idx % rackCnt)), rackCnt);
}
Expand All @@ -552,7 +552,7 @@ private static class DummyPlacementPolicy extends SCMCommonPlacementPolicy {
this.rackCnt = rackCnt;
this.racks = IntStream.range(0, rackCnt)
.mapToObj(i -> mock(Node.class)).collect(Collectors.toList());
List<DatanodeDetails> datanodeDetails = nodeManager.getAllNodes();
final List<? extends DatanodeDetails> datanodeDetails = nodeManager.getAllNodes();
rackMap = datanodeRackMap.entrySet().stream()
.collect(Collectors.toMap(
entry -> datanodeDetails.get(entry.getKey()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,7 @@ public void testScmRegisterNodeWith4LayerNetworkTopology()
assertEquals(nodeCount, nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
assertEquals(nodeCount, clusterMap.getNumOfLeafNode(""));
assertEquals(4, clusterMap.getMaxLevel());
List<DatanodeDetails> nodeList = nodeManager.getAllNodes();
final List<DatanodeInfo> nodeList = nodeManager.getAllNodes();
nodeList.forEach(node -> assertTrue(
node.getNetworkLocation().startsWith("/rack1/ng")));
nodeList.forEach(node -> assertNotNull(node.getParent()));
Expand Down Expand Up @@ -1872,7 +1872,7 @@ void testScmRegisterNodeWithNetworkTopology(boolean useHostname)
nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
assertEquals(nodeCount, clusterMap.getNumOfLeafNode(""));
assertEquals(3, clusterMap.getMaxLevel());
List<DatanodeDetails> nodeList = nodeManager.getAllNodes();
final List<DatanodeInfo> nodeList = nodeManager.getAllNodes();
nodeList.forEach(node ->
assertEquals("/rack1", node.getNetworkLocation()));

Expand Down Expand Up @@ -2019,7 +2019,7 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname()
nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
assertEquals(1, clusterMap.getNumOfLeafNode(""));
assertEquals(4, clusterMap.getMaxLevel());
List<DatanodeDetails> nodeList = nodeManager.getAllNodes();
final List<DatanodeInfo> nodeList = nodeManager.getAllNodes();
assertEquals(1, nodeList.size());

DatanodeDetails returnedNode = nodeList.get(0);
Expand All @@ -2039,7 +2039,7 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname()
assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy()));
assertEquals(1, clusterMap.getNumOfLeafNode(""));
assertEquals(4, clusterMap.getMaxLevel());
List<DatanodeDetails> updatedNodeList = nodeManager.getAllNodes();
final List<DatanodeInfo> updatedNodeList = nodeManager.getAllNodes();
assertEquals(1, updatedNodeList.size());

DatanodeDetails returnedUpdatedNode = updatedNodeList.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void testDatanodeUsageInfoCompatibility() throws IOException {

@Test
public void testDatanodeUsageInfoContainerCount() throws Exception {
List<DatanodeDetails> dnList = cluster().getStorageContainerManager()
List<? extends DatanodeDetails> dnList = cluster().getStorageContainerManager()
.getScmNodeManager()
.getAllNodes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ public void testScmInfo(@TempDir Path tempDir) throws Exception {
*/
private void testScmProcessDatanodeHeartbeat(MiniOzoneCluster cluster) {
NodeManager nodeManager = cluster.getStorageContainerManager().getScmNodeManager();
List<DatanodeDetails> allNodes = nodeManager.getAllNodes();
List<? extends DatanodeDetails> allNodes = nodeManager.getAllNodes();
assertEquals(cluster.getHddsDatanodes().size(), allNodes.size());

for (DatanodeDetails node : allNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void testInsufficientNodesCannotBeDecommissioned()
// Generate some data on the empty cluster to create some containers
generateData(20, "key", ratisRepConfig);

final List<DatanodeDetails> toDecommission = nm.getAllNodes();
final List<? extends DatanodeDetails> toDecommission = nm.getAllNodes();

// trying to decommission 5 nodes should leave the cluster with 2 nodes,
// which is not sufficient for RATIS.THREE replication. It should not be allowed.
Expand Down Expand Up @@ -706,7 +706,7 @@ public void testInsufficientNodesCannotBePutInMaintenance()
throws Exception {
// Generate some data on the empty cluster to create some containers
generateData(20, "key", ratisRepConfig);
final List<DatanodeDetails> toMaintenance = nm.getAllNodes();
final List<? extends DatanodeDetails> toMaintenance = nm.getAllNodes();

// trying to move 6 nodes to maintenance should leave the cluster with 1 node,
// which is not sufficient for RATIS.THREE replication (3 - maintenanceReplicaMinimum = 2).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private void assertNotSamePeers() {
nodeManager.getAllNodes().forEach((dn) -> {
Collection<DatanodeDetails> peers = nodeManager.getPeerList(dn);
assertThat(peers).doesNotContain(dn);
List<DatanodeDetails> trimList = nodeManager.getAllNodes();
List<? extends DatanodeDetails> trimList = nodeManager.getAllNodes();
trimList.remove(dn);
assertThat(peers).containsAll(trimList);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static void cleanup() throws Exception {
public void sortDatanodesRelativeToDatanode() {
for (DatanodeDetails dn : nodeManager.getAllNodes()) {
assertEquals(ROOT_LEVEL + 2, dn.getLevel());
List<DatanodeDetails> sorted =
List<? extends DatanodeDetails> sorted =
keyManager.sortDatanodes(nodeManager.getAllNodes(), nodeAddress(dn));
assertEquals(dn, sorted.get(0),
"Source node should be sorted very first");
Expand All @@ -146,12 +146,12 @@ public void sortDatanodesRelativeToNonDatanode() {

@Test
public void testSortDatanodes() {
List<DatanodeDetails> nodes = nodeManager.getAllNodes();
List<? extends DatanodeDetails> nodes = nodeManager.getAllNodes();

// sort normal datanodes
String client;
client = nodeManager.getAllNodes().get(0).getIpAddress();
List<DatanodeDetails> datanodeDetails =
List<? extends DatanodeDetails> datanodeDetails =
keyManager.sortDatanodes(nodes, client);
assertEquals(NODE_COUNT, datanodeDetails.size());

Expand All @@ -166,7 +166,7 @@ public void testSortDatanodes() {
assertEquals(NODE_COUNT, datanodeDetails.size());
}

private static void assertRackOrder(String rack, List<DatanodeDetails> list) {
private static void assertRackOrder(String rack, List<? extends DatanodeDetails> list) {
int size = list.size();
for (int i = 0; i < size / 2; i++) {
assertEquals(rack, list.get(i).getNetworkLocation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testExplicitRemovalOfNode() throws Exception {
ReconNodeManager nodeManager = (ReconNodeManager) ozoneCluster.getReconServer()
.getReconStorageContainerManager().getScmNodeManager();
long nodeDBCountBefore = nodeManager.getNodeDBKeyCount();
List<DatanodeDetails> allNodes = nodeManager.getAllNodes();
List<? extends DatanodeDetails> allNodes = nodeManager.getAllNodes();
assertEquals(nodeDBCountBefore, allNodes.size());

DatanodeDetails datanodeDetails = allNodes.get(3);
Expand All @@ -137,7 +137,7 @@ public void testExplicitRemovalOfNode() throws Exception {
try {
return nodeManager.getNodeStatus(datanodeDetails).isDead();
} catch (NodeNotFoundException e) {
fail("getNodeStatus() Failed for " + datanodeDetails.getUuid(), e);
fail("getNodeStatus() Failed for " + datanodeDetails, e);
throw new RuntimeException(e);
}
}, 2000, 10000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1833,7 +1833,7 @@ public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
refreshPipelineFromCache(keyInfoList);

if (args.getSortDatanodes()) {
sortDatanodes(clientAddress, keyInfoList.toArray(new OmKeyInfo[0]));
sortDatanodes(clientAddress, keyInfoList);
}
return fileStatusList;
}
Expand Down Expand Up @@ -1962,7 +1962,7 @@ private List<OzoneFileStatus> sortPipelineInfo(
refreshPipelineFromCache(keyInfoList);

if (omKeyArgs.getSortDatanodes()) {
sortDatanodes(clientAddress, keyInfoList.toArray(new OmKeyInfo[0]));
sortDatanodes(clientAddress, keyInfoList);
}

return fileStatusFinalList;
Expand Down Expand Up @@ -2001,9 +2001,13 @@ private FileEncryptionInfo getFileEncryptionInfo(OmBucketInfo bucketInfo)
return encInfo;
}

private void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
private void sortDatanodes(String clientMachine, OmKeyInfo keyInfo) {
sortDatanodes(clientMachine, Collections.singletonList(keyInfo));
}

private void sortDatanodes(String clientMachine, List<OmKeyInfo> keyInfos) {
if (keyInfos != null && clientMachine != null) {
Map<Set<String>, List<DatanodeDetails>> sortedPipelines = new HashMap<>();
final Map<Set<String>, List<? extends DatanodeDetails>> sortedPipelines = new HashMap<>();
for (OmKeyInfo keyInfo : keyInfos) {
OmKeyLocationInfoGroup key = keyInfo.getLatestVersionLocations();
if (key == null) {
Expand All @@ -2013,14 +2017,16 @@ private void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
for (OmKeyLocationInfo k : key.getLocationList()) {
Pipeline pipeline = k.getPipeline();
List<DatanodeDetails> nodes = pipeline.getNodes();
List<String> uuidList = toNodeUuid(nodes);
Set<String> uuidSet = new HashSet<>(uuidList);
List<DatanodeDetails> sortedNodes = sortedPipelines.get(uuidSet);
if (nodes.isEmpty()) {
LOG.warn("No datanodes in pipeline {}", pipeline.getId());
continue;
}

final Set<String> uuidSet = nodes.stream().map(DatanodeDetails::getUuidString)
.collect(Collectors.toSet());

List<? extends DatanodeDetails> sortedNodes = sortedPipelines.get(uuidSet);
if (sortedNodes == null) {
if (nodes.isEmpty()) {
LOG.warn("No datanodes in pipeline {}", pipeline.getId());
continue;
}
sortedNodes = sortDatanodes(nodes, clientMachine);
if (sortedNodes != null) {
sortedPipelines.put(uuidSet, sortedNodes);
Expand All @@ -2038,15 +2044,15 @@ private void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
}

@VisibleForTesting
public List<DatanodeDetails> sortDatanodes(List<DatanodeDetails> nodes,
public List<? extends DatanodeDetails> sortDatanodes(List<? extends DatanodeDetails> nodes,
String clientMachine) {
final Node client = getClientNode(clientMachine, nodes);
return ozoneManager.getClusterMap()
.sortByDistanceCost(client, nodes, nodes.size());
}

private Node getClientNode(String clientMachine,
List<DatanodeDetails> nodes) {
List<? extends DatanodeDetails> nodes) {
List<DatanodeDetails> matchingNodes = new ArrayList<>();
boolean useHostname = ozoneManager.getConfiguration().getBoolean(
HddsConfigKeys.HDDS_DATANODE_USE_DN_HOSTNAME,
Expand Down Expand Up @@ -2092,14 +2098,6 @@ private String resolveNodeLocation(String hostname) {
}
}

private static List<String> toNodeUuid(Collection<DatanodeDetails> nodes) {
List<String> nodeSet = new ArrayList<>(nodes.size());
for (DatanodeDetails node : nodes) {
nodeSet.add(node.getUuidString());
}
return nodeSet;
}

private void slimLocationVersion(OmKeyInfo... keyInfos) {
if (keyInfos != null) {
for (OmKeyInfo keyInfo : keyInfos) {
Expand Down
Loading