Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String PROCESS_CALL_TIME_DESC = "Processing call time.";
String TOTAL_CALL_TIME_NAME = "totalCallTime";
String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";
String QUEUE_READ_CALL_TIME_NAME = "queueReadCallTime";
String QUEUE_READ_CALL_TIME_DESC = "Queue read call time.";
String PROCESS_READ_CALL_TIME_NAME = "processReadCallTime";
String PROCESS_READ_CALL_TIME_DESC = "Process read call time.";
String TOTAL_READ_CALL_TIME_NAME = "totalReadCallTime";
String TOTAL_READ_CALL_TIME_DESC =
"total read call time, including both queued and processing time.";
String QUEUE_WRITE_CALL_TIME_NAME = "queueWriteCallTime";
String QUEUE_WRITE_CALL_TIME_DESC = "Queue write call time.";
String PROCESS_WRITE_CALL_TIME_NAME = "processWriteCallTime";
String PROCESS_WRITE_CALL_TIME_DESC = "Process write call time.";
String TOTAL_WRITE_CALL_TIME_NAME = "totalWriteCallTime";
String TOTAL_WRITE_CALL_TIME_DESC =
"total write call time, including both queued and processing time.";
String QUEUE_SCAN_CALL_TIME_NAME = "queueScanCallTime";
String QUEUE_SCAN_CALL_TIME_DESC = "Queue scan call time.";
String PROCESS_SCAN_CALL_TIME_NAME = "processScanCallTime";
String PROCESS_SCAN_CALL_TIME_DESC = "process scan call time.";
String TOTAL_SCAN_CALL_TIME_NAME = "totalScanCallTime";
String TOTAL_SCAN_CALL_TIME_DESC =
"total scan call time, including both queued and processing time.";
String QUEUE_SIZE_NAME = "queueSize";
String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and "
+ "parsed and is waiting to run or is currently being executed.";
Expand Down Expand Up @@ -117,4 +138,22 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
void processedCall(int processingTime);

void queuedAndProcessedCall(int totalTime);

void dequeuedReadCall(int qTime);

void processReadCall(int processingTime);

void queuedAndProcessedReadCall(int totalTime);

void dequeuedWriteCall(int qTime);

void processWriteCall(int processingTime);

void queuedAndProcessedWriteCall(int totalTime);

void dequeuedScanCall(int qTime);

void processScanCall(int processingTime);

