From 6b7e0945522ed672a99f854857a23255a3841a3b Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 23 Feb 2021 12:37:59 +0200 Subject: [PATCH 1/5] Tablet throttler: support for custom -throttle_metrics_query and -throttle_metrics_threshold Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/throttler.go | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 787f3a93714..aad1229cc82 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -10,6 +10,7 @@ import ( "context" "flag" "fmt" + "math" "math/rand" "net/http" "strings" @@ -58,9 +59,12 @@ const ( selfStoreName = "self" ) -var throttleThreshold = flag.Duration("throttle_threshold", 1*time.Second, "Replication lag threshold for throttling") -var throttleTabletTypes = flag.String("throttle_tablet_types", "replica", "Comma separated VTTablet types to be considered by the throttler. default: 'replica'. example: 'replica,rdonly'. 'replica' aways implicitly included") - +var ( + throttleThreshold = flag.Duration("throttle_threshold", 1*time.Second, "Replication lag threshold for default lag throttling") + throttleTabletTypes = flag.String("throttle_tablet_types", "replica", "Comma separated VTTablet types to be considered by the throttler. default: 'replica'. example: 'replica,rdonly'. 'replica' aways implicitly included") + throttleMetricQuery = flag.String("throttle_metrics_query", "", "Override default heartbeat/lag metric. Use either `SELECT` (must return single row, single value) or `SHOW GLOBAL ... LIKE ...` queries. Set -throttle_metrics_threshold respectively.") + throttleMetricThreshold = flag.Float64("throttle_metrics_threshold", math.MaxFloat64, "Override default throttle threshold, respective to -throttle_metrics_query") +) var ( throttlerUser = "vt_tablet_throttler" throttlerGrant = fmt.Sprintf("'%s'@'%s'", throttlerUser, "%") @@ -75,10 +79,13 @@ var ( replicationLagQuery = `select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from _vt.heartbeat` ) +// ThrottleCheckType allows a client to indicate what type of check it wants to issue. See available types below. type ThrottleCheckType int const ( + // ThrottleCheckPrimaryWrite indicates a check before making a write on a primary server ThrottleCheckPrimaryWrite ThrottleCheckType = iota + // ThrottleCheckSelf indicates a check on a specific server health ThrottleCheckSelf ) @@ -215,19 +222,28 @@ func (throttler *Throttler) initConfig(password string) { }, }, } + metricsQuery := replicationLagQuery + if *throttleMetricQuery != "" { + metricsQuery = *throttleMetricQuery + } + metricsThreshold := throttleThreshold.Seconds() + if *throttleMetricThreshold != math.MaxFloat64 { + metricsThreshold = *throttleMetricThreshold + } + config.Instance.Stores.MySQL.Clusters[selfStoreName] = &config.MySQLClusterConfigurationSettings{ User: "", // running on local tablet server, will use vttablet DBA user Password: "", // running on local tablet server, will use vttablet DBA user - ThrottleThreshold: throttleThreshold.Seconds(), - MetricQuery: replicationLagQuery, + ThrottleThreshold: metricsThreshold, + MetricQuery: metricsQuery, IgnoreHostsCount: 0, } if password != "" { config.Instance.Stores.MySQL.Clusters[shardStoreName] = &config.MySQLClusterConfigurationSettings{ User: throttlerUser, Password: password, - ThrottleThreshold: throttleThreshold.Seconds(), - MetricQuery: replicationLagQuery, + ThrottleThreshold: metricsThreshold, + MetricQuery: metricsQuery, IgnoreHostsCount: 0, } } @@ -799,7 +815,7 @@ func (throttler *Throttler) CheckSelf(ctx context.Context, appName string, remot return throttler.checkStore(ctx, appName, selfStoreName, remoteAddr, flags) } -// CheckSelf is checks the mysql/self metric, and is available on each tablet +// CheckByType runs a check by requested check type func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) { switch checkType { case ThrottleCheckSelf: From 0d03a439d53ef2e2467ccfd2aa6d2a6e359623a1 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 23 Feb 2021 15:24:44 +0200 Subject: [PATCH 2/5] adding endtoend tests for custom query Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- ...uster_endtoend_tabletmanager_throttler.yml | 40 +++ ..._tabletmanager_throttler_custom_config.yml | 40 +++ .../throttler_custom_config/throttler_test.go | 251 ++++++++++++++++++ test/ci_workflow_gen.go | 2 +- test/config.json | 13 +- 5 files changed, 344 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/cluster_endtoend_tabletmanager_throttler.yml create mode 100644 .github/workflows/cluster_endtoend_tabletmanager_throttler_custom_config.yml create mode 100644 go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go diff --git a/.github/workflows/cluster_endtoend_tabletmanager_throttler.yml b/.github/workflows/cluster_endtoend_tabletmanager_throttler.yml new file mode 100644 index 00000000000..f7c9e7bdd2e --- /dev/null +++ b/.github/workflows/cluster_endtoend_tabletmanager_throttler.yml @@ -0,0 +1,40 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + +name: Cluster (tabletmanager_throttler) +on: [push, pull_request] +jobs: + + build: + name: Run endtoend tests on Cluster (tabletmanager_throttler) + runs-on: ubuntu-latest + + steps: + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.15 + + - name: Check out code + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + sudo apt-get update + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get install -y gnupg2 + sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get update + sudo apt-get install percona-xtrabackup-24 + + - name: Run cluster endtoend test + timeout-minutes: 30 + run: | + source build.env + eatmydata -- go run test.go -docker=false -print-log -follow -shard tabletmanager_throttler diff --git a/.github/workflows/cluster_endtoend_tabletmanager_throttler_custom_config.yml b/.github/workflows/cluster_endtoend_tabletmanager_throttler_custom_config.yml new file mode 100644 index 00000000000..c48d42e5c58 --- /dev/null +++ b/.github/workflows/cluster_endtoend_tabletmanager_throttler_custom_config.yml @@ -0,0 +1,40 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + +name: Cluster (tabletmanager_throttler_custom_config) +on: [push, pull_request] +jobs: + + build: + name: Run endtoend tests on Cluster (tabletmanager_throttler_custom_config) + runs-on: ubuntu-latest + + steps: + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.15 + + - name: Check out code + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + sudo apt-get update + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get install -y gnupg2 + sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get update + sudo apt-get install percona-xtrabackup-24 + + - name: Run cluster endtoend test + timeout-minutes: 30 + run: | + source build.env + eatmydata -- go run test.go -docker=false -print-log -follow -shard tabletmanager_throttler_custom_config diff --git a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go new file mode 100644 index 00000000000..40882947558 --- /dev/null +++ b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go @@ -0,0 +1,251 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package master + +import ( + "flag" + "fmt" + "net/http" + "os" + "testing" + "time" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + + "vitess.io/vitess/go/test/endtoend/cluster" + + "github.com/stretchr/testify/assert" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + primaryTablet *cluster.Vttablet + replicaTablet *cluster.Vttablet + hostname = "localhost" + keyspaceName = "ks" + cell = "zone1" + sqlSchema = ` + create table t1( + id bigint, + value varchar(16), + primary key(id) + ) Engine=InnoDB; +` + + vSchema = ` + { + "sharded": true, + "vindexes": { + "hash": { + "type": "hash" + } + }, + "tables": { + "t1": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ] + } + } + }` + + httpClient = base.SetupHTTPClient(time.Second) + checkAPIPath = "throttler/check" + checkSelfAPIPath = "throttler/check-self" +) + +const ( + throttlerInitWait = 10 * time.Second + accumulateLagWait = 2 * time.Second + throttlerRefreshIntervalWait = 12 * time.Second + replicationCatchUpWait = 5 * time.Second +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Set extra tablet args for lock timeout + clusterInstance.VtTabletExtraArgs = []string{ + "-lock_tables_timeout", "5s", + "-watch_replication_stream", + "-enable_replication_reporter", + "-enable-lag-throttler", + "-throttle_metrics_query", "show global status like 'threads_running'", + "-throttle_metrics_threshold", "1", + "-heartbeat_enable", + "-heartbeat_interval", "250ms", + } + // We do not need semiSync for this test case. + clusterInstance.EnableSemiSync = false + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + + if err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false); err != nil { + return 1 + } + + // Collect table paths and ports + tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets + for _, tablet := range tablets { + if tablet.Type == "master" { + primaryTablet = tablet + } else if tablet.Type != "rdonly" { + replicaTablet = tablet + } + } + + return m.Run() + }() + os.Exit(exitCode) +} + +func throttleCheck(tablet *cluster.Vttablet) (*http.Response, error) { + return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkAPIPath)) +} + +func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) { + return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath)) +} + +func TestThrottlerBeforeMetricsCollected(t *testing.T) { + defer cluster.PanicHandler(t) + + // Immediately after startup, we expect this response: + // {"StatusCode":404,"Value":0,"Threshold":0,"Message":"No such metric"} + { + resp, err := throttleCheck(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusNotFound, resp.StatusCode) + } +} + +func TestThrottlerAfterMetricsCollected(t *testing.T) { + defer cluster.PanicHandler(t) + + time.Sleep(throttlerInitWait) + // By this time metrics will have been collected. We expect no lag, and something like: + // {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""} + { + resp, err := throttleCheck(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(replicaTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } +} + +func TestLag(t *testing.T) { + defer cluster.PanicHandler(t) + + { + err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) + assert.NoError(t, err) + + time.Sleep(accumulateLagWait) + // Lag will have accumulated + // {"StatusCode":429,"Value":4.864921,"Threshold":1,"Message":"Threshold exceeded"} + { + resp, err := throttleCheck(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(primaryTablet) + assert.NoError(t, err) + // self (on primary) is unaffected by replication lag + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(replicaTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + } + } + { + err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias) + assert.NoError(t, err) + + time.Sleep(replicationCatchUpWait) + // Restore + { + resp, err := throttleCheck(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + resp, err := throttleCheckSelf(replicaTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + } +} + +func TestNoReplicas(t *testing.T) { + defer cluster.PanicHandler(t) + { + err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "RDONLY") + assert.NoError(t, err) + + time.Sleep(throttlerRefreshIntervalWait) + // This makes no REPLICA servers available. We expect something like: + // {"StatusCode":200,"Value":0,"Threshold":1,"Message":""} + resp, err := throttleCheck(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "REPLICA") + assert.NoError(t, err) + + time.Sleep(throttlerRefreshIntervalWait) + // Restore valid replica + resp, err := throttleCheck(primaryTablet) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } +} diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index 2f4bbb126ff..ba0c1a2b70c 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -32,7 +32,7 @@ const ( unitTestDatabases = "percona56, mysql57, mysql80, mariadb101, mariadb102, mariadb103" clusterTestTemplate = "templates/cluster_endtoend_test.tpl" - clusterList = "11,12,13,14,15,16,17,18,19,20,21,22,23,24,26,27,vreplication_basic,vreplication_multicell,vreplication_cellalias,vreplication_v2" + clusterList = "11,12,13,14,15,16,17,18,19,20,21,22,23,24,26,27,vreplication_basic,vreplication_multicell,vreplication_cellalias,vreplication_v2,tabletmanager_throttler,tabletmanager_throttler_custom_config" // TODO: currently some percona tools including xtrabackup are installed on all clusters, we can possibly optimize // this by only installing them in the required clusters clustersRequiringXtraBackup = clusterList diff --git a/test/config.json b/test/config.json index b9a48bfe033..2812761440e 100644 --- a/test/config.json +++ b/test/config.json @@ -411,7 +411,18 @@ "Args": ["vitess.io/vitess/go/test/endtoend/tabletmanager/throttler"], "Command": [], "Manual": false, - "Shard": "18", + "Shard": "tabletmanager_throttler", + "RetryMax": 0, + "Tags": [ + "site_test" + ] + }, + "tabletmanager_throttler_custom_config": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/tabletmanager/throttler_custom_config"], + "Command": [], + "Manual": false, + "Shard": "tabletmanager_throttler_custom_config", "RetryMax": 0, "Tags": [ "site_test" From a2bb48f5635805abba3de57194b3092ffe94e17c Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 24 Feb 2021 09:06:08 +0200 Subject: [PATCH 3/5] Support for -throttle_check_as_check_self. Self checks now know how to interpret a SHOW GLOBAL query. Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../throttle/base/throttle_metric.go | 3 + .../tabletserver/throttle/check_result.go | 2 + .../throttle/mysql/mysql_throttle_metric.go | 69 ++++++++++++------- .../tabletserver/throttle/throttler.go | 67 ++++++++++++------ 4 files changed, 97 insertions(+), 44 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric.go b/go/vt/vttablet/tabletserver/throttle/base/throttle_metric.go index 4baf063c6fd..ff6e1b146d9 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric.go +++ b/go/vt/vttablet/tabletserver/throttle/base/throttle_metric.go @@ -27,6 +27,9 @@ var errNoResultYet = errors.New("Metric not collected yet") // ErrNoSuchMetric is for when a user requests a metric by an unknown metric name var ErrNoSuchMetric = errors.New("No such metric") +// ErrInvalidCheckType is an internal error indicating an unknown check type +var ErrInvalidCheckType = errors.New("Unknown throttler check type") + // IsDialTCPError sees if th egiven error indicates a TCP issue func IsDialTCPError(e error) bool { if e == nil { diff --git a/go/vt/vttablet/tabletserver/throttle/check_result.go b/go/vt/vttablet/tabletserver/throttle/check_result.go index 9981109694c..52d52b78468 100644 --- a/go/vt/vttablet/tabletserver/throttle/check_result.go +++ b/go/vt/vttablet/tabletserver/throttle/check_result.go @@ -44,3 +44,5 @@ func NewErrorCheckResult(statusCode int, err error) *CheckResult { var NoSuchMetricCheckResult = NewErrorCheckResult(http.StatusNotFound, base.ErrNoSuchMetric) var okMetricCheckResult = NewCheckResult(http.StatusOK, 0, 0, nil) + +var invalidCheckTypeCheckResult = NewErrorCheckResult(http.StatusInternalServerError, base.ErrInvalidCheckType) diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go index 3e02c2dd589..9863c93b701 100644 --- a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go +++ b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go @@ -17,6 +17,20 @@ import ( "vitess.io/vitess/go/vt/orchestrator/external/golib/sqlutils" ) +// MetricsQueryType indicates the type of metrics query on MySQL backend. See following. +type MetricsQueryType int + +const ( + // MetricsQueryTypeDefault indictes the default, internal implementation. Specifically, our throttler runs a replication lag query + MetricsQueryTypeDefault MetricsQueryType = iota + // MetricsQueryTypeShowGlobal indicatesa SHOW GLOBAL (STATUS|VARIABLES) query + MetricsQueryTypeShowGlobal + // MetricsQueryTypeSelect indicates a custom SELECT query + MetricsQueryTypeSelect + // MetricsQueryTypeUnknown is an unknown query type, which we cannot run. This is an error + MetricsQueryTypeUnknown +) + var mysqlMetricCache = cache.New(cache.NoExpiration, 10*time.Millisecond) func getMySQLMetricCacheKey(probe *Probe) string { @@ -44,6 +58,20 @@ func getCachedMySQLThrottleMetric(probe *Probe) *MySQLThrottleMetric { return nil } +// GetMetricsQueryType analyzes the type of a metrics query +func GetMetricsQueryType(query string) MetricsQueryType { + if query == "" { + return MetricsQueryTypeDefault + } + if strings.HasPrefix(strings.ToLower(query), "select") { + return MetricsQueryTypeSelect + } + if strings.HasPrefix(strings.ToLower(query), "show global") { + return MetricsQueryTypeShowGlobal + } + return MetricsQueryTypeUnknown +} + // MySQLThrottleMetric has the probed metric for a mysql instance type MySQLThrottleMetric struct { ClusterName string @@ -106,33 +134,28 @@ func ReadThrottleMetric(probe *Probe, clusterName string, overrideGetMetricFunc db.SetMaxOpenConns(maxPoolConnections) db.SetMaxIdleConns(maxIdleConnections) } - if strings.HasPrefix(strings.ToLower(probe.MetricQuery), "select") { + metricsQueryType := GetMetricsQueryType(probe.MetricQuery) + switch metricsQueryType { + case MetricsQueryTypeSelect: mySQLThrottleMetric.Err = db.QueryRow(probe.MetricQuery).Scan(&mySQLThrottleMetric.Value) return cacheMySQLThrottleMetric(probe, mySQLThrottleMetric) - } - - if strings.HasPrefix(strings.ToLower(probe.MetricQuery), "show global") { + case MetricsQueryTypeShowGlobal: var variableName string // just a placeholder mySQLThrottleMetric.Err = db.QueryRow(probe.MetricQuery).Scan(&variableName, &mySQLThrottleMetric.Value) return cacheMySQLThrottleMetric(probe, mySQLThrottleMetric) + case MetricsQueryTypeDefault: + mySQLThrottleMetric.Err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error { + slaveIORunning := m.GetString("Slave_IO_Running") + slaveSQLRunning := m.GetString("Slave_SQL_Running") + secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master") + if !secondsBehindMaster.Valid { + return fmt.Errorf("replication not running; Slave_IO_Running=%+v, Slave_SQL_Running=%+v", slaveIORunning, slaveSQLRunning) + } + mySQLThrottleMetric.Value = float64(secondsBehindMaster.Int64) + return nil + }) + return cacheMySQLThrottleMetric(probe, mySQLThrottleMetric) } - - if probe.MetricQuery != "" { - mySQLThrottleMetric.Err = fmt.Errorf("Unsupported metrics query type: %s", probe.MetricQuery) - return mySQLThrottleMetric - } - - // No metric query? By default we look at replication lag as output of SHOW SLAVE STATUS - - mySQLThrottleMetric.Err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error { - slaveIORunning := m.GetString("Slave_IO_Running") - slaveSQLRunning := m.GetString("Slave_SQL_Running") - secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master") - if !secondsBehindMaster.Valid { - return fmt.Errorf("replication not running; Slave_IO_Running=%+v, Slave_SQL_Running=%+v", slaveIORunning, slaveSQLRunning) - } - mySQLThrottleMetric.Value = float64(secondsBehindMaster.Int64) - return nil - }) - return cacheMySQLThrottleMetric(probe, mySQLThrottleMetric) + mySQLThrottleMetric.Err = fmt.Errorf("Unsupported metrics query type: %s", probe.MetricQuery) + return mySQLThrottleMetric } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index aad1229cc82..eddbc43ad56 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -13,6 +13,7 @@ import ( "math" "math/rand" "net/http" + "strconv" "strings" "sync" "sync/atomic" @@ -60,10 +61,11 @@ const ( ) var ( - throttleThreshold = flag.Duration("throttle_threshold", 1*time.Second, "Replication lag threshold for default lag throttling") - throttleTabletTypes = flag.String("throttle_tablet_types", "replica", "Comma separated VTTablet types to be considered by the throttler. default: 'replica'. example: 'replica,rdonly'. 'replica' aways implicitly included") - throttleMetricQuery = flag.String("throttle_metrics_query", "", "Override default heartbeat/lag metric. Use either `SELECT` (must return single row, single value) or `SHOW GLOBAL ... LIKE ...` queries. Set -throttle_metrics_threshold respectively.") - throttleMetricThreshold = flag.Float64("throttle_metrics_threshold", math.MaxFloat64, "Override default throttle threshold, respective to -throttle_metrics_query") + throttleThreshold = flag.Duration("throttle_threshold", 1*time.Second, "Replication lag threshold for default lag throttling") + throttleTabletTypes = flag.String("throttle_tablet_types", "replica", "Comma separated VTTablet types to be considered by the throttler. default: 'replica'. example: 'replica,rdonly'. 'replica' aways implicitly included") + throttleMetricQuery = flag.String("throttle_metrics_query", "", "Override default heartbeat/lag metric. Use either `SELECT` (must return single row, single value) or `SHOW GLOBAL ... LIKE ...` queries. Set -throttle_metrics_threshold respectively.") + throttleMetricThreshold = flag.Float64("throttle_metrics_threshold", math.MaxFloat64, "Override default throttle threshold, respective to -throttle_metrics_query") + throttlerCheckAsCheckSelf = flag.Bool("throttle_check_as_check_self", false, "Should throttler/check return a throttler/check-self result (changes throttler behavior for writes)") ) var ( throttlerUser = "vt_tablet_throttler" @@ -116,6 +118,10 @@ type Throttler struct { mysqlInventory *mysql.Inventory + metricsQuery string + metricsThreshold float64 + metricsQueryType mysql.MetricsQueryType + mysqlClusterThresholds *cache.Cache aggregatedMetrics *cache.Cache throttledApps *cache.Cache @@ -165,6 +171,9 @@ func NewThrottler(env tabletenv.Env, ts *topo.Server, tabletTypeFunc func() topo mysqlClusterProbesChan: make(chan *mysql.ClusterProbes), mysqlInventory: mysql.NewInventory(), + metricsQuery: replicationLagQuery, + metricsThreshold: throttleThreshold.Seconds(), + throttledApps: cache.New(cache.NoExpiration, 10*time.Second), mysqlClusterThresholds: cache.New(cache.NoExpiration, 0), aggregatedMetrics: cache.New(aggregatedMetricsExpiration, aggregatedMetricsCleanup), @@ -222,28 +231,27 @@ func (throttler *Throttler) initConfig(password string) { }, }, } - metricsQuery := replicationLagQuery if *throttleMetricQuery != "" { - metricsQuery = *throttleMetricQuery + throttler.metricsQuery = *throttleMetricQuery } - metricsThreshold := throttleThreshold.Seconds() if *throttleMetricThreshold != math.MaxFloat64 { - metricsThreshold = *throttleMetricThreshold + throttler.metricsThreshold = *throttleMetricThreshold } + throttler.metricsQueryType = mysql.GetMetricsQueryType(throttler.metricsQuery) config.Instance.Stores.MySQL.Clusters[selfStoreName] = &config.MySQLClusterConfigurationSettings{ User: "", // running on local tablet server, will use vttablet DBA user Password: "", // running on local tablet server, will use vttablet DBA user - ThrottleThreshold: metricsThreshold, - MetricQuery: metricsQuery, + MetricQuery: throttler.metricsQuery, + ThrottleThreshold: throttler.metricsThreshold, IgnoreHostsCount: 0, } if password != "" { config.Instance.Stores.MySQL.Clusters[shardStoreName] = &config.MySQLClusterConfigurationSettings{ User: throttlerUser, Password: password, - ThrottleThreshold: metricsThreshold, - MetricQuery: metricsQuery, + MetricQuery: throttler.metricsQuery, + ThrottleThreshold: throttler.metricsThreshold, IgnoreHostsCount: 0, } } @@ -351,7 +359,7 @@ func (throttler *Throttler) readSelfMySQLThrottleMetric() *mysql.MySQLThrottleMe metric := &mysql.MySQLThrottleMetric{ ClusterName: selfStoreName, Key: *mysql.SelfInstanceKey, - Value: 3.14, + Value: 0, Err: nil, } ctx := context.Background() @@ -362,17 +370,29 @@ func (throttler *Throttler) readSelfMySQLThrottleMetric() *mysql.MySQLThrottleMe } defer conn.Recycle() - tm, err := conn.Exec(ctx, replicationLagQuery, 1, true) + tm, err := conn.Exec(ctx, throttler.metricsQuery, 1, true) if err != nil { metric.Err = err return metric } row := tm.Named().Row() if row == nil { - metric.Err = fmt.Errorf("no results for ReadSelfMySQLThrottleMetric") + metric.Err = fmt.Errorf("no results for readSelfMySQLThrottleMetric") return metric } - metric.Value, metric.Err = row.ToFloat64("replication_lag") + + switch throttler.metricsQueryType { + case mysql.MetricsQueryTypeSelect: + // We expect a single row, single column result. + // The "for" iteration below is just a way to get first result without knowning column name + for k := range row { + metric.Value, metric.Err = row.ToFloat64(k) + } + case mysql.MetricsQueryTypeShowGlobal: + metric.Value, metric.Err = strconv.ParseFloat(row["Value"].ToString(), 64) + default: + metric.Err = fmt.Errorf("Unsupported metrics query type for query %s", throttler.metricsQuery) + } return metric } @@ -805,13 +825,13 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor return throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) } -// Check is the main serving function of the throttler, and returns a check result for this cluster's lag; it is only applicable on a Primary tablet. -func (throttler *Throttler) Check(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { +// checkShard checks the health of the shard, and runs on the primary tablet only +func (throttler *Throttler) checkShard(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { return throttler.checkStore(ctx, appName, shardStoreName, remoteAddr, flags) } // CheckSelf is checks the mysql/self metric, and is available on each tablet -func (throttler *Throttler) CheckSelf(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { +func (throttler *Throttler) checkSelf(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { return throttler.checkStore(ctx, appName, selfStoreName, remoteAddr, flags) } @@ -819,9 +839,14 @@ func (throttler *Throttler) CheckSelf(ctx context.Context, appName string, remot func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) { switch checkType { case ThrottleCheckSelf: - return throttler.CheckSelf(ctx, appName, remoteAddr, flags) + return throttler.checkSelf(ctx, appName, remoteAddr, flags) + case ThrottleCheckPrimaryWrite: + if *throttlerCheckAsCheckSelf { + return throttler.checkSelf(ctx, appName, remoteAddr, flags) + } + return throttler.checkShard(ctx, appName, remoteAddr, flags) default: - return throttler.Check(ctx, appName, remoteAddr, flags) + return invalidCheckTypeCheckResult } } From 6944b6e42917c114da4e6bb53740f586bd932e06 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 24 Feb 2021 10:59:05 +0200 Subject: [PATCH 4/5] endtoend test for custom 'SHOW GLOBAL STATUS LIKE' query with check_as_self_check Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../throttler_custom_config/throttler_test.go | 108 ++++++++---------- 1 file changed, 48 insertions(+), 60 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go index 40882947558..51fb6102fc3 100644 --- a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go @@ -16,6 +16,7 @@ limitations under the License. package master import ( + "context" "flag" "fmt" "net/http" @@ -23,11 +24,14 @@ import ( "testing" "time" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" "vitess.io/vitess/go/test/endtoend/cluster" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var ( @@ -68,13 +72,11 @@ var ( httpClient = base.SetupHTTPClient(time.Second) checkAPIPath = "throttler/check" checkSelfAPIPath = "throttler/check-self" + vtParams mysql.ConnParams ) const ( - throttlerInitWait = 10 * time.Second - accumulateLagWait = 2 * time.Second - throttlerRefreshIntervalWait = 12 * time.Second - replicationCatchUpWait = 5 * time.Second + throttlerInitWait = 10 * time.Second ) func TestMain(m *testing.M) { @@ -98,7 +100,8 @@ func TestMain(m *testing.M) { "-enable_replication_reporter", "-enable-lag-throttler", "-throttle_metrics_query", "show global status like 'threads_running'", - "-throttle_metrics_threshold", "1", + "-throttle_metrics_threshold", "2", + "-throttle_check_as_check_self", "-heartbeat_enable", "-heartbeat_interval", "250ms", } @@ -126,6 +129,20 @@ func TestMain(m *testing.M) { } } + vtgateInstance := clusterInstance.NewVtgateInstance() + // set the gateway we want to use + vtgateInstance.GatewayImplementation = "tabletgateway" + // Start vtgate + if err := vtgateInstance.Setup(); err != nil { + return 1 + } + // ensure it is torn down during cluster TearDown + clusterInstance.VtgateProcess = *vtgateInstance + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() }() os.Exit(exitCode) @@ -139,15 +156,13 @@ func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) { return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath)) } -func TestThrottlerBeforeMetricsCollected(t *testing.T) { +func TestThrottlerThresholdOK(t *testing.T) { defer cluster.PanicHandler(t) - // Immediately after startup, we expect this response: - // {"StatusCode":404,"Value":0,"Threshold":0,"Message":"No such metric"} { resp, err := throttleCheck(primaryTablet) assert.NoError(t, err) - assert.Equal(t, http.StatusNotFound, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) } } @@ -167,23 +182,17 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) } - { - resp, err := throttleCheckSelf(replicaTablet) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) - } } -func TestLag(t *testing.T) { +func TestThreadsRunning(t *testing.T) { defer cluster.PanicHandler(t) - { - err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) - assert.NoError(t, err) - - time.Sleep(accumulateLagWait) - // Lag will have accumulated - // {"StatusCode":429,"Value":4.864921,"Threshold":1,"Message":"Threshold exceeded"} + sleepSeconds := 6 + go vtgateExec(t, fmt.Sprintf("select sleep(%d)", sleepSeconds), "") + t.Run("exceeds threshold", func(t *testing.T) { + time.Sleep(3 * time.Second) + // by this time we will have +1 threads_running, and we should hit the threshold + // {"StatusCode":429,"Value":2,"Threshold":2,"Message":"Threshold exceeded"} { resp, err := throttleCheck(primaryTablet) assert.NoError(t, err) @@ -192,20 +201,11 @@ func TestLag(t *testing.T) { { resp, err := throttleCheckSelf(primaryTablet) assert.NoError(t, err) - // self (on primary) is unaffected by replication lag - assert.Equal(t, http.StatusOK, resp.StatusCode) - } - { - resp, err := throttleCheckSelf(replicaTablet) - assert.NoError(t, err) assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) } - } - { - err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias) - assert.NoError(t, err) - - time.Sleep(replicationCatchUpWait) + }) + t.Run("restored below threshold", func(t *testing.T) { + time.Sleep(time.Duration(sleepSeconds) * time.Second) // Restore { resp, err := throttleCheck(primaryTablet) @@ -217,35 +217,23 @@ func TestLag(t *testing.T) { assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) } - { - resp, err := throttleCheckSelf(replicaTablet) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) - } - } + }) } -func TestNoReplicas(t *testing.T) { - defer cluster.PanicHandler(t) - { - err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "RDONLY") - assert.NoError(t, err) +func vtgateExec(t *testing.T, query string, expectError string) *sqltypes.Result { + t.Helper() - time.Sleep(throttlerRefreshIntervalWait) - // This makes no REPLICA servers available. We expect something like: - // {"StatusCode":200,"Value":0,"Threshold":1,"Message":""} - resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) - } - { - err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "REPLICA") - assert.NoError(t, err) + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() - time.Sleep(throttlerRefreshIntervalWait) - // Restore valid replica - resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) + qr, err := conn.ExecuteFetch(query, 1000, true) + if expectError == "" { + require.NoError(t, err) + } else { + require.Error(t, err, "error should not be nil") + assert.Contains(t, err.Error(), expectError, "Unexpected error") } + return qr } From a9a2c627fa278caf3e3245337abcad43ac428ef3 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 24 Feb 2021 11:12:43 +0200 Subject: [PATCH 5/5] raise the threshold, and ensure we hit it Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../throttler_custom_config/throttler_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go index 51fb6102fc3..0f03bed0e62 100644 --- a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go @@ -76,6 +76,7 @@ var ( ) const ( + testThreshold = 5 throttlerInitWait = 10 * time.Second ) @@ -100,7 +101,7 @@ func TestMain(m *testing.M) { "-enable_replication_reporter", "-enable-lag-throttler", "-throttle_metrics_query", "show global status like 'threads_running'", - "-throttle_metrics_threshold", "2", + "-throttle_metrics_threshold", fmt.Sprintf("%d", testThreshold), "-throttle_check_as_check_self", "-heartbeat_enable", "-heartbeat_interval", "250ms", @@ -188,7 +189,9 @@ func TestThreadsRunning(t *testing.T) { defer cluster.PanicHandler(t) sleepSeconds := 6 - go vtgateExec(t, fmt.Sprintf("select sleep(%d)", sleepSeconds), "") + for i := 0; i < testThreshold; i++ { + go vtgateExec(t, fmt.Sprintf("select sleep(%d)", sleepSeconds), "") + } t.Run("exceeds threshold", func(t *testing.T) { time.Sleep(3 * time.Second) // by this time we will have +1 threads_running, and we should hit the threshold