Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
queryExpiredViolationType = "PREPARED_QUERY_EXPIRED"
preparedQueryExpireEarlyDuration = time.Second
methodNameReadRows = "ReadRows"

// For routing cookie
cookiePrefix = "x-goog-cbt-cookie-"
)

var errNegativeRowLimit = errors.New("bigtable: row limit cannot be negative")
Expand Down Expand Up @@ -398,6 +401,7 @@ type Table struct {
// and enabled on the client
func (c *Client) newFeatureFlags() metadata.MD {
ff := btpb.FeatureFlags{
RoutingCookie: true,
ReverseScans: true,
LastScannedRowResponses: true,
ClientSideMetricsEnabled: c.metricsTracerFactory.enabled,
Expand Down Expand Up @@ -2281,15 +2285,37 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method
mt.setMethod(method)

callWrapper := func(ctx context.Context, callSettings gax.CallSettings) error {
op := &mt.currOp
Comment thread
mutianf marked this conversation as resolved.
// Inject cookie and attempt information
md := metadata.New(nil)
for k, v := range op.cookies {
md.Append(k, v)
}

existingMD, _ := metadata.FromOutgoingContext(ctx)
finalMD := metadata.Join(existingMD, md)
Comment thread
mutianf marked this conversation as resolved.
newCtx := metadata.NewOutgoingContext(ctx, finalMD)

mt.recordAttemptStart()

// f makes calls to CBT service
err := f(ctx, &attemptHeaderMD, &attempTrailerMD, callSettings)
err := f(newCtx, &attemptHeaderMD, &attempTrailerMD, callSettings)

// Record attempt specific metrics
mt.recordAttemptCompletion(attemptHeaderMD, attempTrailerMD, err)

extractCookies(attemptHeaderMD, op)
extractCookies(attempTrailerMD, op)
return err
}

return gax.Invoke(ctx, callWrapper, opts...)
}

func extractCookies(md metadata.MD, op *opTracer) {
for k, v := range md {
if strings.HasPrefix(k, cookiePrefix) {
op.cookies[k] = v[len(v)-1]
}
}
}
7 changes: 1 addition & 6 deletions bigtable/internal/testproxy/known_failures.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1 @@
TestMutateRows_Retry_WithRoutingCookie\|
TestReadRow_Retry_WithRoutingCookie\|
TestReadRows_Retry_WithRoutingCookie\|
TestReadRows_Retry_WithRoutingCookie_MultipleErrorResponses\|
TestReadRows_Retry_WithRetryInfo_MultipleErrorResponse\|
TestSampleRowKeys_Retry_WithRoutingCookie
TestReadRows_Retry_WithRetryInfo_MultipleErrorResponse
17 changes: 13 additions & 4 deletions bigtable/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,9 @@ type opTracer struct {
currAttempt attemptTracer

appBlockingLatency float64

// For routing cookie and gRPC attempt number
cookies map[string]string
}

func (o *opTracer) setStartTime(t time.Time) {
Expand Down Expand Up @@ -524,14 +527,20 @@ func (a *attemptTracer) setServerLatencyErr(err error) {
}

func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Context, tableName string, isStreaming bool) builtinMetricsTracer {
if !tf.enabled {
return builtinMetricsTracer{builtInEnabled: false}
}
// Operation has started but not the attempt.
// So, create only operation tracer and not attempt tracer
currOpTracer := opTracer{}
currOpTracer := opTracer{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think metrics is the right place for this feature (like I mentioned in the other comment) because this is retry related and doesn't really have anything to do with metrics?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not want to add another tracer.

I can create a follow up PR to this one. Renaming metricsTracer to Tracer and file to tracer.go

cookies: make(map[string]string),
}
currOpTracer.setStartTime(time.Now())

if !tf.enabled {
return builtinMetricsTracer{
builtInEnabled: false,
currOp: currOpTracer,
}
}

return builtinMetricsTracer{
ctx: ctx,
builtInEnabled: tf.enabled,
Expand Down
Loading