Skip to content

Commit

Permalink
feat: add opentelemetry counters for sent and acked messages
Browse files Browse the repository at this point in the history
Also add network latency, queue length and error counts.

The metrics (other than error counts) are now reported periodically,
every second.
  • Loading branch information
agrawal-siddharth committed Jul 16, 2024
1 parent e6a6cb4 commit 3caa290
Showing 1 changed file with 232 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand All @@ -40,6 +41,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
Expand Down Expand Up @@ -264,6 +266,8 @@ class ConnectionWorker implements AutoCloseable {
private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/";
private static Pattern streamPatternTable = Pattern.compile(tableMatching);
private Meter writeMeter;
static Duration METRICS_UPDATE_INTERVAL = Duration.ofSeconds(1);
private Instant instantLastSentMetrics = Instant.now();
static AttributeKey<String> telemetryKeyTableId = AttributeKey.stringKey("table_id");
private static String dataflowPrefix = "dataflow:";
static List<AttributeKey<String>> telemetryKeysTraceId =
Expand All @@ -274,10 +278,41 @@ class ConnectionWorker implements AutoCloseable {
add(AttributeKey.stringKey("trace_field_3"));
}
};
static AttributeKey<String> telemetryKeyErrorCode = AttributeKey.stringKey("error_code");
private Attributes telemetryAttributes;
private LongCounter instrumentIncomingRequestCount;
private LongCounter instrumentIncomingRequestSize;
private LongCounter instrumentIncomingRequestRows;
private static final List<Long> METRICS_MILLISECONDS_LATENCY_BUCKETS =
ImmutableList.of(0L, 50L, 100L, 500L, 1000L, 5000L, 10000L, 20000L, 30000L, 60000L, 120000L);

private static final class OpenTelemetryMetrics {
private long incomingRequestCountBuffered;
private LongCounter instrumentIncomingRequestCount;
private long incomingRequestSizeBuffered;
private LongCounter instrumentIncomingRequestSize;
private long incomingRequestRowsBuffered;
private LongCounter instrumentIncomingRequestRows;
private long sentRequestCountBuffered;
private LongCounter instrumentSentRequestCount;
private long sentRequestSizeBuffered;
private LongCounter instrumentSentRequestSize;
private long sentRequestRowsBuffered;
private LongCounter instrumentSentRequestRows;
private long ackedRequestCountBuffered;
private LongCounter instrumentAckedRequestCount;
private long ackedRequestSizeBuffered;
private LongCounter instrumentAckedRequestSize;
private long ackedRequestRowsBuffered;
private LongCounter instrumentAckedRequestRows;
private LongCounter instrumentErrorRequestCount;
private LongCounter instrumentErrorRequestSize;
private LongCounter instrumentErrorRequestRows;
private LongHistogram instrumentNetworkResponseLatency;
private long connectionEstablishCountBuffered;
private LongCounter instrumentConnectionEstablishCount;
private long connectionRetryCountBuffered;
private LongCounter instrumentConnectionRetryCount;
}

private OpenTelemetryMetrics telemetryMetrics = new OpenTelemetryMetrics();

public static Boolean isDefaultStreamName(String streamName) {
Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
Expand Down Expand Up @@ -353,11 +388,71 @@ private void refreshOpenTelemetryTableNameAttributes() {
}
}

private Attributes augmentAttributesWithErrorCode(Attributes attributes, String errorCode) {
AttributesBuilder builder = attributes.toBuilder();
if ((errorCode != null) && !errorCode.isEmpty()) {
builder.put(telemetryKeyErrorCode, errorCode);
}
return builder.build();
}

@VisibleForTesting
Attributes getTelemetryAttributes() {
return telemetryAttributes;
}

