Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|:---- |:---- |
| `BytesWritten` | Total number of bytes written to DataNode |
| `BytesRead` | Total number of bytes read from DataNode |
| `ReadTransferRateNumOps` | Total number of data read transfers |
| `ReadTransferRateAvgTime` | Average transfer rate of bytes read from DataNode, measured in bytes per second. |
| `ReadTransferRate`*num*`s(50/75/90/95/99)thPercentileRate` | The 50/75/90/95/99th percentile of the transfer rate of bytes read from DataNode, measured in bytes per second. |
| `BlocksWritten` | Total number of blocks written to DataNode |
| `BlocksRead` | Total number of blocks read from DataNode |
| `BlocksReplicated` | Total number of blocks replicated |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,11 @@ public void readBlock(final ExtendedBlock block,
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
datanode.metrics.incrTotalReadTime(duration);
if (read < 0 || duration <= 0) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You duplicated this logic, I would put behind a private helper so people don't forget this check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to a Utils method

LOG.warn("Unexpected value for data transfer bytes={} duration={}", read, duration);
} else {
datanode.metrics.addReadTransferRate(read * 1000 / duration);
}
} catch ( SocketException ignored ) {
LOG.trace("{}:Ignoring exception while serving {} to {}",
dnR, block, remoteAddress, ignored);
Expand Down Expand Up @@ -1122,6 +1127,11 @@ public void copyBlock(final ExtendedBlock block,
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
datanode.metrics.incrTotalReadTime(duration);
if (read < 0 || duration <= 0) {
LOG.warn("Unexpected value for data transfer bytes={} duration={}", read, duration);
} else {
datanode.metrics.addReadTransferRate(read * 1000 / duration);
}

LOG.info("Copied {} to {}", block, peer.getRemoteAddressString());
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong bytesRead;
@Metric("Milliseconds spent reading")
MutableCounterLong totalReadTime;
@Metric private MutableRate readTransferRate;
final private MutableQuantiles[] readTransferRateQuantiles;
@Metric MutableCounterLong blocksWritten;
@Metric MutableCounterLong blocksRead;
@Metric MutableCounterLong blocksReplicated;
Expand Down Expand Up @@ -227,6 +229,7 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals,
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
readTransferRateQuantiles = new MutableQuantiles[len];

for (int i = 0; i < len; i++) {
int interval = intervals[i];
Expand Down Expand Up @@ -255,6 +258,10 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals,
"ramDiskBlocksLazyPersistWindows" + interval + "s",
"Time between the RamDisk block write and disk persist in ms",
"ops", "latency", interval);
readTransferRateQuantiles[i] = registry.newQuantiles(
"readTransferRate" + interval + "s",
"Rate at which bytes are read from datanode calculated in bytes per second",
"ops", "rate", interval);
}
}

Expand Down Expand Up @@ -316,6 +323,13 @@ public void addIncrementalBlockReport(long latency,
}
}

public void addReadTransferRate(long readTransferRate) {
this.readTransferRate.add(readTransferRate);
for (MutableQuantiles q : readTransferRateQuantiles) {
q.add(readTransferRate);
}
}

public void addCacheReport(long latency) {
cacheReports.add(latency);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ public void testTimeoutMetric() throws Exception {
@Test(timeout=120000)
public void testDataNodeTimeSpend() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + 60);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
final FileSystem fs = cluster.getFileSystem();
Expand All @@ -391,6 +392,7 @@ public void testDataNodeTimeSpend() throws Exception {

final long startWriteValue = getLongCounter("TotalWriteTime", rb);
final long startReadValue = getLongCounter("TotalReadTime", rb);
assertCounter("ReadTransferRateNumOps", 0L, rb);
final AtomicInteger x = new AtomicInteger(0);

// Lets Metric system update latest metrics
Expand All @@ -410,6 +412,8 @@ public Boolean get() {
MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name());
final long endWriteValue = getLongCounter("TotalWriteTime", rbNew);
final long endReadValue = getLongCounter("TotalReadTime", rbNew);
assertCounter("ReadTransferRateNumOps", 1L, rbNew);
assertQuantileGauges("ReadTransferRate" + "60s", rbNew, "Rate");
return endWriteValue > startWriteValue
&& endReadValue > startReadValue;
}
Expand Down