diff --git a/go/vt/vttablet/tabletserver/debugenv.go b/go/vt/vttablet/tabletserver/debugenv.go index 50a200ec3ad..87d2a81a424 100644 --- a/go/vt/vttablet/tabletserver/debugenv.go +++ b/go/vt/vttablet/tabletserver/debugenv.go @@ -23,6 +23,7 @@ import ( "net/http" "strconv" "text/template" + "time" "vitess.io/vitess/go/acl" "vitess.io/vitess/go/vt/log" @@ -72,6 +73,15 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) f(ival) msg = fmt.Sprintf("Setting %v to: %v", varname, value) } + setDurationVal := func(f func(time.Duration)) { + durationVal, err := time.ParseDuration(value) + if err != nil { + msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err) + return + } + f(durationVal) + msg = fmt.Sprintf("Setting %v to: %v", varname, value) + } switch varname { case "PoolSize": setIntVal(tsv.SetPoolSize) @@ -85,6 +95,10 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) setIntVal(tsv.SetMaxResultSize) case "WarnResultSize": setIntVal(tsv.SetWarnResultSize) + case "UnhealthyThreshold": + setDurationVal(tsv.Config().Healthcheck.UnhealthyThresholdSeconds.Set) + setDurationVal(tsv.hs.SetUnhealthyThreshold) + setDurationVal(tsv.sm.SetUnhealthyThreshold) case "Consolidator": tsv.SetConsolidatorMode(value) msg = fmt.Sprintf("Setting %v to: %v", varname, value) @@ -98,12 +112,19 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) Value: fmt.Sprintf("%v", f()), }) } + addDurationVar := func(varname string, f func() time.Duration) { + vars = append(vars, envValue{ + VarName: varname, + Value: fmt.Sprintf("%v", f()), + }) + } addIntVar("PoolSize", tsv.PoolSize) addIntVar("StreamPoolSize", tsv.StreamPoolSize) addIntVar("TxPoolSize", tsv.TxPoolSize) addIntVar("QueryCacheCapacity", tsv.QueryPlanCacheCap) addIntVar("MaxResultSize", tsv.MaxResultSize) addIntVar("WarnResultSize", tsv.WarnResultSize) + addDurationVar("UnhealthyThreshold", tsv.Config().Healthcheck.UnhealthyThresholdSeconds.Get) vars = append(vars, envValue{ VarName: "Consolidator", Value: tsv.ConsolidatorMode(), diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index b42fcb4f1d6..36e856d3e8e 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -28,6 +28,7 @@ import ( "github.com/golang/protobuf/proto" "vitess.io/vitess/go/history" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -51,7 +52,7 @@ var ( type healthStreamer struct { stats *tabletenv.Stats degradedThreshold time.Duration - unhealthyThreshold time.Duration + unhealthyThreshold sync2.AtomicDuration mu sync.Mutex ctx context.Context @@ -66,7 +67,7 @@ func newHealthStreamer(env tabletenv.Env, alias topodatapb.TabletAlias) *healthS return &healthStreamer{ stats: env.Stats(), degradedThreshold: env.Config().Healthcheck.DegradedThresholdSeconds.Get(), - unhealthyThreshold: env.Config().Healthcheck.UnhealthyThresholdSeconds.Get(), + unhealthyThreshold: sync2.NewAtomicDuration(env.Config().Healthcheck.UnhealthyThresholdSeconds.Get()), clients: make(map[chan *querypb.StreamHealthResponse]struct{}), state: &querypb.StreamHealthResponse{ @@ -220,7 +221,7 @@ func (hs *healthStreamer) AppendDetails(details []*kv) []*kv { sbm := time.Duration(hs.state.RealtimeStats.SecondsBehindMaster) * time.Second class := healthyClass switch { - case sbm > hs.unhealthyThreshold: + case sbm > hs.unhealthyThreshold.Get(): class = unhealthyClass case sbm > hs.degradedThreshold: class = unhappyClass @@ -240,3 +241,17 @@ func (hs *healthStreamer) AppendDetails(details []*kv) []*kv { return details } + +func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) { + hs.unhealthyThreshold.Set(v) + shr := proto.Clone(hs.state).(*querypb.StreamHealthResponse) + for ch := range hs.clients { + select { + case ch <- shr: + default: + log.Info("Resetting health streamer clients due to unhealthy threshold change") + close(ch) + delete(hs.clients, ch) + } + } +} diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index c1498e43f44..a18a5ec091d 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -122,7 +122,7 @@ type stateManager struct { checkMySQLThrottler *sync2.Semaphore timebombDuration time.Duration - unhealthyThreshold time.Duration + unhealthyThreshold sync2.AtomicDuration shutdownGracePeriod time.Duration transitionGracePeriod time.Duration } @@ -187,7 +187,7 @@ func (sm *stateManager) Init(env tabletenv.Env, target querypb.Target) { sm.checkMySQLThrottler = sync2.NewSemaphore(1, 0) sm.timebombDuration = env.Config().OltpReadPool.TimeoutSeconds.Get() * 10 sm.hcticks = timer.NewTimer(env.Config().Healthcheck.IntervalSeconds.Get()) - sm.unhealthyThreshold = env.Config().Healthcheck.UnhealthyThresholdSeconds.Get() + sm.unhealthyThreshold = sync2.NewAtomicDuration(env.Config().Healthcheck.UnhealthyThresholdSeconds.Get()) sm.shutdownGracePeriod = env.Config().GracePeriods.ShutdownSeconds.Get() sm.transitionGracePeriod = env.Config().GracePeriods.TransitionSeconds.Get() } @@ -640,7 +640,7 @@ func (sm *stateManager) refreshReplHealthLocked() (time.Duration, error) { } sm.replHealthy = false } else { - if lag > sm.unhealthyThreshold { + if lag > sm.unhealthyThreshold.Get() { if sm.replHealthy { log.Infof("Going unhealthy due to high replication lag: %v", lag) } @@ -768,3 +768,7 @@ func (sm *stateManager) IsServingString() string { } return "NOT_SERVING" } + +func (sm *stateManager) SetUnhealthyThreshold(v time.Duration) { + sm.unhealthyThreshold.Set(v) +}