Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String NUM_LIFO_MODE_SWITCHES_NAME = "numLifoModeSwitches";
String NUM_LIFO_MODE_SWITCHES_DESC = "Total number of calls in general queue which " +
"were served from the tail of the queue";
String NUM_CALL_RESPONSE_QUEUE_NAME = "numCallsInResponseQueue";
String NUM_CALL_RESPONSE_QUEUE_DESC = "Number of calls in response queue.";
String NUM_SIZE_RESPONSE_QUEUE_NAME = "numSizeInResponseQueue";
String NUM_SIZE_RESPONSE_QUEUE_DESC = "Size in response queue.";
// Direct Memory Usage metrics
String NETTY_DM_USAGE_NAME = "nettyDirectMemoryUsage";

Expand Down Expand Up @@ -126,4 +130,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
void processedCall(int processingTime);

void queuedAndProcessedCall(int totalTime);

void addCallToResponseQueue(long size);

void removeCallFromResponseQueue(long size);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl
private final MutableFastCounter authenticationFallbacks;
private final MutableFastCounter sentBytes;
private final MutableFastCounter receivedBytes;
private final MutableFastCounter numCallsInResponseQueue;
private final MutableFastCounter numSizeInResponseQueue;


private MetricHistogram queueCallTime;
Expand Down Expand Up @@ -78,6 +80,10 @@ public MetricsHBaseServerSourceImpl(String metricsName,
REQUEST_SIZE_DESC);
this.responseSize = this.getMetricsRegistry().newSizeHistogram(RESPONSE_SIZE_NAME,
RESPONSE_SIZE_DESC);
this.numCallsInResponseQueue = this.getMetricsRegistry().newCounter(NUM_CALL_RESPONSE_QUEUE_NAME,
NUM_CALL_RESPONSE_QUEUE_DESC, 0L);
this.numSizeInResponseQueue = this.getMetricsRegistry().newCounter(NUM_SIZE_RESPONSE_QUEUE_NAME,
NUM_SIZE_RESPONSE_QUEUE_DESC, 0L);
}

@Override
Expand Down Expand Up @@ -140,6 +146,18 @@ public void queuedAndProcessedCall(int totalTime) {
totalCallTime.add(totalTime);
}

@Override
public void addCallToResponseQueue(long size) {
numCallsInResponseQueue.incr();
numSizeInResponseQueue.incr(size);
}

@Override
public void removeCallFromResponseQueue(long size) {
numCallsInResponseQueue.incr(-1);
numSizeInResponseQueue.incr(-size);
}

@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ boolean hasRemaining() {
return remaining > 0;
}

int getRemaining() {
return remaining;
}

/**
* Write out our chain of buffers in chunks
* @param channel Where to write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ void totalCall(int totalTime) {
source.queuedAndProcessedCall(totalTime);
}

void addCallToResponseQueue(int size) {
source.addCallToResponseQueue(size);
}

void removeCallFromResponseQueue(int size) {
source.removeCallFromResponseQueue(size);
}

public void exception(Throwable throwable) {
source.exception();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ private boolean processAllResponses(final SimpleServerRpcConnection connection)
if (resp == null) {
return true;
}
simpleRpcServer.getMetrics().removeCallFromResponseQueue(resp.getResponse().getRemaining());
if (!processResponse(connection, resp)) {
connection.responseQueue.addFirst(resp);
simpleRpcServer.getMetrics().addCallToResponseQueue(resp.getResponse().getRemaining());
return false;
}
}
Expand Down Expand Up @@ -298,6 +300,7 @@ void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOExcept
}
// Too big to fit, putting ahead.
conn.responseQueue.addFirst(resp);
simpleRpcServer.getMetrics().addCallToResponseQueue(resp.getResponse().getRemaining());
added = true; // We will register to the selector later, outside of the lock.
}
} finally {
Expand All @@ -307,6 +310,7 @@ void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOExcept

if (!added) {
conn.responseQueue.addLast(resp);
simpleRpcServer.getMetrics().addCallToResponseQueue(resp.getResponse().getRemaining());
}
registerForWrite(conn);
}
Expand Down