diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 76da487497bd..3eb40f97fa3a 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -55,6 +55,9 @@ const ( queryExpiredViolationType = "PREPARED_QUERY_EXPIRED" preparedQueryExpireEarlyDuration = time.Second methodNameReadRows = "ReadRows" + + // For routing cookie + cookiePrefix = "x-goog-cbt-cookie-" ) var errNegativeRowLimit = errors.New("bigtable: row limit cannot be negative") @@ -398,6 +401,7 @@ type Table struct { // and enabled on the client func (c *Client) newFeatureFlags() metadata.MD { ff := btpb.FeatureFlags{ + RoutingCookie: true, ReverseScans: true, LastScannedRowResponses: true, ClientSideMetricsEnabled: c.metricsTracerFactory.enabled, @@ -2281,15 +2285,37 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method mt.setMethod(method) callWrapper := func(ctx context.Context, callSettings gax.CallSettings) error { + op := &mt.currOp + // Inject cookie and attempt information + md := metadata.New(nil) + for k, v := range op.cookies { + md.Append(k, v) + } + + existingMD, _ := metadata.FromOutgoingContext(ctx) + finalMD := metadata.Join(existingMD, md) + newCtx := metadata.NewOutgoingContext(ctx, finalMD) + mt.recordAttemptStart() // f makes calls to CBT service - err := f(ctx, &attemptHeaderMD, &attempTrailerMD, callSettings) + err := f(newCtx, &attemptHeaderMD, &attempTrailerMD, callSettings) // Record attempt specific metrics mt.recordAttemptCompletion(attemptHeaderMD, attempTrailerMD, err) + + extractCookies(attemptHeaderMD, op) + extractCookies(attempTrailerMD, op) return err } return gax.Invoke(ctx, callWrapper, opts...) } + +func extractCookies(md metadata.MD, op *opTracer) { + for k, v := range md { + if strings.HasPrefix(k, cookiePrefix) { + op.cookies[k] = v[len(v)-1] + } + } +} diff --git a/bigtable/internal/testproxy/known_failures.txt b/bigtable/internal/testproxy/known_failures.txt index 6112960b2888..7a66a5c8f75b 100644 --- a/bigtable/internal/testproxy/known_failures.txt +++ b/bigtable/internal/testproxy/known_failures.txt @@ -1,6 +1 @@ -TestMutateRows_Retry_WithRoutingCookie\| -TestReadRow_Retry_WithRoutingCookie\| -TestReadRows_Retry_WithRoutingCookie\| -TestReadRows_Retry_WithRoutingCookie_MultipleErrorResponses\| -TestReadRows_Retry_WithRetryInfo_MultipleErrorResponse\| -TestSampleRowKeys_Retry_WithRoutingCookie \ No newline at end of file +TestReadRows_Retry_WithRetryInfo_MultipleErrorResponse \ No newline at end of file diff --git a/bigtable/metrics.go b/bigtable/metrics.go index 2082cc8f05e2..738dc6372d20 100644 --- a/bigtable/metrics.go +++ b/bigtable/metrics.go @@ -457,6 +457,9 @@ type opTracer struct { currAttempt attemptTracer appBlockingLatency float64 + + // For routing cookie and gRPC attempt number + cookies map[string]string } func (o *opTracer) setStartTime(t time.Time) { @@ -524,14 +527,20 @@ func (a *attemptTracer) setServerLatencyErr(err error) { } func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Context, tableName string, isStreaming bool) builtinMetricsTracer { - if !tf.enabled { - return builtinMetricsTracer{builtInEnabled: false} - } // Operation has started but not the attempt. // So, create only operation tracer and not attempt tracer - currOpTracer := opTracer{} + currOpTracer := opTracer{ + cookies: make(map[string]string), + } currOpTracer.setStartTime(time.Now()) + if !tf.enabled { + return builtinMetricsTracer{ + builtInEnabled: false, + currOp: currOpTracer, + } + } + return builtinMetricsTracer{ ctx: ctx, builtInEnabled: tf.enabled,