void queuedAndProcessedScanCall(int totalTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl
private MetricHistogram queueCallTime;
private MetricHistogram processCallTime;
private MetricHistogram totalCallTime;
private MetricHistogram queueReadCallTime;
private MetricHistogram processReadCallTime;
private MetricHistogram totalReadCallTime;
private MetricHistogram queueWriteCallTime;
private MetricHistogram processWriteCallTime;
private MetricHistogram totalWriteCallTime;
private MetricHistogram queueScanCallTime;
private MetricHistogram processScanCallTime;
private MetricHistogram totalScanCallTime;
private MetricHistogram requestSize;
private MetricHistogram responseSize;

Expand Down Expand Up @@ -67,6 +76,24 @@ public MetricsHBaseServerSourceImpl(String metricsName, String metricsDescriptio
this.getMetricsRegistry().newTimeHistogram(PROCESS_CALL_TIME_NAME, PROCESS_CALL_TIME_DESC);
this.totalCallTime =
this.getMetricsRegistry().newTimeHistogram(TOTAL_CALL_TIME_NAME, TOTAL_CALL_TIME_DESC);
this.queueReadCallTime = this.getMetricsRegistry().newTimeHistogram(QUEUE_READ_CALL_TIME_NAME,
QUEUE_READ_CALL_TIME_DESC);
this.processReadCallTime = this.getMetricsRegistry()
.newTimeHistogram(PROCESS_READ_CALL_TIME_NAME, PROCESS_READ_CALL_TIME_DESC);
this.totalReadCallTime = this.getMetricsRegistry().newTimeHistogram(TOTAL_READ_CALL_TIME_NAME,
TOTAL_READ_CALL_TIME_DESC);
this.queueWriteCallTime = this.getMetricsRegistry().newTimeHistogram(QUEUE_WRITE_CALL_TIME_NAME,
QUEUE_WRITE_CALL_TIME_DESC);
this.processWriteCallTime = this.getMetricsRegistry()
.newTimeHistogram(PROCESS_WRITE_CALL_TIME_NAME, PROCESS_WRITE_CALL_TIME_DESC);
this.totalWriteCallTime = this.getMetricsRegistry().newTimeHistogram(TOTAL_WRITE_CALL_TIME_NAME,
TOTAL_WRITE_CALL_TIME_DESC);
this.queueScanCallTime = this.getMetricsRegistry().newTimeHistogram(QUEUE_SCAN_CALL_TIME_NAME,
QUEUE_SCAN_CALL_TIME_DESC);
this.processScanCallTime = this.getMetricsRegistry()
.newTimeHistogram(PROCESS_SCAN_CALL_TIME_NAME, PROCESS_SCAN_CALL_TIME_DESC);
this.totalScanCallTime = this.getMetricsRegistry().newTimeHistogram(TOTAL_SCAN_CALL_TIME_NAME,
TOTAL_SCAN_CALL_TIME_DESC);
this.requestSize =
this.getMetricsRegistry().newSizeHistogram(REQUEST_SIZE_NAME, REQUEST_SIZE_DESC);
this.responseSize =
Expand Down Expand Up @@ -133,6 +160,51 @@ public void queuedAndProcessedCall(int totalTime) {
totalCallTime.add(totalTime);
}

@Override
public void dequeuedReadCall(int qTime) {
queueReadCallTime.add(qTime);
}

@Override
public void processReadCall(int processingTime) {
processReadCallTime.add(processingTime);
}

@Override
public void queuedAndProcessedReadCall(int totalTime) {
totalReadCallTime.add(totalTime);
}

@Override
public void dequeuedWriteCall(int qTime) {
queueWriteCallTime.add(qTime);
}

@Override
public void processWriteCall(int processingTime) {
processWriteCallTime.add(processingTime);
}

@Override
public void queuedAndProcessedWriteCall(int totalTime) {
totalWriteCallTime.add(totalTime);
}

@Override
public void dequeuedScanCall(int qTime) {
queueScanCallTime.add(qTime);
}

@Override
public void processScanCall(int processingTime) {
processScanCallTime.add(processingTime);
}

@Override
public void queuedAndProcessedScanCall(int totalTime) {
totalScanCallTime.add(totalTime);
}

@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,42 @@ void totalCall(int totalTime) {
source.queuedAndProcessedCall(totalTime);
}

void dequeuedReadCall(int qTime) {
source.dequeuedReadCall(qTime);
}

void processReadCall(int processingTime) {
source.processReadCall(processingTime);
}

void totalReadCall(int totalTime) {
source.queuedAndProcessedReadCall(totalTime);
}

void dequeuedWriteCall(int qTime) {
source.dequeuedWriteCall(qTime);
}

void processWriteCall(int processingTime) {
source.processWriteCall(processingTime);
}

void totalWriteCall(int totalTime) {
source.queuedAndProcessedWriteCall(totalTime);
}

void dequeuedScanCall(int qTime) {
source.dequeuedScanCall(qTime);
}

void processScanCall(int processingTime) {
source.processScanCall(processingTime);
}

void totalScanCall(int totalTime) {
source.queuedAndProcessedScanCall(totalTime);
}

public void exception(Throwable throwable) {
source.exception();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,17 @@ public boolean dispatch(final CallRunner callTask) {

protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
final CallRunner callTask) {
RpcCall call = callTask.getRpcCall();
int queueIndex;
if (toWriteQueue) {
queueIndex = writeBalancer.getNextQueue(callTask);
call.setQueueType(RpcCall.CallQueueType.WRITE);
} else if (toScanQueue) {
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(callTask);
call.setQueueType(RpcCall.CallQueueType.SCAN);
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue(callTask);
call.setQueueType(RpcCall.CallQueueType.READ);
}

Queue<CallRunner> queue = queues.get(queueIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,24 @@ public interface RpcCall extends RpcCallContext {

/** Returns A short string format of this call without possibly lengthy params */
String toShortString();

/**
* The queue type where the call is located. If {@link RWQueueRpcExecutor} is used, the queue type
* can be divided into read, write and scan.
*/
static enum CallQueueType {
DEFAULT,
WRITE,
READ,
SCAN
}

/** Returns The queue type of this call. */
CallQueueType getQueueType();

/**
* Set the queue type of this call.
* @param type The queue type where the call is located.
*/
void setQueueType(CallQueueType type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,25 @@ public Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
int totalTime = (int) (endTime - receiveTime);
switch (call.getQueueType()) {
case READ:
metrics.dequeuedReadCall(qTime);
metrics.processReadCall(processingTime);
metrics.totalReadCall(totalTime);
break;
case WRITE:
metrics.dequeuedWriteCall(qTime);
metrics.processWriteCall(processingTime);
metrics.totalWriteCall(totalTime);
break;
case SCAN:
metrics.dequeuedScanCall(qTime);
metrics.processScanCall(processingTime);
metrics.totalScanCall(totalTime);
break;
case DEFAULT:
break;
}
if (LOG.isTraceEnabled()) {
LOG.trace(
"{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, totalTime: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
protected long startTime;
protected final long deadline;// the deadline to handle this call, if exceed we can drop it.

protected CallQueueType callQueueType;

protected final ByteBuffAllocator bbAllocator;

protected final CellBlockBuilder cellBlockBuilder;
Expand Down Expand Up @@ -141,6 +143,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
this.cellBlockBuilder = cellBlockBuilder;
this.reqCleanup = reqCleanup;
this.span = Span.current();
this.callQueueType = CallQueueType.DEFAULT;
}

/**
Expand Down Expand Up @@ -578,4 +581,14 @@ public synchronized BufferChain getResponse() {
public synchronized RpcCallback getCallBack() {
return this.rpcCallback;
}

@Override
public CallQueueType getQueueType() {
return callQueueType;
}

@Override
public void setQueueType(CallQueueType type) {
this.callQueueType = type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,27 @@ public void testSourceMethods() {
HELPER.assertCounter("processCallTime_NumOps", 1, serverSource);
HELPER.assertCounter("totalCallTime_NumOps", 1, serverSource);

mrpc.dequeuedReadCall(100);
mrpc.processReadCall(101);
mrpc.totalReadCall(102);
HELPER.assertCounter("queueReadCallTime_NumOps", 1, serverSource);
HELPER.assertCounter("processReadCallTime_NumOps", 1, serverSource);
HELPER.assertCounter("totalReadCallTime_NumOps", 1, serverSource);

mrpc.dequeuedWriteCall(100);
mrpc.processWriteCall(101);
mrpc.totalWriteCall(102);
HELPER.assertCounter("queueWriteCallTime_NumOps", 1, serverSource);
HELPER.assertCounter("processWriteCallTime_NumOps", 1, serverSource);
HELPER.assertCounter("totalWriteCallTime_NumOps", 1, serverSource);

mrpc.dequeuedScanCall(100);
mrpc.processScanCall(101);
mrpc.totalScanCall(102);
HELPER.assertCounter("queueScanCallTime_NumOps", 1, serverSource);
HELPER.assertCounter("processScanCallTime_NumOps", 1, serverSource);
HELPER.assertCounter("totalScanCallTime_NumOps", 1, serverSource);

mrpc.sentBytes(103);
mrpc.sentBytes(103);
mrpc.sentBytes(103);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,15 @@ public long getResponseExceptionSize() {
@Override
public void incrementResponseExceptionSize(long exceptionSize) {
}

@Override
public CallQueueType getQueueType() {
return CallQueueType.DEFAULT;
}

@Override
public void setQueueType(CallQueueType type) {
}
};
return rpcCall;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ public long getResponseExceptionSize() {
@Override
public void incrementResponseExceptionSize(long exceptionSize) {
}

@Override
public CallQueueType getQueueType() {
return CallQueueType.DEFAULT;
}

@Override
public void setQueueType(CallQueueType type) {
}
};
return rpcCall;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,15 @@ public long getResponseExceptionSize() {
@Override
public void incrementResponseExceptionSize(long exceptionSize) {
}

@Override
public CallQueueType getQueueType() {
return CallQueueType.DEFAULT;
}

@Override
public void setQueueType(CallQueueType type) {
}
};
}
}