private void periodicallyReportOpenTelemetryMetrics() {
Duration durationSinceLastRefresh = Duration.between(instantLastSentMetrics, Instant.now());
if (durationSinceLastRefresh.compareTo(METRICS_UPDATE_INTERVAL) > 0) {
instantLastSentMetrics = Instant.now();
Attributes attributes = getTelemetryAttributes();
if (telemetryMetrics.incomingRequestCountBuffered > 0) {
telemetryMetrics.instrumentIncomingRequestCount.add(telemetryMetrics.incomingRequestCountBuffered, attributes);
telemetryMetrics.incomingRequestCountBuffered = 0;
}
if (telemetryMetrics.incomingRequestSizeBuffered > 0) {
telemetryMetrics.instrumentIncomingRequestSize.add(telemetryMetrics.incomingRequestSizeBuffered, attributes);
telemetryMetrics.incomingRequestSizeBuffered = 0;
}
if (telemetryMetrics.incomingRequestRowsBuffered > 0) {
telemetryMetrics.instrumentIncomingRequestRows.add(telemetryMetrics.incomingRequestRowsBuffered, attributes);
telemetryMetrics.incomingRequestRowsBuffered = 0;
}
if (telemetryMetrics.sentRequestCountBuffered > 0) {
telemetryMetrics.instrumentSentRequestCount.add(telemetryMetrics.sentRequestCountBuffered, attributes);
telemetryMetrics.sentRequestCountBuffered = 0;
}
if (telemetryMetrics.sentRequestSizeBuffered > 0) {
telemetryMetrics.instrumentSentRequestSize.add(telemetryMetrics.sentRequestSizeBuffered, attributes);
telemetryMetrics.sentRequestSizeBuffered = 0;
}
if (telemetryMetrics.sentRequestRowsBuffered > 0) {
telemetryMetrics.instrumentSentRequestRows.add(telemetryMetrics.sentRequestRowsBuffered, attributes);
telemetryMetrics.sentRequestRowsBuffered = 0;
}
if (telemetryMetrics.ackedRequestCountBuffered > 0) {
telemetryMetrics.instrumentAckedRequestCount.add(telemetryMetrics.ackedRequestCountBuffered, attributes);
telemetryMetrics.ackedRequestCountBuffered = 0;
}
if (telemetryMetrics.ackedRequestSizeBuffered > 0) {
telemetryMetrics.instrumentAckedRequestSize.add(telemetryMetrics.ackedRequestSizeBuffered, attributes);
telemetryMetrics.ackedRequestSizeBuffered = 0;
}
if (telemetryMetrics.ackedRequestRowsBuffered > 0) {
telemetryMetrics.instrumentAckedRequestRows.add(telemetryMetrics.ackedRequestRowsBuffered, attributes);
telemetryMetrics.ackedRequestRowsBuffered = 0;
}
if (telemetryMetrics.connectionEstablishCountBuffered > 0) {
telemetryMetrics.instrumentConnectionEstablishCount.add(telemetryMetrics.connectionEstablishCountBuffered, attributes);
telemetryMetrics.connectionEstablishCountBuffered = 0;
}
if (telemetryMetrics.connectionRetryCountBuffered > 0) {
telemetryMetrics.instrumentConnectionRetryCount.add(telemetryMetrics.connectionRetryCountBuffered, attributes);
telemetryMetrics.connectionRetryCountBuffered = 0;
}
}
}

