Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 150 additions & 58 deletions go/vt/vtadmin/cluster/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ limitations under the License.
//
// Some potential improvements we can add, if desired:
//
// 1. Background refresh. We would take a config flag that governs the refresh
// interval and backoff (for when background refresh happens around the same
// time as grpc-core calls to ResolveNow) and spin up a goroutine. We would
// then have to spin this down when Close is called.
//
// 2. Stats!
// 1. Stats!
package resolver

import (
Expand All @@ -39,13 +34,15 @@ import (
"time"

"github.com/spf13/pflag"
grpcbackoff "google.golang.org/grpc/backoff"
grpcresolver "google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtadmin/cluster/discovery"
"vitess.io/vitess/go/vt/vtadmin/debug"
"vitess.io/vitess/go/vt/vtadmin/internal/backoff"
)

const logPrefix = "[vtadmin.cluster.resolver]"
Expand Down Expand Up @@ -113,6 +110,23 @@ type Options struct {
DiscoveryTags []string
DiscoveryTimeout time.Duration

// MinDiscoveryInterval is the minimum amount of time to wait before re-
// discovering after a successful discovery resolution. This is useful to
// avoid spamming the backing discovery service if multiple ResolveNow calls
// are made in rapid succession.
//
// Note that the interval for failed discovery resolutions are governed by
// the backoff strategy and configuration, not this field.
MinDiscoveryInterval time.Duration

// BackoffStrategy specfies the strategy to use when backing off of failed
// discovery resolution. Permitted values are "exponential", "linear", and
// "none"; the empty string defaults to "exponential".
BackoffStrategy string
// BackoffConfig is the configuration used by the given backoff strategy.
// For a "none" strategy, this is ignored.
BackoffConfig grpcbackoff.Config

// BalancerPolicy, if set, will cause a resolver to provide a ServiceConfig
// to the resolver's ClientConn with a corresponding loadBalancingConfig.
// Omitting this option will cause grpc to use its default load balacing
Expand All @@ -138,6 +152,8 @@ func (opts *Options) NewBuilder(scheme string) grpcresolver.Builder {
}
}

var defaultBackoffConfig = grpcbackoff.DefaultConfig

// InstallFlags installs the resolver.Options flags on the given flagset. It is
// used by both vtsql and vtctldclient proxies.
func (opts *Options) InstallFlags(fs *pflag.FlagSet) {
Expand All @@ -149,6 +165,24 @@ func (opts *Options) InstallFlags(fs *pflag.FlagSet) {
fs.Var(&opts.BalancerPolicy, "grpc-balancer-policy",
fmt.Sprintf("Specify a load balancer policy to use for resolvers built by these options (the default grpc behavior is pick_first). Valid choices are %s",
strings.Join(allBalancerPolicies, ",")))

fs.DurationVar(&opts.MinDiscoveryInterval, "min-rediscovery-interval", time.Second*30,
"Minimum amount of time to wait between successful discovery resolution calls. "+
"Useful to avoid spamming the backing discovery service if multiple ResolveNow calls are made in rapid succession "+
"(Note that the interval for failed discovery resolutions are governed by the backoff strategy and configuration, not this flag).")

fs.StringVar(&opts.BackoffStrategy, "backoff-strategy", "",
"Backoff strategy to use when backing off of failed discovery resolutions. "+
`Permitted values are "exponential", "linear", and "none", and the empty string defaults to "exponential".`)
fs.DurationVar(&opts.BackoffConfig.BaseDelay, "backoff-base-delay", defaultBackoffConfig.BaseDelay,
"The amount of time to backoff after the first failure.")
fs.DurationVar(&opts.BackoffConfig.MaxDelay, "backoff-max-delay", defaultBackoffConfig.MaxDelay,
"The upper bound of time to wait, including backoffs offset by jitter.")
fs.Float64Var(&opts.BackoffConfig.Multiplier, "backoff-multiplier", defaultBackoffConfig.Multiplier,
"The factor to multiply (or add, in the case of \"linear\" backoff after a failed retry. "+
"Ideally should be greater than 1.")
fs.Float64Var(&opts.BackoffConfig.Jitter, "backoff-jitter", defaultBackoffConfig.Jitter,
"The factor to which backoffs are randomized.")
}

// Build is part of the resolver.Builder interface. See the commentary on
Expand All @@ -170,8 +204,6 @@ func (b *builder) Build(target grpcresolver.Target, cc grpcresolver.ClientConn,
b.resolvers = append(b.resolvers, r)
b.m.Unlock()

r.ResolveNow(grpcresolver.ResolveNowOptions{})

return r, nil
}

Expand Down Expand Up @@ -199,17 +231,24 @@ func (b *builder) build(target grpcresolver.Target, cc grpcresolver.ClientConn,

ctx, cancel := context.WithCancel(context.Background())

return &resolver{
component: target.URL.Host,
cluster: target.URL.Scheme,
discoverAddrs: fn,
opts: b.opts,
cc: cc,
sc: sc,
ctx: ctx,
cancel: cancel,
createdAt: time.Now().UTC(),
}, nil
r := &resolver{
component: target.URL.Host,
cluster: target.URL.Scheme,
discoverAddrs: fn,
backoffStrategy: backoff.Get(b.opts.BackoffStrategy, b.opts.BackoffConfig),
opts: b.opts,
cc: cc,
sc: sc,
rn: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
createdAt: time.Now().UTC(),
}

r.wg.Add(1)
go r.watch()

return r, nil
}

// Scheme is part of the resolver.Builder interface.
Expand Down Expand Up @@ -238,16 +277,19 @@ func (b *builder) Debug() map[string]any {
}

type resolver struct {
component string
cluster string
discoverAddrs func(ctx context.Context, tags []string) ([]string, error)
opts Options
component string
cluster string
discoverAddrs func(ctx context.Context, tags []string) ([]string, error)
backoffStrategy backoff.Strategy
opts Options

cc grpcresolver.ClientConn
sc serviceconfig.Config // optionally used to enforce a balancer policy

rn chan struct{} // used to signal that ResolveNow has been called
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

// for debug.Debuggable
// TODO: consider proper exported stats - histograms for timings, error rates, etc.
Expand All @@ -259,6 +301,83 @@ type resolver struct {
lastAddrs []grpcresolver.Address
}

func (r *resolver) watch() {
defer r.wg.Done()

// Handles calling r.resolve and recording debug metrics appropriately,
// called in the for loop below.
resolveOnce := func() (state *grpcresolver.State, err error) {
lastResolvedAt := time.Now().UTC()
defer func() {
r.m.Lock()
defer r.m.Unlock()

r.lastResolvedAt = lastResolvedAt
r.lastResolveError = err
if state != nil {
r.lastAddrs = state.Addresses
}
}()

state, err = r.resolve()
return state, err
}

backoffIndex := 1
for {
state, err := resolveOnce()
switch err {
case nil:
switch len(state.Addresses) {
case 0:
log.Warningf("%s: found no %ss (cluster %s); updating grpc clientconn state anyway", logPrefix, r.component, r.cluster)
default:
log.Infof("%s: found %d %ss (cluster %s)", logPrefix, len(state.Addresses), r.component, r.cluster)
}

if updateErr := r.cc.UpdateState(*state); updateErr != nil {
log.Errorf("%s: failed to update %ss addresses for %s (cluster %s): %s", logPrefix, r.component, r.cluster, updateErr)
err = updateErr
}
default:
log.Errorf("%s: failed to resolve new addresses for %s (cluster %s): %s", logPrefix, r.component, r.cluster, err)
r.cc.ReportError(err)
}

var timer *time.Timer
if err == nil {
// Success. Wait for next call to ResolveNow (via r.rn chan), but
// also at least a minimum time to avoid spamming the backing
// discovery.
backoffIndex = 1
timer = time.NewTimer(r.opts.MinDiscoveryInterval)
select {
case <-r.ctx.Done():
if !timer.Stop() {
<-timer.C
}

return
case <-r.rn:
}
} else {
timer = time.NewTimer(r.backoffStrategy.Backoff(backoffIndex))
backoffIndex++
}

select {
case <-r.ctx.Done():
if !timer.Stop() {
<-timer.C
}

return
case <-timer.C:
timer.Stop()
}
}
}

func (r *resolver) resolve() (*grpcresolver.State, error) {
span, ctx := trace.NewSpan(r.ctx, "(vtadmin/cluster/resolver).resolve")
defer span.Finish()
Expand Down Expand Up @@ -302,54 +421,27 @@ func (r *resolver) resolve() (*grpcresolver.State, error) {
// ClientConn's when errors occur, as well as periodically to refresh the set of
// addresses a ClientConn can use for SubConns.
func (r *resolver) ResolveNow(o grpcresolver.ResolveNowOptions) {
r.m.Lock()
defer r.m.Unlock()

var (
state *grpcresolver.State
err error
)

r.lastResolvedAt = time.Now().UTC()
defer func() {
r.lastResolveError = err
if state != nil {
r.lastAddrs = state.Addresses
}
}()

state, err = r.resolve()
if err != nil {
log.Errorf("%s: failed to resolve new addresses for %s (cluster %s): %s", logPrefix, r.component, r.cluster, err)
r.cc.ReportError(err)
return
}

switch len(state.Addresses) {
case 0:
log.Warningf("%s: found no %ss (cluster %s); updating grpc clientconn state anyway", logPrefix, r.component, r.cluster)
select {
case r.rn <- struct{}{}:
default:
log.Infof("%s: found %d %ss (cluster %s)", logPrefix, len(state.Addresses), r.component, r.cluster)
}

err = r.cc.UpdateState(*state)
if err != nil {
log.Errorf("%s: failed to update %ss addresses for %s (cluster %s): %s", logPrefix, r.component, r.cluster, err)
r.cc.ReportError(err)
return
}
}

// Close is part of the resolver.Resolver interface.
func (r *resolver) Close() {
r.cancel() // cancel any ongoing call to ResolveNow, and therefore any resultant discovery lookup.
// First, cancel any ongoing discovery calls and terminate the watcher loop.
r.cancel()
// Then, wait for the watch goroutine to gracefully exit.
r.wg.Wait()
}

// Debug implements debug.Debuggable for resolver.
func (r *resolver) Debug() map[string]any {
r.m.Lock()
defer r.m.Unlock()

// TODO: add a field for "closed"

m := map[string]any{
"cluster": r.cluster,
"component": r.component,
Expand Down
Loading