Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.outliers.report.interval";
public static final String DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT =
"30m";
public static final String DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY =
"dfs.namenode.max.slowpeer.collect.nodes";
public static final int DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT =
5;
public static final String DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY =
"dfs.namenode.slowpeer.collect.interval";
public static final String DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT =
"30m";

// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
Expand Down Expand Up @@ -1173,6 +1181,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY =
"dfs.namenode.block-placement-policy.default.prefer-local-node";
public static final boolean DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT = true;
public static final String
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY =
"dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled";
public static final boolean
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT =
false;

public static final String DFS_NAMENODE_GC_TIME_MONITOR_ENABLE =
"dfs.namenode.gc.time.monitor.enable";
public static final boolean DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
import static org.apache.hadoop.util.Time.monotonicNow;
Expand Down Expand Up @@ -82,7 +84,8 @@ private enum NodeNotChosenReason {
NODE_TOO_BUSY("the node is too busy"),
TOO_MANY_NODES_ON_RACK("the rack has too many chosen nodes"),
NOT_ENOUGH_STORAGE_SPACE("not enough storage space to place the block"),
NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable");
NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable"),
NODE_SLOW("the node is too slow");

private final String text;

Expand All @@ -99,6 +102,8 @@ private String getText() {
private boolean considerLoadByStorageType;
protected double considerLoadFactor;
private boolean preferLocalNode;
private boolean dataNodePeerStatsEnabled;
private boolean excludeSlowNodesEnabled;
protected NetworkTopology clusterMap;
protected Host2NodesMap host2datanodeMap;
private FSClusterStats stats;
Expand Down Expand Up @@ -144,6 +149,12 @@ public void initialize(Configuration conf, FSClusterStats stats,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY,
DFSConfigKeys.
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT);
this.dataNodePeerStatsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
this.excludeSlowNodesEnabled = conf.getBoolean(
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT);
}

@Override
Expand Down Expand Up @@ -1091,6 +1102,15 @@ boolean isGoodDatanode(DatanodeDescriptor node,
return false;
}

// check if the target is a slow node
if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) {
Set<Node> nodes = DatanodeManager.getSlowNodes();
if (nodes.contains(node)) {
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW);
return false;
}
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
package org.apache.hadoop.hdfs.server.blockmanagement;

import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
import static org.apache.hadoop.util.Time.monotonicNow;

import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
Expand Down Expand Up @@ -53,6 +57,7 @@
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Timer;

Expand Down Expand Up @@ -201,8 +206,16 @@ public class DatanodeManager {
*/
private final boolean useDfsNetworkTopology;

private static final String IP_PORT_SEPARATOR = ":";

@Nullable
private final SlowPeerTracker slowPeerTracker;
private static Set<Node> slowNodesSet = Sets.newConcurrentHashSet();
private Daemon slowPeerCollectorDaemon;
private final long slowPeerCollectionInterval;
private final int maxSlowPeerReportNodes;
private boolean excludeSlowNodesEnabled;

@Nullable
private final SlowDiskTracker slowDiskTracker;

Expand Down Expand Up @@ -242,11 +255,22 @@ public class DatanodeManager {
DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
DFSConfigKeys.
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT));

final Timer timer = new Timer();
this.slowPeerTracker = dataNodePeerStatsEnabled ?
new SlowPeerTracker(conf, timer) : null;