private void registerOpenTelemetryMetrics() {
MeterProvider meterProvider = Singletons.getOpenTelemetry().getMeterProvider();
writeMeter =
Expand All @@ -366,21 +461,118 @@ private void registerOpenTelemetryMetrics() {
.setInstrumentationVersion(
ConnectionWorker.class.getPackage().getImplementationVersion())
.build();
instrumentIncomingRequestCount =
telemetryMetrics.instrumentIncomingRequestCount =
writeMeter
.counterBuilder("append_requests")
.setDescription("Counts number of incoming requests")
.build();
instrumentIncomingRequestSize =
telemetryMetrics.instrumentIncomingRequestSize =
writeMeter
.counterBuilder("append_request_bytes")
.setDescription("Counts byte size of incoming requests")
.build();
instrumentIncomingRequestRows =
telemetryMetrics.instrumentIncomingRequestRows =
writeMeter
.counterBuilder("append_rows")
.setDescription("Counts number of incoming request rows")
.build();
telemetryMetrics.instrumentSentRequestCount =
writeMeter
.counterBuilder("append_requests_sent")
.setDescription("Counts number of requests sent over the network")
.build();
telemetryMetrics.instrumentSentRequestSize =
writeMeter
.counterBuilder("append_request_bytes_sent")
.setDescription("Counts byte size of requests sent over the network")
.build();
telemetryMetrics.instrumentSentRequestRows =
writeMeter
.counterBuilder("append_rows_sent")
.setDescription("Counts number of request rows sent over the network")
.build();
telemetryMetrics.instrumentAckedRequestCount =
writeMeter
.counterBuilder("append_requests_acked")
.setDescription("Counts number of requests acked by the server")
.build();
telemetryMetrics.instrumentAckedRequestSize =
writeMeter
.counterBuilder("append_request_bytes_acked")
.setDescription("Counts byte size of requests acked by the server")
.build();
telemetryMetrics.instrumentAckedRequestRows =
writeMeter
.counterBuilder("append_rows_acked")
.setDescription("Counts number of request rows acked by the server")
.build();
telemetryMetrics.instrumentErrorRequestCount =
writeMeter
.counterBuilder("append_requests_error")
.setDescription("Counts number of requests returned by the server with an error")
.build();
telemetryMetrics.instrumentErrorRequestSize =
writeMeter
.counterBuilder("append_request_bytes_error")
.setDescription("Counts byte size of requests returned by the server with an error")
.build();
telemetryMetrics.instrumentErrorRequestRows =
writeMeter
.counterBuilder("append_rows_error")
.setDescription(
"Counts number of rows in requests returned by the server with an error")
.build();
writeMeter
.gaugeBuilder("waiting_queue_length")
.ofLongs()
.setDescription(
"Reports length of waiting queue. This queue contains requests buffered in the client and not yet sent to the server.")
.buildWithCallback(
measurement -> {
int length = 0;
this.lock.lock();
try {
length = waitingRequestQueue.size();
} finally {
this.lock.unlock();
}
measurement.record(length, getTelemetryAttributes());
});
writeMeter
.gaugeBuilder("inflight_queue_length")
.ofLongs()
.setDescription(
"Reports length of inflight queue. This queue contains sent append requests waiting for response from the server.")
.buildWithCallback(
measurement -> {
int length = 0;
this.lock.lock();
try {
length = inflightRequestQueue.size();
} finally {
this.lock.unlock();
}
measurement.record(length, getTelemetryAttributes());
});
telemetryMetrics.instrumentNetworkResponseLatency =
writeMeter
.histogramBuilder("network_response_latency")
.ofLongs()
.setDescription(
"Reports time taken in milliseconds for a response to arrive once a message has been sent over the network.")
.setExplicitBucketBoundariesAdvice(METRICS_MILLISECONDS_LATENCY_BUCKETS)
.build();
telemetryMetrics.instrumentConnectionEstablishCount =
writeMeter
.counterBuilder("connection_establish_count")
.setDescription(
"Counts number of connection attempts made, regardless of whether these are initial or retry.")
.build();
telemetryMetrics.instrumentConnectionRetryCount =
writeMeter
.counterBuilder("connection_retry_count")
.setDescription("Counts number of connection retry attempts made.")
.build();
}

public ConnectionWorker(
Expand Down Expand Up @@ -469,6 +661,7 @@ public void run() {

private void resetConnection() {
log.info("Start connecting stream: " + streamName + " id: " + writerId);
telemetryMetrics.connectionEstablishCountBuffered++;
if (this.streamConnection != null) {
// It's safe to directly close the previous connection as the in flight messages
// will be picked up by the next connection.
Expand Down Expand Up @@ -615,9 +808,9 @@ private ApiFuture<AppendRowsResponse> appendInternal(
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount());
telemetryMetrics.incomingRequestCountBuffered++;
telemetryMetrics.incomingRequestSizeBuffered += requestWrapper.messageSize;
telemetryMetrics.incomingRequestRowsBuffered += message.getProtoRows().getRows().getSerializedRowsCount();
this.lock.lock();
try {
if (userClosed) {
Expand Down Expand Up @@ -815,6 +1008,7 @@ private void appendLoop() {
throwIfWaitCallbackTooLong(sendInstant);
}
}
periodicallyReportOpenTelemetryMetrics();

// Copy the streamConnectionIsConnected guarded by lock to a local variable.
// In addition, only reconnect if there is a retriable error.
Expand Down Expand Up @@ -876,7 +1070,8 @@ private void appendLoop() {
}
while (!localQueue.isEmpty()) {
localQueue.peekFirst().setRequestSendQueueTime();
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
AppendRequestAndResponse requestWrapper = localQueue.pollFirst();
AppendRowsRequest originalRequest = requestWrapper.message;
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();
// Always respect the first writer schema seen by the loop.
if (writerSchema == null) {
Expand Down Expand Up @@ -928,6 +1123,10 @@ private void appendLoop() {
// In the close case, the request is in the inflight queue, and will either be returned
// to the user with an error, or will be resent.
this.streamConnection.send(originalRequestBuilder.build());
telemetryMetrics.sentRequestCountBuffered++;
telemetryMetrics.sentRequestSizeBuffered += requestWrapper.messageSize;
telemetryMetrics.sentRequestRowsBuffered +=
originalRequest.getProtoRows().getRows().getSerializedRowsCount();
}
}
cleanupConnectionAndRequests(/* avoidBlocking= */ false);
Expand Down Expand Up @@ -1195,6 +1394,13 @@ private void requestCallback(AppendRowsResponse response) {
connectionRetryStartTime = 0;
}
if (!this.inflightRequestQueue.isEmpty()) {
Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
if (sendInstant != null) {
Duration durationLatency = Duration.between(sendInstant, Instant.now());
telemetryMetrics.instrumentNetworkResponseLatency.record(
durationLatency.toMillis(), getTelemetryAttributes());
}

requestWrapper = pollFirstInflightRequestQueue();
} else if (inflightCleanuped) {
// It is possible when requestCallback is called, the inflight queue is already drained
Expand All @@ -1209,6 +1415,17 @@ private void requestCallback(AppendRowsResponse response) {
.withDescription("Request callback called on an empty inflight queue."));
return;
}

if (response.hasError()) {
Attributes augmentedTelemetryAttributes =
augmentAttributesWithErrorCode(
getTelemetryAttributes(), Code.values()[response.getError().getCode()].toString());
telemetryMetrics.instrumentErrorRequestCount.add(1, augmentedTelemetryAttributes);
telemetryMetrics.instrumentErrorRequestSize.add(requestWrapper.messageSize, augmentedTelemetryAttributes);
telemetryMetrics.instrumentErrorRequestRows.add(
requestWrapper.message.getProtoRows().getRows().getSerializedRowsCount(),
augmentedTelemetryAttributes);
}
} finally {
this.lock.unlock();
}
Expand Down Expand Up @@ -1256,6 +1473,10 @@ private void requestCallback(AppendRowsResponse response) {
}
} else {
requestWrapper.appendResult.set(response);
telemetryMetrics.ackedRequestCountBuffered++;
telemetryMetrics.ackedRequestSizeBuffered += requestWrapper.messageSize;
telemetryMetrics.ackedRequestRowsBuffered +=
requestWrapper.message.getProtoRows().getRows().getSerializedRowsCount();
}
});
}
Expand Down Expand Up @@ -1301,6 +1522,7 @@ private void doneCallback(Throwable finalStatus) {
|| System.currentTimeMillis() - connectionRetryStartTime
<= maxRetryDuration.toMillis())) {
this.conectionRetryCountWithoutCallback++;
this.telemetryMetrics.connectionRetryCountBuffered++;
log.info(
"Connection is going to be reestablished with the next request. Retriable error "
+ finalStatus.toString()
Expand Down

0 comments on commit 3caa290

Please sign in to comment.