Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1054,5 +1054,13 @@ public class CommonConfigurationKeysPublic {
public static final String HADOOP_HTTP_IDLE_TIMEOUT_MS_KEY =
"hadoop.http.idle_timeout.ms";
public static final int HADOOP_HTTP_IDLE_TIMEOUT_MS_DEFAULT = 60000;

/**
* To configure scheduling of server metrics update thread. This config is used to indicate
* initial delay and delay between each execution of the metric update runnable thread.
*/
public static final String IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL =
"ipc.server.metrics.update.runner.interval";
public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000;
}

Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

import javax.security.sasl.Sasl;
Expand Down Expand Up @@ -127,6 +130,8 @@
import org.apache.hadoop.tracing.TraceUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.classification.VisibleForTesting;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream;
import org.apache.hadoop.thirdparty.protobuf.Message;
Expand Down Expand Up @@ -500,6 +505,11 @@ protected ResponseBuffer initialValue() {
private Responder responder = null;
private Handler[] handlers = null;
private final AtomicInteger numInProcessHandler = new AtomicInteger();
private final LongAdder totalRequests = new LongAdder();
private long lastSeenTotalRequests = 0;
private long totalRequestsPerSecond = 0;
private final long metricsUpdaterInterval;
private final ScheduledExecutorService scheduledExecutorService;

private boolean logSlowRPC = false;

Expand All @@ -515,6 +525,14 @@ public int getNumInProcessHandler() {
return numInProcessHandler.get();
}

public long getTotalRequests() {
return totalRequests.sum();
}

public long getTotalRequestsPerSecond() {
return totalRequestsPerSecond;
}

/**
* Sets slow RPC flag.
* @param logSlowRPCFlag input logSlowRPCFlag.
Expand Down Expand Up @@ -578,6 +596,7 @@ void logSlowRpcCalls(String methodName, Call call,
}

void updateMetrics(Call call, long startTime, boolean connDropped) {
totalRequests.increment();
// delta = handler + processing + response
long deltaNanos = Time.monotonicNowNanos() - startTime;
long timestampNanos = call.timestampNanos;
Expand Down Expand Up @@ -3304,6 +3323,14 @@ protected Server(String bindAddress, int port,
this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
this.exceptionsHandler.addTerseLoggingExceptions(
HealthCheckFailedException.class);
this.metricsUpdaterInterval =
conf.getLong(CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL,
CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT);
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Hadoop-Metrics-Updater-%d")
.build());
this.scheduledExecutorService.scheduleWithFixedDelay(new MetricsUpdateRunner(),
metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS);
}

public synchronized void addAuxiliaryListener(int auxiliaryPort)
Expand Down Expand Up @@ -3598,10 +3625,25 @@ public synchronized void stop() {
}
responder.interrupt();
notifyAll();
shutdownMetricsUpdaterExecutor();
this.rpcMetrics.shutdown();
this.rpcDetailedMetrics.shutdown();
}

private void shutdownMetricsUpdaterExecutor() {
this.scheduledExecutorService.shutdown();
try {
boolean isExecutorShutdown =
this.scheduledExecutorService.awaitTermination(3, TimeUnit.SECONDS);
if (!isExecutorShutdown) {
LOG.info("Hadoop Metrics Updater executor could not be shutdown.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("Hadoop Metrics Updater executor shutdown interrupted.", e);
}
}

/**
* Wait for the server to be stopped.
* Does not wait for all subthreads to finish.
Expand Down Expand Up @@ -4061,4 +4103,32 @@ protected int getMaxIdleTime() {
public String getServerName() {
return serverName;
}

/**
* Server metrics updater thread, used to update some metrics on a regular basis.
* For instance, requests per second.
*/
private class MetricsUpdateRunner implements Runnable {

private long lastExecuted = 0;

@Override
public synchronized void run() {
long currentTime = Time.monotonicNow();
if (lastExecuted == 0) {
lastExecuted = currentTime - metricsUpdaterInterval;
}
long currentTotalRequests = totalRequests.sum();
long totalRequestsDiff = currentTotalRequests - lastSeenTotalRequests;
lastSeenTotalRequests = currentTotalRequests;
if ((currentTime - lastExecuted) > 0) {
double totalRequestsPerSecInDouble =
(double) totalRequestsDiff / TimeUnit.MILLISECONDS.toSeconds(
currentTime - lastExecuted);
totalRequestsPerSecond = ((long) totalRequestsPerSecInDouble);
}
lastExecuted = currentTime;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ public String numOpenConnectionsPerUser() {
return server.getNumDroppedConnections();
}

@Metric("Number of total requests")
public long getTotalRequests() {
return server.getTotalRequests();
}

@Metric("Number of total requests per second")
public long getTotalRequestsPerSecond() {
return server.getTotalRequestsPerSecond();
}

public TimeUnit getMetricsTimeUnit() {
return metricsTimeUnit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
| `rpcLockWaitTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
| `rpcLockWaitTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
| `rpcLockWaitTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
| `TotalRequests` | Total num of requests served by the RPC server. |
| `TotalRequestsPerSeconds` | Total num of requests per second served by the RPC server. |

RetryCache/NameNodeRetryCache
-----------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hadoop.ipc;

import org.apache.hadoop.ipc.metrics.RpcMetrics;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -84,6 +86,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -1697,6 +1700,61 @@ public void testRpcMetricsInNanos() throws Exception {
}
}

@Test
public void testNumTotalRequestsMetrics() throws Exception {
UserGroupInformation ugi = UserGroupInformation.
createUserForTesting("userXyz", new String[0]);

final Server server = setupTestServer(conf, 1);

ExecutorService executorService = null;
try {
RpcMetrics rpcMetrics = server.getRpcMetrics();
assertEquals(0, rpcMetrics.getTotalRequests());
assertEquals(0, rpcMetrics.getTotalRequestsPerSecond());

List<ExternalCall<Void>> externalCallList = new ArrayList<>();

executorService = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("testNumTotalRequestsMetrics")
.build());
AtomicInteger rps = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(1);
executorService.submit(() -> {
while (true) {
int numRps = (int) rpcMetrics.getTotalRequestsPerSecond();
rps.getAndSet(numRps);
if (rps.get() > 0) {
countDownLatch.countDown();
break;
}
}
});

for (int i = 0; i < 100000; i++) {
externalCallList.add(newExtCall(ugi, () -> null));
}
for (ExternalCall<Void> externalCall : externalCallList) {
server.queueCall(externalCall);
}
for (ExternalCall<Void> externalCall : externalCallList) {
externalCall.get();
}

assertEquals(100000, rpcMetrics.getTotalRequests());
if (countDownLatch.await(10, TimeUnit.SECONDS)) {
assertTrue(rps.get() > 10);
} else {
throw new AssertionError("total requests per seconds are still 0");
}
} finally {
if (executorService != null) {
executorService.shutdown();
}
server.stop();
}
}


public static void main(String[] args) throws Exception {
new TestRPC().testCallsInternal(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
.getLogFile;
import static org.assertj.core.api.Assertions.assertThat;

import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Lists;
Expand Down Expand Up @@ -106,6 +108,8 @@ public void testJournalNodeSync() throws Exception {
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
assertThat(jCluster.getJournalNode(0).getRpcServer().getRpcServer().getRpcMetrics()
.getTotalRequests()).isGreaterThan(20);

// Generate some edit logs and delete one.
long firstTxId = generateEditLog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -123,6 +124,7 @@ public void testRequeueCall() throws Exception {
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);

NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(0);

dfs.create(testPath, (short)1).close();
assertSentTo(0);
Expand All @@ -132,6 +134,7 @@ public void testRequeueCall() throws Exception {
// be triggered and client should retry active NN.
dfs.getFileStatus(testPath);
assertSentTo(0);
assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(1);
// reset the original call queue
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
}
Expand Down