this.excludeSlowNodesEnabled = conf.getBoolean(
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT);
this.maxSlowPeerReportNodes = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT);
this.slowPeerCollectionInterval = conf.getTimeDuration(
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
if (slowPeerTracker != null && excludeSlowNodesEnabled) {
startSlowPeerCollector();
}
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
new SlowDiskTracker(conf, timer) : null;

Expand Down Expand Up @@ -356,6 +380,44 @@ public class DatanodeManager {
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
}

private void startSlowPeerCollector() {
if (slowPeerCollectorDaemon != null) {
return;
}
slowPeerCollectorDaemon = new Daemon(new Runnable() {
@Override
public void run() {
while (true) {
try {
slowNodesSet = getSlowPeers();
} catch (Exception e) {
LOG.error("Failed to collect slow peers", e);
}

try {
Thread.sleep(slowPeerCollectionInterval);
} catch (InterruptedException e) {
LOG.error("Slow peers collection thread interrupted", e);
return;
}
}
}
});
slowPeerCollectorDaemon.start();
}

public void stopSlowPeerCollector() {
if (slowPeerCollectorDaemon == null) {
return;
}
slowPeerCollectorDaemon.interrupt();
try {
slowPeerCollectorDaemon.join();
} catch (InterruptedException e) {
LOG.error("Slow peers collection thread did not shutdown", e);
}
}

private static long getStaleIntervalFromConf(Configuration conf,
long heartbeatExpireInterval) {
long staleInterval = conf.getLong(
Expand Down Expand Up @@ -401,6 +463,7 @@ void activate(final Configuration conf) {
void close() {
datanodeAdminManager.close();
heartbeatManager.close();
stopSlowPeerCollector();
}

/** @return the network topology. */
Expand Down Expand Up @@ -2019,6 +2082,48 @@ public String getSlowPeersReport() {
return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
}

/**
* Returns all tracking slow peers.
* @return
*/
public Set<Node> getSlowPeers() {
Set<Node> slowPeersSet = Sets.newConcurrentHashSet();
if (slowPeerTracker == null) {
return slowPeersSet;
}
ArrayList<String> slowNodes =
slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
for (String slowNode : slowNodes) {
if (StringUtils.isBlank(slowNode)
|| !slowNode.contains(IP_PORT_SEPARATOR)) {
continue;
}
String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0];
DatanodeDescriptor datanodeByHost =
host2DatanodeMap.getDatanodeByHost(ipAddr);
if (datanodeByHost != null) {
slowPeersSet.add(datanodeByHost);
}
}
return slowPeersSet;
}

/**
* Returns all tracking slow peers.
* @return
*/
public static Set<Node> getSlowNodes() {
return slowNodesSet;
}

/**
* Use only for testing.
*/
@VisibleForTesting
public SlowPeerTracker getSlowPeerTracker() {
return slowPeerTracker;
}

/**
* Use only for testing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -233,6 +234,23 @@ public SortedSet<String> getReportingNodes() {
}
}

/**
* Returns all tracking slow peers.
* @param numNodes
* @return
*/
public ArrayList<String> getSlowNodes(int numNodes) {
Collection<ReportForJson> jsonReports = getJsonReports(numNodes);
ArrayList<String> slowNodes = new ArrayList<>();
for (ReportForJson jsonReport : jsonReports) {
slowNodes.add(jsonReport.getSlowNode());
}
if (!slowNodes.isEmpty()) {
LOG.warn("Slow nodes list: " + slowNodes);
}
return slowNodes;
}

/**
* Retrieve reports in a structure for generating JSON, limiting the
* output to the top numNodes nodes i.e nodes with the most reports.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2368,6 +2368,36 @@
</description>
</property>

<property>
<name>dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled</name>
<value>false</value>
<description>
If this is set to true, we will filter out slow nodes
when choosing targets for blocks.
</description>
</property>

<property>
<name>dfs.namenode.max.slowpeer.collect.nodes</name>
<value>5</value>
<description>
How many slow nodes we will collect for filtering out
when choosing targets for blocks.

It is ignored if dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled is false.
</description>
</property>

<property>
<name>dfs.namenode.slowpeer.collect.interval</name>
<value>30m</value>
<description>
Interval at which the slow peer trackers runs in the background to collect slow peers.

It is ignored if dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled is false.
</description>
</property>

<property>
<name>dfs.datanode.fileio.profiling.sampling.percentage</name>
<value>0</value>
Expand Down
Loading