Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ protected NetworkTopology init(InnerNode.Factory factory) {
private int depthOfAllLeaves = -1;
/** rack counter */
protected int numOfRacks = 0;
/** empty rack map, rackname->nodenumber. */
private HashMap<String, Set<String>> rackMap =
new HashMap<String, Set<String>>();
/** decommission nodes, contained stoped nodes. */
private HashSet<String> decommissionNodes = new HashSet<>();
/** empty rack counter. */
private int numOfEmptyRacks = 0;

/**
* Whether or not this cluster has ever consisted of more than 1 rack,
Expand Down Expand Up @@ -150,6 +157,7 @@ public void add(Node node) {
if (rack == null) {
incrementRacks();
}
interAddNodeWithEmptyRack(node);
if (depthOfAllLeaves == -1) {
depthOfAllLeaves = node.getLevel();
}
Expand Down Expand Up @@ -226,6 +234,7 @@ public void remove(Node node) {
if (rack == null) {
numOfRacks--;
}
interRemoveNodeWithEmptyRack(node);
}
LOG.debug("NetworkTopology became:\n{}", this);
} finally {
Expand Down Expand Up @@ -989,4 +998,108 @@ private <T extends Node> void sortByDistance(Node reader, T[] nodes,
Preconditions.checkState(idx == activeLen,
"Sorted the wrong number of nodes!");
}
}

/** @return the number of nonempty racks */
public int getNumOfNonEmptyRacks() {
return numOfRacks - numOfEmptyRacks;
}

/**
* Update empty rack number when add a node like recommission.
* @param node node to be added; can be null
*/
public void recommissionNode(Node node) {
if (node == null) {
return;
}
if (node instanceof InnerNode) {
throw new IllegalArgumentException(
"Not allow to remove an inner node: " + NodeBase.getPath(node));
}
netlock.writeLock().lock();
try {
decommissionNodes.remove(node.getName());
interAddNodeWithEmptyRack(node);
} finally {
netlock.writeLock().unlock();
}
}

/**
* Update empty rack number when remove a node like decommission.
* @param node node to be added; can be null
*/
public void decommissionNode(Node node) {
if (node == null) {
return;
}
if (node instanceof InnerNode) {
throw new IllegalArgumentException(
"Not allow to remove an inner node: " + NodeBase.getPath(node));
}
netlock.writeLock().lock();
try {
decommissionNodes.add(node.getName());
interRemoveNodeWithEmptyRack(node);
} finally {
netlock.writeLock().unlock();
}
}

/**
* Internal function for update empty rack number
* for add or recommission a node.
* @param node node to be added; can be null
*/
private void interAddNodeWithEmptyRack(Node node) {
if (node == null) {
return;
}
String rackname = node.getNetworkLocation();
Set<String> nodes = rackMap.get(rackname);
if (nodes == null) {
nodes = new HashSet<String>();
}
if (!decommissionNodes.contains(node.getName())) {
nodes.add(node.getName());
}
rackMap.put(rackname, nodes);
countEmptyRacks();
}

/**
* Internal function for update empty rack number
* for remove or decommission a node.
* @param node node to be removed; can be null
*/
private void interRemoveNodeWithEmptyRack(Node node) {
if (node == null) {
return;
}
String rackname = node.getNetworkLocation();
Set<String> nodes = rackMap.get(rackname);
if (nodes != null) {
InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
if (rack == null) {
// this node and its rack are both removed.
rackMap.remove(rackname);
} else if (nodes.contains(node.getName())) {
// this node is decommissioned or removed.
nodes.remove(node.getName());
rackMap.put(rackname, nodes);
}
countEmptyRacks();
}
}

private void countEmptyRacks() {
int count = 0;
for (Set<String> nodes : rackMap.values()) {
if (nodes != null && nodes.isEmpty()) {
count++;
}
}
numOfEmptyRacks = count;
LOG.debug("Current numOfEmptyRacks is {}", numOfEmptyRacks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
&& stats.isAvoidingStaleDataNodesForWrite());
boolean avoidLocalRack = (addBlockFlags != null
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_RACK) && writer != null
&& clusterMap.getNumOfRacks() > 2);
&& clusterMap.getNumOfNonEmptyRacks() > 2);
boolean avoidLocalNode = (addBlockFlags != null
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
&& writer != null
Expand Down Expand Up @@ -385,7 +385,7 @@ protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
totalNumOfReplicas = clusterSize;
}
// No calculation needed when there is only one rack or picking one node.
int numOfRacks = clusterMap.getNumOfRacks();
int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
// HDFS-14527 return default when numOfRacks = 0 to avoid
// ArithmeticException when calc maxNodesPerRack at following logic.
if (numOfRacks <= 1 || totalNumOfReplicas <= 1) {
Expand Down Expand Up @@ -1173,7 +1173,7 @@ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
.map(dn -> dn.getNetworkLocation()).distinct().count();

return new BlockPlacementStatusDefault(Math.toIntExact(rackCount),
minRacks, clusterMap.getNumOfRacks());
minRacks, clusterMap.getNumOfNonEmptyRacks());
}

