Skip to content

Commit

Permalink
feat: add opentelemetry counters for sent and acked messages (googlea…
Browse files Browse the repository at this point in the history
…pis#2532)

Also add network latency, queue length and error counts.

The metrics (other than error counts) are now reported periodically,
every second.
  • Loading branch information
agrawal-siddharth authored and yifatgortler committed Jul 25, 2024
1 parent 9096df4 commit be05595
Showing 1 changed file with 134 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand All @@ -40,6 +41,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
Expand Down Expand Up @@ -259,6 +261,7 @@ class ConnectionWorker implements AutoCloseable {
private static Pattern streamPatternTable = Pattern.compile(tableMatching);
private Meter writeMeter;
static AttributeKey<String> telemetryKeyTableId = AttributeKey.stringKey("table_id");
static AttributeKey<String> telemetryKeyWriterId = AttributeKey.stringKey("writer_id");
private static String dataflowPrefix = "dataflow:";
static List<AttributeKey<String>> telemetryKeysTraceId =
new ArrayList<AttributeKey<String>>() {
Expand All @@ -268,10 +271,25 @@ class ConnectionWorker implements AutoCloseable {
add(AttributeKey.stringKey("trace_field_3"));
}
};
static AttributeKey<String> telemetryKeyErrorCode = AttributeKey.stringKey("error_code");
static AttributeKey<String> telemetryKeyIsRetry = AttributeKey.stringKey("is_retry");
private Attributes telemetryAttributes;
private LongCounter instrumentIncomingRequestCount;
private LongCounter instrumentIncomingRequestSize;
private LongCounter instrumentIncomingRequestRows;
// Latency buckets are based on a list of 1.5 ^ n
private static final List<Long> METRICS_MILLISECONDS_LATENCY_BUCKETS =
ImmutableList.of(
0L, 17L, 38L, 86L, 195L, 438L, 985L, 2217L, 4988L, 11223L, 25251L, 56815L, 127834L,
287627L, 647160L);

private static final class OpenTelemetryMetrics {
private LongCounter instrumentAckedRequestCount;
private LongCounter instrumentAckedRequestSize;
private LongCounter instrumentAckedRequestRows;
private LongHistogram instrumentNetworkResponseLatency;
private LongCounter instrumentConnectionStartCount;
private LongCounter instrumentConnectionEndCount;
}

private OpenTelemetryMetrics telemetryMetrics = new OpenTelemetryMetrics();

public static Boolean isDefaultStreamName(String streamName) {
Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
Expand Down Expand Up @@ -327,16 +345,21 @@ private void setTraceIdAttributes(AttributesBuilder builder) {
}
}

// Specify common attributes for all metrics.
// For example, table name and writer id.
// Metrics dashboards can be filtered on available attributes.
private Attributes buildOpenTelemetryAttributes() {
AttributesBuilder builder = Attributes.builder();
String tableName = getTableName();
if (!tableName.isEmpty()) {
builder.put(telemetryKeyTableId, tableName);
}
builder.put(telemetryKeyWriterId, writerId);
setTraceIdAttributes(builder);
return builder.build();
}

// Refresh the table name attribute when multiplexing switches between tables.
private void refreshOpenTelemetryTableNameAttributes() {
String tableName = getTableName();
if (!tableName.isEmpty()
Expand All @@ -347,6 +370,22 @@ private void refreshOpenTelemetryTableNameAttributes() {
}
}

// Build new attributes augmented with an error code string.
private Attributes augmentAttributesWithErrorCode(Attributes attributes, String errorCode) {
AttributesBuilder builder = attributes.toBuilder();
if ((errorCode != null) && !errorCode.isEmpty()) {
builder.put(telemetryKeyErrorCode, errorCode);
}
return builder.build();
}

// Build new attributes augmented with a flag indicating this was a retry.
private Attributes augmentAttributesWithRetry(Attributes attributes) {
AttributesBuilder builder = attributes.toBuilder();
builder.put(telemetryKeyIsRetry, "1");
return builder.build();
}

@VisibleForTesting
Attributes getTelemetryAttributes() {
return telemetryAttributes;
Expand All @@ -360,20 +399,72 @@ private void registerOpenTelemetryMetrics() {
.setInstrumentationVersion(
ConnectionWorker.class.getPackage().getImplementationVersion())
.build();
instrumentIncomingRequestCount =
telemetryMetrics.instrumentAckedRequestCount =
writeMeter
.counterBuilder("append_requests_acked")
.setDescription("Counts number of requests acked by the server")
.build();
telemetryMetrics.instrumentAckedRequestSize =
writeMeter
.counterBuilder("append_request_bytes_acked")
.setDescription("Counts byte size of requests acked by the server")
.build();
telemetryMetrics.instrumentAckedRequestRows =
writeMeter
.counterBuilder("append_rows_acked")
.setDescription("Counts number of request rows acked by the server")
.build();
writeMeter
.gaugeBuilder("active_connection_count")
.ofLongs()
.setDescription("Reports number of active connections")
.buildWithCallback(
measurement -> {
int count = 0;
this.lock.lock();
try {
if (streamConnectionIsConnected) {
count = 1;
}
} finally {
this.lock.unlock();
}
measurement.record(count, getTelemetryAttributes());
});
writeMeter
.gaugeBuilder("inflight_queue_length")
.ofLongs()
.setDescription(
"Reports length of inflight queue. This queue contains sent append requests waiting for response from the server.")
.buildWithCallback(
measurement -> {
int length = 0;
this.lock.lock();
try {
length = inflightRequestQueue.size();
} finally {
this.lock.unlock();
}
measurement.record(length, getTelemetryAttributes());
});
telemetryMetrics.instrumentNetworkResponseLatency =
writeMeter
.counterBuilder("append_requests")
.setDescription("Counts number of incoming requests")
.histogramBuilder("network_response_latency")
.ofLongs()
.setDescription(
"Reports time taken in milliseconds for a response to arrive once a message has been sent over the network.")
.setExplicitBucketBoundariesAdvice(METRICS_MILLISECONDS_LATENCY_BUCKETS)
.build();
instrumentIncomingRequestSize =
telemetryMetrics.instrumentConnectionStartCount =
writeMeter
.counterBuilder("append_request_bytes")
.setDescription("Counts byte size of incoming requests")
.counterBuilder("connection_start_count")
.setDescription(
"Counts number of connection attempts made, regardless of whether these are initial or retry.")
.build();
instrumentIncomingRequestRows =
telemetryMetrics.instrumentConnectionEndCount =
writeMeter
.counterBuilder("append_rows")
.setDescription("Counts number of incoming request rows")
.counterBuilder("connection_end_count")
.setDescription("Counts number of connection end events.")
.build();
}

Expand Down Expand Up @@ -465,6 +556,7 @@ public void run() {

private void resetConnection() {
log.info("Start connecting stream: " + streamName + " id: " + writerId);
telemetryMetrics.instrumentConnectionStartCount.add(1, getTelemetryAttributes());
if (this.streamConnection != null) {
// It's safe to directly close the previous connection as the in flight messages
// will be picked up by the next connection.
Expand Down Expand Up @@ -618,9 +710,6 @@ private ApiFuture<AppendRowsResponse> appendInternal(
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount());
this.lock.lock();
try {
if (userClosed) {
Expand Down Expand Up @@ -1214,6 +1303,13 @@ private void requestCallback(AppendRowsResponse response) {
connectionRetryStartTime = 0;
}
if (!this.inflightRequestQueue.isEmpty()) {
Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
if (sendInstant != null) {
Duration durationLatency = Duration.between(sendInstant, Instant.now());
telemetryMetrics.instrumentNetworkResponseLatency.record(
durationLatency.toMillis(), getTelemetryAttributes());
}

requestWrapper = pollFirstInflightRequestQueue();
requestProfilerHook.endOperation(
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
Expand All @@ -1234,6 +1330,22 @@ private void requestCallback(AppendRowsResponse response) {
this.lock.unlock();
}

Attributes augmentedTelemetryAttributes =
augmentAttributesWithErrorCode(
getTelemetryAttributes(),
Code.values()[
response.hasError() ? response.getError().getCode() : Status.Code.OK.ordinal()]
.toString());
if (requestWrapper.retryCount > 0) {
augmentedTelemetryAttributes = augmentAttributesWithRetry(augmentedTelemetryAttributes);
}
telemetryMetrics.instrumentAckedRequestCount.add(1, augmentedTelemetryAttributes);
telemetryMetrics.instrumentAckedRequestSize.add(
requestWrapper.messageSize, augmentedTelemetryAttributes);
telemetryMetrics.instrumentAckedRequestRows.add(
requestWrapper.message.getProtoRows().getRows().getSerializedRowsCount(),
augmentedTelemetryAttributes);

// Retries need to happen on the same thread as queue locking may occur
if (response.hasError()) {
if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) {
Expand Down Expand Up @@ -1316,6 +1428,11 @@ private void doneCallback(Throwable finalStatus) {
this.lock.lock();
try {
this.streamConnectionIsConnected = false;
this.telemetryMetrics.instrumentConnectionEndCount.add(
1,
augmentAttributesWithErrorCode(
getTelemetryAttributes(),
Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString()));
if (connectionFinalStatus == null) {
if (connectionRetryStartTime == 0) {
connectionRetryStartTime = System.currentTimeMillis();
Expand All @@ -1327,6 +1444,8 @@ private void doneCallback(Throwable finalStatus) {
|| System.currentTimeMillis() - connectionRetryStartTime
<= maxRetryDuration.toMillis())) {
this.conectionRetryCountWithoutCallback++;
this.telemetryMetrics.instrumentConnectionStartCount.add(
1, augmentAttributesWithRetry(getTelemetryAttributes()));
log.info(
"Connection is going to be reestablished with the next request. Retriable error "
+ finalStatus.toString()
Expand Down

0 comments on commit be05595

Please sign in to comment.