diff --git a/go/vt/vtadmin/cluster/cluster_test.go b/go/vt/vtadmin/cluster/cluster_test.go index 7fe2ff39d17..badaa457fb5 100644 --- a/go/vt/vtadmin/cluster/cluster_test.go +++ b/go/vt/vtadmin/cluster/cluster_test.go @@ -36,6 +36,7 @@ import ( "vitess.io/vitess/go/vt/vitessdriver" "vitess.io/vitess/go/vt/vtadmin/cluster" "vitess.io/vitess/go/vt/vtadmin/cluster/discovery/fakediscovery" + "vitess.io/vitess/go/vt/vtadmin/cluster/resolver" vtadminerrors "vitess.io/vitess/go/vt/vtadmin/errors" "vitess.io/vitess/go/vt/vtadmin/testutil" "vitess.io/vitess/go/vt/vtadmin/vtctldclient/fakevtctldclient" @@ -2652,7 +2653,9 @@ func TestGetTablets(t *testing.T) { Id: "c1", Name: "one", }, - Discovery: disco, + ResolverOptions: &resolver.Options{ + Discovery: disco, + }, }) db.DialFunc = func(cfg vitessdriver.Configuration) (*sql.DB, error) { return nil, assert.AnError diff --git a/go/vt/vtadmin/cluster/discovery/discovery.go b/go/vt/vtadmin/cluster/discovery/discovery.go index 6c90e119f4d..61873ee411f 100644 --- a/go/vt/vtadmin/cluster/discovery/discovery.go +++ b/go/vt/vtadmin/cluster/discovery/discovery.go @@ -53,6 +53,10 @@ type Discovery interface { // return an address is not specified by the interface, and can be // implementation-specific. DiscoverVTGateAddr(ctx context.Context, tags []string) (string, error) + // DiscoverVTGateAddrs returns a list of addresses of vtgates found in the + // discovery service. This is semantically equivalent to the result of + // DiscoverVTGateAddr for each gate returned by a call to DiscoverVTGates. + DiscoverVTGateAddrs(ctx context.Context, tags []string) ([]string, error) // DiscoverVTGates returns a list of vtgates found in the discovery service. // Tags can optionally be used to filter gates. Order of the gates is not // specified by the interface, and can be implementation-specific. @@ -68,6 +72,10 @@ type Discovery interface { // return an address is not specified by the interface, and can be // implementation-specific. DiscoverVtctldAddr(ctx context.Context, tags []string) (string, error) + // DiscoverVtctldAddrs returns a list of addresses of vtctlds found in the + // discovery service. This is semantically equivalent to the result of + // DiscoverVtctldAddr for each gate returned by a call to DiscoverVtctlds. + DiscoverVtctldAddrs(ctx context.Context, tags []string) ([]string, error) // DiscoverVtctlds returns a list of vtctlds found in the discovery service. // Tags can optionally be used to filter vtctlds. Order of the vtctlds is // not specified by the interface, and can be implementation-specific. diff --git a/go/vt/vtadmin/cluster/discovery/discovery_consul.go b/go/vt/vtadmin/cluster/discovery/discovery_consul.go index 6c00a3b0356..3f405c6aa40 100644 --- a/go/vt/vtadmin/cluster/discovery/discovery_consul.go +++ b/go/vt/vtadmin/cluster/discovery/discovery_consul.go @@ -228,6 +228,31 @@ func (c *ConsulDiscovery) DiscoverVTGateAddr(ctx context.Context, tags []string) return addr, nil } +// DiscoverVTGateAddrs is part of the Discovery interface. +func (c *ConsulDiscovery) DiscoverVTGateAddrs(ctx context.Context, tags []string) ([]string, error) { + span, ctx := trace.NewSpan(ctx, "ConsulDiscovery.DiscoverVTGateAddrs") + defer span.Finish() + + executeFQDNTemplate := false + + vtgates, err := c.discoverVTGates(ctx, tags, executeFQDNTemplate) + if err != nil { + return nil, err + } + + addrs := make([]string, len(vtgates)) + for i, vtgate := range vtgates { + addr, err := textutil.ExecuteTemplate(c.vtgateAddrTmpl, vtgate) + if err != nil { + return nil, fmt.Errorf("failed to execute vtgate address template for %v: %w", vtgate, err) + } + + addrs[i] = addr + } + + return addrs, nil +} + // DiscoverVTGates is part of the Discovery interface. func (c *ConsulDiscovery) DiscoverVTGates(ctx context.Context, tags []string) ([]*vtadminpb.VTGate, error) { span, ctx := trace.NewSpan(ctx, "ConsulDiscovery.DiscoverVTGates") @@ -348,6 +373,31 @@ func (c *ConsulDiscovery) DiscoverVtctldAddr(ctx context.Context, tags []string) return addr, nil } +// DiscoverVtctldAddrs is part of the Discovery interface. +func (c *ConsulDiscovery) DiscoverVtctldAddrs(ctx context.Context, tags []string) ([]string, error) { + span, ctx := trace.NewSpan(ctx, "ConsulDiscovery.DiscoverVtctldAddrs") + defer span.Finish() + + executeFQDNTemplate := false + + vtctlds, err := c.discoverVtctlds(ctx, tags, executeFQDNTemplate) + if err != nil { + return nil, err + } + + addrs := make([]string, len(vtctlds)) + for i, vtctld := range vtctlds { + addr, err := textutil.ExecuteTemplate(c.vtctldAddrTmpl, vtctld) + if err != nil { + return nil, fmt.Errorf("failed to execute vtctld address template for %v: %w", vtctld, err) + } + + addrs[i] = addr + } + + return addrs, nil +} + // DiscoverVtctlds is part of the Discovery interface. func (c *ConsulDiscovery) DiscoverVtctlds(ctx context.Context, tags []string) ([]*vtadminpb.Vtctld, error) { span, ctx := trace.NewSpan(ctx, "ConsulDiscovery.DiscoverVtctlds") diff --git a/go/vt/vtadmin/cluster/discovery/discovery_json.go b/go/vt/vtadmin/cluster/discovery/discovery_json.go index fb83046a9bf..36cbeab455e 100644 --- a/go/vt/vtadmin/cluster/discovery/discovery_json.go +++ b/go/vt/vtadmin/cluster/discovery/discovery_json.go @@ -23,6 +23,7 @@ import ( "math/rand" "vitess.io/vitess/go/trace" + vtadminpb "vitess.io/vitess/go/vt/proto/vtadmin" ) @@ -146,6 +147,24 @@ func (d *JSONDiscovery) DiscoverVTGateAddr(ctx context.Context, tags []string) ( return gate.Hostname, nil } +// DiscoverVTGateAddrs is part of the Discovery interface. +func (d *JSONDiscovery) DiscoverVTGateAddrs(ctx context.Context, tags []string) ([]string, error) { + span, ctx := trace.NewSpan(ctx, "JSONDiscovery.DiscoverVTGateAddrs") + defer span.Finish() + + gates, err := d.discoverVTGates(ctx, tags) + if err != nil { + return nil, err + } + + addrs := make([]string, len(gates)) + for i, gate := range gates { + addrs[i] = gate.Hostname + } + + return addrs, nil +} + // DiscoverVTGates is part of the Discovery interface. func (d *JSONDiscovery) DiscoverVTGates(ctx context.Context, tags []string) ([]*vtadminpb.VTGate, error) { span, ctx := trace.NewSpan(ctx, "JSONDiscovery.DiscoverVTGates") @@ -228,6 +247,24 @@ func (d *JSONDiscovery) DiscoverVtctldAddr(ctx context.Context, tags []string) ( return vtctld.Hostname, nil } +// DiscoverVtctldAddrs is part of the Discovery interface. +func (d *JSONDiscovery) DiscoverVtctldAddrs(ctx context.Context, tags []string) ([]string, error) { + span, ctx := trace.NewSpan(ctx, "JSONDiscovery.DiscoverVtctldAddrs") + defer span.Finish() + + vtctlds, err := d.discoverVtctlds(ctx, tags) + if err != nil { + return nil, err + } + + addrs := make([]string, len(vtctlds)) + for i, vtctld := range vtctlds { + addrs[i] = vtctld.Hostname + } + + return addrs, nil +} + // DiscoverVtctlds is part of the Discovery interface. func (d *JSONDiscovery) DiscoverVtctlds(ctx context.Context, tags []string) ([]*vtadminpb.Vtctld, error) { span, ctx := trace.NewSpan(ctx, "JSONDiscovery.DiscoverVtctlds") diff --git a/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go b/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go index 5f94d7a4769..732b28cfb82 100644 --- a/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go +++ b/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go @@ -181,6 +181,21 @@ func (d *Fake) DiscoverVTGateAddr(ctx context.Context, tags []string) (string, e return gate.Hostname, nil } +// DiscoverVTGateAddrs is part of the discovery.Discovery interface. +func (d *Fake) DiscoverVTGateAddrs(ctx context.Context, tags []string) ([]string, error) { + gates, err := d.DiscoverVTGates(ctx, tags) + if err != nil { + return nil, err + } + + addrs := make([]string, len(gates)) + for i, gate := range gates { + addrs[i] = gate.Hostname + } + + return addrs, nil +} + // DiscoverVtctlds is part of the discover.Discovery interface. func (d *Fake) DiscoverVtctlds(ctx context.Context, tags []string) ([]*vtadminpb.Vtctld, error) { if d.vtctlds.shouldErr { @@ -234,6 +249,21 @@ func (d *Fake) DiscoverVtctldAddr(ctx context.Context, tags []string) (string, e return vtctld.Hostname, nil } +// DiscoverVtctldAddrs is part of the discovery.Discovery interface. +func (d *Fake) DiscoverVtctldAddrs(ctx context.Context, tags []string) ([]string, error) { + vtctlds, err := d.DiscoverVtctlds(ctx, tags) + if err != nil { + return nil, err + } + + addrs := make([]string, len(vtctlds)) + for i, vtctld := range vtctlds { + addrs[i] = vtctld.Hostname + } + + return addrs, nil +} + // DiscoverVtctld is part of the discover.Discovery interface. func (d *Fake) DiscoverVtctld(ctx context.Context, tags []string) (*vtadminpb.Vtctld, error) { vtctlds, err := d.DiscoverVtctlds(ctx, tags) diff --git a/go/vt/vtadmin/cluster/resolver/resolver.go b/go/vt/vtadmin/cluster/resolver/resolver.go new file mode 100644 index 00000000000..45ec24eddfa --- /dev/null +++ b/go/vt/vtadmin/cluster/resolver/resolver.go @@ -0,0 +1,369 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package resolver provides a discovery-based resolver for VTAdmin clusters. +// +// It uses a discovery.Discovery implementation to dynamically update the set of +// vtctlds and vtgates in a cluster being used by a grpc.ClientConn, allowing +// VTAdmin to transparently reconnect to different vtctlds and vtgates both +// periodically and when hosts are recycled. +// +// Some potential improvements we can add, if desired: +// +// 1. Background refresh. We would take a config flag that governs the refresh +// interval and backoff (for when background refresh happens around the same +// time as grpc-core calls to ResolveNow) and spin up a goroutine. We would +// then have to spin this down when Close is called. +// +// 2. Stats! +package resolver + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/spf13/pflag" + grpcresolver "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + + "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vtadmin/cluster/discovery" + "vitess.io/vitess/go/vt/vtadmin/debug" +) + +const logPrefix = "[vtadmin.cluster.resolver]" + +type builder struct { + scheme string + opts Options + + // for debug.Debuggable + m sync.Mutex + resolvers []*resolver +} + +// DialAddr returns the dial address for a resolver scheme and component. +// +// VtctldClientProxy and VTGateProxy should use this to ensure their Dial calls +// use their respective discovery resolvers. +func DialAddr(resolver grpcresolver.Builder, component string) string { + return fmt.Sprintf("%s://%s/", resolver.Scheme(), component) +} + +// BalancerPolicy is an enum for different grpc load balancer policies. It also +// implements the pflag.Value interface to be used as a flagset Var destination. +type BalancerPolicy string + +const ( + // PickFirstBalancer + PickFirstBalancer BalancerPolicy = "pick_first" + RoundRobinBalancer BalancerPolicy = "round_robin" +) + +var allBalancerPolicies = []string{ // convenience for help/error messages + string(PickFirstBalancer), + string(RoundRobinBalancer), +} + +// Set is part of the pflag.Value interface. +func (bp *BalancerPolicy) Set(s string) error { + switch s { + case string(PickFirstBalancer), string(RoundRobinBalancer): + *bp = BalancerPolicy(s) + default: + return fmt.Errorf("unsupported balancer policy %s; must be one of %s", s, strings.Join(allBalancerPolicies, ", ")) + } + + return nil +} + +// String is part of the pflag.Value interface. +func (bp *BalancerPolicy) String() string { return string(*bp) } + +// Type is part of the pflag.Value interface. +func (*BalancerPolicy) Type() string { return "resolver.BalancerPolicy" } + +// Options defines the configuration options that can produce a resolver.Builder. +// +// A builder may be produced directly from an Options struct, but the intended +// usage is to first initialize an Options struct via opts.InstallFlags, which +// ensures the Options have sensible defaults and both vtctldclient proxy and +// VTGateProxy do. +type Options struct { + // Discovery is the discovery implementation used to discover host addresses + // when the ClientConn requests an update from the resolver. + Discovery discovery.Discovery + DiscoveryTags []string + DiscoveryTimeout time.Duration + + // BalancerPolicy, if set, will cause a resolver to provide a ServiceConfig + // to the resolver's ClientConn with a corresponding loadBalancingConfig. + // Omitting this option will cause grpc to use its default load balacing + // policy, which is currently pick_first. + // + // For more details, see https://github.com/grpc/grpc/blob/master/doc/service_config.md. + BalancerPolicy BalancerPolicy +} + +// NewBuilder returns a gRPC resolver.Builder for the given scheme. For vtadmin, +// the scheme should be a cluster ID. +// +// The target provided to Builder.Build will be used to switch on vtctld or +// vtgate, based on the URL.Host field of the parsed dial target. This means +// that the addr passed to Dial should have the form +// "{clusterID}://{vtctld|vtgate}/". Other target URL hosts will cause an error. +// To ensure the dial address conforms to this constraint, use this package's +// DialAddr function. +func (opts *Options) NewBuilder(scheme string) grpcresolver.Builder { + return &builder{ + scheme: scheme, + opts: *opts, + } +} + +// InstallFlags installs the resolver.Options flags on the given flagset. It is +// used by both vtsql and vtctldclient proxies. +func (opts *Options) InstallFlags(fs *pflag.FlagSet) { + fs.DurationVar(&opts.DiscoveryTimeout, "discovery-timeout", 100*time.Millisecond, + "Timeout to use when resolving hosts via discovery.") + fs.StringSliceVar(&opts.DiscoveryTags, "discovery-tags", nil, + "repeated, comma-separated list of tags to use when discovering hosts to connect to. "+ + "the semantics of the tags may depend on the specific discovery implementation used.") + fs.Var(&opts.BalancerPolicy, "grpc-balancer-policy", + fmt.Sprintf("Specify a load balancer policy to use for resolvers built by these options (the default grpc behavior is pick_first). Valid choices are %s", + strings.Join(allBalancerPolicies, ","))) +} + +// Build is part of the resolver.Builder interface. See the commentary on +// NewBuilder in this package for more details on this particular +// implementation. +// +// Build is called during grpc.Dial and grpc.DialContext, but a grpc ClientConn +// will not call ResolveNow on the built Resolver until an error occurs or a +// period of time has elapsed. Therefore, we do a first resolution here before +// returning our Resolver back to grpc core. Failing to do this means that our +// first RPC would hang waiting for a resolver update. +func (b *builder) Build(target grpcresolver.Target, cc grpcresolver.ClientConn, opts grpcresolver.BuildOptions) (grpcresolver.Resolver, error) { + r, err := b.build(target, cc, opts) + if err != nil { + return nil, err + } + + b.m.Lock() + b.resolvers = append(b.resolvers, r) + b.m.Unlock() + + r.ResolveNow(grpcresolver.ResolveNowOptions{}) + + return r, nil +} + +func (b *builder) build(target grpcresolver.Target, cc grpcresolver.ClientConn, opts grpcresolver.BuildOptions) (*resolver, error) { + var fn func(context.Context, []string) ([]string, error) + switch target.URL.Host { + case "vtctld": + fn = b.opts.Discovery.DiscoverVtctldAddrs + case "vtgate": + fn = b.opts.Discovery.DiscoverVTGateAddrs + default: + return nil, fmt.Errorf("%s: unsupported URL host %s", logPrefix, target.URL.Host) + } + + var sc serviceconfig.Config + if b.opts.BalancerPolicy != "" { + // c.f. https://github.com/grpc/grpc/blob/master/doc/service_config.md#example + scpr := cc.ParseServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{ "%s": {} }] }`, b.opts.BalancerPolicy)) + if scpr.Err != nil { + return nil, fmt.Errorf("failed to initialize service config with load balancer policy %s: %s", b.opts.BalancerPolicy, scpr.Err) + } + + sc = scpr.Config + } + + ctx, cancel := context.WithCancel(context.Background()) + + return &resolver{ + component: target.URL.Host, + cluster: target.URL.Scheme, + discoverAddrs: fn, + opts: b.opts, + cc: cc, + sc: sc, + ctx: ctx, + cancel: cancel, + createdAt: time.Now().UTC(), + }, nil +} + +// Scheme is part of the resolver.Builder interface. +func (b *builder) Scheme() string { + return b.scheme +} + +// Debug implements debug.Debuggable for builder. +func (b *builder) Debug() map[string]any { + b.m.Lock() + defer b.m.Unlock() + + resolvers := make([]map[string]any, len(b.resolvers)) + m := map[string]any{ + "scheme": b.scheme, + "discovery_tags": b.opts.DiscoveryTags, + "discovery_timeout": b.opts.DiscoveryTimeout, + "resolvers": resolvers, + } + + for i, r := range b.resolvers { + resolvers[i] = r.Debug() + } + + return m +} + +type resolver struct { + component string + cluster string + discoverAddrs func(ctx context.Context, tags []string) ([]string, error) + opts Options + + cc grpcresolver.ClientConn + sc serviceconfig.Config // optionally used to enforce a balancer policy + + ctx context.Context + cancel context.CancelFunc + + // for debug.Debuggable + // TODO: consider proper exported stats - histograms for timings, error rates, etc. + + m sync.Mutex + createdAt time.Time + lastResolvedAt time.Time + lastResolveError error + lastAddrs []grpcresolver.Address +} + +func (r *resolver) resolve() (*grpcresolver.State, error) { + span, ctx := trace.NewSpan(r.ctx, "(vtadmin/cluster/resolver).resolve") + defer span.Finish() + + span.Annotate("cluster_id", r.cluster) + span.Annotate("component", r.component) + + log.Infof("%s: resolving %ss (cluster %s)", logPrefix, r.component, r.cluster) + + ctx, cancel := context.WithTimeout(ctx, r.opts.DiscoveryTimeout) + defer cancel() + + addrs, err := r.discoverAddrs(ctx, r.opts.DiscoveryTags) + if err != nil { + return nil, fmt.Errorf("failed to discover %ss (cluster %s): %w", r.component, r.cluster, err) + } + + span.Annotate("addrs", strings.Join(addrs, ",")) + + state := &grpcresolver.State{ + Addresses: make([]grpcresolver.Address, len(addrs)), + } + + if r.sc != nil { + span.Annotate("balancer_policy", r.opts.BalancerPolicy) + state.ServiceConfig = &serviceconfig.ParseResult{ + Config: r.sc, + } + } + + for i, addr := range addrs { + state.Addresses[i] = grpcresolver.Address{ + Addr: addr, + } + } + + return state, nil +} + +// ResolveNow is part of the resolver.Resolver interface. It is called by grpc +// ClientConn's when errors occur, as well as periodically to refresh the set of +// addresses a ClientConn can use for SubConns. +func (r *resolver) ResolveNow(o grpcresolver.ResolveNowOptions) { + r.m.Lock() + defer r.m.Unlock() + + var ( + state *grpcresolver.State + err error + ) + + r.lastResolvedAt = time.Now().UTC() + defer func() { + r.lastResolveError = err + if state != nil { + r.lastAddrs = state.Addresses + } + }() + + state, err = r.resolve() + if err != nil { + log.Errorf("%s: failed to resolve new addresses for %s (cluster %s): %s", logPrefix, r.component, r.cluster, err) + r.cc.ReportError(err) + return + } + + switch len(state.Addresses) { + case 0: + log.Warningf("%s: found no %ss (cluster %s); updating grpc clientconn state anyway", logPrefix, r.component, r.cluster) + default: + log.Infof("%s: found %d %ss (cluster %s)", logPrefix, len(state.Addresses), r.component, r.cluster) + } + + err = r.cc.UpdateState(*state) + if err != nil { + log.Errorf("%s: failed to update %ss addresses for %s (cluster %s): %s", logPrefix, r.component, r.cluster, err) + r.cc.ReportError(err) + return + } +} + +// Close is part of the resolver.Resolver interface. +func (r *resolver) Close() { + r.cancel() // cancel any ongoing call to ResolveNow, and therefore any resultant discovery lookup. +} + +// Debug implements debug.Debuggable for resolver. +func (r *resolver) Debug() map[string]any { + r.m.Lock() + defer r.m.Unlock() + + m := map[string]any{ + "cluster": r.cluster, + "component": r.component, + "created_at": debug.TimeToString(r.createdAt), + "addr_list": r.lastAddrs, + } + + if !r.lastResolvedAt.IsZero() { + m["last_resolved_at"] = debug.TimeToString(r.lastResolvedAt) + } + + if r.lastResolveError != nil { + m["error"] = r.lastResolveError.Error() + } + + return m +} diff --git a/go/vt/vtadmin/cluster/resolver/resolver_test.go b/go/vt/vtadmin/cluster/resolver/resolver_test.go new file mode 100644 index 00000000000..9239ec48c25 --- /dev/null +++ b/go/vt/vtadmin/cluster/resolver/resolver_test.go @@ -0,0 +1,235 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver + +import ( + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + grpcresolver "google.golang.org/grpc/resolver" + + "vitess.io/vitess/go/vt/vtadmin/cluster/discovery/fakediscovery" + + vtadminpb "vitess.io/vitess/go/vt/proto/vtadmin" +) + +type mockClientConn struct { + grpcresolver.ClientConn + Addrs []grpcresolver.Address + UpdateStateCalled bool + ReportedError error +} + +func (cc *mockClientConn) UpdateState(state grpcresolver.State) error { + cc.UpdateStateCalled = true + cc.Addrs = state.Addresses + return nil +} + +func (cc *mockClientConn) ReportError(err error) { cc.ReportedError = err } + +var testopts = Options{ + DiscoveryTimeout: time.Millisecond * 50, +} + +func mustBuild(t *testing.T, b *builder, target grpcresolver.Target, cc grpcresolver.ClientConn, opts grpcresolver.BuildOptions) *resolver { + t.Helper() + + r, err := b.build(target, cc, opts) + require.NoError(t, err) + + return r +} + +func TestResolveNow(t *testing.T) { + t.Parallel() + + disco := fakediscovery.New() + disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{ + Hostname: "one", + }) + testopts := testopts + testopts.Discovery = disco + + cc := mockClientConn{} + r := mustBuild(t, &builder{opts: testopts}, grpcresolver.Target{ + URL: url.URL{Host: "vtctld"}, + }, &cc, grpcresolver.BuildOptions{}) + + r.ResolveNow(grpcresolver.ResolveNowOptions{}) + + assert.ElementsMatch(t, cc.Addrs, []grpcresolver.Address{ + { + Addr: "one", + }, + }) + + disco.Clear() + disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{ + Hostname: "two", + }, &vtadminpb.Vtctld{ + Hostname: "three", + }) + + r.ResolveNow(grpcresolver.ResolveNowOptions{}) + + assert.ElementsMatch(t, cc.Addrs, []grpcresolver.Address{ + { + Addr: "two", + }, + { + Addr: "three", + }, + }) +} + +func TestResolveWithTags(t *testing.T) { + t.Parallel() + + disco := fakediscovery.New() + disco.AddTaggedGates([]string{"tag1"}, &vtadminpb.VTGate{ + Hostname: "one", + }) + disco.AddTaggedGates([]string{"tag2"}, &vtadminpb.VTGate{ + Hostname: "two", + }) + testopts := testopts + testopts.Discovery = disco + + cc := mockClientConn{} + r := mustBuild(t, &builder{opts: testopts}, grpcresolver.Target{ + URL: url.URL{Host: "vtgate"}, + }, &cc, grpcresolver.BuildOptions{}) + r.opts.DiscoveryTags = []string{"tag2"} + + r.ResolveNow(grpcresolver.ResolveNowOptions{}) + + assert.ElementsMatch(t, cc.Addrs, []grpcresolver.Address{ + { + Addr: "two", + }, + }) +} + +func TestResolveEmptyList(t *testing.T) { + t.Parallel() + + disco := fakediscovery.New() + disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{ + Hostname: "one", + }) + testopts := testopts + testopts.Discovery = disco + + cc := mockClientConn{} + r := mustBuild(t, + &builder{opts: testopts}, grpcresolver.Target{ + URL: url.URL{Host: "vtgate"}, // we only have vtctlds + }, &cc, grpcresolver.BuildOptions{}, + ) + + r.ResolveNow(grpcresolver.ResolveNowOptions{}) + + assert.Empty(t, cc.Addrs, "ClientConn should have no addresses") + assert.True(t, cc.UpdateStateCalled, "resolver should still call cc.UpdateState with empty host list") + + disco.AddTaggedGates(nil, &vtadminpb.VTGate{ + Hostname: "gate:one", + }) + + r.ResolveNow(grpcresolver.ResolveNowOptions{}) + assert.ElementsMatch(t, cc.Addrs, []grpcresolver.Address{ + { + Addr: "gate:one", + }, + }) +} + +func TestBuild(t *testing.T) { + t.Parallel() + + disco := fakediscovery.New() + disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{ + Hostname: "vtctld:one", + }) + testopts := testopts + testopts.Discovery = disco + + b := &builder{opts: testopts} + + tests := []struct { + name string + target grpcresolver.Target + shouldErr bool + assertion func(t *testing.T, cc *mockClientConn) + }{ + { + name: "vtctld", + target: grpcresolver.Target{ + URL: url.URL{Host: "vtctld"}, + }, + assertion: func(t *testing.T, cc *mockClientConn) { + assert.ElementsMatch(t, cc.Addrs, []grpcresolver.Address{ + { + Addr: "vtctld:one", + }, + }) + }, + }, + { + name: "vtgate", + target: grpcresolver.Target{ + URL: url.URL{Host: "vtgate"}, + }, + assertion: func(t *testing.T, cc *mockClientConn) { + assert.Empty(t, cc.Addrs, "resolver should not add addresses to clientconn (no vtgates in discovery)") + assert.True(t, cc.UpdateStateCalled, "resolver should still call UpdateState on clientconn (no vtgates in discovery)") + }, + }, + { + name: "bad authority", + target: grpcresolver.Target{ + URL: url.URL{Host: "unsupported"}, + }, + shouldErr: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + cc := &mockClientConn{} + _, err := b.Build(tt.target, cc, grpcresolver.BuildOptions{}) + if tt.shouldErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + + func() { + t.Helper() + tt.assertion(t, cc) + }() + }) + } +} diff --git a/go/vt/vtadmin/vtctldclient/config.go b/go/vt/vtadmin/vtctldclient/config.go index 63f96f1a606..a975a696970 100644 --- a/go/vt/vtadmin/vtctldclient/config.go +++ b/go/vt/vtadmin/vtctldclient/config.go @@ -18,12 +18,12 @@ package vtctldclient import ( "fmt" - "time" "github.com/spf13/pflag" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/vtadmin/cluster/discovery" + "vitess.io/vitess/go/vt/vtadmin/cluster/resolver" "vitess.io/vitess/go/vt/vtadmin/credentials" vtadminpb "vitess.io/vitess/go/vt/proto/vtadmin" @@ -31,25 +31,23 @@ import ( // Config represents the options that modify the behavior of a Proxy. type Config struct { - Discovery discovery.Discovery - Credentials *grpcclient.StaticAuthClientCreds - + Credentials *grpcclient.StaticAuthClientCreds CredentialsPath string Cluster *vtadminpb.Cluster - ConnectivityTimeout time.Duration + ResolverOptions *resolver.Options } -const defaultConnectivityTimeout = 2 * time.Second - // Parse returns a new config with the given cluster and discovery, after // attempting to parse the command-line pflags into that Config. See // (*Config).Parse() for more details. func Parse(cluster *vtadminpb.Cluster, disco discovery.Discovery, args []string) (*Config, error) { cfg := &Config{ - Cluster: cluster, - Discovery: disco, + Cluster: cluster, + ResolverOptions: &resolver.Options{ + Discovery: disco, + }, } err := cfg.Parse(args) @@ -66,7 +64,11 @@ func Parse(cluster *vtadminpb.Cluster, disco discovery.Discovery, args []string) func (c *Config) Parse(args []string) error { fs := pflag.NewFlagSet("", pflag.ContinueOnError) - fs.DurationVar(&c.ConnectivityTimeout, "grpc-connectivity-timeout", defaultConnectivityTimeout, "The maximum duration to wait for a gRPC connection to be established to the vtctld.") + if c.ResolverOptions == nil { + c.ResolverOptions = &resolver.Options{} + } + + c.ResolverOptions.InstallFlags(fs) credentialsTmplStr := fs.String("credentials-path-tmpl", "", "Go template used to specify a path to a credentials file, which is a json file containing "+ diff --git a/go/vt/vtadmin/vtctldclient/config_test.go b/go/vt/vtadmin/vtctldclient/config_test.go index e90634b0072..e2a54de48d5 100644 --- a/go/vt/vtadmin/vtctldclient/config_test.go +++ b/go/vt/vtadmin/vtctldclient/config_test.go @@ -23,11 +23,13 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/vtadmin/cluster/resolver" vtadminpb "vitess.io/vitess/go/vt/proto/vtadmin" ) @@ -46,15 +48,16 @@ func TestParse(t *testing.T) { t.Run("no credentials provided", func(t *testing.T) { t.Parallel() - cfg, err := Parse(nil, nil, []string{}) + cfg, err := Parse(&vtadminpb.Cluster{}, nil, []string{}) require.NoError(t, err) expected := &Config{ - Cluster: nil, - Discovery: nil, - Credentials: nil, - CredentialsPath: "", - ConnectivityTimeout: defaultConnectivityTimeout, + Cluster: &vtadminpb.Cluster{}, + Credentials: nil, + CredentialsPath: "", + ResolverOptions: &resolver.Options{ + DiscoveryTimeout: 100 * time.Millisecond, + }, } assert.Equal(t, expected, cfg) }) @@ -89,10 +92,11 @@ func TestParse(t *testing.T) { Cluster: &vtadminpb.Cluster{ Name: "testcluster", }, - Discovery: nil, - Credentials: creds, - CredentialsPath: credsfile.Name(), - ConnectivityTimeout: defaultConnectivityTimeout, + Credentials: creds, + CredentialsPath: credsfile.Name(), + ResolverOptions: &resolver.Options{ + DiscoveryTimeout: 100 * time.Millisecond, + }, } assert.Equal(t, expected, cfg) diff --git a/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go b/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go index d1471a4c5c5..c767cf797ff 100644 --- a/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go +++ b/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go @@ -71,8 +71,6 @@ type VtctldClient struct { // incorrectly. var _ vtctldclient.VtctldClient = (*VtctldClient)(nil) -func (fake *VtctldClient) WaitForReady(ctx context.Context) error { return nil } - // CreateKeyspace is part of the vtctldclient.VtctldClient interface. func (fake *VtctldClient) CreateKeyspace(ctx context.Context, req *vtctldatapb.CreateKeyspaceRequest, opts ...grpc.CallOption) (*vtctldatapb.CreateKeyspaceResponse, error) { if fake.CreateKeyspaceShouldErr { diff --git a/go/vt/vtadmin/vtctldclient/proxy.go b/go/vt/vtadmin/vtctldclient/proxy.go index 124d7245cac..85cafb130e4 100644 --- a/go/vt/vtadmin/vtctldclient/proxy.go +++ b/go/vt/vtadmin/vtctldclient/proxy.go @@ -18,16 +18,16 @@ package vtctldclient import ( "context" - "fmt" "sync" "time" "google.golang.org/grpc" + grpcresolver "google.golang.org/grpc/resolver" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/vtadmin/cluster/discovery" + "vitess.io/vitess/go/vt/vtadmin/cluster/resolver" "vitess.io/vitess/go/vt/vtadmin/debug" "vitess.io/vitess/go/vt/vtadmin/vtadminproto" "vitess.io/vitess/go/vt/vtctl/grpcvtctldclient" @@ -58,20 +58,18 @@ type Proxy interface { type ClientProxy struct { vtctldclient.VtctldClient // embedded to provide easy implementation of the vtctlservicepb.VtctldClient interface - cluster *vtadminpb.Cluster - creds *grpcclient.StaticAuthClientCreds - discovery discovery.Discovery - cfg *Config + cluster *vtadminpb.Cluster + creds *grpcclient.StaticAuthClientCreds + cfg *Config // DialFunc is called to open a new vtctdclient connection. In production, // this should always be grpcvtctldclient.NewWithDialOpts, but it is // exported for testing purposes. DialFunc func(addr string, ff grpcclient.FailFast, opts ...grpc.DialOption) (vtctldclient.VtctldClient, error) + resolver grpcresolver.Builder m sync.Mutex closed bool - host string - lastPing time.Time dialedAt time.Time } @@ -84,18 +82,18 @@ type ClientProxy struct { // use. func New(cfg *Config) *ClientProxy { return &ClientProxy{ - cfg: cfg, - cluster: cfg.Cluster, - creds: cfg.Credentials, - discovery: cfg.Discovery, - DialFunc: grpcvtctldclient.NewWithDialOpts, - closed: true, + cfg: cfg, + cluster: cfg.Cluster, + creds: cfg.Credentials, + DialFunc: grpcvtctldclient.NewWithDialOpts, + resolver: cfg.ResolverOptions.NewBuilder(cfg.Cluster.Id), + closed: true, } } // Dial is part of the Proxy interface. func (vtctld *ClientProxy) Dial(ctx context.Context) error { - span, ctx := trace.NewSpan(ctx, "VtctldClientProxy.Dial") + span, _ := trace.NewSpan(ctx, "VtctldClientProxy.Dial") defer span.Finish() vtadminproto.AnnotateClusterSpan(vtctld.cluster, span) @@ -105,26 +103,12 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { if vtctld.VtctldClient != nil { if !vtctld.closed { - waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout) - defer waitCancel() - - if err := vtctld.VtctldClient.WaitForReady(waitCtx); err == nil { - // Our cached connection is still open and ready, so we're good to go. - span.Annotate("is_noop", true) - span.Annotate("vtctld_host", vtctld.host) - - vtctld.lastPing = time.Now() - - return nil - } - // If WaitForReady returns an error, that indicates our cached connection - // is no longer valid. We fall through to close the cached connection, - // discover a new vtctld, and establish a new connection. + span.Annotate("is_noop", true) + return nil } span.Annotate("is_stale", true) - // close before reopen. this is safe to call on an already-closed client. if err := vtctld.closeLocked(); err != nil { // Even if the client connection does not shut down cleanly, we don't want to block // Dial from discovering a new vtctld. This makes VTAdmin's dialer more resilient, @@ -133,12 +117,6 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { } } - addr, err := vtctld.discovery.DiscoverVtctldAddr(ctx, nil) - if err != nil { - return fmt.Errorf("error discovering vtctld to dial: %w", err) - } - - span.Annotate("vtctld_host", addr) span.Annotate("is_using_credentials", vtctld.creds != nil) opts := []grpc.DialOption{ @@ -152,26 +130,16 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { opts = append(opts, grpc.WithPerRPCCredentials(vtctld.creds)) } - client, err := vtctld.DialFunc(addr, grpcclient.FailFast(false), opts...) - if err != nil { - return err - } - - waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout) - defer waitCancel() + opts = append(opts, grpc.WithResolvers(vtctld.resolver)) - if err := client.WaitForReady(waitCtx); err != nil { - // If the gRPC connection does not transition to a READY state within the context timeout, - // then return an error. The onus to redial (or not) is on the caller of the Dial function. - // As an enhancement, we could update this Dial function to try redialing the discovered vtctld - // a few times with a backoff before giving up. - log.Infof("Could not transition to READY state for gRPC connection to %s: %s\n", addr, err.Error()) + // TODO: update DialFunc to take ctx as first arg. + client, err := vtctld.DialFunc(resolver.DialAddr(vtctld.resolver, "vtctld"), grpcclient.FailFast(false), opts...) + if err != nil { return err } - log.Infof("Established gRPC connection to vtctld %s\n", addr) + log.Infof("Established gRPC connection to vtctld\n") vtctld.dialedAt = time.Now() - vtctld.host = addr vtctld.VtctldClient = client vtctld.closed = false @@ -213,7 +181,6 @@ func (vtctld *ClientProxy) Debug() map[string]any { defer vtctld.m.Unlock() m := map[string]any{ - "host": vtctld.host, "is_connected": !vtctld.closed, } @@ -226,9 +193,12 @@ func (vtctld *ClientProxy) Debug() map[string]any { } if !vtctld.closed { - m["last_ping"] = debug.TimeToString(vtctld.lastPing) m["dialed_at"] = debug.TimeToString(vtctld.dialedAt) } + if dr, ok := vtctld.resolver.(debug.Debuggable); ok { + m["resolver"] = dr.Debug() + } + return m } diff --git a/go/vt/vtadmin/vtctldclient/proxy_test.go b/go/vt/vtadmin/vtctldclient/proxy_test.go index 5eaec8cd9b2..235ff8eafe0 100644 --- a/go/vt/vtadmin/vtctldclient/proxy_test.go +++ b/go/vt/vtadmin/vtctldclient/proxy_test.go @@ -25,31 +25,49 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" + grpcresolver "google.golang.org/grpc/resolver" + + "vitess.io/vitess/go/vt/vtadmin/cluster/discovery/fakediscovery" + "vitess.io/vitess/go/vt/vtadmin/cluster/resolver" vtadminpb "vitess.io/vitess/go/vt/proto/vtadmin" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice" - "vitess.io/vitess/go/vt/vtadmin/cluster/discovery/fakediscovery" ) type fakeVtctld struct { - vtctlservicepb.VtctlServer + vtctlservicepb.VtctldServer + addr string } -func initVtctlServer() (net.Listener, *grpc.Server, error) { +// GetKeyspace is used for tests to detect what addr the VtctldServer is +// listening on. The addr will always be stored as resp.Keyspace.Name, and the +// actual request is ignored. +func (fake *fakeVtctld) GetKeyspace(ctx context.Context, req *vtctldatapb.GetKeyspaceRequest) (*vtctldatapb.GetKeyspaceResponse, error) { + return &vtctldatapb.GetKeyspaceResponse{ + Keyspace: &vtctldatapb.Keyspace{ + Name: fake.addr, + }, + }, nil +} + +func initVtctldServer() (net.Listener, *grpc.Server, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return nil, nil, err } - vtctld := &fakeVtctld{} + vtctld := &fakeVtctld{ + addr: listener.Addr().String(), + } server := grpc.NewServer() - vtctlservicepb.RegisterVtctlServer(server, vtctld) + vtctlservicepb.RegisterVtctldServer(server, vtctld) return listener, server, err } func TestDial(t *testing.T) { - listener, server, err := initVtctlServer() + listener, server, err := initVtctldServer() require.NoError(t, err) defer listener.Close() @@ -67,24 +85,54 @@ func TestDial(t *testing.T) { Id: "test", Name: "testcluster", }, - Discovery: disco, - ConnectivityTimeout: defaultConnectivityTimeout, + ResolverOptions: &resolver.Options{ + Discovery: disco, + DiscoveryTimeout: 50 * time.Millisecond, + }, }) defer proxy.Close() // prevents grpc-core from logging a bunch of "connection errors" after deferred listener.Close() above. - // We don't have a vtctld host until we call Dial - require.Empty(t, proxy.host) - err = proxy.Dial(context.Background()) assert.NoError(t, err) - assert.Equal(t, listener.Addr().String(), proxy.host) + + resp, err := proxy.GetKeyspace(context.Background(), &vtctldatapb.GetKeyspaceRequest{}) + require.NoError(t, err) + assert.Equal(t, listener.Addr().String(), resp.Keyspace.Name) +} + +// testResolverBuilder wraps a grpcresolver.Builder to return *testResolvers +// with a channel to detect calls to ResolveNow in tests. +type testResolverBuilder struct { + grpcresolver.Builder + fired chan struct{} +} + +func (b *testResolverBuilder) Build(target grpcresolver.Target, cc grpcresolver.ClientConn, opts grpcresolver.BuildOptions) (grpcresolver.Resolver, error) { + r, err := b.Builder.Build(target, cc, opts) + if err != nil { + return nil, err + } + + return &testResolver{r, b.fired}, nil +} + +// testResolver wraps a grpcresolver.Resolver to signal when ResolveNow is +// called in tests. +type testResolver struct { + grpcresolver.Resolver + fired chan struct{} +} + +func (r *testResolver) ResolveNow(o grpcresolver.ResolveNowOptions) { + r.Resolver.ResolveNow(o) + r.fired <- struct{}{} } // TestRedial tests that vtadmin-api is able to recover from a lost connection to // a vtctld by rediscovering and redialing a new one. func TestRedial(t *testing.T) { // Initialize vtctld #1 - listener1, server1, err := initVtctlServer() + listener1, server1, err := initVtctldServer() require.NoError(t, err) defer listener1.Close() @@ -93,7 +141,7 @@ func TestRedial(t *testing.T) { defer server1.Stop() // Initialize vtctld #2 - listener2, server2, err := initVtctlServer() + listener2, server2, err := initVtctldServer() require.NoError(t, err) defer listener2.Close() @@ -109,17 +157,20 @@ func TestRedial(t *testing.T) { Hostname: listener2.Addr().String(), }) + reResolveFired := make(chan struct{}) proxy := New(&Config{ Cluster: &vtadminpb.Cluster{ Id: "test", Name: "testcluster", }, - Discovery: disco, - ConnectivityTimeout: defaultConnectivityTimeout, + ResolverOptions: &resolver.Options{ + Discovery: disco, + DiscoveryTimeout: 50 * time.Millisecond, + }, }) - // We don't have a vtctld host until we call Dial - require.Empty(t, proxy.host) + // wrap the resolver builder to test that re-resolve has fired as expected. + proxy.resolver = &testResolverBuilder{Builder: proxy.resolver, fired: reResolveFired} // Check for a successful connection to whichever vtctld we discover first. err = proxy.Dial(context.Background()) @@ -131,7 +182,11 @@ func TestRedial(t *testing.T) { var currentVtctld *grpc.Server var nextAddr string - switch proxy.host { + resp, err := proxy.GetKeyspace(context.Background(), &vtctldatapb.GetKeyspaceRequest{}) + require.NoError(t, err) + + proxyHost := resp.Keyspace.Name + switch proxyHost { case listener1.Addr().String(): currentVtctld = server1 nextAddr = listener2.Addr().String() @@ -140,7 +195,7 @@ func TestRedial(t *testing.T) { currentVtctld = server2 nextAddr = listener1.Addr().String() default: - t.Fatalf("invalid proxy host %s", proxy.host) + t.Fatalf("invalid proxy host %s", proxyHost) } // Remove the shut down vtctld from VTAdmin's service discovery (clumsily). @@ -153,20 +208,20 @@ func TestRedial(t *testing.T) { // Force an ungraceful shutdown of the gRPC server to which we're connected currentVtctld.Stop() - // Wait for the client connection to shut down. (If we redial too quickly, - // we get into a race condition with gRPC's internal retry logic. - // (Using WaitForReady here _does_ expose more function internals than is ideal for a unit test, - // but it's far less flaky than using time.Sleep.) - for { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - if err = proxy.VtctldClient.WaitForReady(ctx); err != nil { - break - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + select { + case <-reResolveFired: + case <-ctx.Done(): + require.FailNowf(t, "forced shutdown of vtctld should trigger grpc re-resolution", ctx.Err().Error()) } // Finally, check that we discover, dial + establish a new connection to the remaining vtctld. err = proxy.Dial(context.Background()) assert.NoError(t, err) - assert.Equal(t, nextAddr, proxy.host) + + resp, err = proxy.GetKeyspace(context.Background(), &vtctldatapb.GetKeyspaceRequest{}) + require.NoError(t, err) + assert.Equal(t, nextAddr, resp.Keyspace.Name) } diff --git a/go/vt/vtadmin/vtsql/config.go b/go/vt/vtadmin/vtsql/config.go index 8655bfd7c9f..2acb856190d 100644 --- a/go/vt/vtadmin/vtsql/config.go +++ b/go/vt/vtadmin/vtsql/config.go @@ -24,6 +24,7 @@ import ( "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/vtadmin/cluster/discovery" + "vitess.io/vitess/go/vt/vtadmin/cluster/resolver" "vitess.io/vitess/go/vt/vtadmin/credentials" vtadminpb "vitess.io/vitess/go/vt/proto/vtadmin" @@ -31,18 +32,16 @@ import ( // Config represents the options that modify the behavior of a vtqsl.VTGateProxy. type Config struct { - Discovery discovery.Discovery - DiscoveryTags []string - Credentials Credentials - - DialPingTimeout time.Duration - + Credentials Credentials // CredentialsPath is used only to power vtadmin debug endpoints; there may // be a better way where we don't need to put this in the config, because // it's not really an "option" in normal use. CredentialsPath string - Cluster *vtadminpb.Cluster + DialPingTimeout time.Duration + + Cluster *vtadminpb.Cluster + ResolverOptions *resolver.Options } // Parse returns a new config with the given cluster ID and name, after @@ -50,8 +49,10 @@ type Config struct { // (*Config).Parse() for more details. func Parse(cluster *vtadminpb.Cluster, disco discovery.Discovery, args []string) (*Config, error) { cfg := &Config{ - Cluster: cluster, - Discovery: disco, + Cluster: cluster, + ResolverOptions: &resolver.Options{ + Discovery: disco, + }, } err := cfg.Parse(args) @@ -68,11 +69,14 @@ func Parse(cluster *vtadminpb.Cluster, disco discovery.Discovery, args []string) func (c *Config) Parse(args []string) error { fs := pflag.NewFlagSet("", pflag.ContinueOnError) + if c.ResolverOptions == nil { + c.ResolverOptions = &resolver.Options{} + } + + c.ResolverOptions.InstallFlags(fs) + fs.DurationVar(&c.DialPingTimeout, "dial-ping-timeout", time.Millisecond*500, "Timeout to use when pinging an existing connection during calls to Dial.") - fs.StringSliceVar(&c.DiscoveryTags, "discovery-tags", []string{}, - "repeated, comma-separated list of tags to use when discovering a vtgate to connect to. "+ - "the semantics of the tags may depend on the specific discovery implementation used") credentialsTmplStr := fs.String("credentials-path-tmpl", "", "Go template used to specify a path to a credentials file, which is a json file containing "+ diff --git a/go/vt/vtadmin/vtsql/config_test.go b/go/vt/vtadmin/vtsql/config_test.go index db3e3fbbf12..4664d89b53e 100644 --- a/go/vt/vtadmin/vtsql/config_test.go +++ b/go/vt/vtadmin/vtsql/config_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/vtadmin/cluster/resolver" vtadminpb "vitess.io/vitess/go/vt/proto/vtadmin" ) @@ -92,7 +93,7 @@ func TestConfigParse(t *testing.T) { err = cfg.Parse(args) assert.NoError(t, err) - assert.Equal(t, expectedTags, cfg.DiscoveryTags) + assert.Equal(t, expectedTags, cfg.ResolverOptions.DiscoveryTags) assert.Equal(t, expectedCreds, cfg.Credentials) }) @@ -144,7 +145,10 @@ func TestConfigParse(t *testing.T) { Name: "testcluster", }, DialPingTimeout: time.Millisecond * 500, - DiscoveryTags: expectedTags, + ResolverOptions: &resolver.Options{ + DiscoveryTags: expectedTags, + DiscoveryTimeout: 100 * time.Millisecond, + }, Credentials: expectedCreds, CredentialsPath: path, } diff --git a/go/vt/vtadmin/vtsql/vtsql.go b/go/vt/vtadmin/vtsql/vtsql.go index 248e10927a7..62925b1d682 100644 --- a/go/vt/vtadmin/vtsql/vtsql.go +++ b/go/vt/vtadmin/vtsql/vtsql.go @@ -25,12 +25,13 @@ import ( "time" "google.golang.org/grpc" + grpcresolver "google.golang.org/grpc/resolver" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vitessdriver" - "vitess.io/vitess/go/vt/vtadmin/cluster/discovery" + "vitess.io/vitess/go/vt/vtadmin/cluster/resolver" "vitess.io/vitess/go/vt/vtadmin/debug" "vitess.io/vitess/go/vt/vtadmin/vtadminproto" @@ -64,20 +65,18 @@ type DB interface { // VTGateProxy is a proxy for creating and using database connections to vtgates // in a Vitess cluster. type VTGateProxy struct { - cluster *vtadminpb.Cluster - discovery discovery.Discovery - discoveryTags []string - creds Credentials - cfg *Config + cluster *vtadminpb.Cluster + creds Credentials + cfg *Config // DialFunc is called to open a new database connection. In production this // should always be vitessdriver.OpenWithConfiguration, but it is exported // for testing purposes. DialFunc func(cfg vitessdriver.Configuration) (*sql.DB, error) + resolver grpcresolver.Builder dialPingTimeout time.Duration m sync.Mutex - host string conn *sql.DB dialedAt time.Time lastPing time.Time @@ -96,19 +95,13 @@ var ErrConnClosed = errors.New("use of closed connection") // It does not open a connection to a vtgate; users must call Dial before first // use. func New(cfg *Config) *VTGateProxy { - discoveryTags := cfg.DiscoveryTags - if discoveryTags == nil { - discoveryTags = []string{} - } - return &VTGateProxy{ cluster: cfg.Cluster, - discovery: cfg.Discovery, - discoveryTags: discoveryTags, creds: cfg.Credentials, cfg: cfg, DialFunc: vitessdriver.OpenWithConfiguration, dialPingTimeout: cfg.DialPingTimeout, + resolver: cfg.ResolverOptions.NewBuilder(cfg.Cluster.Id), } } @@ -137,7 +130,7 @@ func (vtgate *VTGateProxy) Dial(ctx context.Context, target string, opts ...grpc span, _ := trace.NewSpan(ctx, "VTGateProxy.Dial") defer span.Finish() - vtgate.annotateSpan(span) + vtadminproto.AnnotateClusterSpan(vtgate.cluster, span) vtgate.m.Lock() defer vtgate.m.Unlock() @@ -149,41 +142,28 @@ func (vtgate *VTGateProxy) Dial(ctx context.Context, target string, opts ...grpc err := vtgate.PingContext(ctx) switch err { case nil: - log.Infof("Have valid connection to %s, reusing it.", vtgate.host) + log.Info("Have valid connection to vtgate, reusing it.") span.Annotate("is_noop", true) vtgate.lastPing = time.Now() return nil default: - log.Warningf("Ping failed on host %s: %s; Rediscovering a vtgate to get new connection", vtgate.host, err) + log.Warningf("Ping failed on vtgate: %s; Rediscovering a vtgate to get new connection", err) if err := vtgate.closeLocked(); err != nil { - log.Warningf("Error when closing connection to vtgate %s: %s; Continuing anyway ...", vtgate.host, err) + log.Warningf("Error when closing connection to vtgate: %s; Continuing anyway ...", err) } } } span.Annotate("is_noop", false) - if vtgate.host == "" { - gate, err := vtgate.discovery.DiscoverVTGateAddr(ctx, vtgate.discoveryTags) - if err != nil { - return fmt.Errorf("error discovering vtgate to dial: %w", err) - } - - vtgate.host = gate - // re-annotate the hostname - span.Annotate("vtgate_host", gate) - } - - log.Infof("Dialing %s ...", vtgate.host) - conf := vitessdriver.Configuration{ Protocol: fmt.Sprintf("grpc_%s", vtgate.cluster.Id), - Address: vtgate.host, + Address: resolver.DialAddr(vtgate.resolver, "vtgate"), Target: target, - GRPCDialOptions: append(opts, grpc.WithInsecure()), + GRPCDialOptions: append(opts, grpc.WithInsecure(), grpc.WithResolvers(vtgate.resolver)), } if vtgate.creds != nil { @@ -194,7 +174,7 @@ func (vtgate *VTGateProxy) Dial(ctx context.Context, target string, opts ...grpc db, err := vtgate.DialFunc(conf) if err != nil { - return fmt.Errorf("error dialing vtgate %s: %w", vtgate.host, err) + return fmt.Errorf("error dialing vtgate: %w", err) } vtgate.conn = db @@ -208,7 +188,7 @@ func (vtgate *VTGateProxy) ShowTablets(ctx context.Context) (*sql.Rows, error) { span, ctx := trace.NewSpan(ctx, "VTGateProxy.ShowTablets") defer span.Finish() - vtgate.annotateSpan(span) + vtadminproto.AnnotateClusterSpan(vtgate.cluster, span) if vtgate.conn == nil { return nil, ErrConnClosed @@ -227,7 +207,7 @@ func (vtgate *VTGateProxy) PingContext(ctx context.Context) error { span, ctx := trace.NewSpan(ctx, "VTGateProxy.PingContext") defer span.Finish() - vtgate.annotateSpan(span) + vtadminproto.AnnotateClusterSpan(vtgate.cluster, span) return vtgate.pingContext(ctx) } @@ -254,8 +234,6 @@ func (vtgate *VTGateProxy) closeLocked() error { } err := vtgate.conn.Close() - - vtgate.host = "" vtgate.conn = nil return err @@ -267,7 +245,6 @@ func (vtgate *VTGateProxy) Debug() map[string]any { defer vtgate.m.Unlock() m := map[string]any{ - "host": vtgate.host, "is_connected": (vtgate.conn != nil), } @@ -290,13 +267,9 @@ func (vtgate *VTGateProxy) Debug() map[string]any { m["credentials"] = cmap } - return m -} - -func (vtgate *VTGateProxy) annotateSpan(span trace.Span) { - vtadminproto.AnnotateClusterSpan(vtgate.cluster, span) - - if vtgate.host != "" { - span.Annotate("vtgate_host", vtgate.host) + if dr, ok := vtgate.resolver.(debug.Debuggable); ok { + m["resolver"] = dr.Debug() } + + return m } diff --git a/go/vt/vtadmin/vtsql/vtsql_test.go b/go/vt/vtadmin/vtsql/vtsql_test.go index c0ddd84e170..4e2efa11243 100644 --- a/go/vt/vtadmin/vtsql/vtsql_test.go +++ b/go/vt/vtadmin/vtsql/vtsql_test.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/vitessdriver" "vitess.io/vitess/go/vt/vtadmin/cluster/discovery/fakediscovery" + "vitess.io/vitess/go/vt/vtadmin/cluster/resolver" "vitess.io/vitess/go/vt/vtadmin/vtsql/fakevtsql" querypb "vitess.io/vitess/go/vt/proto/query" @@ -107,14 +108,6 @@ func TestDial(t *testing.T) { }, shouldErr: false, }, - { - name: "discovery error", - disco: fakediscovery.New(), - proxy: &VTGateProxy{ - cluster: &vtadminpb.Cluster{}, - }, - shouldErr: true, - }, { name: "dialer error", disco: fakediscovery.New(), @@ -124,7 +117,7 @@ func TestDial(t *testing.T) { }, }, proxy: &VTGateProxy{ - cluster: &vtadminpb.Cluster{}, + cluster: &vtadminpb.Cluster{Id: "test"}, DialFunc: func(cfg vitessdriver.Configuration) (*sql.DB, error) { return nil, assert.AnError }, @@ -140,7 +133,7 @@ func TestDial(t *testing.T) { }, }, proxy: &VTGateProxy{ - cluster: &vtadminpb.Cluster{}, + cluster: &vtadminpb.Cluster{Id: "test"}, creds: &StaticAuthCredentials{ StaticAuthClientCreds: &grpcclient.StaticAuthClientCreds{ Username: "user", @@ -166,10 +159,13 @@ func TestDial(t *testing.T) { if len(tt.gates) > 0 { tt.disco.AddTaggedGates(nil, tt.gates...) } - - tt.proxy.discovery = tt.disco } + tt.proxy.resolver = (&resolver.Options{ + Discovery: tt.disco, + DiscoveryTimeout: 50 * time.Millisecond, + }).NewBuilder(tt.proxy.cluster.Id) + err := tt.proxy.Dial(ctx, "") if tt.shouldErr { assert.Error(t, err) diff --git a/go/vt/vtctl/grpcvtctldclient/client_test.go b/go/vt/vtctl/grpcvtctldclient/client_test.go index f1a39d056ed..878387be701 100644 --- a/go/vt/vtctl/grpcvtctldclient/client_test.go +++ b/go/vt/vtctl/grpcvtctldclient/client_test.go @@ -35,34 +35,6 @@ import ( vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice" ) -func TestWaitForReady(t *testing.T) { - ts := memorytopo.NewServer("cell1") - vtctld := testutil.NewVtctldServerWithTabletManagerClient(t, ts, nil, func(ts *topo.Server) vtctlservicepb.VtctldServer { - return grpcvtctldserver.NewVtctldServer(ts) - }) - - testutil.WithTestServer(t, vtctld, func(t *testing.T, client vtctldclient.VtctldClient) { - ctx := context.Background() - err := client.WaitForReady(ctx) - assert.NoError(t, err) - }) -} - -func TestWaitForReadyShutdown(t *testing.T) { - ts := memorytopo.NewServer("cell1") - vtctld := testutil.NewVtctldServerWithTabletManagerClient(t, ts, nil, func(ts *topo.Server) vtctlservicepb.VtctldServer { - return grpcvtctldserver.NewVtctldServer(ts) - }) - - testutil.WithTestServer(t, vtctld, func(t *testing.T, client vtctldclient.VtctldClient) { - client.Close() - ctx := context.Background() - err := client.WaitForReady(ctx) - - assert.Error(t, ErrConnectionShutdown, err) - }) -} - func TestFindAllShardsInKeyspace(t *testing.T) { ctx := context.Background() ts := memorytopo.NewServer("cell1") diff --git a/go/vt/vtctl/localvtctldclient/client.go b/go/vt/vtctl/localvtctldclient/client.go index 3f91a162abc..b870247fe07 100644 --- a/go/vt/vtctl/localvtctldclient/client.go +++ b/go/vt/vtctl/localvtctldclient/client.go @@ -17,7 +17,6 @@ limitations under the License. package localvtctldclient import ( - "context" "errors" "sync" @@ -38,9 +37,6 @@ type localVtctldClient struct { // Close is part of the vtctldclient.VtctldClient interface. func (client *localVtctldClient) Close() error { return nil } -// WaitForReady is part of the vtctldclient.VtctldClient interface. -func (client *localVtctldClient) WaitForReady(ctx context.Context) error { return nil } - //go:generate -command localvtctldclient go run ../vtctldclient/codegen //go:generate localvtctldclient -targetpkg localvtctldclient -impl localVtctldClient -out client_gen.go -local diff --git a/go/vt/vtctl/vtctldclient/client.go b/go/vt/vtctl/vtctldclient/client.go index c5be65bd9a2..5b90a08ecdd 100644 --- a/go/vt/vtctl/vtctldclient/client.go +++ b/go/vt/vtctl/vtctldclient/client.go @@ -3,7 +3,6 @@ package vtctldclient import ( - "context" "fmt" "log" @@ -15,10 +14,6 @@ type VtctldClient interface { // Close augments the vtctlservicepb.VtctlClient interface with io.Closer. Close() error - - // WaitForReady waits until the connection to the vtctld is in a ready state, - // or until the context times out. - WaitForReady(ctx context.Context) error } // Factory is a function that creates new VtctldClients.