Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (c *Client) prepareStatementWithMetadata(ctx context.Context, query string,

preparedStatement, err = c.prepareStatement(ctx, mt, query, paramTypes, opts...)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode.String())
mt.setCurrOpStatus(statusCode)
return preparedStatement, statusErr
}

Expand Down Expand Up @@ -741,7 +741,7 @@ func (bs *BoundStatement) Execute(ctx context.Context, f func(ResultRow) bool, o

err = bs.execute(ctx, f, mt)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode.String())
mt.setCurrOpStatus(statusCode)
return statusErr
}

Expand Down Expand Up @@ -1033,7 +1033,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts

err = t.readRows(ctx, arg, f, mt, opts...)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode.String())
mt.setCurrOpStatus(statusCode)
return statusErr
}

Expand Down Expand Up @@ -1713,7 +1713,7 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl

err = t.apply(ctx, mt, row, m, opts...)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode.String())
mt.setCurrOpStatus(statusCode)
return statusErr
}

Expand Down Expand Up @@ -2008,7 +2008,7 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply
}, t.c.retryOption)

statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode.String())
mt.setCurrOpStatus(statusCode)
return statusErr
}

Expand Down Expand Up @@ -2153,7 +2153,7 @@ func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadMod

updatedRow, err := t.applyReadModifyWrite(ctx, mt, row, m)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode.String())
mt.setCurrOpStatus(statusCode)
return updatedRow, statusErr
}

Expand Down Expand Up @@ -2234,7 +2234,7 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {

rowKeys, err := t.sampleRowKeys(ctx, mt)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode.String())
mt.setCurrOpStatus(statusCode)
return rowKeys, statusErr
}

Expand Down
14 changes: 12 additions & 2 deletions bigtable/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ import (
"sync/atomic"
"time"

"regexp"
"strings"

"cloud.google.com/go/bigtable/internal"
"github.com/google/uuid"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)
Expand Down Expand Up @@ -176,6 +180,8 @@ var (
}

sharedLatencyStatsHandler = &latencyStatsHandler{}

camel = regexp.MustCompile("([a-z0-9])([A-Z])")
)

type metricInfo struct {
Expand Down Expand Up @@ -727,12 +733,16 @@ func (mt *builtinMetricsTracer) recordOperationCompletion() {
mt.instrumentAppBlockingLatencies.Record(mt.ctx, mt.currOp.appBlockingLatency, metric.WithAttributeSet(appBlockingLatAttrs))
}

func (mt *builtinMetricsTracer) setCurrOpStatus(status string) {
func (mt *builtinMetricsTracer) setCurrOpStatus(code codes.Code) {
if !mt.builtInEnabled {
return
}

mt.currOp.setStatus(status)
mt.currOp.setStatus(canonicalString(code))
}

func canonicalString(c codes.Code) string {
return strings.ToUpper(camel.ReplaceAllString(c.String(), "${1}_${2}"))
}

func (mt *builtinMetricsTracer) incrementAppBlockingLatency(latency float64) {
Expand Down
29 changes: 26 additions & 3 deletions bigtable/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ func TestToOtelMetricAttrs(t *testing.T) {
method: "ReadRows",
isStreaming: true,
currOp: opTracer{
status: codes.OK.String(),
status: canonicalString(codes.OK),
currAttempt: attemptTracer{
startTime: time.Now(),
clusterID: "my-cluster",
Expand All @@ -755,7 +755,7 @@ func TestToOtelMetricAttrs(t *testing.T) {
attribute.String(monitoredResLabelKeyTable, "my-table"),
attribute.String(metricLabelKeyMethod, "ReadRows"),
attribute.Bool(metricLabelKeyStreamingOperation, true),
attribute.String(metricLabelKeyStatus, codes.OK.String()),
attribute.String(metricLabelKeyStatus, canonicalString(codes.OK)),
attribute.String(monitoredResLabelKeyCluster, clusterID1),
attribute.String(monitoredResLabelKeyZone, zoneID1),
},
Expand Down Expand Up @@ -936,6 +936,11 @@ func TestFirstResponseLatencyWithDelayedStream(t *testing.T) {
// Metric does not match target client UID. Skipping
continue
}

wantStatus := canonicalString(codes.OK)
if strings.Contains(metricType, metricNameFirstRespLatencies) && ts.GetMetric().GetLabels()[metricLabelKeyStatus] != wantStatus {
t.Errorf("Incorrect status: got: %v, want: %v", ts.GetMetric().GetLabels()[metricLabelKeyStatus], wantStatus)
}
// If we reach here, the metric belongs to our test client instance
foundMetricsForClientUID = append(foundMetricsForClientUID, metricType)

Expand Down Expand Up @@ -1142,7 +1147,6 @@ func TestApplicationLatencies(t *testing.T) {
if _, exists := metricLabels[metricLabelKeyStreamingOperation]; exists {
t.Errorf("Label %s should not be present for %s", metricLabelKeyStreamingOperation, expectedMetricType)
}

resLabels := ts.GetResource().GetLabels()
if tblName, ok := resLabels[monitoredResLabelKeyTable]; (ok && tblName != tableID && tblName != "") || !ok {
t.Errorf("Label %s: got %q, want %q for resource %s", monitoredResLabelKeyTable, tblName, tableID, ts.GetResource())
Expand All @@ -1155,3 +1159,22 @@ func TestApplicationLatencies(t *testing.T) {
t.Errorf("Failed to find metric %s for client UID %s", expectedMetricType, clientUID)
}
}

func TestCanonicalString(t *testing.T) {
tests := []struct {
code codes.Code
want string
}{
{codes.OK, "OK"},
{codes.Canceled, "CANCELED"},
{codes.DeadlineExceeded, "DEADLINE_EXCEEDED"},
{codes.Code(100), "CODE(100)"},
}

for _, test := range tests {
got := canonicalString(test.code)
if got != test.want {
t.Errorf("canonicalString(%v) = %q, want %q", test.code, got, test.want)
}
}
}
Loading