Skip to content

Commit

Permalink
chore(bigtable): more text proxy tune-up (#6977)
Browse files Browse the repository at this point in the history
* chore(bigtable): more text proxy tune-up
  • Loading branch information
telpirion authored Nov 2, 2022
1 parent e7ab70b commit d82ef41
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 57 deletions.
141 changes: 88 additions & 53 deletions bigtable/internal/testproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"context"
"crypto/x509"
"errors"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -346,6 +347,27 @@ func statusFromError(err error) *statpb.Status {
return st
}

// parseTableID extracts a table ID from a table name.
// For example, a table ID is in the format projects/<project>/instances/<instance>/tables/<tableID>
//
// Note that this function does not check all variants and edge cases. It assumes
// that the test suite used with the test proxy sends *generally* correct requests.
func parseTableID(tableName string) (tableID string, _ error) {
paths := strings.Split(tableName, "/")

if len(paths) < 6 {
return "", errors.New("table resource name does not have the correct format")
}

tableID = paths[len(paths)-1]
var err error
if tableID == "" {
err = errors.New("cannot read tableID from table name")
}

return tableID, err
}

// testClient contains a bigtable.Client object, cancel functions for the calls
// made using the client, an appProfileID (optionally), and a
// perOperationTimeout (optionally).
Expand Down Expand Up @@ -507,15 +529,15 @@ type goTestProxyServer struct {

// client retrieves a testClient from the clientIDs map. You must lock clientsLock before calling
// this method.
func (s *goTestProxyServer) client(clientID string) (*testClient, bool) {
func (s *goTestProxyServer) client(clientID string) (*testClient, error) {
client, ok := s.clientIDs[clientID]
if !ok {
return nil, false
return nil, fmt.Errorf("client ID %s does not exist", clientID)
}
if !client.isOpen {
return nil, false
return nil, fmt.Errorf("client ID %s is closed to new requests", clientID)
}
return client, true
return client, nil
}

// CreateClient responds to the CreateClient RPC. This method adds a new client
Expand Down Expand Up @@ -571,10 +593,9 @@ func (s *goTestProxyServer) CloseClient(ctx context.Context, req *pb.CloseClient
s.clientsLock.Lock()
defer s.clientsLock.Unlock()

btc, exists := s.client(clientID)
if !exists {
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
btc, err := s.client(clientID)
if err != nil {
return nil, err
}
btc.isOpen = false

Expand Down Expand Up @@ -608,16 +629,17 @@ func (s *goTestProxyServer) RemoveClient(ctx context.Context, req *pb.RemoveClie
// data for a single row in the Table.
func (s *goTestProxyServer) ReadRow(ctx context.Context, req *pb.ReadRowRequest) (*pb.RowResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
if err != nil {
return nil, err
}
s.clientsLock.RUnlock()

if !exists {
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
tid, err := parseTableID(req.TableName)
if err != nil {
return nil, err
}

tName := req.TableName
t := btc.c.Open(tName)
t := btc.c.Open(tid)

res := &pb.RowResult{
Status: &statpb.Status{
Expand Down Expand Up @@ -652,13 +674,11 @@ func (s *goTestProxyServer) ReadRow(ctx context.Context, req *pb.ReadRowRequest)
// data for a set of rows, a range of rows, or the entire table.
func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsRequest) (*pb.RowsResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
log.Printf("bad client ID: %v\n", req.ClientId)
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand All @@ -669,7 +689,11 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques

}

t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)

rowPbs := rrq.Rows
rs := rowSetFromProto(rowPbs)
Expand All @@ -687,7 +711,7 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques
var c int32
var rowsPb []*btpb.Row
lim := req.GetCancelAfterRows()
err := t.ReadRows(ctx, rs, func(r bigtable.Row) bool {
err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool {

c++
if c == lim {
Expand Down Expand Up @@ -728,12 +752,11 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques
// changes (or deletions) to a single row in a table.
func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequest) (*pb.MutateRowResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand All @@ -744,7 +767,11 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ
mPbs := rrq.Mutations
m := mutationFromProto(mPbs)

t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
row := rrq.RowKey

res := &pb.MutateRowResult{
Expand All @@ -756,7 +783,7 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ
ctx, cancel := btc.timeout(ctx)
defer cancel()

err := t.Apply(ctx, string(row), m)
err = t.Apply(ctx, string(row), m)
if err != nil {
res.Status = statusFromError(err)
return res, nil
Expand All @@ -769,13 +796,11 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ
// series of changes or deletions to multiple rows in a single call.
func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRowsRequest) (*pb.MutateRowsResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
log.Printf("received invalid client ID: %s\n", req.ClientId)
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand All @@ -785,7 +810,11 @@ func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRo
}

mrs := rrq.Entries
t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)

keys := make([]string, len(mrs))
muts := make([]*bigtable.Mutation, len(mrs))
Expand Down Expand Up @@ -840,13 +869,11 @@ func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRo
// one mutation if a condition is true and another mutation if it is false.
func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.CheckAndMutateRowRequest) (*pb.CheckAndMutateRowResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
log.Printf("received invalid ClientID: %s\n", req.ClientId)
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand All @@ -873,7 +900,11 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check
},
}

t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
rowKey := string(rrq.RowKey)

var matched bool
Expand All @@ -882,7 +913,7 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check
ctx, cancel := btc.timeout(ctx)
defer cancel()

err := t.Apply(ctx, rowKey, c, ao)
err = t.Apply(ctx, rowKey, c, ao)
if err != nil {
log.Printf("received error from Table.Apply: %v", err)
res.Status = statusFromError(err)
Expand All @@ -900,13 +931,11 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check
// of the keys available in a table.
func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRowKeysRequest) (*pb.SampleRowKeysResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
log.Printf("received invalid client ID: %s\n", req.ClientId)
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand All @@ -924,7 +953,11 @@ func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRow
ctx, cancel := btc.timeout(ctx)
defer cancel()

t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
keys, err := t.SampleRowKeys(ctx)
if err != nil {
log.Printf("received error from Table.SampleRowKeys(): %v\n", err)
Expand All @@ -949,13 +982,11 @@ func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRow
// applies a non-idempotent change to a row.
func (s *goTestProxyServer) ReadModifyWriteRow(ctx context.Context, req *pb.ReadModifyWriteRowRequest) (*pb.RowResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
log.Printf("received invalid client ID: %s\n", req.ClientId)
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand Down Expand Up @@ -984,7 +1015,11 @@ func (s *goTestProxyServer) ReadModifyWriteRow(ctx context.Context, req *pb.Read
},
}

t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
k := string(rrq.RowKey)

ctx, cancel := btc.timeout(ctx)
Expand Down
9 changes: 5 additions & 4 deletions bigtable/internal/testproxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import (

const (
buffer = 1024 * 1024
tableName = "table"
tableName = "projects/my-project/instances/my-instance/tables/table"
tableID = "table"
columnFamily = "cf"
testProxyClient = "testProxyClient"
testProxyAddress = "localhost:9990"
Expand Down Expand Up @@ -70,15 +71,15 @@ func populateTable(bts *bttest.Server) error {
}
defer adminClient.Close()

if err := adminClient.CreateTable(ctx, tableName); err != nil {
if err := adminClient.CreateTable(ctx, tableID); err != nil {
return fmt.Errorf("testproxy setup: can't create table: %v", err)
}

// Create column families (3 is an arbitrarily sufficient number)
count := 3
for i := 0; i < count; i++ {
cfName := fmt.Sprintf("%s%d", columnFamily, i)
if err := adminClient.CreateColumnFamily(ctx, tableName, cfName); err != nil {
if err := adminClient.CreateColumnFamily(ctx, tableID, cfName); err != nil {
return fmt.Errorf("testproxy setup: can't create column family: %s", cfName)
}
}
Expand All @@ -90,7 +91,7 @@ func populateTable(bts *bttest.Server) error {
}
defer dataClient.Close()

t := dataClient.Open(tableName)
t := dataClient.Open(tableID)

for fc := 0; fc < count; fc++ {
for cc := count; cc > 0; cc-- {
Expand Down

0 comments on commit d82ef41

Please sign in to comment.