diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 69bd040e7f95..3730bce36dbf 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -97,6 +97,10 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { String NUM_LIFO_MODE_SWITCHES_NAME = "numLifoModeSwitches"; String NUM_LIFO_MODE_SWITCHES_DESC = "Total number of calls in general queue which " + "were served from the tail of the queue"; + String NUM_CALL_RESPONSE_QUEUE_NAME = "numCallsInResponseQueue"; + String NUM_CALL_RESPONSE_QUEUE_DESC = "Number of calls in response queue."; + String NUM_SIZE_RESPONSE_QUEUE_NAME = "numSizeInResponseQueue"; + String NUM_SIZE_RESPONSE_QUEUE_DESC = "Size in response queue."; // Direct Memory Usage metrics String NETTY_DM_USAGE_NAME = "nettyDirectMemoryUsage"; @@ -126,4 +130,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { void processedCall(int processingTime); void queuedAndProcessedCall(int totalTime); + + void addCallToResponseQueue(long size); + + void removeCallFromResponseQueue(long size); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index e4fee95e2c4d..ebf7e709795e 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -38,6 +38,8 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl private final MutableFastCounter authenticationFallbacks; private final MutableFastCounter sentBytes; private final MutableFastCounter receivedBytes; + private final MutableFastCounter numCallsInResponseQueue; + private final MutableFastCounter numSizeInResponseQueue; private MetricHistogram queueCallTime; @@ -78,6 +80,10 @@ public MetricsHBaseServerSourceImpl(String metricsName, REQUEST_SIZE_DESC); this.responseSize = this.getMetricsRegistry().newSizeHistogram(RESPONSE_SIZE_NAME, RESPONSE_SIZE_DESC); + this.numCallsInResponseQueue = this.getMetricsRegistry().newCounter(NUM_CALL_RESPONSE_QUEUE_NAME, + NUM_CALL_RESPONSE_QUEUE_DESC, 0L); + this.numSizeInResponseQueue = this.getMetricsRegistry().newCounter(NUM_SIZE_RESPONSE_QUEUE_NAME, + NUM_SIZE_RESPONSE_QUEUE_DESC, 0L); } @Override @@ -140,6 +146,18 @@ public void queuedAndProcessedCall(int totalTime) { totalCallTime.add(totalTime); } + @Override + public void addCallToResponseQueue(long size) { + numCallsInResponseQueue.incr(); + numSizeInResponseQueue.incr(size); + } + + @Override + public void removeCallFromResponseQueue(long size) { + numCallsInResponseQueue.incr(-1); + numSizeInResponseQueue.incr(-size); + } + @Override public void getMetrics(MetricsCollector metricsCollector, boolean all) { MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java index 915b82df4261..45b61b3ea9d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java @@ -63,6 +63,10 @@ boolean hasRemaining() { return remaining > 0; } + int getRemaining() { + return remaining; + } + /** * Write out our chain of buffers in chunks * @param channel Where to write diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index 9d5373ce0486..94a5a3456a42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -86,6 +86,14 @@ void totalCall(int totalTime) { source.queuedAndProcessedCall(totalTime); } + void addCallToResponseQueue(int size) { + source.addCallToResponseQueue(size); + } + + void removeCallFromResponseQueue(int size) { + source.removeCallFromResponseQueue(size); + } + public void exception(Throwable throwable) { source.exception(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java index b68da56a217a..8e641bd4e41c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java @@ -269,8 +269,10 @@ private boolean processAllResponses(final SimpleServerRpcConnection connection) if (resp == null) { return true; } + simpleRpcServer.getMetrics().removeCallFromResponseQueue(resp.getResponse().getRemaining()); if (!processResponse(connection, resp)) { connection.responseQueue.addFirst(resp); + simpleRpcServer.getMetrics().addCallToResponseQueue(resp.getResponse().getRemaining()); return false; } } @@ -298,6 +300,7 @@ void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOExcept } // Too big to fit, putting ahead. conn.responseQueue.addFirst(resp); + simpleRpcServer.getMetrics().addCallToResponseQueue(resp.getResponse().getRemaining()); added = true; // We will register to the selector later, outside of the lock. } } finally { @@ -307,6 +310,7 @@ void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOExcept if (!added) { conn.responseQueue.addLast(resp); + simpleRpcServer.getMetrics().addCallToResponseQueue(resp.getResponse().getRemaining()); } registerForWrite(conn); }