From cefeafac43f340e0b7764b6452c31986043239bb Mon Sep 17 00:00:00 2001 From: "ronghang.ye" Date: Mon, 27 Nov 2023 00:24:17 +0800 Subject: [PATCH] HBASE-28220 Split the queueCallTime, processCallTime and totalCallTime according to queue type when using RWQueueRpcExecutor --- .../hbase/ipc/MetricsHBaseServerSource.java | 39 ++++++++++ .../ipc/MetricsHBaseServerSourceImpl.java | 72 +++++++++++++++++++ .../hadoop/hbase/ipc/MetricsHBaseServer.java | 36 ++++++++++ .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 4 ++ .../org/apache/hadoop/hbase/ipc/RpcCall.java | 20 ++++++ .../apache/hadoop/hbase/ipc/RpcServer.java | 19 +++++ .../apache/hadoop/hbase/ipc/ServerCall.java | 13 ++++ .../hadoop/hbase/ipc/TestRpcMetrics.java | 21 ++++++ .../namequeues/TestNamedQueueRecorder.java | 9 +++ .../hbase/namequeues/TestRpcLogDetails.java | 9 +++ .../region/TestRegionProcedureStore.java | 9 +++ 11 files changed, 251 insertions(+) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index a1ec313f97a3..7eecf084a664 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -46,6 +46,27 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { String PROCESS_CALL_TIME_DESC = "Processing call time."; String TOTAL_CALL_TIME_NAME = "totalCallTime"; String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time."; + String QUEUE_READ_CALL_TIME_NAME = "queueReadCallTime"; + String QUEUE_READ_CALL_TIME_DESC = "Queue read call time."; + String PROCESS_READ_CALL_TIME_NAME = "processReadCallTime"; + String PROCESS_READ_CALL_TIME_DESC = "Process read call time."; + String TOTAL_READ_CALL_TIME_NAME = "totalReadCallTime"; + String TOTAL_READ_CALL_TIME_DESC = + "total read call time, including both queued and processing time."; + String QUEUE_WRITE_CALL_TIME_NAME = "queueWriteCallTime"; + String QUEUE_WRITE_CALL_TIME_DESC = "Queue write call time."; + String PROCESS_WRITE_CALL_TIME_NAME = "processWriteCallTime"; + String PROCESS_WRITE_CALL_TIME_DESC = "Process write call time."; + String TOTAL_WRITE_CALL_TIME_NAME = "totalWriteCallTime"; + String TOTAL_WRITE_CALL_TIME_DESC = + "total write call time, including both queued and processing time."; + String QUEUE_SCAN_CALL_TIME_NAME = "queueScanCallTime"; + String QUEUE_SCAN_CALL_TIME_DESC = "Queue scan call time."; + String PROCESS_SCAN_CALL_TIME_NAME = "processScanCallTime"; + String PROCESS_SCAN_CALL_TIME_DESC = "process scan call time."; + String TOTAL_SCAN_CALL_TIME_NAME = "totalScanCallTime"; + String TOTAL_SCAN_CALL_TIME_DESC = + "total scan call time, including both queued and processing time."; String QUEUE_SIZE_NAME = "queueSize"; String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and " + "parsed and is waiting to run or is currently being executed."; @@ -117,4 +138,22 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { void processedCall(int processingTime); void queuedAndProcessedCall(int totalTime); + + void dequeuedReadCall(int qTime); + + void processReadCall(int processingTime); + + void queuedAndProcessedReadCall(int totalTime); + + void dequeuedWriteCall(int qTime); + + void processWriteCall(int processingTime); + + void queuedAndProcessedWriteCall(int totalTime); + + void dequeuedScanCall(int qTime); + + void processScanCall(int processingTime); + + void queuedAndProcessedScanCall(int totalTime); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index 440ebc6f5a6c..8052a1f4fb58 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -40,6 +40,15 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl private MetricHistogram queueCallTime; private MetricHistogram processCallTime; private MetricHistogram totalCallTime; + private MetricHistogram queueReadCallTime; + private MetricHistogram processReadCallTime; + private MetricHistogram totalReadCallTime; + private MetricHistogram queueWriteCallTime; + private MetricHistogram processWriteCallTime; + private MetricHistogram totalWriteCallTime; + private MetricHistogram queueScanCallTime; + private MetricHistogram processScanCallTime; + private MetricHistogram totalScanCallTime; private MetricHistogram requestSize; private MetricHistogram responseSize; @@ -67,6 +76,24 @@ public MetricsHBaseServerSourceImpl(String metricsName, String metricsDescriptio this.getMetricsRegistry().newTimeHistogram(PROCESS_CALL_TIME_NAME, PROCESS_CALL_TIME_DESC); this.totalCallTime = this.getMetricsRegistry().newTimeHistogram(TOTAL_CALL_TIME_NAME, TOTAL_CALL_TIME_DESC); + this.queueReadCallTime = this.getMetricsRegistry().newTimeHistogram(QUEUE_READ_CALL_TIME_NAME, + QUEUE_READ_CALL_TIME_DESC); + this.processReadCallTime = this.getMetricsRegistry() + .newTimeHistogram(PROCESS_READ_CALL_TIME_NAME, PROCESS_READ_CALL_TIME_DESC); + this.totalReadCallTime = this.getMetricsRegistry().newTimeHistogram(TOTAL_READ_CALL_TIME_NAME, + TOTAL_READ_CALL_TIME_DESC); + this.queueWriteCallTime = this.getMetricsRegistry().newTimeHistogram(QUEUE_WRITE_CALL_TIME_NAME, + QUEUE_WRITE_CALL_TIME_DESC); + this.processWriteCallTime = this.getMetricsRegistry() + .newTimeHistogram(PROCESS_WRITE_CALL_TIME_NAME, PROCESS_WRITE_CALL_TIME_DESC); + this.totalWriteCallTime = this.getMetricsRegistry().newTimeHistogram(TOTAL_WRITE_CALL_TIME_NAME, + TOTAL_WRITE_CALL_TIME_DESC); + this.queueScanCallTime = this.getMetricsRegistry().newTimeHistogram(QUEUE_SCAN_CALL_TIME_NAME, + QUEUE_SCAN_CALL_TIME_DESC); + this.processScanCallTime = this.getMetricsRegistry() + .newTimeHistogram(PROCESS_SCAN_CALL_TIME_NAME, PROCESS_SCAN_CALL_TIME_DESC); + this.totalScanCallTime = this.getMetricsRegistry().newTimeHistogram(TOTAL_SCAN_CALL_TIME_NAME, + TOTAL_SCAN_CALL_TIME_DESC); this.requestSize = this.getMetricsRegistry().newSizeHistogram(REQUEST_SIZE_NAME, REQUEST_SIZE_DESC); this.responseSize = @@ -133,6 +160,51 @@ public void queuedAndProcessedCall(int totalTime) { totalCallTime.add(totalTime); } + @Override + public void dequeuedReadCall(int qTime) { + queueReadCallTime.add(qTime); + } + + @Override + public void processReadCall(int processingTime) { + processReadCallTime.add(processingTime); + } + + @Override + public void queuedAndProcessedReadCall(int totalTime) { + totalReadCallTime.add(totalTime); + } + + @Override + public void dequeuedWriteCall(int qTime) { + queueWriteCallTime.add(qTime); + } + + @Override + public void processWriteCall(int processingTime) { + processWriteCallTime.add(processingTime); + } + + @Override + public void queuedAndProcessedWriteCall(int totalTime) { + totalWriteCallTime.add(totalTime); + } + + @Override + public void dequeuedScanCall(int qTime) { + queueScanCallTime.add(qTime); + } + + @Override + public void processScanCall(int processingTime) { + processScanCallTime.add(processingTime); + } + + @Override + public void queuedAndProcessedScanCall(int totalTime) { + totalScanCallTime.add(totalTime); + } + @Override public void getMetrics(MetricsCollector metricsCollector, boolean all) { MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index a4c73f925d3c..ae51962db23e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -97,6 +97,42 @@ void totalCall(int totalTime) { source.queuedAndProcessedCall(totalTime); } + void dequeuedReadCall(int qTime) { + source.dequeuedReadCall(qTime); + } + + void processReadCall(int processingTime) { + source.processReadCall(processingTime); + } + + void totalReadCall(int totalTime) { + source.queuedAndProcessedReadCall(totalTime); + } + + void dequeuedWriteCall(int qTime) { + source.dequeuedWriteCall(qTime); + } + + void processWriteCall(int processingTime) { + source.processWriteCall(processingTime); + } + + void totalWriteCall(int totalTime) { + source.queuedAndProcessedWriteCall(totalTime); + } + + void dequeuedScanCall(int qTime) { + source.dequeuedScanCall(qTime); + } + + void processScanCall(int processingTime) { + source.processScanCall(processingTime); + } + + void totalScanCall(int totalTime) { + source.queuedAndProcessedScanCall(totalTime); + } + public void exception(Throwable throwable) { source.exception(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index bdb10919bf10..5ec28cfeb623 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -142,13 +142,17 @@ public boolean dispatch(final CallRunner callTask) { protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue, final CallRunner callTask) { + RpcCall call = callTask.getRpcCall(); int queueIndex; if (toWriteQueue) { queueIndex = writeBalancer.getNextQueue(callTask); + call.setQueueType(RpcCall.CallQueueType.WRITE); } else if (toScanQueue) { queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(callTask); + call.setQueueType(RpcCall.CallQueueType.SCAN); } else { queueIndex = numWriteQueues + readBalancer.getNextQueue(callTask); + call.setQueueType(RpcCall.CallQueueType.READ); } Queue queue = queues.get(queueIndex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index 197ddb71d7e6..1ae4af5d1393 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -107,4 +107,24 @@ public interface RpcCall extends RpcCallContext { /** Returns A short string format of this call without possibly lengthy params */ String toShortString(); + + /** + * The queue type where the call is located. If {@link RWQueueRpcExecutor} is used, the queue type + * can be divided into read, write and scan. + */ + static enum CallQueueType { + DEFAULT, + WRITE, + READ, + SCAN + } + + /** Returns The queue type of this call. */ + CallQueueType getQueueType(); + + /** + * Set the queue type of this call. + * @param type The queue type where the call is located. + */ + void setQueueType(CallQueueType type); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 39aca2c54fb1..a5b64f5b4d07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -419,6 +419,25 @@ public Pair call(RpcCall call, MonitoredRPCHandler status) int processingTime = (int) (endTime - startTime); int qTime = (int) (startTime - receiveTime); int totalTime = (int) (endTime - receiveTime); + switch (call.getQueueType()) { + case READ: + metrics.dequeuedReadCall(qTime); + metrics.processReadCall(processingTime); + metrics.totalReadCall(totalTime); + break; + case WRITE: + metrics.dequeuedWriteCall(qTime); + metrics.processWriteCall(processingTime); + metrics.totalWriteCall(totalTime); + break; + case SCAN: + metrics.dequeuedScanCall(qTime); + metrics.processScanCall(processingTime); + metrics.totalScanCall(totalTime); + break; + case DEFAULT: + break; + } if (LOG.isTraceEnabled()) { LOG.trace( "{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, totalTime: {}", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 5adc520d904d..a9365af09712 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -76,6 +76,8 @@ public abstract class ServerCall implements RpcCa protected long startTime; protected final long deadline;// the deadline to handle this call, if exceed we can drop it. + protected CallQueueType callQueueType; + protected final ByteBuffAllocator bbAllocator; protected final CellBlockBuilder cellBlockBuilder; @@ -141,6 +143,7 @@ public abstract class ServerCall implements RpcCa this.cellBlockBuilder = cellBlockBuilder; this.reqCleanup = reqCleanup; this.span = Span.current(); + this.callQueueType = CallQueueType.DEFAULT; } /** @@ -578,4 +581,14 @@ public synchronized BufferChain getResponse() { public synchronized RpcCallback getCallBack() { return this.rpcCallback; } + + @Override + public CallQueueType getQueueType() { + return callQueueType; + } + + @Override + public void setQueueType(CallQueueType type) { + this.callQueueType = type; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java index 288bb3fe2624..76e45da09d25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java @@ -125,6 +125,27 @@ public void testSourceMethods() { HELPER.assertCounter("processCallTime_NumOps", 1, serverSource); HELPER.assertCounter("totalCallTime_NumOps", 1, serverSource); + mrpc.dequeuedReadCall(100); + mrpc.processReadCall(101); + mrpc.totalReadCall(102); + HELPER.assertCounter("queueReadCallTime_NumOps", 1, serverSource); + HELPER.assertCounter("processReadCallTime_NumOps", 1, serverSource); + HELPER.assertCounter("totalReadCallTime_NumOps", 1, serverSource); + + mrpc.dequeuedWriteCall(100); + mrpc.processWriteCall(101); + mrpc.totalWriteCall(102); + HELPER.assertCounter("queueWriteCallTime_NumOps", 1, serverSource); + HELPER.assertCounter("processWriteCallTime_NumOps", 1, serverSource); + HELPER.assertCounter("totalWriteCallTime_NumOps", 1, serverSource); + + mrpc.dequeuedScanCall(100); + mrpc.processScanCall(101); + mrpc.totalScanCall(102); + HELPER.assertCounter("queueScanCallTime_NumOps", 1, serverSource); + HELPER.assertCounter("processScanCallTime_NumOps", 1, serverSource); + HELPER.assertCounter("totalScanCallTime_NumOps", 1, serverSource); + mrpc.sentBytes(103); mrpc.sentBytes(103); mrpc.sentBytes(103); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 8ae27478a187..0ed4eb1c2742 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -684,6 +684,15 @@ public long getResponseExceptionSize() { @Override public void incrementResponseExceptionSize(long exceptionSize) { } + + @Override + public CallQueueType getQueueType() { + return CallQueueType.DEFAULT; + } + + @Override + public void setQueueType(CallQueueType type) { + } }; return rpcCall; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java index 19c13069d262..20c541f03ce6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java @@ -242,6 +242,15 @@ public long getResponseExceptionSize() { @Override public void incrementResponseExceptionSize(long exceptionSize) { } + + @Override + public CallQueueType getQueueType() { + return CallQueueType.DEFAULT; + } + + @Override + public void setQueueType(CallQueueType type) { + } }; return rpcCall; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index 76c84cef9a3a..cf92185484b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -303,6 +303,15 @@ public long getResponseExceptionSize() { @Override public void incrementResponseExceptionSize(long exceptionSize) { } + + @Override + public CallQueueType getQueueType() { + return CallQueueType.DEFAULT; + } + + @Override + public void setQueueType(CallQueueType type) { + } }; } }