Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions go/vt/vtadmin/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func New(ctx context.Context, cfg Config) (*Cluster, error) {
return nil, fmt.Errorf("error creating vtsql connection config: %w", err)
}

for _, opt := range cfg.vtsqlConfigOpts {
vtsqlCfg = opt(vtsqlCfg)
}

vtctldargs := buildPFlagSlice(cfg.VtctldFlags)

vtctldCfg, err := vtctldclient.Parse(protocluster, disco, vtctldargs)
Expand All @@ -116,7 +120,11 @@ func New(ctx context.Context, cfg Config) (*Cluster, error) {
vtctldCfg = opt(vtctldCfg)
}

cluster.DB = vtsql.New(vtsqlCfg)
cluster.DB, err = vtsql.New(ctx, vtsqlCfg)
if err != nil {
return nil, fmt.Errorf("error creating vtsql proxy: %w", err)
}

cluster.Vtctld, err = vtctldclient.New(ctx, vtctldCfg)
if err != nil {
return nil, fmt.Errorf("error creating vtctldclient: %w", err)
Expand Down Expand Up @@ -883,10 +891,6 @@ func (c *Cluster) GetTablets(ctx context.Context) ([]*vtadminpb.Tablet, error) {
}

func (c *Cluster) getTablets(ctx context.Context) ([]*vtadminpb.Tablet, error) {
if err := c.DB.Dial(ctx, ""); err != nil {
return nil, err
}

rows, err := c.DB.ShowTablets(ctx)
if err != nil {
return nil, err
Expand Down
31 changes: 0 additions & 31 deletions go/vt/vtadmin/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cluster_test

import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
Expand All @@ -33,14 +32,10 @@ import (
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vitessdriver"
"vitess.io/vitess/go/vt/vtadmin/cluster"
"vitess.io/vitess/go/vt/vtadmin/cluster/discovery/fakediscovery"
"vitess.io/vitess/go/vt/vtadmin/cluster/resolver"
vtadminerrors "vitess.io/vitess/go/vt/vtadmin/errors"
"vitess.io/vitess/go/vt/vtadmin/testutil"
"vitess.io/vitess/go/vt/vtadmin/vtctldclient/fakevtctldclient"
"vitess.io/vitess/go/vt/vtadmin/vtsql"
"vitess.io/vitess/go/vt/vtctl/vtctldclient"

replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
Expand Down Expand Up @@ -2596,32 +2591,6 @@ func TestGetShardReplicationPositions(t *testing.T) {
}
}

// This test only validates the error handling on dialing database connections.
// Other cases are covered by one or both of TestFindTablets and TestFindTablet.
func TestGetTablets(t *testing.T) {
t.Parallel()

disco := fakediscovery.New()
disco.AddTaggedGates(nil, &vtadminpb.VTGate{Hostname: "gate"})

db := vtsql.New(&vtsql.Config{
Cluster: &vtadminpb.Cluster{
Id: "c1",
Name: "one",
},
ResolverOptions: &resolver.Options{
Discovery: disco,
},
})
db.DialFunc = func(cfg vitessdriver.Configuration) (*sql.DB, error) {
return nil, assert.AnError
}

c := &cluster.Cluster{DB: db}
_, err := c.GetTablets(context.Background())
assert.Error(t, err)
}

func TestGetVSchema(t *testing.T) {
t.Parallel()

Expand Down
12 changes: 12 additions & 0 deletions go/vt/vtadmin/cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtadmin/errors"
"vitess.io/vitess/go/vt/vtadmin/vtctldclient"
"vitess.io/vitess/go/vt/vtadmin/vtsql"
)

var (
Expand Down Expand Up @@ -65,6 +66,7 @@ type Config struct {
WorkflowReadPoolConfig *RPCPoolConfig

vtctldConfigOpts []vtctldclient.ConfigOption
vtsqlConfigOpts []vtsql.ConfigOption
}

// Cluster returns a new cluster instance from the given config.
Expand Down Expand Up @@ -369,3 +371,13 @@ func (cfg Config) WithVtctldTestConfigOptions(opts ...vtctldclient.ConfigOption)
cfg.vtctldConfigOpts = append(cfg.vtctldConfigOpts, opts...)
return cfg
}

// WithVtSQLTestConfigOptions returns a new Config with the given vtsql
// ConfigOptions appended to any existing ConfigOptions in the current Config.
//
// It should be used in tests only, and is exported to for use in the
// vtadmin/testutil package.
func (cfg Config) WithVtSQLTestConfigOptions(opts ...vtsql.ConfigOption) Config {
cfg.vtsqlConfigOpts = append(cfg.vtsqlConfigOpts, opts...)
return cfg
}
29 changes: 13 additions & 16 deletions go/vt/vtadmin/testutil/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,25 @@ func BuildCluster(t testing.TB, cfg TestClusterConfig) *cluster.Cluster {
disco.AddTaggedGates(nil, &vtadminpb.VTGate{Hostname: fmt.Sprintf("%s-%s-gate", cfg.Cluster.Name, cfg.Cluster.Id)})
disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{Hostname: "doesn't matter"})

tablets := make([]*vtadminpb.Tablet, len(cfg.Tablets))
for i, t := range cfg.Tablets {
tablet := &vtadminpb.Tablet{
Cluster: cfg.Cluster,
Tablet: t.Tablet,
State: t.State,
}

tablets[i] = tablet
}

clusterConf := cluster.Config{
ID: cfg.Cluster.Id,
Name: cfg.Cluster.Name,
DiscoveryImpl: discoveryTestImplName,
}.WithVtctldTestConfigOptions(vtadminvtctldclient.WithDialFunc(func(addr string, ff grpcclient.FailFast, opts ...grpc.DialOption) (vtctldclient.VtctldClient, error) {
return cfg.VtctldClient, nil
})).WithVtSQLTestConfigOptions(vtsql.WithDialFunc(func(c vitessdriver.Configuration) (*sql.DB, error) {
return sql.OpenDB(&fakevtsql.Connector{Tablets: tablets, ShouldErr: cfg.DBConfig.ShouldErr}), nil
}))

m.Lock()
Expand All @@ -113,22 +126,6 @@ func BuildCluster(t testing.TB, cfg TestClusterConfig) *cluster.Cluster {

require.NoError(t, err, "failed to create cluster from configs %+v %+v", clusterConf, cfg)

tablets := make([]*vtadminpb.Tablet, len(cfg.Tablets))
for i, t := range cfg.Tablets {
tablet := &vtadminpb.Tablet{
Cluster: cfg.Cluster,
Tablet: t.Tablet,
State: t.State,
}

tablets[i] = tablet
}

db := c.DB.(*vtsql.VTGateProxy)
db.DialFunc = func(_ vitessdriver.Configuration) (*sql.DB, error) {
return sql.OpenDB(&fakevtsql.Connector{Tablets: tablets, ShouldErr: cfg.DBConfig.ShouldErr}), nil
}

return c
}

Expand Down
20 changes: 20 additions & 0 deletions go/vt/vtadmin/vtsql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package vtsql

import (
"database/sql"
"fmt"

"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/vitessdriver"
"vitess.io/vitess/go/vt/vtadmin/cluster/discovery"
"vitess.io/vitess/go/vt/vtadmin/cluster/resolver"
"vitess.io/vitess/go/vt/vtadmin/credentials"
Expand All @@ -39,6 +41,24 @@ type Config struct {

Cluster *vtadminpb.Cluster
ResolverOptions *resolver.Options

dialFunc func(c vitessdriver.Configuration) (*sql.DB, error)
}

// ConfigOption is a function that mutates a Config. It should return the same
// Config structure, in a builder-pattern style.
type ConfigOption func(cfg *Config) *Config

// WithDialFunc returns a ConfigOption that applies the given dial function to
// a Config.
//
// It is used to support dependency injection in tests, and needs to be exported
// for higher-level tests (for example, package vtadmin/cluster).
func WithDialFunc(f func(c vitessdriver.Configuration) (*sql.DB, error)) ConfigOption {
return func(cfg *Config) *Config {
cfg.dialFunc = f
return cfg
}
}

// Parse returns a new config with the given cluster ID and name, after
Expand Down
63 changes: 28 additions & 35 deletions go/vt/vtadmin/vtsql/vtsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,15 @@ type DB interface {
// ShowTablets executes `SHOW vitess_tablets` and returns the result.
ShowTablets(ctx context.Context) (*sql.Rows, error)

// Dial opens a gRPC database connection to a vtgate in the cluster. If the
// DB already has a valid connection, this is a no-op.
//
// target is a Vitess query target, e.g. "", "<keyspace>", "<keyspace>@replica".
Dial(ctx context.Context, target string, opts ...grpc.DialOption) error

// Ping behaves like (*sql.DB).Ping.
Ping() error
// PingContext behaves like (*sql.DB).PingContext.
PingContext(ctx context.Context) error

// Close closes the currently-held database connection. This is a no-op if
// Close closes the underlying database connection. This is a no-op if
// the DB has no current valid connection. It is safe to call repeatedly.
// Users may call Dial on a previously-closed DB to create a new connection,
// but that connection may not be to the same particular vtgate.
//
// Once closed, a DB is not safe for reuse.
Close() error
}

Expand All @@ -72,7 +66,7 @@ type VTGateProxy struct {
// DialFunc is called to open a new database connection. In production this
// should always be vitessdriver.OpenWithConfiguration, but it is exported
// for testing purposes.
DialFunc func(cfg vitessdriver.Configuration) (*sql.DB, error)
dialFunc func(cfg vitessdriver.Configuration) (*sql.DB, error)
resolver grpcresolver.Builder

m sync.Mutex
Expand All @@ -92,14 +86,25 @@ var ErrConnClosed = errors.New("use of closed connection")
//
// It does not open a connection to a vtgate; users must call Dial before first
// use.
func New(cfg *Config) *VTGateProxy {
return &VTGateProxy{
func New(ctx context.Context, cfg *Config) (*VTGateProxy, error) {
dialFunc := cfg.dialFunc
if dialFunc == nil {
dialFunc = vitessdriver.OpenWithConfiguration
}

proxy := VTGateProxy{
cluster: cfg.Cluster,
creds: cfg.Credentials,
cfg: cfg,
DialFunc: vitessdriver.OpenWithConfiguration,
dialFunc: dialFunc,
resolver: cfg.ResolverOptions.NewBuilder(cfg.Cluster.Id),
}

if err := proxy.dial(ctx, ""); err != nil {
return nil, err
}

return &proxy, nil
}

// getQueryContext returns a new context with the correct effective and immediate
Expand All @@ -123,23 +128,12 @@ func (vtgate *VTGateProxy) getQueryContext(ctx context.Context) context.Context

// Dial is part of the DB interface. The proxy's DiscoveryTags can be set to
// narrow the set of possible gates it will connect to.
func (vtgate *VTGateProxy) Dial(ctx context.Context, target string, opts ...grpc.DialOption) error {
func (vtgate *VTGateProxy) dial(ctx context.Context, target string, opts ...grpc.DialOption) error {
span, _ := trace.NewSpan(ctx, "VTGateProxy.Dial")
defer span.Finish()

vtadminproto.AnnotateClusterSpan(vtgate.cluster, span)

vtgate.m.Lock()
defer vtgate.m.Unlock()

if vtgate.conn != nil {
log.Info("Have valid connection to vtgate, reusing it.")
span.Annotate("is_noop", true)

return nil
}

span.Annotate("is_noop", false)
span.Annotate("is_using_credentials", vtgate.creds != nil)

conf := vitessdriver.Configuration{
Protocol: fmt.Sprintf("grpc_%s", vtgate.cluster.Id),
Expand All @@ -154,11 +148,16 @@ func (vtgate *VTGateProxy) Dial(ctx context.Context, target string, opts ...grpc
}, conf.GRPCDialOptions...)
}

db, err := vtgate.DialFunc(conf)
db, err := vtgate.dialFunc(conf)
if err != nil {
return fmt.Errorf("error dialing vtgate: %w", err)
}

log.Infof("Established gRPC connection to vtgate\n")

vtgate.m.Lock()
defer vtgate.m.Unlock()

vtgate.conn = db
vtgate.dialedAt = time.Now()

Expand Down Expand Up @@ -207,18 +206,12 @@ func (vtgate *VTGateProxy) Close() error {
vtgate.m.Lock()
defer vtgate.m.Unlock()

return vtgate.closeLocked()
}

func (vtgate *VTGateProxy) closeLocked() error {
if vtgate.conn == nil {
return nil
}

err := vtgate.conn.Close()
vtgate.conn = nil

return err
defer func() { vtgate.conn = nil }()
return vtgate.conn.Close()
}

// Debug implements debug.Debuggable for VTGateProxy.
Expand Down
Loading