Skip to content

Commit

Permalink
feat: add opentelemetry counters for sent and acked messages
Browse files Browse the repository at this point in the history
Also add network latency, queue length and error counts.

The metrics (other than error counts) are now reported periodically,
every second.
  • Loading branch information
agrawal-siddharth committed Jul 24, 2024
1 parent 54774f9 commit 0572a47
Showing 1 changed file with 134 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand All @@ -40,6 +41,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
Expand Down Expand Up @@ -259,6 +261,7 @@ class ConnectionWorker implements AutoCloseable {
private static Pattern streamPatternTable = Pattern.compile(tableMatching);
private Meter writeMeter;
static AttributeKey<String> telemetryKeyTableId = AttributeKey.stringKey("table_id");
static AttributeKey<String> telemetryKeyWriterId = AttributeKey.stringKey("writer_id");
private static String dataflowPrefix = "dataflow:";
static List<AttributeKey<String>> telemetryKeysTraceId =
new ArrayList<AttributeKey<String>>() {
Expand All @@ -268,10 +271,25 @@ class ConnectionWorker implements AutoCloseable {
add(AttributeKey.stringKey("trace_field_3"));
}
};
static AttributeKey<String> telemetryKeyErrorCode = AttributeKey.stringKey("error_code");
static AttributeKey<String> telemetryKeyIsRetry = AttributeKey.stringKey("is_retry");
private Attributes telemetryAttributes;
private LongCounter instrumentIncomingRequestCount;
private LongCounter instrumentIncomingRequestSize;
private LongCounter instrumentIncomingRequestRows;
// Buckets are based on a list of 1.5 ^ n
private static final List<Long> METRICS_MILLISECONDS_LATENCY_BUCKETS =
ImmutableList.of(
1L, 3L, 8L, 17L, 38L, 86L, 195L, 438L, 985L, 2217L, 4988L, 11223L, 25251L, 56815L,
127834L, 287627L, 647160L, 1456110L);

private static final class OpenTelemetryMetrics {
private LongCounter instrumentAckedRequestCount;
private LongCounter instrumentAckedRequestSize;
private LongCounter instrumentAckedRequestRows;
private LongHistogram instrumentNetworkResponseLatency;
private LongCounter instrumentConnectionStartCount;
private LongCounter instrumentConnectionEndCount;
}

private OpenTelemetryMetrics telemetryMetrics = new OpenTelemetryMetrics();

public static Boolean isDefaultStreamName(String streamName) {
Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
Expand Down Expand Up @@ -327,16 +345,21 @@ private void setTraceIdAttributes(AttributesBuilder builder) {
}
}

// Specify common attributes for all metrics.
// For example, table name and writer id.
// Metrics dashboards can be filtered on available attributes.
private Attributes buildOpenTelemetryAttributes() {
AttributesBuilder builder = Attributes.builder();
String tableName = getTableName();
if (!tableName.isEmpty()) {
builder.put(telemetryKeyTableId, tableName);
}
builder.put(telemetryKeyWriterId, writerId);
setTraceIdAttributes(builder);
return builder.build();
}

// Refresh the table name attribute when multiplexing switches between tables.
private void refreshOpenTelemetryTableNameAttributes() {
String tableName = getTableName();
if (!tableName.isEmpty()
Expand All @@ -347,6 +370,22 @@ private void refreshOpenTelemetryTableNameAttributes() {
}
}

// Build new attributes augmented with an error code string.
private Attributes augmentAttributesWithErrorCode(Attributes attributes, String errorCode) {
AttributesBuilder builder = attributes.toBuilder();
if ((errorCode != null) && !errorCode.isEmpty()) {
builder.put(telemetryKeyErrorCode, errorCode);
}
return builder.build();
}

// Build new attributes augmented with a flag indicating this was a retry.
private Attributes augmentAttributesWithRetry(Attributes attributes) {
AttributesBuilder builder = attributes.toBuilder();
builder.put(telemetryKeyIsRetry, "1");
return builder.build();
}

@VisibleForTesting
Attributes getTelemetryAttributes() {
return telemetryAttributes;
Expand All @@ -360,20 +399,72 @@ private void registerOpenTelemetryMetrics() {
.setInstrumentationVersion(
ConnectionWorker.class.getPackage().getImplementationVersion())
.build();
instrumentIncomingRequestCount =
telemetryMetrics.instrumentAckedRequestCount =
writeMeter
.counterBuilder("append_requests_acked")
.setDescription("Counts number of requests acked by the server")
.build();
telemetryMetrics.instrumentAckedRequestSize =
writeMeter
.counterBuilder("append_request_bytes_acked")
.setDescription("Counts byte size of requests acked by the server")
.build();
telemetryMetrics.instrumentAckedRequestRows =
writeMeter
.counterBuilder("append_rows_acked")
.setDescription("Counts number of request rows acked by the server")
.build();
writeMeter
.gaugeBuilder("active_connection_count")
.ofLongs()
.setDescription("Reports number of active connections")
.buildWithCallback(
measurement -> {
int count = 0;
this.lock.lock();
try {
if (streamConnectionIsConnected) {
count = 1;
}
} finally {
this.lock.unlock();
}
measurement.record(count, getTelemetryAttributes());
});
writeMeter
.gaugeBuilder("inflight_queue_length")
.ofLongs()
.setDescription(
"Reports length of inflight queue. This queue contains sent append requests waiting for response from the server.")
.buildWithCallback(
measurement -> {
int length = 0;
this.lock.lock();
try {
length = inflightRequestQueue.size();
} finally {
this.lock.unlock();
}
measurement.record(length, getTelemetryAttributes());
});
telemetryMetrics.instrumentNetworkResponseLatency =
writeMeter
.counterBuilder("append_requests")
.setDescription("Counts number of incoming requests")
.histogramBuilder("network_response_latency")
.ofLongs()
.setDescription(
"Reports time taken in milliseconds for a response to arrive once a message has been sent over the network.")
.setExplicitBucketBoundariesAdvice(METRICS_MILLISECONDS_LATENCY_BUCKETS)
.build();
instrumentIncomingRequestSize =
telemetryMetrics.instrumentConnectionStartCount =
writeMeter
.counterBuilder("append_request_bytes")
.setDescription("Counts byte size of incoming requests")
.counterBuilder("connection_start_count")
.setDescription(
"Counts number of connection attempts made, regardless of whether these are initial or retry.")
.build();
instrumentIncomingRequestRows =
telemetryMetrics.instrumentConnectionEndCount =
writeMeter
.counterBuilder("append_rows")
.setDescription("Counts number of incoming request rows")
.counterBuilder("connection_end_count")
.setDescription("Counts number of connection end events.")
.build();
}

Expand Down Expand Up @@ -465,6 +556,7 @@ public void run() {

private void resetConnection() {
log.info("Start connecting stream: " + streamName + " id: " + writerId);
telemetryMetrics.instrumentConnectionStartCount.add(1, getTelemetryAttributes());
if (this.streamConnection != null) {
// It's safe to directly close the previous connection as the in flight messages
// will be picked up by the next connection.
Expand Down Expand Up @@ -618,9 +710,6 @@ private ApiFuture<AppendRowsResponse> appendInternal(
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount());
this.lock.lock();
try {
if (userClosed) {
Expand Down Expand Up @@ -1214,6 +1303,13 @@ private void requestCallback(AppendRowsResponse response) {
connectionRetryStartTime = 0;
}
if (!this.inflightRequestQueue.isEmpty()) {
Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
if (sendInstant != null) {
Duration durationLatency = Duration.between(sendInstant, Instant.now());
telemetryMetrics.instrumentNetworkResponseLatency.record(
durationLatency.toMillis(), getTelemetryAttributes());
}

requestWrapper = pollFirstInflightRequestQueue();
requestProfilerHook.endOperation(
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
Expand All @@ -1234,6 +1330,22 @@ private void requestCallback(AppendRowsResponse response) {
this.lock.unlock();
}

Attributes augmentedTelemetryAttributes =
augmentAttributesWithErrorCode(
getTelemetryAttributes(),
Code.values()[
response.hasError() ? response.getError().getCode() : Status.Code.OK.ordinal()]
.toString());
if (requestWrapper.retryCount > 0) {
augmentedTelemetryAttributes = augmentAttributesWithRetry(augmentedTelemetryAttributes);
}
telemetryMetrics.instrumentAckedRequestCount.add(1, augmentedTelemetryAttributes);
telemetryMetrics.instrumentAckedRequestSize.add(
requestWrapper.messageSize, augmentedTelemetryAttributes);
telemetryMetrics.instrumentAckedRequestRows.add(
requestWrapper.message.getProtoRows().getRows().getSerializedRowsCount(),
augmentedTelemetryAttributes);

// Retries need to happen on the same thread as queue locking may occur
if (response.hasError()) {
if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) {
Expand Down Expand Up @@ -1316,6 +1428,11 @@ private void doneCallback(Throwable finalStatus) {
this.lock.lock();
try {
this.streamConnectionIsConnected = false;
this.telemetryMetrics.instrumentConnectionEndCount.add(
1,
augmentAttributesWithErrorCode(
getTelemetryAttributes(),
Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString()));
if (connectionFinalStatus == null) {
if (connectionRetryStartTime == 0) {
connectionRetryStartTime = System.currentTimeMillis();
Expand All @@ -1327,6 +1444,8 @@ private void doneCallback(Throwable finalStatus) {
|| System.currentTimeMillis() - connectionRetryStartTime
<= maxRetryDuration.toMillis())) {
this.conectionRetryCountWithoutCallback++;
this.telemetryMetrics.instrumentConnectionStartCount.add(
1, augmentAttributesWithRetry(getTelemetryAttributes()));
log.info(
"Connection is going to be reestablished with the next request. Retriable error "
+ finalStatus.toString()
Expand Down

0 comments on commit 0572a47

Please sign in to comment.