Skip to content

Commit

Permalink
feat(bigtable): Add client side metrics to feature flag (#10678)
Browse files Browse the repository at this point in the history
* feat(bigtable): Add client side metrics to feature flag

* refactor(bigtable): Refactoring code

* feat(bigtable): Removing exported feature flags option

* feat(bigtable): Fixng tests
  • Loading branch information
bhshkh authored Sep 5, 2024
1 parent 0d732cc commit 02b2d12
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 31 deletions.
27 changes: 24 additions & 3 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bigtable // import "cloud.google.com/go/bigtable"

import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (

const prodAddr = "bigtable.googleapis.com:443"
const mtlsProdAddr = "bigtable.mtls.googleapis.com:443"
const featureFlagsHeaderKey = "bigtable-features"

// Client is a client for reading and writing data to tables in an instance.
//
Expand Down Expand Up @@ -267,6 +269,25 @@ type Table struct {
authorizedView string
}

// newFeatureFlags creates the feature flags `bigtable-features` header
// to be sent on each request. This includes all features supported and
// and enabled on the client
func (c *Client) newFeatureFlags() metadata.MD {
ff := btpb.FeatureFlags{
ReverseScans: true,
LastScannedRowResponses: true,
ClientSideMetricsEnabled: c.metricsTracerFactory.enabled,
}

val := ""
b, err := proto.Marshal(&ff)
if err == nil {
val = base64.URLEncoding.EncodeToString(b)
}

return metadata.Pairs(featureFlagsHeaderKey, val)
}

// Open opens a table.
func (c *Client) Open(table string) *Table {
return &Table{
Expand All @@ -275,7 +296,7 @@ func (c *Client) Open(table string) *Table {
md: metadata.Join(metadata.Pairs(
resourcePrefixHeader, c.fullTableName(table),
requestParamsHeader, c.requestParamsHeaderValue(table),
), btopt.WithFeatureFlags()),
), c.newFeatureFlags()),
}
}

Expand All @@ -287,7 +308,7 @@ func (c *Client) OpenTable(table string) TableAPI {
md: metadata.Join(metadata.Pairs(
resourcePrefixHeader, c.fullTableName(table),
requestParamsHeader, c.requestParamsHeaderValue(table),
), btopt.WithFeatureFlags()),
), c.newFeatureFlags()),
}}
}

Expand All @@ -299,7 +320,7 @@ func (c *Client) OpenAuthorizedView(table, authorizedView string) TableAPI {
md: metadata.Join(metadata.Pairs(
resourcePrefixHeader, c.fullAuthorizedViewName(table, authorizedView),
requestParamsHeader, c.requestParamsHeaderValue(table),
), btopt.WithFeatureFlags()),
), c.newFeatureFlags()),
authorizedView: authorizedView,
}}
}
Expand Down
22 changes: 19 additions & 3 deletions bigtable/bttest/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package bttest
import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"fmt"
"math"
Expand All @@ -32,7 +33,6 @@ import (

btapb "cloud.google.com/go/bigtable/admin/apiv2/adminpb"
btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
"cloud.google.com/go/bigtable/internal/option"
"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
Expand Down Expand Up @@ -801,6 +801,22 @@ func TestReadRows(t *testing.T) {
}
}

// withFeatureFlags set the feature flags the client supports in the
// `bigtable-features` header sent on each request.
func withFeatureFlags() metadata.MD {
ffStr := ""
ff := btpb.FeatureFlags{
ReverseScans: true,
LastScannedRowResponses: true,
ClientSideMetricsEnabled: false, // Not suppported in emulator
}
b, err := proto.Marshal(&ff)
if err == nil {
ffStr = base64.URLEncoding.EncodeToString(b)
}
return metadata.Pairs("bigtable-features", ffStr)
}

func TestReadRowsLastScannedRow(t *testing.T) {
ctx := context.Background()
s := &server{
Expand Down Expand Up @@ -840,7 +856,7 @@ func TestReadRowsLastScannedRow(t *testing.T) {
EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("s")},
}}},
} {
featureFlags := option.WithFeatureFlags()
featureFlags := withFeatureFlags()
ctx := metadata.NewIncomingContext(context.Background(), featureFlags)

mock := &MockReadRowsServer{ctx: ctx}
Expand Down Expand Up @@ -1246,7 +1262,7 @@ func TestReadRowsReversed(t *testing.T) {
}
}

