Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions go/sync2/atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sync2

import (
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -82,6 +83,31 @@ func (i *AtomicInt64) CompareAndSwap(oldval, newval int64) (swapped bool) {
return atomic.CompareAndSwapInt64(&i.int64, oldval, newval)
}

// AtomicFloat64 is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Flat64 functions.
type AtomicFloat64 struct {
uint64
}

// NewAtomicFloat64 initializes a new AtomicFloat64 with a given value.
func NewAtomicFloat64(n float64) AtomicFloat64 {
return AtomicFloat64{math.Float64bits(n)}
}

// Set atomically sets n as new value.
func (f *AtomicFloat64) Set(n float64) {
atomic.StoreUint64(&f.uint64, math.Float64bits(n))
}

// Get atomically returns the current value.
func (f *AtomicFloat64) Get() float64 {
return math.Float64frombits(atomic.LoadUint64(&f.uint64))
}

// CompareAndSwap automatically swaps the old with the new value.
func (f *AtomicFloat64) CompareAndSwap(oldval, newval float64) (swapped bool) {
return atomic.CompareAndSwapUint64(&f.uint64, math.Float64bits(oldval), math.Float64bits(newval))
}

// AtomicDuration is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions.
type AtomicDuration struct {
int64
Expand Down
18 changes: 18 additions & 0 deletions go/sync2/atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ func TestAtomicInt64(t *testing.T) {
assert.Equal(t, int64(4), i.Get())
}

func TestAtomicFloat64(t *testing.T) {
i := NewAtomicFloat64(1.0)
assert.Equal(t, float64(1.0), i.Get())

i.Set(2.0)
assert.Equal(t, float64(2.0), i.Get())
{
swapped := i.CompareAndSwap(2.0, 4.0)
assert.Equal(t, float64(4), i.Get())
assert.Equal(t, true, swapped)
}
{
swapped := i.CompareAndSwap(2.0, 5.0)
assert.Equal(t, float64(4), i.Get())
assert.Equal(t, false, swapped)
}
}

func TestAtomicDuration(t *testing.T) {
d := NewAtomicDuration(time.Second)
assert.Equal(t, time.Second, d.Get())
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vttablet/tabletserver/debugenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
f(ival)
msg = fmt.Sprintf("Setting %v to: %v", varname, value)
}
setFloat64Val := func(f func(float64)) {
fval, err := strconv.ParseFloat(value, 64)
if err != nil {
msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err)
return
}
f(fval)
msg = fmt.Sprintf("Setting %v to: %v", varname, value)
}
switch varname {
case "PoolSize":
setIntVal(tsv.SetPoolSize)
Expand All @@ -85,6 +94,8 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
setIntVal(tsv.SetMaxResultSize)
case "WarnResultSize":
setIntVal(tsv.SetWarnResultSize)
case "ThrottleMetricThreshold":
setFloat64Val(tsv.SetThrottleMetricThreshold)
case "Consolidator":
tsv.SetConsolidatorMode(value)
msg = fmt.Sprintf("Setting %v to: %v", varname, value)
Expand All @@ -98,12 +109,19 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
Value: fmt.Sprintf("%v", f()),
})
}
addFloat64Var := func(varname string, f func() float64) {
vars = append(vars, envValue{
VarName: varname,
Value: fmt.Sprintf("%v", f()),
})
}
addIntVar("PoolSize", tsv.PoolSize)
addIntVar("StreamPoolSize", tsv.StreamPoolSize)
addIntVar("TxPoolSize", tsv.TxPoolSize)
addIntVar("QueryCacheCapacity", tsv.QueryPlanCacheCap)
addIntVar("MaxResultSize", tsv.MaxResultSize)
addIntVar("WarnResultSize", tsv.WarnResultSize)
addFloat64Var("ThrottleMetricThreshold", tsv.ThrottleMetricThreshold)
vars = append(vars, envValue{
VarName: "Consolidator",
Value: tsv.ConsolidatorMode(),
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1799,6 +1799,16 @@ func (tsv *TabletServer) WarnResultSize() int {
return int(tsv.qe.warnResultSize.Get())
}

// SetThrottleMetricThreshold changes the throttler metric threshold
func (tsv *TabletServer) SetThrottleMetricThreshold(val float64) {
tsv.lagThrottler.MetricsThreshold.Set(val)
}

// ThrottleMetricThreshold returns the throttler metric threshold
func (tsv *TabletServer) ThrottleMetricThreshold() float64 {
return tsv.lagThrottler.MetricsThreshold.Get()
}

// SetPassthroughDMLs changes the setting to pass through all DMLs
// It should only be used for testing
func (tsv *TabletServer) SetPassthroughDMLs(val bool) {
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/dbconnpool"
Expand Down Expand Up @@ -119,7 +120,7 @@ type Throttler struct {
mysqlInventory *mysql.Inventory

metricsQuery string
metricsThreshold float64
MetricsThreshold sync2.AtomicFloat64
metricsQueryType mysql.MetricsQueryType

mysqlClusterThresholds *cache.Cache
Expand Down Expand Up @@ -172,7 +173,7 @@ func NewThrottler(env tabletenv.Env, ts *topo.Server, tabletTypeFunc func() topo
mysqlInventory: mysql.NewInventory(),

metricsQuery: replicationLagQuery,
metricsThreshold: throttleThreshold.Seconds(),
MetricsThreshold: sync2.NewAtomicFloat64(throttleThreshold.Seconds()),

throttledApps: cache.New(cache.NoExpiration, 10*time.Second),
mysqlClusterThresholds: cache.New(cache.NoExpiration, 0),
Expand Down Expand Up @@ -235,23 +236,23 @@ func (throttler *Throttler) initConfig(password string) {
throttler.metricsQuery = *throttleMetricQuery
}
if *throttleMetricThreshold != math.MaxFloat64 {
throttler.metricsThreshold = *throttleMetricThreshold
throttler.MetricsThreshold = sync2.NewAtomicFloat64(*throttleMetricThreshold)
}
throttler.metricsQueryType = mysql.GetMetricsQueryType(throttler.metricsQuery)

config.Instance.Stores.MySQL.Clusters[selfStoreName] = &config.MySQLClusterConfigurationSettings{
User: "", // running on local tablet server, will use vttablet DBA user
Password: "", // running on local tablet server, will use vttablet DBA user
MetricQuery: throttler.metricsQuery,
ThrottleThreshold: throttler.metricsThreshold,
ThrottleThreshold: throttler.MetricsThreshold.Get(),
IgnoreHostsCount: 0,
}
if password != "" {
config.Instance.Stores.MySQL.Clusters[shardStoreName] = &config.MySQLClusterConfigurationSettings{
User: throttlerUser,
Password: password,
MetricQuery: throttler.metricsQuery,
ThrottleThreshold: throttler.metricsThreshold,
ThrottleThreshold: throttler.MetricsThreshold.Get(),
IgnoreHostsCount: 0,
}
}
Expand Down