Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final boolean RPC_METRICS_QUANTILE_ENABLE_DEFAULT = false;
public static final String RPC_METRICS_PERCENTILES_INTERVALS_KEY =
"rpc.metrics.percentiles.intervals";


public static final String RPC_METRICS_TIME_UNIT = "rpc.metrics.timeunit";

/** Allowed hosts for nfs exports */
public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,8 +725,9 @@ public void addResponseTime(String callName, Schedulable schedulable,
addCost(user, processingCost);

int priorityLevel = schedulable.getPriorityLevel();
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
long queueTime = details.get(Timing.QUEUE, RpcMetrics.getMetricsTimeUnit());
long processingTime = details.get(Timing.PROCESSING,
RpcMetrics.getMetricsTimeUnit());

this.decayRpcSchedulerDetailedMetrics.addQueueTime(
priorityLevel, queueTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ default void addResponseTime(String callName, Schedulable schedulable,
// this interface, a default implementation is supplied which uses the old
// method. All new implementations MUST override this interface and should
// NOT use the other addResponseTime method.
int queueTime = (int)
details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
int processingTime = (int)
details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
int queueTime = (int) details.get(ProcessingDetails.Timing.QUEUE,
RpcMetrics.getMetricsTimeUnit());
int processingTime = (int) details.get(ProcessingDetails.Timing.PROCESSING,
RpcMetrics.getMetricsTimeUnit());
addResponseTime(callName, schedulable.getPriorityLevel(),
queueTime, processingTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,13 @@ void logSlowRpcCalls(String methodName, Call call,
(rpcMetrics.getProcessingStdDev() * deviation);

long processingTime =
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
details.get(Timing.PROCESSING, RpcMetrics.getMetricsTimeUnit());
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) {
LOG.warn(
"Slow RPC : {} took {} {} to process from client {},"
+ " the processing detail is {}",
methodName, processingTime, RpcMetrics.TIMEUNIT, call,
methodName, processingTime, RpcMetrics.getMetricsTimeUnit(), call,
details.toString());
rpcMetrics.incrSlowRpc();
}
Expand All @@ -570,7 +570,7 @@ void updateMetrics(Call call, long startTime, boolean connDropped) {
deltaNanos -= details.get(Timing.RESPONSE);
details.set(Timing.HANDLER, deltaNanos);

long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
long queueTime = details.get(Timing.QUEUE, RpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcQueueTime(queueTime);

if (call.isResponseDeferred() || connDropped) {
Expand All @@ -579,9 +579,9 @@ void updateMetrics(Call call, long startTime, boolean connDropped) {
}

long processingTime =
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
details.get(Timing.PROCESSING, RpcMetrics.getMetricsTimeUnit());
long waitTime =
details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
details.get(Timing.LOCKWAIT, RpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcLockWaitTime(waitTime);
rpcMetrics.addRpcProcessingTime(processingTime);
// don't include lock wait for detailed metrics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.Server;
Expand Down Expand Up @@ -49,7 +51,7 @@ public class RpcMetrics {
final String name;
final boolean rpcQuantileEnable;
/** The time unit used when storing/accessing time durations. */
public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS;
private static TimeUnit metricsTimeUnit = TimeUnit.MILLISECONDS;

RpcMetrics(Server server, Configuration conf) {
String port = String.valueOf(server.getListenerAddress().getPort());
Expand All @@ -75,19 +77,19 @@ public class RpcMetrics {
for (int i = 0; i < intervals.length; i++) {
int interval = intervals[i];
rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
+ interval + "s", "rpc queue time in " + TIMEUNIT, "ops",
+ interval + "s", "rpc queue time in " + metricsTimeUnit, "ops",
"latency", interval);
rpcLockWaitTimeQuantiles[i] = registry.newQuantiles(
"rpcLockWaitTime" + interval + "s",
"rpc lock wait time in " + TIMEUNIT, "ops",
"rpc lock wait time in " + metricsTimeUnit, "ops",
"latency", interval);
rpcProcessingTimeQuantiles[i] = registry.newQuantiles(
"rpcProcessingTime" + interval + "s",
"rpc processing time in " + TIMEUNIT, "ops",
"rpc processing time in " + metricsTimeUnit, "ops",
"latency", interval);
deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
"deferredRpcProcessingTime" + interval + "s",
"deferred rpc processing time in " + TIMEUNIT, "ops",
"deferred rpc processing time in " + metricsTimeUnit, "ops",
"latency", interval);
}
}
Expand All @@ -97,10 +99,27 @@ public class RpcMetrics {
public String name() { return name; }

public static RpcMetrics create(Server server, Configuration conf) {
if (server instanceof RPC.Server) {
setMetricTimeUnit(conf);
}
RpcMetrics m = new RpcMetrics(server, conf);
return DefaultMetricsSystem.instance().register(m.name, null, m);
}

private static void setMetricTimeUnit(Configuration conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be error-prone because a process can have multiple Server binding at different ports at the same time.
So metricsTimeUnit is initialized multiple times for each Server creation. Hopefully they end up using the same metric unit. Otherwise this could hypothetically create confusion later on.

Copy link
Contributor Author

@virajjasani virajjasani Jul 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, let me make it this way. If server is instance of RPC.Server (which is LimitedPrivate for Commons, HDFS, Yarn, MapReduce), we can make this change so that for anyone extending this Server from downstream application does not interfere with core-site config used in core Hadoop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor Author

@virajjasani virajjasani Jul 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is recently addressed better by keeping metricsTimeUnit as non-static final.

String timeunit = conf.get(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT);
if (StringUtils.isNotEmpty(timeunit)) {
try {
metricsTimeUnit = TimeUnit.valueOf(timeunit);
} catch (IllegalArgumentException e) {
LOG.info("Config key {} 's value {} does not correspond to enum values"
+ " of java.util.concurrent.TimeUnit. Hence default unit"
+ " MILLISECONDS will be used",
CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, timeunit);
}
}
}

@Metric("Number of received bytes") MutableCounterLong receivedBytes;
@Metric("Number of sent bytes") MutableCounterLong sentBytes;
@Metric("Queue time") MutableRate rpcQueueTime;
Expand Down Expand Up @@ -141,6 +160,10 @@ public String numOpenConnectionsPerUser() {
return server.getNumDroppedConnections();
}

public static TimeUnit getMetricsTimeUnit() {
return metricsTimeUnit;
}

// Public instrumentation methods that could be extracted to an
// abstract class if we decide to do custom instrumentation classes a la
// JobTrackerInstrumentation. The methods with //@Override comment are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3312,6 +3312,21 @@
</description>
</property>

<property>
<name>rpc.metrics.timeunit</name>
<value>MILLISECONDS</value>
<description>
This property is used to configure timeunit for various RPC Metrics
e.g rpcQueueTime, rpcLockWaitTime, rpcProcessingTime,
deferredRpcProcessingTime. In the absence of this property,
default timeunit used is milliseconds.
The value of this property should match to any one value of enum:
java.util.concurrent.TimeUnit.
Some of the valid values: NANOSECONDS, MICROSECONDS, MILLISECONDS,
SECONDS etc.
</description>
</property>

<property>
<name>rpc.metrics.percentiles.intervals</name>
<value></value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ rpc
---

Each metrics record contains tags such as Hostname and port (number to which server is bound) as additional information along with metrics.
`rpc.metrics.timeunit` config can be used to configure timeunit for RPC metrics.
The default timeunit used for RPC metrics is milliseconds (as per the below description).

| Name | Description |
|:---- |:---- |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,8 +1098,8 @@ public TestRpcService run() {
proxy.lockAndSleep(null, newSleepRequest(5));
rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertGauge("RpcLockWaitTimeAvgTime",
(double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
rpcMetrics);
(double)(RpcMetrics.getMetricsTimeUnit().convert(10L,
TimeUnit.SECONDS)), rpcMetrics);
} finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);
Expand Down Expand Up @@ -1603,6 +1603,70 @@ public void testSetProtocolEngine() {
assertTrue(rpcEngine instanceof StoppedRpcEngine);
}

@Test
public void testRpcMetricsInNanos() throws Exception {
final Server server;
TestRpcService proxy = null;

final int interval = 1;
conf.setBoolean(CommonConfigurationKeys.
RPC_METRICS_QUANTILE_ENABLE, true);
conf.set(CommonConfigurationKeys.
RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
conf.set(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, "NANOSECONDS");

server = setupTestServer(conf, 5);
String testUser = "testUserInNanos";
UserGroupInformation anotherUser =
UserGroupInformation.createRemoteUser(testUser);
TestRpcService proxy2 =
anotherUser.doAs((PrivilegedAction<TestRpcService>) () -> {
try {
return RPC.getProxy(TestRpcService.class, 0,
server.getListenerAddress(), conf);
} catch (IOException e) {
LOG.error("Something went wrong.", e);
}
return null;
});
try {
proxy = getClient(addr, conf);
for (int i = 0; i < 1000; i++) {
proxy.ping(null, newEmptyRequest());
proxy.echo(null, newEchoRequest("" + i));
proxy2.echo(null, newEchoRequest("" + i));
}
MetricsRecordBuilder rpcMetrics =
getMetrics(server.getRpcMetrics().name());
assertEquals("Expected zero rpc lock wait time",
0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
rpcMetrics);
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
rpcMetrics);

proxy.lockAndSleep(null, newSleepRequest(5));
rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertGauge("RpcLockWaitTimeAvgTime",
(double)(RpcMetrics.getMetricsTimeUnit().convert(10L,
TimeUnit.SECONDS)), rpcMetrics);
LOG.info("RpcProcessingTimeAvgTime: {} , RpcQueueTimeAvgTime: {}",
getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics),
getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics));

assertTrue(getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics)
> 4000000D);
assertTrue(getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics)
> 4000D);
} finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);
}
stop(server, proxy);
}
}


public static void main(String[] args) throws Exception {
new TestRPC().testCallsInternal(conf);
}
Expand Down