Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions go/sqltypes/named_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ func (r RowNamedValues) AsUint64(fieldName string, def uint64) uint64 {
return def
}

// AsFloat64 returns the named field as float64, or default value if nonexistent/error
func (r RowNamedValues) AsFloat64(fieldName string, def float64) float64 {
if v, err := r.ToFloat64(fieldName); err == nil {
return v
}
return def
}

// ToFloat64 returns the named field as float64
func (r RowNamedValues) ToFloat64(fieldName string) (float64, error) {
if v, ok := r[fieldName]; ok {
return v.ToFloat64()
}
return 0, ErrNoSuchField
}

// ToBool returns the named field as bool
func (r RowNamedValues) ToBool(fieldName string) (bool, error) {
if v, ok := r[fieldName]; ok {
Expand Down
106 changes: 79 additions & 27 deletions go/test/endtoend/tabletmanager/throttler/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (

var (
clusterInstance *cluster.LocalProcessCluster
masterTablet cluster.Vttablet
replicaTablet cluster.Vttablet
primaryTablet *cluster.Vttablet
replicaTablet *cluster.Vttablet
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
Expand Down Expand Up @@ -65,8 +65,16 @@ var (
}
}`

httpClient = base.SetupHTTPClient(time.Second)
checkAPIPath = "throttler/check"
httpClient = base.SetupHTTPClient(time.Second)
checkAPIPath = "throttler/check"
checkSelfAPIPath = "throttler/check-self"
)

const (
throttlerInitWait = 10 * time.Second
accumulateLagWait = 2 * time.Second
throttlerRefreshIntervalWait = 12 * time.Second
replicationCatchUpWait = 5 * time.Second
)

func TestMain(m *testing.M) {
Expand All @@ -89,6 +97,7 @@ func TestMain(m *testing.M) {
"-watch_replication_stream",
"-enable_replication_reporter",
"-enable-lag-throttler",
"-throttle_threshold", "1s",
"-heartbeat_enable",
"-heartbeat_interval", "250ms",
}
Expand All @@ -110,9 +119,9 @@ func TestMain(m *testing.M) {
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
for _, tablet := range tablets {
if tablet.Type == "master" {
masterTablet = *tablet
primaryTablet = tablet
} else if tablet.Type != "rdonly" {
replicaTablet = *tablet
replicaTablet = tablet
}
}

Expand All @@ -121,29 +130,47 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}

func throttleCheck() (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", masterTablet.HTTPPort, checkAPIPath))
func throttleCheck(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkAPIPath))
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath))
}

func TestThrottlerBeforeMetricsCollected(t *testing.T) {
defer cluster.PanicHandler(t)

// Immediately after startup, we expect this response:
// {"StatusCode":404,"Value":0,"Threshold":0,"Message":"No such metric"}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}
}

func TestThrottlerAfterMetricsCollected(t *testing.T) {
defer cluster.PanicHandler(t)

time.Sleep(10 * time.Second)
time.Sleep(throttlerInitWait)
// By this time metrics will have been collected. We expect no lag, and something like:
// {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
}

func TestLag(t *testing.T) {
Expand All @@ -153,22 +180,47 @@ func TestLag(t *testing.T) {
err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias)
assert.NoError(t, err)

time.Sleep(2 * time.Second)
time.Sleep(accumulateLagWait)
// Lag will have accumulated
// {"StatusCode":429,"Value":4.864921,"Threshold":1,"Message":"Threshold exceeded"}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
// self (on primary) is unaffected by replication lag
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
}
{
err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias)
assert.NoError(t, err)

time.Sleep(5 * time.Second)
time.Sleep(replicationCatchUpWait)
// Restore
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
}
}

Expand All @@ -178,20 +230,20 @@ func TestNoReplicas(t *testing.T) {
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "RDONLY")
assert.NoError(t, err)

time.Sleep(10 * time.Second)
time.Sleep(throttlerRefreshIntervalWait)
// This makes no REPLICA servers available. We expect something like:
// {"StatusCode":200,"Value":0,"Threshold":1,"Message":""}
resp, err := throttleCheck()
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "REPLICA")
assert.NoError(t, err)

time.Sleep(10 * time.Second)
time.Sleep(throttlerRefreshIntervalWait)
// Restore valid replica
resp, err := throttleCheck()
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
Expand Down
9 changes: 9 additions & 0 deletions go/timer/suspendable_ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ func (s *SuspendableTicker) Stop() {
s.ticker.Stop()
}

// TickNow generates a tick at this point in time. It may block
// if nothing consumes the tick.
func (s *SuspendableTicker) TickNow() {
if atomic.LoadInt64(&s.suspended) == 0 {
// not suspended
s.C <- time.Now()
}
}

func (s *SuspendableTicker) loop() {
for t := range s.ticker.C {
if atomic.LoadInt64(&s.suspended) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) err

sm.ddle.Close()
sm.tableGC.Close()
sm.throttler.Close()
sm.messager.Close()
sm.tracker.Close()
sm.se.MakeNonMaster()
Expand All @@ -470,6 +469,7 @@ func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) err
sm.te.AcceptReadOnly()
sm.rt.MakeNonMaster()
sm.watcher.Open()
sm.throttler.Open()
sm.setState(wantTabletType, StateServing)
return nil
}
Expand Down
40 changes: 20 additions & 20 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,18 @@ func TestStateManagerServeNonMaster(t *testing.T) {

verifySubcomponent(t, 1, sm.ddle, testStateClosed)
verifySubcomponent(t, 2, sm.tableGC, testStateClosed)
verifySubcomponent(t, 3, sm.throttler, testStateClosed)
verifySubcomponent(t, 4, sm.messager, testStateClosed)
verifySubcomponent(t, 5, sm.tracker, testStateClosed)
verifySubcomponent(t, 3, sm.messager, testStateClosed)
verifySubcomponent(t, 4, sm.tracker, testStateClosed)
assert.True(t, sm.se.(*testSchemaEngine).nonMaster)

verifySubcomponent(t, 6, sm.se, testStateOpen)
verifySubcomponent(t, 7, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 8, sm.qe, testStateOpen)
verifySubcomponent(t, 9, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 10, sm.te, testStateNonMaster)
verifySubcomponent(t, 11, sm.rt, testStateNonMaster)
verifySubcomponent(t, 12, sm.watcher, testStateOpen)
verifySubcomponent(t, 5, sm.se, testStateOpen)
verifySubcomponent(t, 6, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 7, sm.qe, testStateOpen)
verifySubcomponent(t, 8, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 9, sm.te, testStateNonMaster)
verifySubcomponent(t, 10, sm.rt, testStateNonMaster)
verifySubcomponent(t, 11, sm.watcher, testStateOpen)
verifySubcomponent(t, 12, sm.throttler, testStateOpen)

assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType)
assert.Equal(t, StateServing, sm.state)
Expand Down Expand Up @@ -292,18 +292,18 @@ func TestStateManagerSetServingTypeNoChange(t *testing.T) {

verifySubcomponent(t, 1, sm.ddle, testStateClosed)
verifySubcomponent(t, 2, sm.tableGC, testStateClosed)
verifySubcomponent(t, 3, sm.throttler, testStateClosed)
verifySubcomponent(t, 4, sm.messager, testStateClosed)
verifySubcomponent(t, 5, sm.tracker, testStateClosed)
verifySubcomponent(t, 3, sm.messager, testStateClosed)
verifySubcomponent(t, 4, sm.tracker, testStateClosed)
assert.True(t, sm.se.(*testSchemaEngine).nonMaster)

verifySubcomponent(t, 6, sm.se, testStateOpen)
verifySubcomponent(t, 7, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 8, sm.qe, testStateOpen)
verifySubcomponent(t, 9, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 10, sm.te, testStateNonMaster)
verifySubcomponent(t, 11, sm.rt, testStateNonMaster)
verifySubcomponent(t, 12, sm.watcher, testStateOpen)
verifySubcomponent(t, 5, sm.se, testStateOpen)
verifySubcomponent(t, 6, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 7, sm.qe, testStateOpen)
verifySubcomponent(t, 8, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 9, sm.te, testStateNonMaster)
verifySubcomponent(t, 10, sm.rt, testStateNonMaster)
verifySubcomponent(t, 11, sm.watcher, testStateOpen)
verifySubcomponent(t, 12, sm.throttler, testStateOpen)

assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType)
assert.Equal(t, StateServing, sm.state)
Expand Down
62 changes: 33 additions & 29 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,35 +1576,39 @@ func (tsv *TabletServer) registerMigrationStatusHandler() {
})
}

// registerThrottlerCheckHandler registers a throttler "check" request
func (tsv *TabletServer) registerThrottlerCheckHandler() {
tsv.exporter.HandleFunc("/throttler/check", func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
remoteAddr := r.Header.Get("X-Forwarded-For")
if remoteAddr == "" {
remoteAddr = r.RemoteAddr
remoteAddr = strings.Split(remoteAddr, ":")[0]
}
appName := r.URL.Query().Get("app")
if appName == "" {
appName = throttle.DefaultAppName
}
flags := &throttle.CheckFlags{
LowPriority: (r.URL.Query().Get("p") == "low"),
}
checkResult := tsv.lagThrottler.Check(ctx, appName, remoteAddr, flags)
if checkResult.StatusCode == http.StatusNotFound && flags.OKIfNotExists {
checkResult.StatusCode = http.StatusOK // 200
}
// registerThrottlerCheckHandlers registers throttler "check" requests
func (tsv *TabletServer) registerThrottlerCheckHandlers() {
handle := func(path string, checkResultFunc func(ctx context.Context, appName string, remoteAddr string, flags *throttle.CheckFlags) *throttle.CheckResult) {
tsv.exporter.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
remoteAddr := r.Header.Get("X-Forwarded-For")
if remoteAddr == "" {
remoteAddr = r.RemoteAddr
remoteAddr = strings.Split(remoteAddr, ":")[0]
}
appName := r.URL.Query().Get("app")
if appName == "" {
appName = throttle.DefaultAppName
}
flags := &throttle.CheckFlags{
LowPriority: (r.URL.Query().Get("p") == "low"),
}
checkResult := checkResultFunc(ctx, appName, remoteAddr, flags)
if checkResult.StatusCode == http.StatusNotFound && flags.OKIfNotExists {
checkResult.StatusCode = http.StatusOK // 200
}

if r.Method == http.MethodGet {
w.Header().Set("Content-Type", "application/json")
}
w.WriteHeader(checkResult.StatusCode)
if r.Method == http.MethodGet {
json.NewEncoder(w).Encode(checkResult)
}
})
if r.Method == http.MethodGet {
w.Header().Set("Content-Type", "application/json")
}
w.WriteHeader(checkResult.StatusCode)
if r.Method == http.MethodGet {
json.NewEncoder(w).Encode(checkResult)
}
})
}
handle("/throttler/check", tsv.lagThrottler.Check)
handle("/throttler/check-self", tsv.lagThrottler.CheckSelf)
}

// registerThrottlerStatusHandler registers a throttler "status" request
Expand All @@ -1619,7 +1623,7 @@ func (tsv *TabletServer) registerThrottlerStatusHandler() {

// registerThrottlerHandlers registers all throttler handlers
func (tsv *TabletServer) registerThrottlerHandlers() {
tsv.registerThrottlerCheckHandler()
tsv.registerThrottlerCheckHandlers()
tsv.registerThrottlerStatusHandler()
}

Expand Down
Loading