/**
Expand Down Expand Up @@ -1370,4 +1370,3 @@ public boolean getExcludeSlowNodesEnabled() {
return excludeSlowNodesEnabled;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
totalNumOfReplicas = clusterSize;
}
// No calculation needed when there is only one rack or picking one node.
int numOfRacks = clusterMap.getNumOfRacks();
int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
// HDFS-14527 return default when numOfRacks = 0 to avoid
// ArithmeticException when calc maxNodesPerRack at following logic.
if (numOfRacks <= 1 || totalNumOfReplicas <= 1) {
Expand Down Expand Up @@ -90,38 +90,39 @@ protected Node chooseTargetInOrder(int numOfReplicas,
EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException {
int totalReplicaExpected = results.size() + numOfReplicas;
int numOfRacks = clusterMap.getNumOfRacks();
if (totalReplicaExpected < numOfRacks ||
totalReplicaExpected % numOfRacks == 0) {
writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
return writer;
}
int numOfRacks = clusterMap.getNumOfNonEmptyRacks();

assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks;
try {
if (totalReplicaExpected < numOfRacks ||
totalReplicaExpected % numOfRacks == 0) {
writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
return writer;
}

// Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1)
// replicas.
HashMap<String, Integer> rackCounts = new HashMap<>();
for (DatanodeStorageInfo dsInfo : results) {
String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation();
Integer count = rackCounts.get(rack);
if (count != null) {
rackCounts.put(rack, count + 1);
} else {
rackCounts.put(rack, 1);
assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks;

// Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1)
// replicas.
HashMap<String, Integer> rackCounts = new HashMap<>();
for (DatanodeStorageInfo dsInfo : results) {
String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation();
Integer count = rackCounts.get(rack);
if (count != null) {
rackCounts.put(rack, count + 1);
} else {
rackCounts.put(rack, 1);
}
}
}
int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results
for (int count : rackCounts.values()) {
if (count > maxNodesPerRack -1) {
excess += count - (maxNodesPerRack -1);
int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results
for (int count : rackCounts.values()) {
if (count > maxNodesPerRack -1) {
excess += count - (maxNodesPerRack -1);
}
}
}
numOfReplicas = Math.min(totalReplicaExpected - results.size(),
(maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
numOfReplicas = Math.min(totalReplicaExpected - results.size(),
(maxNodesPerRack -1) * numOfRacks - (results.size() - excess));

try {
// Try to spread the replicas as evenly as possible across racks.
// This is done by first placing with (maxNodesPerRack-1), then spreading
// the remainder by calling again with maxNodesPerRack.
Expand Down Expand Up @@ -243,7 +244,7 @@ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
racks.add(dn.getNetworkLocation());
}
return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas,
clusterMap.getNumOfRacks());
clusterMap.getNumOfNonEmptyRacks());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager
hbManager.startDecommission(node);
// Update cluster's emptyRack
blockManager.getDatanodeManager().getNetworkTopology().decommissionNode(node);
// hbManager.startDecommission will set dead node to decommissioned.
if (node.isDecommissionInProgress()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
Expand All @@ -200,6 +202,8 @@ public void stopDecommission(DatanodeDescriptor node) {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager
hbManager.stopDecommission(node);
// Update cluster's emptyRack
blockManager.getDatanodeManager().getNetworkTopology().recommissionNode(node);
// extra redundancy blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive()) {
Expand Down Expand Up @@ -412,4 +416,4 @@ void runMonitorForTest() throws ExecutionException, InterruptedException {
executor.submit(monitor).get();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8233,7 +8233,7 @@ public ECTopologyVerifierResult getECTopologyResultForPolicies(
getBlockManager().getDatanodeManager().getNumOfDataNodes();
int numOfRacks =
getBlockManager().getDatanodeManager().getNetworkTopology()
.getNumOfRacks();
.getNumOfNonEmptyRacks();
result = ECTopologyVerifier
.getECTopologyVerifierResult(numOfRacks, numOfDataNodes, policies);
}
Expand Down Expand Up @@ -8676,7 +8676,7 @@ private ECTopologyVerifierResult getEcTopologyVerifierResultForEnabledPolicies()
int numOfDataNodes =
getBlockManager().getDatanodeManager().getNumOfDataNodes();
int numOfRacks = getBlockManager().getDatanodeManager().getNetworkTopology()
.getNumOfRacks();
.getNumOfNonEmptyRacks();
ErasureCodingPolicy[] enabledEcPolicies =
getErasureCodingPolicyManager().getCopyOfEnabledPolicies();
return ECTopologyVerifier
Expand Down Expand Up @@ -8734,4 +8734,3 @@ public void checkErasureCodingSupported(String operationName)
}
}
}

Loading