serverCtx := metadata.NewIncomingContext(context.Background(), option.WithFeatureFlags())
serverCtx := metadata.NewIncomingContext(context.Background(), withFeatureFlags())
rrss := &MockReadRowsServer{ctx: serverCtx}
rreq := &btpb.ReadRowsRequest{TableName: tbl.Name, Reversed: true}
if err := srv.ReadRows(rreq, rrss); err != nil {
Expand Down
23 changes: 0 additions & 23 deletions bigtable/internal/option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@ package option

import (
"context"
"encoding/base64"
"fmt"
"os"

btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
"google.golang.org/protobuf/proto"

"cloud.google.com/go/bigtable/internal"
"cloud.google.com/go/internal/version"
gax "github.com/googleapis/gax-go/v2"
Expand Down Expand Up @@ -66,25 +62,6 @@ func withGoogleClientInfo() metadata.MD {
return metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...))
}

func makeFeatureFlags() string {
ff := btpb.FeatureFlags{ReverseScans: true, LastScannedRowResponses: true}
b, err := proto.Marshal(&ff)
if err != nil {
return ""
}

return base64.URLEncoding.EncodeToString(b)
}

var featureFlags = makeFeatureFlags()

// WithFeatureFlags set the feature flags the client supports in the
// `bigtable-features` header sent on each request. Intended for
// use by Google-written clients.
func WithFeatureFlags() metadata.MD {
return metadata.Pairs("bigtable-features", featureFlags)
}

// streamInterceptor intercepts the creation of ClientStream within the bigtable
// client to inject Google client information into the context metadata for
// streaming RPCs.
Expand Down
33 changes: 31 additions & 2 deletions bigtable/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bigtable

import (
"context"
"encoding/base64"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -127,13 +128,18 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {

// Setup fake Bigtable server
isFirstAttempt := true
headerAndErrorInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
receivedHeader := metadata.MD{}
serverStreamInterceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Capture incoming metadata
receivedHeader, _ = metadata.FromIncomingContext(ss.Context())
if strings.HasSuffix(info.FullMethod, "ReadRows") {
if isFirstAttempt {
// Fail first attempt
isFirstAttempt = false
return status.Error(codes.Unavailable, "Mock Unavailable error")
}

// Send server headers
header := metadata.New(map[string]string{
serverTimingMDKey: "gfet4t7; dur=123",
locationMDKey: string(testHeaders),
Expand Down Expand Up @@ -174,7 +180,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
}

// open table and compare errors
tbl, cleanup, gotErr := setupFakeServer(project, instance, test.config, grpc.StreamInterceptor(headerAndErrorInjector))
tbl, cleanup, gotErr := setupFakeServer(project, instance, test.config, grpc.StreamInterceptor(serverStreamInterceptor))
defer cleanup()
if gotErr != nil {
t.Fatalf("err: got: %v, want: %v", gotErr, nil)
Expand Down Expand Up @@ -216,6 +222,29 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
t.Fatalf("ReadRows failed: %v", err)
}

// Check feature flags
ffStrs := receivedHeader.Get(featureFlagsHeaderKey)
if len(ffStrs) < 1 {
t.Errorf("Feature flags not sent by client")
}
ffBytes, err := base64.URLEncoding.DecodeString(ffStrs[0])
if err != nil {
t.Errorf("Feature flags not encoded correctly: %v", err)
}
ff := &btpb.FeatureFlags{}
if err = proto.Unmarshal(ffBytes, ff); err != nil {
t.Errorf("Feature flags not marshalled correctly: %v", err)
}
if ff.ClientSideMetricsEnabled != test.wantBuiltinEnabled || !ff.LastScannedRowResponses || !ff.ReverseScans {
t.Errorf("Feature flags: ClientSideMetricsEnabled got: %v, want: %v\n"+
"LastScannedRowResponses got: %v, want: %v\n"+
"ReverseScans got: %v, want: %v\n",
ff.ClientSideMetricsEnabled, test.wantBuiltinEnabled,
ff.LastScannedRowResponses, true,
ff.ReverseScans, true,
)
}

// Calculate elapsed time
elapsedTime := time.Since(testStartTime)
if elapsedTime < 3*defaultSamplePeriod {
Expand Down

0 comments on commit 02b2d12

Please sign in to comment.