diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 7a57b4137234..b5cbbf999703 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -18,6 +18,7 @@ package bigtable // import "cloud.google.com/go/bigtable" import ( "context" + "encoding/base64" "errors" "fmt" "io" @@ -49,6 +50,7 @@ import ( const prodAddr = "bigtable.googleapis.com:443" const mtlsProdAddr = "bigtable.mtls.googleapis.com:443" +const featureFlagsHeaderKey = "bigtable-features" // Client is a client for reading and writing data to tables in an instance. // @@ -267,6 +269,25 @@ type Table struct { authorizedView string } +// newFeatureFlags creates the feature flags `bigtable-features` header +// to be sent on each request. This includes all features supported and +// and enabled on the client +func (c *Client) newFeatureFlags() metadata.MD { + ff := btpb.FeatureFlags{ + ReverseScans: true, + LastScannedRowResponses: true, + ClientSideMetricsEnabled: c.metricsTracerFactory.enabled, + } + + val := "" + b, err := proto.Marshal(&ff) + if err == nil { + val = base64.URLEncoding.EncodeToString(b) + } + + return metadata.Pairs(featureFlagsHeaderKey, val) +} + // Open opens a table. func (c *Client) Open(table string) *Table { return &Table{ @@ -275,7 +296,7 @@ func (c *Client) Open(table string) *Table { md: metadata.Join(metadata.Pairs( resourcePrefixHeader, c.fullTableName(table), requestParamsHeader, c.requestParamsHeaderValue(table), - ), btopt.WithFeatureFlags()), + ), c.newFeatureFlags()), } } @@ -287,7 +308,7 @@ func (c *Client) OpenTable(table string) TableAPI { md: metadata.Join(metadata.Pairs( resourcePrefixHeader, c.fullTableName(table), requestParamsHeader, c.requestParamsHeaderValue(table), - ), btopt.WithFeatureFlags()), + ), c.newFeatureFlags()), }} } @@ -299,7 +320,7 @@ func (c *Client) OpenAuthorizedView(table, authorizedView string) TableAPI { md: metadata.Join(metadata.Pairs( resourcePrefixHeader, c.fullAuthorizedViewName(table, authorizedView), requestParamsHeader, c.requestParamsHeaderValue(table), - ), btopt.WithFeatureFlags()), + ), c.newFeatureFlags()), authorizedView: authorizedView, }} } diff --git a/bigtable/bttest/inmem_test.go b/bigtable/bttest/inmem_test.go index 055aeba089dc..6b1b34575ff1 100644 --- a/bigtable/bttest/inmem_test.go +++ b/bigtable/bttest/inmem_test.go @@ -17,6 +17,7 @@ package bttest import ( "bytes" "context" + "encoding/base64" "encoding/binary" "fmt" "math" @@ -32,7 +33,6 @@ import ( btapb "cloud.google.com/go/bigtable/admin/apiv2/adminpb" btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" - "cloud.google.com/go/bigtable/internal/option" "cloud.google.com/go/internal/testutil" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" @@ -801,6 +801,22 @@ func TestReadRows(t *testing.T) { } } +// withFeatureFlags set the feature flags the client supports in the +// `bigtable-features` header sent on each request. +func withFeatureFlags() metadata.MD { + ffStr := "" + ff := btpb.FeatureFlags{ + ReverseScans: true, + LastScannedRowResponses: true, + ClientSideMetricsEnabled: false, // Not suppported in emulator + } + b, err := proto.Marshal(&ff) + if err == nil { + ffStr = base64.URLEncoding.EncodeToString(b) + } + return metadata.Pairs("bigtable-features", ffStr) +} + func TestReadRowsLastScannedRow(t *testing.T) { ctx := context.Background() s := &server{ @@ -840,7 +856,7 @@ func TestReadRowsLastScannedRow(t *testing.T) { EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("s")}, }}}, } { - featureFlags := option.WithFeatureFlags() + featureFlags := withFeatureFlags() ctx := metadata.NewIncomingContext(context.Background(), featureFlags) mock := &MockReadRowsServer{ctx: ctx} @@ -1246,7 +1262,7 @@ func TestReadRowsReversed(t *testing.T) { } } - serverCtx := metadata.NewIncomingContext(context.Background(), option.WithFeatureFlags()) + serverCtx := metadata.NewIncomingContext(context.Background(), withFeatureFlags()) rrss := &MockReadRowsServer{ctx: serverCtx} rreq := &btpb.ReadRowsRequest{TableName: tbl.Name, Reversed: true} if err := srv.ReadRows(rreq, rrss); err != nil { diff --git a/bigtable/internal/option/option.go b/bigtable/internal/option/option.go index 5c15e6502fbb..d6c879266e24 100644 --- a/bigtable/internal/option/option.go +++ b/bigtable/internal/option/option.go @@ -19,13 +19,9 @@ package option import ( "context" - "encoding/base64" "fmt" "os" - btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" - "google.golang.org/protobuf/proto" - "cloud.google.com/go/bigtable/internal" "cloud.google.com/go/internal/version" gax "github.com/googleapis/gax-go/v2" @@ -66,25 +62,6 @@ func withGoogleClientInfo() metadata.MD { return metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...)) } -func makeFeatureFlags() string { - ff := btpb.FeatureFlags{ReverseScans: true, LastScannedRowResponses: true} - b, err := proto.Marshal(&ff) - if err != nil { - return "" - } - - return base64.URLEncoding.EncodeToString(b) -} - -var featureFlags = makeFeatureFlags() - -// WithFeatureFlags set the feature flags the client supports in the -// `bigtable-features` header sent on each request. Intended for -// use by Google-written clients. -func WithFeatureFlags() metadata.MD { - return metadata.Pairs("bigtable-features", featureFlags) -} - // streamInterceptor intercepts the creation of ClientStream within the bigtable // client to inject Google client information into the context metadata for // streaming RPCs. diff --git a/bigtable/metrics_test.go b/bigtable/metrics_test.go index 5547e7a5091f..cac887542715 100644 --- a/bigtable/metrics_test.go +++ b/bigtable/metrics_test.go @@ -18,6 +18,7 @@ package bigtable import ( "context" + "encoding/base64" "fmt" "sort" "strings" @@ -127,13 +128,18 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { // Setup fake Bigtable server isFirstAttempt := true - headerAndErrorInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + receivedHeader := metadata.MD{} + serverStreamInterceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // Capture incoming metadata + receivedHeader, _ = metadata.FromIncomingContext(ss.Context()) if strings.HasSuffix(info.FullMethod, "ReadRows") { if isFirstAttempt { // Fail first attempt isFirstAttempt = false return status.Error(codes.Unavailable, "Mock Unavailable error") } + + // Send server headers header := metadata.New(map[string]string{ serverTimingMDKey: "gfet4t7; dur=123", locationMDKey: string(testHeaders), @@ -174,7 +180,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { } // open table and compare errors - tbl, cleanup, gotErr := setupFakeServer(project, instance, test.config, grpc.StreamInterceptor(headerAndErrorInjector)) + tbl, cleanup, gotErr := setupFakeServer(project, instance, test.config, grpc.StreamInterceptor(serverStreamInterceptor)) defer cleanup() if gotErr != nil { t.Fatalf("err: got: %v, want: %v", gotErr, nil) @@ -216,6 +222,29 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { t.Fatalf("ReadRows failed: %v", err) } + // Check feature flags + ffStrs := receivedHeader.Get(featureFlagsHeaderKey) + if len(ffStrs) < 1 { + t.Errorf("Feature flags not sent by client") + } + ffBytes, err := base64.URLEncoding.DecodeString(ffStrs[0]) + if err != nil { + t.Errorf("Feature flags not encoded correctly: %v", err) + } + ff := &btpb.FeatureFlags{} + if err = proto.Unmarshal(ffBytes, ff); err != nil { + t.Errorf("Feature flags not marshalled correctly: %v", err) + } + if ff.ClientSideMetricsEnabled != test.wantBuiltinEnabled || !ff.LastScannedRowResponses || !ff.ReverseScans { + t.Errorf("Feature flags: ClientSideMetricsEnabled got: %v, want: %v\n"+ + "LastScannedRowResponses got: %v, want: %v\n"+ + "ReverseScans got: %v, want: %v\n", + ff.ClientSideMetricsEnabled, test.wantBuiltinEnabled, + ff.LastScannedRowResponses, true, + ff.ReverseScans, true, + ) + } + // Calculate elapsed time elapsedTime := time.Since(testStartTime) if elapsedTime < 3*defaultSamplePeriod {