diff --git a/go/sync2/atomic.go b/go/sync2/atomic.go index 2d8a532aac7..e974eae3960 100644 --- a/go/sync2/atomic.go +++ b/go/sync2/atomic.go @@ -17,6 +17,7 @@ limitations under the License. package sync2 import ( + "math" "sync" "sync/atomic" "time" @@ -82,6 +83,31 @@ func (i *AtomicInt64) CompareAndSwap(oldval, newval int64) (swapped bool) { return atomic.CompareAndSwapInt64(&i.int64, oldval, newval) } +// AtomicFloat64 is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Flat64 functions. +type AtomicFloat64 struct { + uint64 +} + +// NewAtomicFloat64 initializes a new AtomicFloat64 with a given value. +func NewAtomicFloat64(n float64) AtomicFloat64 { + return AtomicFloat64{math.Float64bits(n)} +} + +// Set atomically sets n as new value. +func (f *AtomicFloat64) Set(n float64) { + atomic.StoreUint64(&f.uint64, math.Float64bits(n)) +} + +// Get atomically returns the current value. +func (f *AtomicFloat64) Get() float64 { + return math.Float64frombits(atomic.LoadUint64(&f.uint64)) +} + +// CompareAndSwap automatically swaps the old with the new value. +func (f *AtomicFloat64) CompareAndSwap(oldval, newval float64) (swapped bool) { + return atomic.CompareAndSwapUint64(&f.uint64, math.Float64bits(oldval), math.Float64bits(newval)) +} + // AtomicDuration is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions. type AtomicDuration struct { int64 diff --git a/go/sync2/atomic_test.go b/go/sync2/atomic_test.go index 4590c5b92ee..2754c7bef85 100644 --- a/go/sync2/atomic_test.go +++ b/go/sync2/atomic_test.go @@ -57,6 +57,24 @@ func TestAtomicInt64(t *testing.T) { assert.Equal(t, int64(4), i.Get()) } +func TestAtomicFloat64(t *testing.T) { + i := NewAtomicFloat64(1.0) + assert.Equal(t, float64(1.0), i.Get()) + + i.Set(2.0) + assert.Equal(t, float64(2.0), i.Get()) + { + swapped := i.CompareAndSwap(2.0, 4.0) + assert.Equal(t, float64(4), i.Get()) + assert.Equal(t, true, swapped) + } + { + swapped := i.CompareAndSwap(2.0, 5.0) + assert.Equal(t, float64(4), i.Get()) + assert.Equal(t, false, swapped) + } +} + func TestAtomicDuration(t *testing.T) { d := NewAtomicDuration(time.Second) assert.Equal(t, time.Second, d.Get()) diff --git a/go/vt/vttablet/tabletserver/debugenv.go b/go/vt/vttablet/tabletserver/debugenv.go index 50a200ec3ad..4bb9f26265f 100644 --- a/go/vt/vttablet/tabletserver/debugenv.go +++ b/go/vt/vttablet/tabletserver/debugenv.go @@ -72,6 +72,15 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) f(ival) msg = fmt.Sprintf("Setting %v to: %v", varname, value) } + setFloat64Val := func(f func(float64)) { + fval, err := strconv.ParseFloat(value, 64) + if err != nil { + msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err) + return + } + f(fval) + msg = fmt.Sprintf("Setting %v to: %v", varname, value) + } switch varname { case "PoolSize": setIntVal(tsv.SetPoolSize) @@ -85,6 +94,8 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) setIntVal(tsv.SetMaxResultSize) case "WarnResultSize": setIntVal(tsv.SetWarnResultSize) + case "ThrottleMetricThreshold": + setFloat64Val(tsv.SetThrottleMetricThreshold) case "Consolidator": tsv.SetConsolidatorMode(value) msg = fmt.Sprintf("Setting %v to: %v", varname, value) @@ -98,12 +109,19 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) Value: fmt.Sprintf("%v", f()), }) } + addFloat64Var := func(varname string, f func() float64) { + vars = append(vars, envValue{ + VarName: varname, + Value: fmt.Sprintf("%v", f()), + }) + } addIntVar("PoolSize", tsv.PoolSize) addIntVar("StreamPoolSize", tsv.StreamPoolSize) addIntVar("TxPoolSize", tsv.TxPoolSize) addIntVar("QueryCacheCapacity", tsv.QueryPlanCacheCap) addIntVar("MaxResultSize", tsv.MaxResultSize) addIntVar("WarnResultSize", tsv.WarnResultSize) + addFloat64Var("ThrottleMetricThreshold", tsv.ThrottleMetricThreshold) vars = append(vars, envValue{ VarName: "Consolidator", Value: tsv.ConsolidatorMode(), diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 02b9723e627..8ac76e2cc9f 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1799,6 +1799,16 @@ func (tsv *TabletServer) WarnResultSize() int { return int(tsv.qe.warnResultSize.Get()) } +// SetThrottleMetricThreshold changes the throttler metric threshold +func (tsv *TabletServer) SetThrottleMetricThreshold(val float64) { + tsv.lagThrottler.MetricsThreshold.Set(val) +} + +// ThrottleMetricThreshold returns the throttler metric threshold +func (tsv *TabletServer) ThrottleMetricThreshold() float64 { + return tsv.lagThrottler.MetricsThreshold.Get() +} + // SetPassthroughDMLs changes the setting to pass through all DMLs // It should only be used for testing func (tsv *TabletServer) SetPassthroughDMLs(val bool) { diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index c2eade1312e..a2f237b79ad 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -19,6 +19,7 @@ import ( "sync/atomic" "time" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/dbconnpool" @@ -119,7 +120,7 @@ type Throttler struct { mysqlInventory *mysql.Inventory metricsQuery string - metricsThreshold float64 + MetricsThreshold sync2.AtomicFloat64 metricsQueryType mysql.MetricsQueryType mysqlClusterThresholds *cache.Cache @@ -172,7 +173,7 @@ func NewThrottler(env tabletenv.Env, ts *topo.Server, tabletTypeFunc func() topo mysqlInventory: mysql.NewInventory(), metricsQuery: replicationLagQuery, - metricsThreshold: throttleThreshold.Seconds(), + MetricsThreshold: sync2.NewAtomicFloat64(throttleThreshold.Seconds()), throttledApps: cache.New(cache.NoExpiration, 10*time.Second), mysqlClusterThresholds: cache.New(cache.NoExpiration, 0), @@ -235,7 +236,7 @@ func (throttler *Throttler) initConfig(password string) { throttler.metricsQuery = *throttleMetricQuery } if *throttleMetricThreshold != math.MaxFloat64 { - throttler.metricsThreshold = *throttleMetricThreshold + throttler.MetricsThreshold = sync2.NewAtomicFloat64(*throttleMetricThreshold) } throttler.metricsQueryType = mysql.GetMetricsQueryType(throttler.metricsQuery) @@ -243,7 +244,7 @@ func (throttler *Throttler) initConfig(password string) { User: "", // running on local tablet server, will use vttablet DBA user Password: "", // running on local tablet server, will use vttablet DBA user MetricQuery: throttler.metricsQuery, - ThrottleThreshold: throttler.metricsThreshold, + ThrottleThreshold: throttler.MetricsThreshold.Get(), IgnoreHostsCount: 0, } if password != "" { @@ -251,7 +252,7 @@ func (throttler *Throttler) initConfig(password string) { User: throttlerUser, Password: password, MetricQuery: throttler.metricsQuery, - ThrottleThreshold: throttler.metricsThreshold, + ThrottleThreshold: throttler.MetricsThreshold.Get(), IgnoreHostsCount: 0, } }