Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|:---- |:---- |
| `BytesWritten` | Total number of bytes written to DataNode |
| `BytesRead` | Total number of bytes read from DataNode |
| `ReadTransferRateNumOps` | Total number of data read transfers |
| `ReadTransferRateAvgTime` | Average transfer rate of bytes read from DataNode, measured in bytes per second. |
| `ReadTransferRate`*num*`s(50/75/90/95/99)thPercentileRate` | The 50/75/90/95/99th percentile of the transfer rate of bytes read from DataNode, measured in bytes per second. |
| `BlocksWritten` | Total number of blocks written to DataNode |
| `BlocksRead` | Total number of blocks read from DataNode |
| `BlocksReplicated` | Total number of blocks replicated |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1936,4 +1936,20 @@ public static boolean isParentEntry(final String path, final String parent) {
return path.charAt(parent.length()) == Path.SEPARATOR_CHAR
|| parent.equals(Path.SEPARATOR);
}

/**
* Calculate the transfer rate in bytes/second. Return -1 for any negative input.
* @param bytes bytes
* @param durationMS duration in milliseconds
* @return the number of bytes/second of the transfer rate
*/
public static long transferRateBytesPerSecond(long bytes, long durationMS) {
if (bytes < 0 || durationMS < 0) {
return -1;
}
if (durationMS == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is <= 0, just return -1? Let's add a check for bytes as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont feel we should handle other cases. This is a Utils method and any unexpected data should be left for the client to interpret. For some clients the negative values might even make sense.
The idea behind handling for durationMS = 0 is to take care of DivideByZero for cases when data transfer did not happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we specify our function as: "we expect both inputs to be positive. Otherwise, this function will return -1".

Then returning -1 is a clear signal we don't know how to handle such inputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

durationMS = 1;
}
return bytes * 1000 / durationMS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import javax.crypto.SecretKey;
Expand Down Expand Up @@ -632,6 +633,7 @@ public void readBlock(final ExtendedBlock block,
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
datanode.metrics.incrTotalReadTime(duration);
datanode.metrics.addReadTransferRate(DFSUtil.transferRateBytesPerSecond(read, duration));
} catch ( SocketException ignored ) {
LOG.trace("{}:Ignoring exception while serving {} to {}",
dnR, block, remoteAddress, ignored);
Expand Down Expand Up @@ -1122,6 +1124,7 @@ public void copyBlock(final ExtendedBlock block,
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
datanode.metrics.incrTotalReadTime(duration);
datanode.metrics.addReadTransferRate(DFSUtil.transferRateBytesPerSecond(read, duration));

LOG.info("Copied {} to {}", block, peer.getRemoteAddressString());
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong bytesRead;
@Metric("Milliseconds spent reading")
MutableCounterLong totalReadTime;
@Metric private MutableRate readTransferRate;
final private MutableQuantiles[] readTransferRateQuantiles;
@Metric MutableCounterLong blocksWritten;
@Metric MutableCounterLong blocksRead;
@Metric MutableCounterLong blocksReplicated;
Expand Down Expand Up @@ -227,6 +229,7 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals,
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
readTransferRateQuantiles = new MutableQuantiles[len];

for (int i = 0; i < len; i++) {
int interval = intervals[i];
Expand Down Expand Up @@ -255,6 +258,10 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals,
"ramDiskBlocksLazyPersistWindows" + interval + "s",
"Time between the RamDisk block write and disk persist in ms",
"ops", "latency", interval);
readTransferRateQuantiles[i] = registry.newQuantiles(
"readTransferRate" + interval + "s",
"Rate at which bytes are read from datanode calculated in bytes per second",
"ops", "rate", interval);
}
}

Expand Down Expand Up @@ -316,6 +323,13 @@ public void addIncrementalBlockReport(long latency,
}
}

public void addReadTransferRate(long readTransferRate) {
this.readTransferRate.add(readTransferRate);
for (MutableQuantiles q : readTransferRateQuantiles) {
q.add(readTransferRate);
}
}

public void addCacheReport(long latency) {
cacheReports.add(latency);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,4 +1108,12 @@ public void testErrorMessageForInvalidNameservice() throws Exception {
LambdaTestUtils.intercept(IOException.class, expectedErrorMessage,
()->DFSUtil.getNNServiceRpcAddressesForCluster(conf));
}

@Test
public void testTransferRateBytesPerSecond() {
assertEquals(9830, DFSUtil.transferRateBytesPerSecond(983, 100));
assertEquals(983000, DFSUtil.transferRateBytesPerSecond(983, 0));
assertEquals(-1, DFSUtil.transferRateBytesPerSecond(-983, 100));
assertEquals(-1, DFSUtil.transferRateBytesPerSecond(983, -100));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ public void testTimeoutMetric() throws Exception {
@Test(timeout=120000)
public void testDataNodeTimeSpend() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + 60);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
final FileSystem fs = cluster.getFileSystem();
Expand All @@ -391,6 +392,7 @@ public void testDataNodeTimeSpend() throws Exception {

final long startWriteValue = getLongCounter("TotalWriteTime", rb);
final long startReadValue = getLongCounter("TotalReadTime", rb);
assertCounter("ReadTransferRateNumOps", 0L, rb);
final AtomicInteger x = new AtomicInteger(0);

// Lets Metric system update latest metrics
Expand All @@ -410,6 +412,8 @@ public Boolean get() {
MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name());
final long endWriteValue = getLongCounter("TotalWriteTime", rbNew);
final long endReadValue = getLongCounter("TotalReadTime", rbNew);
assertCounter("ReadTransferRateNumOps", 1L, rbNew);
assertQuantileGauges("ReadTransferRate" + "60s", rbNew, "Rate");
return endWriteValue > startWriteValue
&& endReadValue > startReadValue;
}
Expand Down