diff --git a/docker/mini/vttablet-mini-up.sh b/docker/mini/vttablet-mini-up.sh index 08e00e444f0..0dbb73c5f87 100755 --- a/docker/mini/vttablet-mini-up.sh +++ b/docker/mini/vttablet-mini-up.sh @@ -51,7 +51,7 @@ vttablet \ -init_tablet_type $tablet_type \ -health_check_interval 5s \ -heartbeat_enable \ - -heartbeat_interval 500ms \ + -heartbeat_interval 250ms \ -enable_semi_sync \ -enable_replication_reporter \ -backup_storage_implementation file \ diff --git a/examples/compose/vttablet-up.sh b/examples/compose/vttablet-up.sh index f9f08084330..cfc9b89b44d 100755 --- a/examples/compose/vttablet-up.sh +++ b/examples/compose/vttablet-up.sh @@ -164,6 +164,8 @@ exec $VTROOT/bin/vttablet \ -health_check_interval 5s \ -enable_semi_sync \ -enable_replication_reporter \ + -heartbeat_enable \ + -heartbeat_interval 250ms \ -port $web_port \ -grpc_port $grpc_port \ -binlog_use_v3_resharding_mode=true \ diff --git a/examples/local/scripts/vttablet-up.sh b/examples/local/scripts/vttablet-up.sh index c3f66fa806d..71e47c59d59 100755 --- a/examples/local/scripts/vttablet-up.sh +++ b/examples/local/scripts/vttablet-up.sh @@ -47,6 +47,8 @@ vttablet \ -health_check_interval 5s \ -enable_semi_sync \ -enable_replication_reporter \ + -heartbeat_enable \ + -heartbeat_interval 250ms \ -backup_storage_implementation file \ -file_backup_storage_root $VTDATAROOT/backups \ -restore_from_backup \ diff --git a/go/test/endtoend/tabletmanager/throttler/throttler_test.go b/go/test/endtoend/tabletmanager/throttler/throttler_test.go new file mode 100644 index 00000000000..02f2ad58675 --- /dev/null +++ b/go/test/endtoend/tabletmanager/throttler/throttler_test.go @@ -0,0 +1,197 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package master + +import ( + "flag" + "fmt" + "net/http" + "os" + "testing" + "time" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + + "vitess.io/vitess/go/test/endtoend/cluster" + + "github.com/stretchr/testify/assert" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + masterTablet cluster.Vttablet + replicaTablet cluster.Vttablet + hostname = "localhost" + keyspaceName = "ks" + cell = "zone1" + sqlSchema = ` + create table t1( + id bigint, + value varchar(16), + primary key(id) + ) Engine=InnoDB; +` + + vSchema = ` + { + "sharded": true, + "vindexes": { + "hash": { + "type": "hash" + } + }, + "tables": { + "t1": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ] + } + } + }` + + httpClient = base.SetupHTTPClient(time.Second) + checkAPIPath = "throttler/check" +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Set extra tablet args for lock timeout + clusterInstance.VtTabletExtraArgs = []string{ + "-lock_tables_timeout", "5s", + "-watch_replication_stream", + "-enable_replication_reporter", + "-heartbeat_enable", + "-heartbeat_interval", "250ms", + } + // We do not need semiSync for this test case. + clusterInstance.EnableSemiSync = false + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + + if err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil { + return 1 + } + + // Collect table paths and ports + tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets + for _, tablet := range tablets { + if tablet.Type == "master" { + masterTablet = *tablet + } else if tablet.Type != "rdonly" { + replicaTablet = *tablet + } + } + + return m.Run() + }() + os.Exit(exitCode) +} + +func throttleCheck() (*http.Response, error) { + return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", masterTablet.HTTPPort, checkAPIPath)) +} + +func TestThrottlerBeforeMetricsCollected(t *testing.T) { + defer cluster.PanicHandler(t) + + // Immediately after startup, we expect this response: + // {"StatusCode":404,"Value":0,"Threshold":0,"Message":"No such metric"} + resp, err := throttleCheck() + assert.NoError(t, err) + assert.Equal(t, http.StatusNotFound, resp.StatusCode) +} + +func TestThrottlerAfterMetricsCollected(t *testing.T) { + defer cluster.PanicHandler(t) + + time.Sleep(10 * time.Second) + // By this time metrics will have been collected. We expect no lag, and something like: + // {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""} + resp, err := throttleCheck() + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func TestLag(t *testing.T) { + defer cluster.PanicHandler(t) + + { + err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) + assert.NoError(t, err) + + time.Sleep(2 * time.Second) + // Lag will have accumulated + // {"StatusCode":429,"Value":4.864921,"Threshold":1,"Message":"Threshold exceeded"} + resp, err := throttleCheck() + assert.NoError(t, err) + assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + } + { + err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias) + assert.NoError(t, err) + + time.Sleep(5 * time.Second) + // Restore + resp, err := throttleCheck() + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } +} + +func TestNoReplicas(t *testing.T) { + defer cluster.PanicHandler(t) + { + err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "RDONLY") + assert.NoError(t, err) + + time.Sleep(10 * time.Second) + // This makes no REPLICA servers available. We expect something like: + // {"StatusCode":200,"Value":0,"Threshold":1,"Message":""} + resp, err := throttleCheck() + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + { + err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "REPLICA") + assert.NoError(t, err) + + time.Sleep(10 * time.Second) + // Restore valid replica + resp, err := throttleCheck() + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } +} diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index dcd752697a1..0bb0ed0387a 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Vitess Authors. +Copyright 2020 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 24e3e8a6204..b6fbc09f779 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -58,6 +58,8 @@ func TestSchemaVersioning(t *testing.T) { tsv := framework.Server tsv.EnableHistorian(false) tsv.SetTracking(false) + tsv.EnableHeartbeat(false) + defer tsv.EnableHeartbeat(true) defer tsv.EnableHistorian(true) defer tsv.SetTracking(true) diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker.go b/go/vt/vttablet/tabletserver/repltracker/repltracker.go index 7c3013bc67f..99e0b7444b4 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker.go @@ -128,3 +128,9 @@ func (rt *ReplTracker) Status() (time.Duration, error) { // rt.mode == tabletenv.Poller return rt.poller.Status() } + +// EnableHeartbeat enables or disables writes of heartbeat. This functionality +// is only used by tests. +func (rt *ReplTracker) EnableHeartbeat(enable bool) { + rt.hw.enableWrites(enable) +} diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go index 632859ab33e..250fd89b2fa 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go @@ -82,7 +82,7 @@ func TestReplTracker(t *testing.T) { rt.InitDBConfig(target, mysqld) assert.Equal(t, tabletenv.Polling, rt.mode) assert.Equal(t, mysqld, rt.poller.mysqld) - assert.False(t, rt.hw.enabled) + assert.True(t, rt.hw.enabled) assert.False(t, rt.hr.enabled) rt.MakeNonMaster() diff --git a/go/vt/vttablet/tabletserver/repltracker/writer.go b/go/vt/vttablet/tabletserver/repltracker/writer.go index 249fd06efa3..a7b8bf18637 100644 --- a/go/vt/vttablet/tabletserver/repltracker/writer.go +++ b/go/vt/vttablet/tabletserver/repltracker/writer.go @@ -73,9 +73,6 @@ type heartbeatWriter struct { // newHeartbeatWriter creates a new heartbeatWriter. func newHeartbeatWriter(env tabletenv.Env, alias topodatapb.TabletAlias) *heartbeatWriter { config := env.Config() - if config.ReplicationTracker.Mode != tabletenv.Heartbeat { - return &heartbeatWriter{} - } heartbeatInterval := config.ReplicationTracker.HeartbeatIntervalSeconds.Get() return &heartbeatWriter{ env: env, @@ -111,7 +108,7 @@ func (w *heartbeatWriter) Open() { log.Info("Hearbeat Writer: opening") w.pool.Open(w.env.Config().DB.AppWithDB(), w.env.Config().DB.DbaWithDB(), w.env.Config().DB.AppDebugWithDB()) - w.ticks.Start(w.writeHeartbeat) + w.enableWrites(true) w.isOpen = true } @@ -126,7 +123,7 @@ func (w *heartbeatWriter) Close() { return } - w.ticks.Stop() + w.enableWrites(false) w.pool.Close() w.isOpen = false log.Info("Hearbeat Writer: closed") @@ -182,3 +179,12 @@ func (w *heartbeatWriter) recordError(err error) { w.errorLog.Errorf("%v", err) writeErrors.Add(1) } + +// enableWrites actives or deactives heartbeat writes +func (w *heartbeatWriter) enableWrites(enable bool) { + if enable { + w.ticks.Start(w.writeHeartbeat) + } else { + w.ticks.Stop() + } +} diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 5459ee8f56f..354a39905f3 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -105,6 +105,7 @@ type stateManager struct { txThrottler txThrottler te txEngine messager subComponent + throttler lagThrottler // hcticks starts on initialiazation and runs forever. hcticks *timer.Timer @@ -155,6 +156,11 @@ type ( Open() error Close() } + + lagThrottler interface { + Open() error + Close() + } ) // Init performs the second phase of initialization. @@ -403,6 +409,7 @@ func (sm *stateManager) serveMaster() error { return err } sm.messager.Open() + sm.throttler.Open() sm.setState(topodatapb.TabletType_MASTER, StateServing) return nil } @@ -422,6 +429,7 @@ func (sm *stateManager) unserveMaster() error { } func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) error { + sm.throttler.Close() sm.messager.Close() sm.tracker.Close() sm.se.MakeNonMaster() @@ -469,6 +477,7 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error { } func (sm *stateManager) unserveCommon() { + sm.throttler.Close() sm.messager.Close() sm.te.Close() sm.qe.StopServing() diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 338d97f3bb5..a8f55b50076 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -80,6 +80,7 @@ func TestStateManagerServeMaster(t *testing.T) { verifySubcomponent(t, 7, sm.tracker, testStateOpen) verifySubcomponent(t, 8, sm.te, testStateMaster) verifySubcomponent(t, 9, sm.messager, testStateOpen) + verifySubcomponent(t, 10, sm.throttler, testStateOpen) assert.False(t, sm.se.(*testSchemaEngine).nonMaster) assert.True(t, sm.se.(*testSchemaEngine).ensureCalled) @@ -95,17 +96,18 @@ func TestStateManagerServeNonMaster(t *testing.T) { err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "") require.NoError(t, err) - verifySubcomponent(t, 1, sm.messager, testStateClosed) - verifySubcomponent(t, 2, sm.tracker, testStateClosed) + verifySubcomponent(t, 1, sm.throttler, testStateClosed) + verifySubcomponent(t, 2, sm.messager, testStateClosed) + verifySubcomponent(t, 3, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonMaster) - verifySubcomponent(t, 3, sm.se, testStateOpen) - verifySubcomponent(t, 4, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 5, sm.qe, testStateOpen) - verifySubcomponent(t, 6, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 7, sm.te, testStateNonMaster) - verifySubcomponent(t, 8, sm.rt, testStateNonMaster) - verifySubcomponent(t, 9, sm.watcher, testStateOpen) + verifySubcomponent(t, 4, sm.se, testStateOpen) + verifySubcomponent(t, 5, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 6, sm.qe, testStateOpen) + verifySubcomponent(t, 7, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 8, sm.te, testStateNonMaster) + verifySubcomponent(t, 9, sm.rt, testStateNonMaster) + verifySubcomponent(t, 10, sm.watcher, testStateOpen) assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType) assert.Equal(t, StateServing, sm.state) @@ -117,19 +119,19 @@ func TestStateManagerUnserveMaster(t *testing.T) { err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateNotServing, "") require.NoError(t, err) - verifySubcomponent(t, 1, sm.messager, testStateClosed) - verifySubcomponent(t, 2, sm.te, testStateClosed) + verifySubcomponent(t, 1, sm.throttler, testStateClosed) + verifySubcomponent(t, 2, sm.messager, testStateClosed) + verifySubcomponent(t, 3, sm.te, testStateClosed) assert.True(t, sm.qe.(*testQueryEngine).stopServing) - verifySubcomponent(t, 3, sm.tracker, testStateClosed) - verifySubcomponent(t, 4, sm.watcher, testStateClosed) + verifySubcomponent(t, 4, sm.tracker, testStateClosed) + verifySubcomponent(t, 5, sm.watcher, testStateClosed) + verifySubcomponent(t, 6, sm.se, testStateOpen) + verifySubcomponent(t, 7, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 8, sm.qe, testStateOpen) + verifySubcomponent(t, 9, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 5, sm.se, testStateOpen) - verifySubcomponent(t, 6, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 7, sm.qe, testStateOpen) - verifySubcomponent(t, 8, sm.txThrottler, testStateOpen) - - verifySubcomponent(t, 9, sm.rt, testStateMaster) + verifySubcomponent(t, 10, sm.rt, testStateMaster) assert.Equal(t, topodatapb.TabletType_MASTER, sm.target.TabletType) assert.Equal(t, StateNotServing, sm.state) @@ -141,20 +143,21 @@ func TestStateManagerUnserveNonmaster(t *testing.T) { err := sm.SetServingType(topodatapb.TabletType_RDONLY, testNow, StateNotServing, "") require.NoError(t, err) - verifySubcomponent(t, 1, sm.messager, testStateClosed) - verifySubcomponent(t, 2, sm.te, testStateClosed) + verifySubcomponent(t, 1, sm.throttler, testStateClosed) + verifySubcomponent(t, 2, sm.messager, testStateClosed) + verifySubcomponent(t, 3, sm.te, testStateClosed) assert.True(t, sm.qe.(*testQueryEngine).stopServing) - verifySubcomponent(t, 3, sm.tracker, testStateClosed) + verifySubcomponent(t, 4, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonMaster) - verifySubcomponent(t, 4, sm.se, testStateOpen) - verifySubcomponent(t, 5, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 6, sm.qe, testStateOpen) - verifySubcomponent(t, 7, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 5, sm.se, testStateOpen) + verifySubcomponent(t, 6, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 7, sm.qe, testStateOpen) + verifySubcomponent(t, 8, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 8, sm.rt, testStateNonMaster) - verifySubcomponent(t, 9, sm.watcher, testStateOpen) + verifySubcomponent(t, 9, sm.rt, testStateNonMaster) + verifySubcomponent(t, 10, sm.watcher, testStateOpen) assert.Equal(t, topodatapb.TabletType_RDONLY, sm.target.TabletType) assert.Equal(t, StateNotServing, sm.state) @@ -166,17 +169,18 @@ func TestStateManagerClose(t *testing.T) { err := sm.SetServingType(topodatapb.TabletType_RDONLY, testNow, StateNotConnected, "") require.NoError(t, err) - verifySubcomponent(t, 1, sm.messager, testStateClosed) - verifySubcomponent(t, 2, sm.te, testStateClosed) + verifySubcomponent(t, 1, sm.throttler, testStateClosed) + verifySubcomponent(t, 2, sm.messager, testStateClosed) + verifySubcomponent(t, 3, sm.te, testStateClosed) assert.True(t, sm.qe.(*testQueryEngine).stopServing) - verifySubcomponent(t, 3, sm.tracker, testStateClosed) + verifySubcomponent(t, 4, sm.tracker, testStateClosed) - verifySubcomponent(t, 4, sm.txThrottler, testStateClosed) - verifySubcomponent(t, 5, sm.qe, testStateClosed) - verifySubcomponent(t, 6, sm.watcher, testStateClosed) - verifySubcomponent(t, 7, sm.vstreamer, testStateClosed) - verifySubcomponent(t, 8, sm.rt, testStateClosed) - verifySubcomponent(t, 9, sm.se, testStateClosed) + verifySubcomponent(t, 5, sm.txThrottler, testStateClosed) + verifySubcomponent(t, 6, sm.qe, testStateClosed) + verifySubcomponent(t, 7, sm.watcher, testStateClosed) + verifySubcomponent(t, 8, sm.vstreamer, testStateClosed) + verifySubcomponent(t, 9, sm.rt, testStateClosed) + verifySubcomponent(t, 10, sm.se, testStateClosed) assert.Equal(t, topodatapb.TabletType_RDONLY, sm.target.TabletType) assert.Equal(t, StateNotConnected, sm.state) @@ -278,17 +282,18 @@ func TestStateManagerSetServingTypeNoChange(t *testing.T) { err = sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "") require.NoError(t, err) - verifySubcomponent(t, 1, sm.messager, testStateClosed) - verifySubcomponent(t, 2, sm.tracker, testStateClosed) + verifySubcomponent(t, 1, sm.throttler, testStateClosed) + verifySubcomponent(t, 2, sm.messager, testStateClosed) + verifySubcomponent(t, 3, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonMaster) - verifySubcomponent(t, 3, sm.se, testStateOpen) - verifySubcomponent(t, 4, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 5, sm.qe, testStateOpen) - verifySubcomponent(t, 6, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 7, sm.te, testStateNonMaster) - verifySubcomponent(t, 8, sm.rt, testStateNonMaster) - verifySubcomponent(t, 9, sm.watcher, testStateOpen) + verifySubcomponent(t, 4, sm.se, testStateOpen) + verifySubcomponent(t, 5, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 6, sm.qe, testStateOpen) + verifySubcomponent(t, 7, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 8, sm.te, testStateNonMaster) + verifySubcomponent(t, 9, sm.rt, testStateNonMaster) + verifySubcomponent(t, 10, sm.watcher, testStateOpen) assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType) assert.Equal(t, StateServing, sm.state) @@ -591,6 +596,7 @@ func newTestStateManager(t *testing.T) *stateManager { txThrottler: &testTxThrottler{}, te: &testTxEngine{}, messager: &testSubcomponent{}, + throttler: &testLagThrottler{}, } sm.Init(env, querypb.Target{}) sm.hs.InitDBConfig(querypb.Target{}) @@ -772,3 +778,18 @@ func (te *testTxThrottler) Close() { te.order = order.Add(1) te.state = testStateClosed } + +type testLagThrottler struct { + testOrderState +} + +func (te *testLagThrottler) Open() error { + te.order = order.Add(1) + te.state = testStateOpen + return nil +} + +func (te *testLagThrottler) Close() { + te.order = order.Add(1) + te.state = testStateClosed +} diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 12211acb72a..f555e807bde 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -182,16 +182,21 @@ func Init() { currentConfig.Consolidator = Disable } + if heartbeatInterval == 0 { + heartbeatInterval = time.Duration(defaultConfig.ReplicationTracker.HeartbeatIntervalSeconds*1000) * time.Millisecond + } + if heartbeatInterval > time.Second { + heartbeatInterval = time.Second + } + currentConfig.ReplicationTracker.HeartbeatIntervalSeconds.Set(heartbeatInterval) + switch { case enableHeartbeat: currentConfig.ReplicationTracker.Mode = Heartbeat - currentConfig.ReplicationTracker.HeartbeatIntervalSeconds.Set(heartbeatInterval) case enableReplicationReporter: currentConfig.ReplicationTracker.Mode = Polling - currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0 default: currentConfig.ReplicationTracker.Mode = Disable - currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0 } currentConfig.Healthcheck.IntervalSeconds.Set(healthCheckInterval) @@ -418,7 +423,8 @@ var defaultConfig = TabletConfig{ UnhealthyThresholdSeconds: 7200, }, ReplicationTracker: ReplicationTrackerConfig{ - Mode: Disable, + Mode: Disable, + HeartbeatIntervalSeconds: 0.25, }, HotRowProtection: HotRowProtectionConfig{ Mode: Disable, diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 082c02c794f..a5fb34a63fa 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -131,6 +131,7 @@ oltpReadPool: size: 16 queryCacheSize: 5000 replicationTracker: + heartbeatIntervalSeconds: 0.25 mode: disable schemaReloadIntervalSeconds: 1800 streamBufferSize: 32768 @@ -215,6 +216,7 @@ func TestFlags(t *testing.T) { want.Healthcheck.IntervalSeconds = 20 want.Healthcheck.DegradedThresholdSeconds = 30 want.Healthcheck.UnhealthyThresholdSeconds = 7200 + want.ReplicationTracker.HeartbeatIntervalSeconds = 1 want.ReplicationTracker.Mode = Disable assert.Equal(t, want.DB, currentConfig.DB) assert.Equal(t, want, currentConfig) @@ -282,7 +284,7 @@ func TestFlags(t *testing.T) { currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0 Init() want.ReplicationTracker.Mode = Disable - want.ReplicationTracker.HeartbeatIntervalSeconds = 0 + want.ReplicationTracker.HeartbeatIntervalSeconds = 1 assert.Equal(t, want, currentConfig) enableReplicationReporter = true @@ -291,7 +293,7 @@ func TestFlags(t *testing.T) { currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0 Init() want.ReplicationTracker.Mode = Polling - want.ReplicationTracker.HeartbeatIntervalSeconds = 0 + want.ReplicationTracker.HeartbeatIntervalSeconds = 1 assert.Equal(t, want, currentConfig) healthCheckInterval = 1 * time.Second diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 6ef9abd7358..8d3649a2de9 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -18,6 +18,7 @@ package tabletserver import ( "bytes" + "encoding/json" "fmt" "net/http" "os" @@ -58,6 +59,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" "vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer" "vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" @@ -92,16 +94,17 @@ type TabletServer struct { topoServer *topo.Server // These are sub-components of TabletServer. - se *schema.Engine - rt *repltracker.ReplTracker - vstreamer *vstreamer.Engine - tracker *schema.Tracker - watcher *BinlogWatcher - qe *QueryEngine - txThrottler *txthrottler.TxThrottler - te *TxEngine - messager *messager.Engine - hs *healthStreamer + se *schema.Engine + rt *repltracker.ReplTracker + vstreamer *vstreamer.Engine + tracker *schema.Tracker + watcher *BinlogWatcher + qe *QueryEngine + txThrottler *txthrottler.TxThrottler + te *TxEngine + messager *messager.Engine + hs *healthStreamer + lagThrottler *throttle.Throttler // sm manages state transitions. sm *stateManager @@ -154,6 +157,14 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer) tsv.te = NewTxEngine(tsv) tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) + tsv.lagThrottler = throttle.NewThrottler(tsv, topoServer, + func() topodatapb.TabletType { + if tsv.sm == nil { + return topodatapb.TabletType_UNKNOWN + } + return tsv.sm.Target().TabletType + }, + ) tsv.sm = &stateManager{ hs: tsv.hs, @@ -166,6 +177,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to txThrottler: tsv.txThrottler, te: tsv.te, messager: tsv.messager, + throttler: tsv.lagThrottler, } tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) }) @@ -183,6 +195,8 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.registerQueryzHandler() tsv.registerStreamQueryzHandlers() tsv.registerTwopczHandler() + tsv.registerThrottlerHandlers() + return tsv } @@ -201,6 +215,7 @@ func (tsv *TabletServer) InitDBConfig(target querypb.Target, dbcfgs *dbconfigs.D tsv.txThrottler.InitDBConfig(target) tsv.vstreamer.InitDBConfig(target.Keyspace) tsv.hs.InitDBConfig(target) + tsv.lagThrottler.InitDBConfig(target.Keyspace, target.Shard) return nil } @@ -362,6 +377,11 @@ func (tsv *TabletServer) QueryService() queryservice.QueryService { return tsv } +// LagThrottler returns the throttle.Throttler part of TabletServer. +func (tsv *TabletServer) LagThrottler() *throttle.Throttler { + return tsv.lagThrottler +} + // SchemaEngine returns the SchemaEngine part of TabletServer. func (tsv *TabletServer) SchemaEngine() *schema.Engine { return tsv.se @@ -1455,6 +1475,59 @@ func (tsv *TabletServer) registerTwopczHandler() { }) } +// registerThrottlerCheckHandler registers a throttler "check" request +func (tsv *TabletServer) registerThrottlerCheckHandler() { + tsv.exporter.HandleFunc("/throttler/check", func(w http.ResponseWriter, r *http.Request) { + ctx := tabletenv.LocalContext() + remoteAddr := r.Header.Get("X-Forwarded-For") + if remoteAddr == "" { + remoteAddr = r.RemoteAddr + remoteAddr = strings.Split(remoteAddr, ":")[0] + } + appName := r.URL.Query().Get("app") + if appName == "" { + appName = throttle.DefaultAppName + } + flags := &throttle.CheckFlags{ + LowPriority: (r.URL.Query().Get("p") == "low"), + } + checkResult := tsv.lagThrottler.Check(ctx, appName, remoteAddr, flags) + if checkResult.StatusCode == http.StatusNotFound && flags.OKIfNotExists { + checkResult.StatusCode = http.StatusOK // 200 + } + + if r.Method == http.MethodGet { + w.Header().Set("Content-Type", "application/json") + } + w.WriteHeader(checkResult.StatusCode) + if r.Method == http.MethodGet { + json.NewEncoder(w).Encode(checkResult) + } + }) +} + +// registerThrottlerStatusHandler registers a throttler "status" request +func (tsv *TabletServer) registerThrottlerStatusHandler() { + tsv.exporter.HandleFunc("/throttler/status", func(w http.ResponseWriter, r *http.Request) { + status := tsv.lagThrottler.Status() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(status) + }) +} + +// registerThrottlerHandlers registers all throttler handlers +func (tsv *TabletServer) registerThrottlerHandlers() { + tsv.registerThrottlerCheckHandler() + tsv.registerThrottlerStatusHandler() +} + +// EnableHeartbeat forces heartbeat to be on or off. +// Only to be used for testing. +func (tsv *TabletServer) EnableHeartbeat(enabled bool) { + tsv.rt.EnableHeartbeat(enabled) +} + // SetTracking forces tracking to be on or off. // Only to be used for testing. func (tsv *TabletServer) SetTracking(enabled bool) { diff --git a/go/vt/vttablet/tabletserver/throttle/LICENSE-freno b/go/vt/vttablet/tabletserver/throttle/LICENSE-freno new file mode 100644 index 00000000000..40810255626 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/LICENSE-freno @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 GitHub + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go b/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go new file mode 100644 index 00000000000..46101ba87e6 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go @@ -0,0 +1,37 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "time" +) + +// AppThrottle is the definition for an app throttling instruction +// - Ratio: [0..1], 0 == no throttle, 1 == fully throttle +type AppThrottle struct { + ExpireAt time.Time + Ratio float64 +} + +// NewAppThrottle creates an AppThrottle struct +func NewAppThrottle(expireAt time.Time, ratio float64) *AppThrottle { + result := &AppThrottle{ + ExpireAt: expireAt, + Ratio: ratio, + } + return result +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/http.go b/go/vt/vttablet/tabletserver/throttle/base/http.go new file mode 100644 index 00000000000..6f657766ad1 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/http.go @@ -0,0 +1,35 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package base + +import ( + "crypto/tls" + "net" + "net/http" + "time" +) + +var defaultTimeout = time.Second + +// SetupHTTPClient creates a simple HTTP client with timeout +func SetupHTTPClient(httpTimeout time.Duration) *http.Client { + if httpTimeout == 0 { + httpTimeout = defaultTimeout + } + httpTransport := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: false}, + DialContext: (&net.Dialer{ + Timeout: httpTimeout, + KeepAlive: httpTimeout, + DualStack: true, + }).DialContext, + ResponseHeaderTimeout: httpTimeout, + } + httpClient := &http.Client{Transport: httpTransport} + + return httpClient +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/metric_health.go b/go/vt/vttablet/tabletserver/throttle/base/metric_health.go new file mode 100644 index 00000000000..e970888bf13 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/metric_health.go @@ -0,0 +1,44 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package base + +import ( + "time" +) + +// MetricHealth is a health status for a metric, and more specifically, +// when it was last checked to be "OK" +type MetricHealth struct { + LastHealthyAt time.Time + SecondsSinceLastHealthy int64 +} + +// NewMetricHealth returns a MetricHealth +func NewMetricHealth(lastHealthyAt time.Time) *MetricHealth { + result := &MetricHealth{ + LastHealthyAt: lastHealthyAt, + SecondsSinceLastHealthy: int64(time.Since(lastHealthyAt).Seconds()), + } + return result +} + +// MetricHealthMap maps metric names to metric healths +type MetricHealthMap map[string](*MetricHealth) + +// Aggregate another map into this map, take the worst metric of the two +func (m MetricHealthMap) Aggregate(other MetricHealthMap) MetricHealthMap { + for metricName, otherHealth := range other { + if currentHealth, ok := m[metricName]; ok { + if currentHealth.SecondsSinceLastHealthy < otherHealth.SecondsSinceLastHealthy { + m[metricName] = otherHealth + } + } else { + m[metricName] = otherHealth + } + } + return m +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/metric_health_test.go b/go/vt/vttablet/tabletserver/throttle/base/metric_health_test.go new file mode 100644 index 00000000000..d11ecd7b8e5 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/metric_health_test.go @@ -0,0 +1,120 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package base + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAggregate(t *testing.T) { + { + m1 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 0}, + } + m2 := MetricHealthMap{} + m1.Aggregate(m2) + assert.Equal(t, 1, len(m1)) + assert.Equal(t, int64(0), m1["a"].SecondsSinceLastHealthy) + } + { + m1 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 0}, + } + m2 := MetricHealthMap{} + m2.Aggregate(m1) + assert.Equal(t, 1, len(m2)) + assert.Equal(t, int64(0), m2["a"].SecondsSinceLastHealthy) + } + { + m1 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 7}, + } + m2 := MetricHealthMap{} + m1.Aggregate(m2) + assert.Equal(t, 1, len(m1)) + assert.Equal(t, int64(7), m1["a"].SecondsSinceLastHealthy) + } + { + m1 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 7}, + } + m2 := MetricHealthMap{} + m2.Aggregate(m1) + assert.Equal(t, 1, len(m2)) + assert.Equal(t, int64(7), m2["a"].SecondsSinceLastHealthy) + } + { + m1 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 7}, + } + m2 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 11}, + } + m1.Aggregate(m2) + assert.Equal(t, 1, len(m1)) + assert.Equal(t, int64(11), m1["a"].SecondsSinceLastHealthy) + } + { + m1 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 11}, + } + m2 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 7}, + } + m1.Aggregate(m2) + assert.Equal(t, 1, len(m1)) + assert.Equal(t, int64(11), m1["a"].SecondsSinceLastHealthy) + } + { + m1 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 7}, + "b": &MetricHealth{SecondsSinceLastHealthy: 19}, + } + m2 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 11}, + "b": &MetricHealth{SecondsSinceLastHealthy: 17}, + } + m1.Aggregate(m2) + assert.Equal(t, 2, len(m1)) + assert.Equal(t, int64(11), m1["a"].SecondsSinceLastHealthy) + assert.Equal(t, int64(19), m1["b"].SecondsSinceLastHealthy) + } + { + m1 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 7}, + "b": &MetricHealth{SecondsSinceLastHealthy: 19}, + } + m2 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 11}, + "c": &MetricHealth{SecondsSinceLastHealthy: 17}, + } + m1.Aggregate(m2) + assert.Equal(t, 3, len(m1), 3) + assert.Equal(t, int64(11), m1["a"].SecondsSinceLastHealthy) + assert.Equal(t, int64(19), m1["b"].SecondsSinceLastHealthy) + assert.Equal(t, int64(17), m1["c"].SecondsSinceLastHealthy) + } + { + m0 := MetricHealthMap{} + m1 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 7}, + "b": &MetricHealth{SecondsSinceLastHealthy: 19}, + } + m2 := MetricHealthMap{ + "a": &MetricHealth{SecondsSinceLastHealthy: 11}, + "c": &MetricHealth{SecondsSinceLastHealthy: 17}, + } + m0.Aggregate(m2) + m0.Aggregate(m1) + assert.Equal(t, 3, len(m0)) + assert.Equal(t, int64(11), m0["a"].SecondsSinceLastHealthy) + assert.Equal(t, int64(19), m0["b"].SecondsSinceLastHealthy) + assert.Equal(t, int64(17), m0["c"].SecondsSinceLastHealthy) + } +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/recent_app.go b/go/vt/vttablet/tabletserver/throttle/base/recent_app.go new file mode 100644 index 00000000000..2c629fbff25 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/recent_app.go @@ -0,0 +1,26 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package base + +import ( + "time" +) + +// RecentApp indicates when an app was last checked +type RecentApp struct { + CheckedAtEpoch int64 + MinutesSinceChecked int64 +} + +// NewRecentApp creates a RecentApp +func NewRecentApp(checkedAt time.Time) *RecentApp { + result := &RecentApp{ + CheckedAtEpoch: checkedAt.Unix(), + MinutesSinceChecked: int64(time.Since(checkedAt).Minutes()), + } + return result +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/suspendable_ticker.go b/go/vt/vttablet/tabletserver/throttle/base/suspendable_ticker.go new file mode 100644 index 00000000000..393cfd70dab --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/suspendable_ticker.go @@ -0,0 +1,71 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "sync/atomic" + "time" +) + +// SuspendableTicker is similar to time.Ticker, but also offers Suspend() and Resume() functions. +// While the ticker is suspended, nothing comes from the time channel C +type SuspendableTicker struct { + ticker *time.Ticker + // C is user facing + C chan time.Time + + suspended int64 +} + +// NewSuspendableTicker creates a new suspendable ticker, indicating whether the ticker should start +// suspendable or running +func NewSuspendableTicker(d time.Duration, initiallySuspended bool) *SuspendableTicker { + s := &SuspendableTicker{ + ticker: time.NewTicker(d), + C: make(chan time.Time), + } + if initiallySuspended { + s.suspended = 1 + } + go s.loop() + return s +} + +// Suspend stops sending time events on the channel C +// time events sent during suspended time are lost +func (s *SuspendableTicker) Suspend() { + atomic.StoreInt64(&s.suspended, 1) +} + +// Resume re-enables time events on channel C +func (s *SuspendableTicker) Resume() { + atomic.StoreInt64(&s.suspended, 0) +} + +// Stop completely stops the timer, like time.Timer +func (s *SuspendableTicker) Stop() { + s.ticker.Stop() +} + +func (s *SuspendableTicker) loop() { + for t := range s.ticker.C { + if atomic.LoadInt64(&s.suspended) == 0 { + // not suspended + s.C <- t + } + } +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric.go b/go/vt/vttablet/tabletserver/throttle/base/throttle_metric.go new file mode 100644 index 00000000000..4baf063c6fd --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/throttle_metric.go @@ -0,0 +1,81 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package base + +import ( + "errors" + "strings" +) + +// MetricResult is what we expect our probes to return. This can be a numeric result, or +// a special type of result indicating more meta-information +type MetricResult interface { + Get() (float64, error) +} + +// MetricResultFunc is a function that returns a metric result +type MetricResultFunc func() (metricResult MetricResult, threshold float64) + +// ErrThresholdExceeded is the common error one may get checking on metric result +var ErrThresholdExceeded = errors.New("Threshold exceeded") +var errNoResultYet = errors.New("Metric not collected yet") + +// ErrNoSuchMetric is for when a user requests a metric by an unknown metric name +var ErrNoSuchMetric = errors.New("No such metric") + +// IsDialTCPError sees if th egiven error indicates a TCP issue +func IsDialTCPError(e error) bool { + if e == nil { + return false + } + return strings.HasPrefix(e.Error(), "dial tcp") +} + +type noHostsMetricResult struct{} + +// Get implements MetricResult +func (metricResult *noHostsMetricResult) Get() (float64, error) { + return 0, nil +} + +// NoHostsMetricResult is a result indicating "no hosts" +var NoHostsMetricResult = &noHostsMetricResult{} + +type noMetricResultYet struct{} + +// Get implements MetricResult +func (metricResult *noMetricResultYet) Get() (float64, error) { + return 0, errNoResultYet +} + +// NoMetricResultYet is a result indicating "no data" +var NoMetricResultYet = &noMetricResultYet{} + +type noSuchMetric struct{} + +// Get implements MetricResult +func (metricResult *noSuchMetric) Get() (float64, error) { + return 0, ErrNoSuchMetric +} + +// NoSuchMetric is a metric results for an unknown metric name +var NoSuchMetric = &noSuchMetric{} + +// simpleMetricResult is a result with float value +type simpleMetricResult struct { + Value float64 +} + +// NewSimpleMetricResult creates a simpleMetricResult +func NewSimpleMetricResult(value float64) MetricResult { + return &simpleMetricResult{Value: value} +} + +// Get implements MetricResult +func (metricResult *simpleMetricResult) Get() (float64, error) { + return metricResult.Value, nil +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric_app.go b/go/vt/vttablet/tabletserver/throttle/base/throttle_metric_app.go new file mode 100644 index 00000000000..ce77f7068b6 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/throttle_metric_app.go @@ -0,0 +1,24 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package base + +import ( + "errors" +) + +// ErrAppDenied is seen when an app is denied access +var ErrAppDenied = errors.New("App denied") + +type appDeniedMetric struct{} + +// Get implements MetricResult +func (metricResult *appDeniedMetric) Get() (float64, error) { + return 0, ErrAppDenied +} + +// AppDeniedMetric is a special metric indicating a "denied" situation +var AppDeniedMetric = &appDeniedMetric{} diff --git a/go/vt/vttablet/tabletserver/throttle/base/util.go b/go/vt/vttablet/tabletserver/throttle/base/util.go new file mode 100644 index 00000000000..0c23c4424c8 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/util.go @@ -0,0 +1,34 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "crypto/rand" + "crypto/sha256" + "encoding/hex" +) + +// RandomHash returns a 64 hex character random string +func RandomHash() string { + size := 64 + rb := make([]byte, size) + _, _ = rand.Read(rb) + + hasher := sha256.New() + hasher.Write(rb) + return hex.EncodeToString(hasher.Sum(nil)) +} diff --git a/go/vt/vttablet/tabletserver/throttle/check.go b/go/vt/vttablet/tabletserver/throttle/check.go new file mode 100644 index 00000000000..24feef7e0fa --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/check.go @@ -0,0 +1,204 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package throttle + +import ( + "context" + "fmt" + "net/http" + "strings" + "sync/atomic" + "time" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + + metrics "github.com/rcrowley/go-metrics" +) + +const ( + // DefaultAppName is the app name used by vitess when app doesn't indicate its name + DefaultAppName = "default" + frenoAppName = "freno" + + selfCheckInterval = 250 * time.Millisecond +) + +// CheckFlags provide hints for a check +type CheckFlags struct { + ReadCheck bool + OverrideThreshold float64 + LowPriority bool + OKIfNotExists bool +} + +// StandardCheckFlags have no special hints +var StandardCheckFlags = &CheckFlags{} + +// ThrottlerCheck provides methods for an app checking on metrics +type ThrottlerCheck struct { + throttler *Throttler +} + +// NewThrottlerCheck creates a ThrottlerCheck +func NewThrottlerCheck(throttler *Throttler) *ThrottlerCheck { + return &ThrottlerCheck{ + throttler: throttler, + } +} + +// checkAppMetricResult allows an app to check on a metric +func (check *ThrottlerCheck) checkAppMetricResult(ctx context.Context, appName string, storeType string, storeName string, metricResultFunc base.MetricResultFunc, flags *CheckFlags) (checkResult *CheckResult) { + // Handle deprioritized app logic + denyApp := false + metricName := fmt.Sprintf("%s/%s", storeType, storeName) + if flags.LowPriority { + if _, exists := check.throttler.nonLowPriorityAppRequestsThrottled.Get(metricName); exists { + // a non-deprioritized app, ie a "normal" app, has recently been throttled. + // This is now a deprioritized app. Deny access to this request. + denyApp = true + } + } + // + metricResult, threshold := check.throttler.AppRequestMetricResult(ctx, appName, metricResultFunc, denyApp) + if flags.OverrideThreshold > 0 { + threshold = flags.OverrideThreshold + } + value, err := metricResult.Get() + if appName == "" { + return NewCheckResult(http.StatusExpectationFailed, value, threshold, fmt.Errorf("no app indicated")) + } + + var statusCode int + + if err == base.ErrAppDenied { + // app specifically not allowed to get metrics + statusCode = http.StatusExpectationFailed // 417 + } else if err == base.ErrNoSuchMetric { + // not collected yet, or metric does not exist + statusCode = http.StatusNotFound // 404 + } else if err != nil { + // any error + statusCode = http.StatusInternalServerError // 500 + } else if value > threshold { + // casual throttling + statusCode = http.StatusTooManyRequests // 429 + err = base.ErrThresholdExceeded + + if !flags.LowPriority && !flags.ReadCheck && appName != frenoAppName { + // low priority requests will henceforth be denied + go check.throttler.nonLowPriorityAppRequestsThrottled.SetDefault(metricName, true) + } + } else { + // all good! + statusCode = http.StatusOK // 200 + } + return NewCheckResult(statusCode, value, threshold, err) +} + +// Check is the core function that runs when a user wants to check a metric +func (check *ThrottlerCheck) Check(ctx context.Context, appName string, storeType string, storeName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { + var metricResultFunc base.MetricResultFunc + switch storeType { + case "mysql": + { + metricResultFunc = func() (metricResult base.MetricResult, threshold float64) { + return check.throttler.getMySQLClusterMetrics(ctx, storeName) + } + } + } + if metricResultFunc == nil { + return NoSuchMetricCheckResult + } + + checkResult = check.checkAppMetricResult(ctx, appName, storeType, storeName, metricResultFunc, flags) + atomic.StoreInt64(&check.throttler.lastCheckTimeNano, time.Now().UnixNano()) + + go func(statusCode int) { + metrics.GetOrRegisterCounter("check.any.total", nil).Inc(1) + metrics.GetOrRegisterCounter(fmt.Sprintf("check.%s.total", appName), nil).Inc(1) + + metrics.GetOrRegisterCounter(fmt.Sprintf("check.any.%s.%s.total", storeType, storeName), nil).Inc(1) + metrics.GetOrRegisterCounter(fmt.Sprintf("check.%s.%s.%s.total", appName, storeType, storeName), nil).Inc(1) + + if statusCode != http.StatusOK { + metrics.GetOrRegisterCounter("check.any.error", nil).Inc(1) + metrics.GetOrRegisterCounter(fmt.Sprintf("check.%s.error", appName), nil).Inc(1) + + metrics.GetOrRegisterCounter(fmt.Sprintf("check.any.%s.%s.error", storeType, storeName), nil).Inc(1) + metrics.GetOrRegisterCounter(fmt.Sprintf("check.%s.%s.%s.error", appName, storeType, storeName), nil).Inc(1) + } + + check.throttler.markRecentApp(appName, remoteAddr) + }(checkResult.StatusCode) + + return checkResult +} + +func (check *ThrottlerCheck) splitMetricTokens(metricName string) (storeType string, storeName string, err error) { + metricTokens := strings.Split(metricName, "/") + if len(metricTokens) != 2 { + return storeType, storeName, base.ErrNoSuchMetric + } + storeType = metricTokens[0] + storeName = metricTokens[1] + + return storeType, storeName, nil +} + +// localCheck +func (check *ThrottlerCheck) localCheck(ctx context.Context, metricName string) (checkResult *CheckResult) { + storeType, storeName, err := check.splitMetricTokens(metricName) + if err != nil { + return NoSuchMetricCheckResult + } + checkResult = check.Check(ctx, frenoAppName, storeType, storeName, "local", StandardCheckFlags) + + if checkResult.StatusCode == http.StatusOK { + check.throttler.markMetricHealthy(metricName) + } + if timeSinceHealthy, found := check.throttler.timeSinceMetricHealthy(metricName); found { + metrics.GetOrRegisterGauge(fmt.Sprintf("check.%s.%s.seconds_since_healthy", storeType, storeName), nil).Update(int64(timeSinceHealthy.Seconds())) + } + + return checkResult +} + +func (check *ThrottlerCheck) reportAggregated(metricName string, metricResult base.MetricResult) { + storeType, storeName, err := check.splitMetricTokens(metricName) + if err != nil { + return + } + if value, err := metricResult.Get(); err == nil { + metrics.GetOrRegisterGaugeFloat64(fmt.Sprintf("aggregated.%s.%s", storeType, storeName), nil).Update(value) + } +} + +// AggregatedMetrics is a convenience access method into throttler's `aggregatedMetricsSnapshot` +func (check *ThrottlerCheck) AggregatedMetrics(ctx context.Context) map[string]base.MetricResult { + return check.throttler.aggregatedMetricsSnapshot() +} + +// MetricsHealth is a convenience access method into throttler's `metricsHealthSnapshot` +func (check *ThrottlerCheck) MetricsHealth() map[string](*base.MetricHealth) { + return check.throttler.metricsHealthSnapshot() +} + +// SelfChecks runs checks on all known metrics as if we were an app. +// This runs asynchronously, continuously, and independently of any user interaction +func (check *ThrottlerCheck) SelfChecks(ctx context.Context) { + selfCheckTicker := time.NewTicker(selfCheckInterval) + go func() { + for range selfCheckTicker.C { + for metricName, metricResult := range check.AggregatedMetrics(ctx) { + metricName := metricName + metricResult := metricResult + go check.localCheck(ctx, metricName) + go check.reportAggregated(metricName, metricResult) + } + } + }() +} diff --git a/go/vt/vttablet/tabletserver/throttle/check_result.go b/go/vt/vttablet/tabletserver/throttle/check_result.go new file mode 100644 index 00000000000..f17b315ee86 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/check_result.go @@ -0,0 +1,44 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package throttle + +import ( + "net/http" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" +) + +// CheckResult is the result for an app inquiring on a metric. It also exports as JSON via the API +type CheckResult struct { + StatusCode int `json:"StatusCode"` + Value float64 `json:"Value"` + Threshold float64 `json:"Threshold"` + Error error `json:"-"` + Message string `json:"Message"` +} + +// NewCheckResult returns a CheckResult +func NewCheckResult(statusCode int, value float64, threshold float64, err error) *CheckResult { + result := &CheckResult{ + StatusCode: statusCode, + Value: value, + Threshold: threshold, + Error: err, + } + if err != nil { + result.Message = err.Error() + } + return result +} + +// NewErrorCheckResult returns a check result that indicates an error +func NewErrorCheckResult(statusCode int, err error) *CheckResult { + return NewCheckResult(statusCode, 0, 0, err) +} + +// NoSuchMetricCheckResult is a result returns when a metric is unknown +var NoSuchMetricCheckResult = NewErrorCheckResult(http.StatusNotFound, base.ErrNoSuchMetric) diff --git a/go/vt/vttablet/tabletserver/throttle/config/config.go b/go/vt/vttablet/tabletserver/throttle/config/config.go new file mode 100644 index 00000000000..9d8fafb942c --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/config/config.go @@ -0,0 +1,37 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package config + +// Instance is the one configuration for the throttler +var Instance = &ConfigurationSettings{} + +// Settings returns the settings of the global instance of Configuration +func Settings() *ConfigurationSettings { + return Instance +} + +// ConfigurationSettings models a set of configurable values, that can be +// provided by the user via one or several JSON formatted files. +// +// Some of the settings have reasonable default values, and some other +// (like database credentials) are strictly expected from user. +type ConfigurationSettings struct { + ListenPort int + DataCenter string + Environment string + Domain string + EnableProfiling bool // enable pprof profiling http api + Stores StoresSettings +} + +// PostReadAdjustments validates and fixes config +func (settings *ConfigurationSettings) PostReadAdjustments() error { + if err := settings.Stores.postReadAdjustments(); err != nil { + return err + } + return nil +} diff --git a/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go b/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go new file mode 100644 index 00000000000..2aa83263544 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go @@ -0,0 +1,97 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package config + +// +// MySQL-specific configuration +// + +// MySQLClusterConfigurationSettings has the settings for a specific MySQL cluster. It derives its information +// from MySQLConfigurationSettings +type MySQLClusterConfigurationSettings struct { + User string // override MySQLConfigurationSettings's, or leave empty to inherit those settings + Password string // override MySQLConfigurationSettings's, or leave empty to inherit those settings + MetricQuery string // override MySQLConfigurationSettings's, or leave empty to inherit those settings + CacheMillis int // override MySQLConfigurationSettings's, or leave empty to inherit those settings + ThrottleThreshold float64 // override MySQLConfigurationSettings's, or leave empty to inherit those settings + Port int // Specify if different than 3306 or if different than specified by MySQLConfigurationSettings + IgnoreHostsCount int // Number of hosts that can be skipped/ignored even on error or on exceeding thresholds + IgnoreHostsThreshold float64 // Threshold beyond which IgnoreHostsCount applies (default: 0) + HTTPCheckPort int // Specify if different than specified by MySQLConfigurationSettings. -1 to disable HTTP check + HTTPCheckPath string // Specify if different than specified by MySQLConfigurationSettings + IgnoreHosts []string // override MySQLConfigurationSettings's, or leave empty to inherit those settings +} + +// Hook to implement adjustments after reading each configuration file. +func (settings *MySQLClusterConfigurationSettings) postReadAdjustments() error { + return nil +} + +// MySQLConfigurationSettings has the general configuration for all MySQL clusters +type MySQLConfigurationSettings struct { + User string + Password string + MetricQuery string + CacheMillis int // optional, if defined then probe result will be cached, and future probes may use cached value + ThrottleThreshold float64 + Port int // Specify if different than 3306; applies to all clusters + IgnoreDialTCPErrors bool // Skip hosts where a metric cannot be retrieved due to TCP dial errors + IgnoreHostsCount int // Number of hosts that can be skipped/ignored even on error or on exceeding thresholds + IgnoreHostsThreshold float64 // Threshold beyond which IgnoreHostsCount applies (default: 0) + HTTPCheckPort int // port for HTTP check. -1 to disable. + HTTPCheckPath string // If non-empty, requires HTTPCheckPort + IgnoreHosts []string // If non empty, substrings to indicate hosts to be ignored/skipped + + Clusters map[string](*MySQLClusterConfigurationSettings) // cluster name -> cluster config +} + +// Hook to implement adjustments after reading each configuration file. +func (settings *MySQLConfigurationSettings) postReadAdjustments() error { + // Username & password may be given as plaintext in the config file, or can be delivered + // via environment variables. We accept user & password in the form "${SOME_ENV_VARIABLE}" + // in which case we get the value from this process' invoking environment. + + for _, clusterSettings := range settings.Clusters { + if err := clusterSettings.postReadAdjustments(); err != nil { + return err + } + if clusterSettings.User == "" { + clusterSettings.User = settings.User + } + if clusterSettings.Password == "" { + clusterSettings.Password = settings.Password + } + if clusterSettings.MetricQuery == "" { + clusterSettings.MetricQuery = settings.MetricQuery + } + if clusterSettings.CacheMillis == 0 { + clusterSettings.CacheMillis = settings.CacheMillis + } + if clusterSettings.ThrottleThreshold == 0 { + clusterSettings.ThrottleThreshold = settings.ThrottleThreshold + } + if clusterSettings.Port == 0 { + clusterSettings.Port = settings.Port + } + if clusterSettings.IgnoreHostsCount == 0 { + clusterSettings.IgnoreHostsCount = settings.IgnoreHostsCount + } + if clusterSettings.IgnoreHostsThreshold == 0 { + clusterSettings.IgnoreHostsThreshold = settings.IgnoreHostsThreshold + } + if clusterSettings.HTTPCheckPort == 0 { + clusterSettings.HTTPCheckPort = settings.HTTPCheckPort + } + if clusterSettings.HTTPCheckPath == "" { + clusterSettings.HTTPCheckPath = settings.HTTPCheckPath + } + if len(clusterSettings.IgnoreHosts) == 0 { + clusterSettings.IgnoreHosts = settings.IgnoreHosts + } + } + return nil +} diff --git a/go/vt/vttablet/tabletserver/throttle/config/store_config.go b/go/vt/vttablet/tabletserver/throttle/config/store_config.go new file mode 100644 index 00000000000..81334ef1734 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/config/store_config.go @@ -0,0 +1,26 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package config + +// +// General-store configuration +// + +// StoresSettings is a general settings container for specific stores. +type StoresSettings struct { + MySQL MySQLConfigurationSettings // Any and all MySQL setups go here + + // Futuristic stores can come here. +} + +// Hook to implement adjustments after reading each configuration file. +func (settings *StoresSettings) postReadAdjustments() error { + if err := settings.MySQL.postReadAdjustments(); err != nil { + return err + } + return nil +} diff --git a/go/vt/vttablet/tabletserver/throttle/mysql.go b/go/vt/vttablet/tabletserver/throttle/mysql.go new file mode 100644 index 00000000000..350ad465b73 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/mysql.go @@ -0,0 +1,84 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package throttle + +import ( + "context" + "sort" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" +) + +func aggregateMySQLProbes( + ctx context.Context, + probes *mysql.Probes, + clusterName string, + instanceResultsMap mysql.InstanceMetricResultMap, + ignoreHostsCount int, + IgnoreDialTCPErrors bool, + ignoreHostsThreshold float64, +) (worstMetric base.MetricResult) { + // probes is known not to change. It can be *replaced*, but not changed. + // so it's safe to iterate it + probeValues := []float64{} + for _, probe := range *probes { + instanceMetricResult, ok := instanceResultsMap[mysql.GetClusterInstanceKey(clusterName, &probe.Key)] + if !ok { + return base.NoMetricResultYet + } + + value, err := instanceMetricResult.Get() + if err != nil { + if IgnoreDialTCPErrors && base.IsDialTCPError(err) { + continue + } + if ignoreHostsCount > 0 { + // ok to skip this error + ignoreHostsCount = ignoreHostsCount - 1 + continue + } + return instanceMetricResult + } + + // No error + probeValues = append(probeValues, value) + } + if len(probeValues) == 0 { + return base.NoHostsMetricResult + } + + // If we got here, that means no errors (or good-to-skip errors) + sort.Float64s(probeValues) + // probeValues sorted ascending (from best, ie smallest, to worst, ie largest) + for ignoreHostsCount > 0 { + goodToIgnore := func() bool { + // Note that these hosts don't have errors + numProbeValues := len(probeValues) + if numProbeValues <= 1 { + // We wish to retain at least one host + return false + } + if ignoreHostsThreshold <= 0 { + // No threshold conditional (or implicitly "any value exceeds the threshold") + return true + } + if worstValue := probeValues[numProbeValues-1]; worstValue > ignoreHostsThreshold { + return true + } + return false + }() + if goodToIgnore { + probeValues = probeValues[0 : len(probeValues)-1] + } + // And, whether ignored or not, we are reducing our tokens + ignoreHostsCount = ignoreHostsCount - 1 + } + worstValue := probeValues[len(probeValues)-1] + worstMetric = base.NewSimpleMetricResult(worstValue) + return worstMetric +} diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/instance_key.go b/go/vt/vttablet/tabletserver/throttle/mysql/instance_key.go new file mode 100644 index 00000000000..d86d1317606 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/mysql/instance_key.go @@ -0,0 +1,86 @@ +/* + Copyright 2015 Shlomi Noach, courtesy Booking.com + See https://github.com/github/freno/blob/master/LICENSE +*/ + +package mysql + +import ( + "fmt" + "strconv" + "strings" +) + +// InstanceKey is an instance indicator, identified by hostname and port +type InstanceKey struct { + Hostname string + Port int +} + +// newRawInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306 +// It expects such format and returns with error if input differs in format +func newRawInstanceKey(hostPort string) (*InstanceKey, error) { + tokens := strings.SplitN(hostPort, ":", 2) + if len(tokens) != 2 { + return nil, fmt.Errorf("Cannot parse InstanceKey from %s. Expected format is host:port", hostPort) + } + instanceKey := &InstanceKey{Hostname: tokens[0]} + var err error + if instanceKey.Port, err = strconv.Atoi(tokens[1]); err != nil { + return instanceKey, fmt.Errorf("Invalid port: %s", tokens[1]) + } + + return instanceKey, nil +} + +// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306 or some.hostname +// `defaultPort` is used if `hostPort` does not include a port. +func ParseInstanceKey(hostPort string, defaultPort int) (*InstanceKey, error) { + if !strings.Contains(hostPort, ":") { + return &InstanceKey{Hostname: hostPort, Port: defaultPort}, nil + } + return newRawInstanceKey(hostPort) +} + +// Equals tests equality between this key and another key +func (i *InstanceKey) Equals(other *InstanceKey) bool { + if other == nil { + return false + } + return i.Hostname == other.Hostname && i.Port == other.Port +} + +// SmallerThan returns true if this key is dictionary-smaller than another. +// This is used for consistent sorting/ordering; there's nothing magical about it. +func (i *InstanceKey) SmallerThan(other *InstanceKey) bool { + if i.Hostname < other.Hostname { + return true + } + if i.Hostname == other.Hostname && i.Port < other.Port { + return true + } + return false +} + +// IsValid uses simple heuristics to see whether this key represents an actual instance +func (i *InstanceKey) IsValid() bool { + if i.Hostname == "_" { + return false + } + return len(i.Hostname) > 0 && i.Port > 0 +} + +// StringCode returns an official string representation of this key +func (i *InstanceKey) StringCode() string { + return fmt.Sprintf("%s:%d", i.Hostname, i.Port) +} + +// DisplayString returns a user-friendly string representation of this key +func (i *InstanceKey) DisplayString() string { + return i.StringCode() +} + +// String returns a user-friendly string representation of this key +func (i InstanceKey) String() string { + return i.StringCode() +} diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/instance_key_test.go b/go/vt/vttablet/tabletserver/throttle/mysql/instance_key_test.go new file mode 100644 index 00000000000..a8d3424c36a --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/mysql/instance_key_test.go @@ -0,0 +1,66 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package mysql + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewRawInstanceKey(t *testing.T) { + { + key, err := newRawInstanceKey("127.0.0.1:3307") + assert.NoError(t, err) + assert.Equal(t, key.Hostname, "127.0.0.1") + assert.Equal(t, key.Port, 3307) + } + { + _, err := newRawInstanceKey("127.0.0.1:abcd") + assert.Error(t, err) + } + { + _, err := newRawInstanceKey("127.0.0.1:") + assert.Error(t, err) + } + { + _, err := newRawInstanceKey("127.0.0.1") + assert.Error(t, err) + } +} + +func TestParseInstanceKey(t *testing.T) { + { + key, err := ParseInstanceKey("127.0.0.1:3307", 3306) + assert.NoError(t, err) + assert.Equal(t, "127.0.0.1", key.Hostname) + assert.Equal(t, 3307, key.Port) + } + { + key, err := ParseInstanceKey("127.0.0.1", 3306) + assert.NoError(t, err) + assert.Equal(t, "127.0.0.1", key.Hostname) + assert.Equal(t, 3306, key.Port) + } +} + +func TestEquals(t *testing.T) { + { + expect := &InstanceKey{Hostname: "127.0.0.1", Port: 3306} + key, err := ParseInstanceKey("127.0.0.1", 3306) + assert.NoError(t, err) + assert.True(t, key.Equals(expect)) + } +} + +func TestStringCode(t *testing.T) { + { + key := &InstanceKey{Hostname: "127.0.0.1", Port: 3306} + stringCode := key.StringCode() + assert.Equal(t, "127.0.0.1:3306", stringCode) + } +} diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_inventory.go b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_inventory.go new file mode 100644 index 00000000000..ace9a2853a7 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_inventory.go @@ -0,0 +1,44 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package mysql + +import ( + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" +) + +// ClusterInstanceKey combines a cluster name with an instance key +type ClusterInstanceKey struct { + ClusterName string + Key InstanceKey +} + +// GetClusterInstanceKey creates a ClusterInstanceKey object +func GetClusterInstanceKey(clusterName string, key *InstanceKey) ClusterInstanceKey { + return ClusterInstanceKey{ClusterName: clusterName, Key: *key} +} + +// InstanceMetricResultMap maps a cluster-instance to a result +type InstanceMetricResultMap map[ClusterInstanceKey]base.MetricResult + +// Inventory has the operational data about probes, their metrics, and relevant configuration +type Inventory struct { + ClustersProbes map[string](*Probes) + IgnoreHostsCount map[string]int + IgnoreHostsThreshold map[string]float64 + InstanceKeyMetrics InstanceMetricResultMap +} + +// NewInventory creates a Inventory +func NewInventory() *Inventory { + inventory := &Inventory{ + ClustersProbes: make(map[string](*Probes)), + IgnoreHostsCount: make(map[string]int), + IgnoreHostsThreshold: make(map[string]float64), + InstanceKeyMetrics: make(map[ClusterInstanceKey]base.MetricResult), + } + return inventory +} diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go new file mode 100644 index 00000000000..aa169c27ab2 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go @@ -0,0 +1,132 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package mysql + +import ( + "fmt" + "strings" + "time" + + "github.com/patrickmn/go-cache" + metrics "github.com/rcrowley/go-metrics" + "vitess.io/vitess/go/vt/orchestrator/external/golib/sqlutils" +) + +var mysqlMetricCache = cache.New(cache.NoExpiration, 10*time.Millisecond) + +func getMySQLMetricCacheKey(probe *Probe) string { + return fmt.Sprintf("%s:%s", probe.Key, probe.MetricQuery) +} + +func cacheMySQLThrottleMetric(probe *Probe, mySQLThrottleMetric *MySQLThrottleMetric) *MySQLThrottleMetric { + if mySQLThrottleMetric.Err != nil { + return mySQLThrottleMetric + } + if probe.CacheMillis > 0 { + mysqlMetricCache.Set(getMySQLMetricCacheKey(probe), mySQLThrottleMetric, time.Duration(probe.CacheMillis)*time.Millisecond) + } + return mySQLThrottleMetric +} + +func getCachedMySQLThrottleMetric(probe *Probe) *MySQLThrottleMetric { + if probe.CacheMillis == 0 { + return nil + } + if metric, found := mysqlMetricCache.Get(getMySQLMetricCacheKey(probe)); found { + mySQLThrottleMetric, _ := metric.(*MySQLThrottleMetric) + return mySQLThrottleMetric + } + return nil +} + +// MySQLThrottleMetric has the probed metric for a mysql instance +type MySQLThrottleMetric struct { + ClusterName string + Key InstanceKey + Value float64 + Err error +} + +// NewMySQLThrottleMetric creates a new MySQLThrottleMetric +func NewMySQLThrottleMetric() *MySQLThrottleMetric { + return &MySQLThrottleMetric{Value: 0} +} + +// GetClusterInstanceKey returns the ClusterInstanceKey part of the metric +func (metric *MySQLThrottleMetric) GetClusterInstanceKey() ClusterInstanceKey { + return GetClusterInstanceKey(metric.ClusterName, &metric.Key) +} + +// Get implements MetricResult +func (metric *MySQLThrottleMetric) Get() (float64, error) { + return metric.Value, metric.Err +} + +// ReadThrottleMetric returns replication lag for a given connection config; either by explicit query +// or via SHOW SLAVE STATUS +func ReadThrottleMetric(probe *Probe, clusterName string) (mySQLThrottleMetric *MySQLThrottleMetric) { + if mySQLThrottleMetric := getCachedMySQLThrottleMetric(probe); mySQLThrottleMetric != nil { + return mySQLThrottleMetric + // On cached results we avoid taking latency metrics + } + + started := time.Now() + mySQLThrottleMetric = NewMySQLThrottleMetric() + mySQLThrottleMetric.ClusterName = clusterName + mySQLThrottleMetric.Key = probe.Key + + defer func(metric *MySQLThrottleMetric, started time.Time) { + go func() { + metrics.GetOrRegisterTimer("probes.latency", nil).Update(time.Since(started)) + metrics.GetOrRegisterCounter("probes.total", nil).Inc(1) + if metric.Err != nil { + metrics.GetOrRegisterCounter("probes.error", nil).Inc(1) + } + }() + }(mySQLThrottleMetric, started) + + dbURI := probe.GetDBUri("information_schema") + db, fromCache, err := sqlutils.GetDB(dbURI) + + if err != nil { + mySQLThrottleMetric.Err = err + return mySQLThrottleMetric + } + if !fromCache { + db.SetMaxOpenConns(maxPoolConnections) + db.SetMaxIdleConns(maxIdleConnections) + } + if strings.HasPrefix(strings.ToLower(probe.MetricQuery), "select") { + mySQLThrottleMetric.Err = db.QueryRow(probe.MetricQuery).Scan(&mySQLThrottleMetric.Value) + return cacheMySQLThrottleMetric(probe, mySQLThrottleMetric) + } + + if strings.HasPrefix(strings.ToLower(probe.MetricQuery), "show global") { + var variableName string // just a placeholder + mySQLThrottleMetric.Err = db.QueryRow(probe.MetricQuery).Scan(&variableName, &mySQLThrottleMetric.Value) + return cacheMySQLThrottleMetric(probe, mySQLThrottleMetric) + } + + if probe.MetricQuery != "" { + mySQLThrottleMetric.Err = fmt.Errorf("Unsupported metrics query type: %s", probe.MetricQuery) + return mySQLThrottleMetric + } + + // No metric query? By default we look at replication lag as output of SHOW SLAVE STATUS + + mySQLThrottleMetric.Err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error { + slaveIORunning := m.GetString("Slave_IO_Running") + slaveSQLRunning := m.GetString("Slave_SQL_Running") + secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master") + if !secondsBehindMaster.Valid { + return fmt.Errorf("replication not running; Slave_IO_Running=%+v, Slave_SQL_Running=%+v", slaveIORunning, slaveSQLRunning) + } + mySQLThrottleMetric.Value = float64(secondsBehindMaster.Int64) + return nil + }) + return cacheMySQLThrottleMetric(probe, mySQLThrottleMetric) +} diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/probe.go b/go/vt/vttablet/tabletserver/throttle/mysql/probe.go new file mode 100644 index 00000000000..0cc3b1ea1cc --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/mysql/probe.go @@ -0,0 +1,86 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package mysql + +import ( + "fmt" + "net" +) + +const maxPoolConnections = 3 +const maxIdleConnections = 3 +const timeoutMillis = 1000 + +// Probe is the minimal configuration required to connect to a MySQL server +type Probe struct { + Key InstanceKey + User string + Password string + MetricQuery string + CacheMillis int + QueryInProgress int64 +} + +// Probes maps instances to probe(s) +type Probes map[InstanceKey](*Probe) + +// ClusterProbes has the probes for a specific cluster +type ClusterProbes struct { + ClusterName string + IgnoreHostsCount int + IgnoreHostsThreshold float64 + InstanceProbes *Probes +} + +// NewProbes creates Probes +func NewProbes() *Probes { + return &Probes{} +} + +// NewProbe creates Probe +func NewProbe() *Probe { + config := &Probe{ + Key: InstanceKey{}, + } + return config +} + +// DuplicateCredentials creates a new connection config with given key and with same credentials as this config +func (p *Probe) DuplicateCredentials(key InstanceKey) *Probe { + config := &Probe{ + Key: key, + User: p.User, + Password: p.Password, + } + return config +} + +// Duplicate duplicates this probe, including credentials +func (p *Probe) Duplicate() *Probe { + return p.DuplicateCredentials(p.Key) +} + +// String returns a human readable string of this struct +func (p *Probe) String() string { + return fmt.Sprintf("%s, user=%s", p.Key.DisplayString(), p.User) +} + +// Equals checks if this probe has same instance key as another +func (p *Probe) Equals(other *Probe) bool { + return p.Key.Equals(&other.Key) +} + +// GetDBUri returns the DB URI for the mysql server indicated by this probe +func (p *Probe) GetDBUri(databaseName string) string { + hostname := p.Key.Hostname + var ip = net.ParseIP(hostname) + if (ip != nil) && (ip.To4() == nil) { + // Wrap IPv6 literals in square brackets + hostname = fmt.Sprintf("[%s]", hostname) + } + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?interpolateParams=true&charset=utf8mb4,utf8,latin1&timeout=%dms", p.User, p.Password, hostname, p.Key.Port, databaseName, timeoutMillis) +} diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/probe_test.go b/go/vt/vttablet/tabletserver/throttle/mysql/probe_test.go new file mode 100644 index 00000000000..8baa60529de --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/mysql/probe_test.go @@ -0,0 +1,47 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package mysql + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewProbe(t *testing.T) { + c := NewProbe() + assert.Equal(t, "", c.Key.Hostname) + assert.Equal(t, 0, c.Key.Port) + assert.Equal(t, "", c.User) + assert.Equal(t, "", c.Password) +} + +func TestDuplicateCredentials(t *testing.T) { + c := NewProbe() + c.Key = InstanceKey{Hostname: "myhost", Port: 3306} + c.User = "gromit" + c.Password = "penguin" + + dup := c.DuplicateCredentials(InstanceKey{Hostname: "otherhost", Port: 3310}) + assert.Equal(t, "otherhost", dup.Key.Hostname) + assert.Equal(t, 3310, dup.Key.Port) + assert.Equal(t, "gromit", dup.User) + assert.Equal(t, "penguin", dup.Password) +} + +func TestDuplicate(t *testing.T) { + c := NewProbe() + c.Key = InstanceKey{Hostname: "myhost", Port: 3306} + c.User = "gromit" + c.Password = "penguin" + + dup := c.Duplicate() + assert.Equal(t, "myhost", dup.Key.Hostname) + assert.Equal(t, 3306, dup.Key.Port) + assert.Equal(t, "gromit", dup.User) + assert.Equal(t, "penguin", dup.Password) +} diff --git a/go/vt/vttablet/tabletserver/throttle/mysql_test.go b/go/vt/vttablet/tabletserver/throttle/mysql_test.go new file mode 100644 index 00000000000..e90f9a69614 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/mysql_test.go @@ -0,0 +1,198 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package throttle + +import ( + "context" + "testing" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" + + "github.com/stretchr/testify/assert" +) + +var ( + key1 = mysql.InstanceKey{Hostname: "10.0.0.1", Port: 3306} + key2 = mysql.InstanceKey{Hostname: "10.0.0.2", Port: 3306} + key3 = mysql.InstanceKey{Hostname: "10.0.0.3", Port: 3306} + key4 = mysql.InstanceKey{Hostname: "10.0.0.4", Port: 3306} + key5 = mysql.InstanceKey{Hostname: "10.0.0.5", Port: 3306} +) + +func TestAggregateMySQLProbesNoErrors(t *testing.T) { + ctx := context.Background() + clusterName := "c0" + key1cluster := mysql.GetClusterInstanceKey(clusterName, &key1) + key2cluster := mysql.GetClusterInstanceKey(clusterName, &key2) + key3cluster := mysql.GetClusterInstanceKey(clusterName, &key3) + key4cluster := mysql.GetClusterInstanceKey(clusterName, &key4) + key5cluster := mysql.GetClusterInstanceKey(clusterName, &key5) + instanceResultsMap := mysql.InstanceMetricResultMap{ + key1cluster: base.NewSimpleMetricResult(1.2), + key2cluster: base.NewSimpleMetricResult(1.7), + key3cluster: base.NewSimpleMetricResult(0.3), + key4cluster: base.NewSimpleMetricResult(0.6), + key5cluster: base.NewSimpleMetricResult(1.1), + } + var probes mysql.Probes = map[mysql.InstanceKey](*mysql.Probe){} + for clusterKey := range instanceResultsMap { + probes[clusterKey.Key] = &mysql.Probe{Key: clusterKey.Key} + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 0, false, 0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 1.7) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 1, false, 0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 1.2) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 2, false, 0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 1.1) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 3, false, 0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 0.6) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 4, false, 0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 0.3) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 5, false, 0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 0.3) + } +} + +func TestAggregateMySQLProbesNoErrorsIgnoreHostsThreshold(t *testing.T) { + ctx := context.Background() + clusterName := "c0" + key1cluster := mysql.GetClusterInstanceKey(clusterName, &key1) + key2cluster := mysql.GetClusterInstanceKey(clusterName, &key2) + key3cluster := mysql.GetClusterInstanceKey(clusterName, &key3) + key4cluster := mysql.GetClusterInstanceKey(clusterName, &key4) + key5cluster := mysql.GetClusterInstanceKey(clusterName, &key5) + instanceResultsMap := mysql.InstanceMetricResultMap{ + key1cluster: base.NewSimpleMetricResult(1.2), + key2cluster: base.NewSimpleMetricResult(1.7), + key3cluster: base.NewSimpleMetricResult(0.3), + key4cluster: base.NewSimpleMetricResult(0.6), + key5cluster: base.NewSimpleMetricResult(1.1), + } + var probes mysql.Probes = map[mysql.InstanceKey](*mysql.Probe){} + for clusterKey := range instanceResultsMap { + probes[clusterKey.Key] = &mysql.Probe{Key: clusterKey.Key} + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 0, false, 1.0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 1.7) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 1, false, 1.0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 1.2) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 2, false, 1.0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 1.1) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 3, false, 1.0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 0.6) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 4, false, 1.0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 0.6) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 5, false, 1.0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 0.6) + } +} + +func TestAggregateMySQLProbesWithErrors(t *testing.T) { + ctx := context.Background() + clusterName := "c0" + key1cluster := mysql.GetClusterInstanceKey(clusterName, &key1) + key2cluster := mysql.GetClusterInstanceKey(clusterName, &key2) + key3cluster := mysql.GetClusterInstanceKey(clusterName, &key3) + key4cluster := mysql.GetClusterInstanceKey(clusterName, &key4) + key5cluster := mysql.GetClusterInstanceKey(clusterName, &key5) + instanceResultsMap := mysql.InstanceMetricResultMap{ + key1cluster: base.NewSimpleMetricResult(1.2), + key2cluster: base.NewSimpleMetricResult(1.7), + key3cluster: base.NewSimpleMetricResult(0.3), + key4cluster: base.NoSuchMetric, + key5cluster: base.NewSimpleMetricResult(1.1), + } + var probes mysql.Probes = map[mysql.InstanceKey](*mysql.Probe){} + for clusterKey := range instanceResultsMap { + probes[clusterKey.Key] = &mysql.Probe{Key: clusterKey.Key} + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 0, false, 0) + _, err := worstMetric.Get() + assert.Error(t, err) + assert.Equal(t, err, base.ErrNoSuchMetric) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 1, false, 0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 1.7) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 2, false, 0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 1.2) + } + + instanceResultsMap[key1cluster] = base.NoSuchMetric + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 0, false, 0) + _, err := worstMetric.Get() + assert.Error(t, err) + assert.Equal(t, err, base.ErrNoSuchMetric) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 1, false, 0) + _, err := worstMetric.Get() + assert.Error(t, err) + assert.Equal(t, err, base.ErrNoSuchMetric) + } + { + worstMetric := aggregateMySQLProbes(ctx, &probes, clusterName, instanceResultsMap, 2, false, 0) + value, err := worstMetric.Get() + assert.NoError(t, err) + assert.Equal(t, value, 1.7) + } +} diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go new file mode 100644 index 00000000000..2230c0516eb --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -0,0 +1,711 @@ +/* + Copyright 2017 GitHub Inc. + + Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE +*/ + +package throttle + +import ( + "context" + "flag" + "fmt" + "math/rand" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "vitess.io/vitess/go/vt/dbconnpool" + "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" + + "github.com/patrickmn/go-cache" +) + +const ( + leaderCheckInterval = 5 * time.Second + mysqlCollectInterval = 100 * time.Millisecond + mysqlDormantCollectInterval = 5 * time.Second + mysqlRefreshInterval = 10 * time.Second + mysqlAggregateInterval = 100 * time.Millisecond + + aggregatedMetricsExpiration = 5 * time.Second + aggregatedMetricsCleanup = 1 * time.Second + throttledAppsSnapshotInterval = 5 * time.Second + recentAppsExpiration = time.Hour * 24 + + nonDeprioritizedAppMapExpiration = time.Second + nonDeprioritizedAppMapInterval = 100 * time.Millisecond + + dormantPeriod = time.Minute + defaultThrottleTTLMinutes = 60 + defaultThrottleRatio = 1.0 + + maxPasswordLength = 32 + + localStoreName = "local" +) + +var throttleThreshold = flag.Duration("throttle_threshold", 1*time.Second, "Replication lag threshold for throttling") +var throttleTabletTypes = flag.String("throttle_tablet_types", "replica", "Comma separated VTTablet types to be considered by the throttler. default: 'replica'. example: 'replica,rdonly'. 'replica' aways implicitly included") + +var ( + throttlerUser = "vt_tablet_throttler" + throttlerGrant = fmt.Sprintf("'%s'@'%s'", throttlerUser, "%") + + sqlCreateThrottlerUser = []string{ + `CREATE USER IF NOT EXISTS %s IDENTIFIED BY '%s'`, + `ALTER USER %s IDENTIFIED BY '%s'`, + } + sqlGrantThrottlerUser = []string{ + `GRANT SELECT ON _vt.heartbeat TO %s`, + } + replicationLagQuery = `select unix_timestamp(now(6))-max(ts/1000000000) from _vt.heartbeat` +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +// Throttler is the main entity in the throttling mechanism. This service runs, probes, collects data, +// aggregates, reads inventory, provides information, etc. +type Throttler struct { + keyspace string + shard string + + check *ThrottlerCheck + isLeader int64 + isOpen int64 + + env tabletenv.Env + pool *connpool.Pool + tabletTypeFunc func() topodatapb.TabletType + ts *topo.Server + + throttleTabletTypesMap map[topodatapb.TabletType]bool + + mysqlThrottleMetricChan chan *mysql.MySQLThrottleMetric + mysqlInventoryChan chan *mysql.Inventory + mysqlClusterProbesChan chan *mysql.ClusterProbes + + mysqlInventory *mysql.Inventory + + mysqlClusterThresholds *cache.Cache + aggregatedMetrics *cache.Cache + throttledApps *cache.Cache + recentApps *cache.Cache + metricsHealth *cache.Cache + + lastCheckTimeNano int64 + + initMutex sync.Mutex + throttledAppsMutex sync.Mutex + tickers [](*base.SuspendableTicker) + + nonLowPriorityAppRequestsThrottled *cache.Cache + httpClient *http.Client +} + +// ThrottlerStatus published some status values from the throttler +type ThrottlerStatus struct { + Keyspace string + Shard string + + IsLeader bool + IsOpen bool + IsDormant bool + + AggregatedMetrics map[string]base.MetricResult + MetricsHealth base.MetricHealthMap +} + +// NewThrottler creates a Throttler +func NewThrottler(env tabletenv.Env, ts *topo.Server, tabletTypeFunc func() topodatapb.TabletType) *Throttler { + throttler := &Throttler{ + isLeader: 0, + isOpen: 0, + + env: env, + tabletTypeFunc: tabletTypeFunc, + ts: ts, + pool: connpool.NewPool(env, "ThrottlerPool", tabletenv.ConnPoolConfig{ + Size: 2, + IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds, + }), + + mysqlThrottleMetricChan: make(chan *mysql.MySQLThrottleMetric), + + mysqlInventoryChan: make(chan *mysql.Inventory, 1), + mysqlClusterProbesChan: make(chan *mysql.ClusterProbes), + mysqlInventory: mysql.NewInventory(), + + throttledApps: cache.New(cache.NoExpiration, 10*time.Second), + mysqlClusterThresholds: cache.New(cache.NoExpiration, 0), + aggregatedMetrics: cache.New(aggregatedMetricsExpiration, aggregatedMetricsCleanup), + recentApps: cache.New(recentAppsExpiration, time.Minute), + metricsHealth: cache.New(cache.NoExpiration, 0), + + tickers: [](*base.SuspendableTicker){}, + + nonLowPriorityAppRequestsThrottled: cache.New(nonDeprioritizedAppMapExpiration, nonDeprioritizedAppMapInterval), + + httpClient: base.SetupHTTPClient(0), + } + throttler.initThrottleTabletTypes() + throttler.ThrottleApp("abusing-app", time.Now().Add(time.Hour*24*365*10), defaultThrottleRatio) + throttler.check = NewThrottlerCheck(throttler) + + return throttler +} + +func (throttler *Throttler) initThrottleTabletTypes() { + throttler.throttleTabletTypesMap = make(map[topodatapb.TabletType]bool) + + tokens := strings.Split(*throttleTabletTypes, ",") + for _, token := range tokens { + token = strings.TrimSpace(token) + token = strings.ToUpper(token) + if value, ok := topodatapb.TabletType_value[token]; ok { + throttler.throttleTabletTypesMap[topodatapb.TabletType(value)] = true + } + } + // always on: + throttler.throttleTabletTypesMap[topodatapb.TabletType_REPLICA] = true +} + +// InitDBConfig initializes keyspace and shard +func (throttler *Throttler) InitDBConfig(keyspace, shard string) { + throttler.keyspace = keyspace + throttler.shard = shard + go throttler.Operate(context.Background()) +} + +// initThrottler initializes config +func (throttler *Throttler) initConfig(password string) { + log.Infof("Throttler: initializing config") + config.Instance = &config.ConfigurationSettings{ + Stores: config.StoresSettings{ + MySQL: config.MySQLConfigurationSettings{ + IgnoreDialTCPErrors: true, + Clusters: map[string](*config.MySQLClusterConfigurationSettings){ + localStoreName: &config.MySQLClusterConfigurationSettings{ + User: throttlerUser, + Password: password, + ThrottleThreshold: throttleThreshold.Seconds(), + MetricQuery: replicationLagQuery, + IgnoreHostsCount: 0, + }, + }, + }, + }, + } +} + +// Open opens database pool and initializes the schema +func (throttler *Throttler) Open() error { + throttler.initMutex.Lock() + defer throttler.initMutex.Unlock() + if atomic.LoadInt64(&throttler.isOpen) > 0 { + // already open + return nil + } + + throttler.pool.Open(throttler.env.Config().DB.AppWithDB(), throttler.env.Config().DB.DbaWithDB(), throttler.env.Config().DB.AppDebugWithDB()) + atomic.StoreInt64(&throttler.isOpen, 1) + + for _, t := range throttler.tickers { + t.Resume() + } + + return nil +} + +// Close frees resources +func (throttler *Throttler) Close() { + throttler.initMutex.Lock() + defer throttler.initMutex.Unlock() + if atomic.LoadInt64(&throttler.isOpen) == 0 { + // not open + return + } + for _, t := range throttler.tickers { + t.Suspend() + } + atomic.StoreInt64(&throttler.isLeader, 0) + + throttler.pool.Close() + atomic.StoreInt64(&throttler.isOpen, 0) +} + +// createThrottlerUser creates or updates the throttler account and assigns it a random password +func (throttler *Throttler) createThrottlerUser(ctx context.Context) (password string, err error) { + if atomic.LoadInt64(&throttler.isOpen) == 0 { + return "", fmt.Errorf("createThrottlerUser: not open") + } + + conn, err := dbconnpool.NewDBConnection(ctx, throttler.env.Config().DB.DbaWithDB()) + if err != nil { + return password, err + } + defer conn.Close() + + // Double check this server is writable + tm, err := conn.ExecuteFetch("select @@global.read_only as read_only from dual", 1, true) + if err != nil { + return password, err + } + row := tm.Named().Row() + if row == nil { + return password, fmt.Errorf("unexpected result for MySQL variables: %+v", tm.Rows) + } + readOnly, err := row.ToBool("read_only") + if err != nil { + return password, err + } + if readOnly { + return password, fmt.Errorf("createThrottlerUser(): server is read_only") + } + + password = base.RandomHash()[0:maxPasswordLength] + { + // There seems to be a bug where CREATE USER hangs. If CREATE USER is preceded by + // any query that writes to the binary log, CREATE USER does not hang. + // The simplest such query is FLUSH STATUS. Other options are FLUSH PRIVILEGES or similar. + // The bug was found in MySQL 8.0.21, and not found in 5.7.30 + // at this time, Vitess only supports 5.7 an ddoes not support 8.0, + // but please keep this code in anticipation of supporting 8.0 + // - shlomi + simpleBinlogQuery := `FLUSH STATUS` + if _, err := conn.ExecuteFetch(simpleBinlogQuery, 0, false); err != nil { + return password, err + } + } + for _, query := range sqlCreateThrottlerUser { + parsed := sqlparser.BuildParsedQuery(query, throttlerGrant, password) + if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil { + return password, err + } + } + for _, query := range sqlGrantThrottlerUser { + parsed := sqlparser.BuildParsedQuery(query, throttlerGrant) + if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil { + return password, err + } + } + log.Infof("Throttler: user created/updated") + return password, nil +} + +// ThrottledAppsSnapshot returns a snapshot (a copy) of current throttled apps +func (throttler *Throttler) ThrottledAppsSnapshot() map[string]cache.Item { + return throttler.throttledApps.Items() +} + +// isDormant returns true when the last check was more than dormantPeriod ago +func (throttler *Throttler) isDormant() bool { + lastCheckTime := time.Unix(0, atomic.LoadInt64(&throttler.lastCheckTimeNano)) + return time.Since(lastCheckTime) > dormantPeriod +} + +// Operate is the main entry point for the throttler operation and logic. It will +// run the probes, colelct metrics, refresh inventory, etc. +func (throttler *Throttler) Operate(ctx context.Context) { + + addTicker := func(d time.Duration) *base.SuspendableTicker { + throttler.initMutex.Lock() + defer throttler.initMutex.Unlock() + + t := base.NewSuspendableTicker(d, true) + throttler.tickers = append(throttler.tickers, t) + return t + } + + leaderCheckTicker := addTicker(leaderCheckInterval) + mysqlCollectTicker := addTicker(mysqlCollectInterval) + mysqlDormantCollectTicker := addTicker(mysqlDormantCollectInterval) + mysqlRefreshTicker := addTicker(mysqlRefreshInterval) + mysqlAggregateTicker := addTicker(mysqlAggregateInterval) + throttledAppsTicker := addTicker(throttledAppsSnapshotInterval) + + shouldCreateThrottlerUser := false + for { + select { + case <-leaderCheckTicker.C: + { + func() { + throttler.initMutex.Lock() + defer throttler.initMutex.Unlock() + + // sparse + shouldBeLeader := int64(0) + if atomic.LoadInt64(&throttler.isOpen) > 0 { + if throttler.tabletTypeFunc() == topodatapb.TabletType_MASTER { + shouldBeLeader = 1 + } + } + + if shouldBeLeader > throttler.isLeader { + log.Infof("Throttler: transition into leadership") + shouldCreateThrottlerUser = true + } + if shouldBeLeader < throttler.isLeader { + log.Infof("Throttler: transition out of leadership") + } + + atomic.StoreInt64(&throttler.isLeader, shouldBeLeader) + + if shouldCreateThrottlerUser { + password, err := throttler.createThrottlerUser(ctx) + if err == nil { + throttler.initConfig(password) + shouldCreateThrottlerUser = false + } else { + log.Errorf("Error creating throttler account: %+v", err) + } + } + }() + } + case <-mysqlCollectTicker.C: + { + if atomic.LoadInt64(&throttler.isLeader) > 0 { + // frequent + if !throttler.isDormant() { + throttler.collectMySQLMetrics(ctx) + } + } + } + case <-mysqlDormantCollectTicker.C: + { + if atomic.LoadInt64(&throttler.isLeader) > 0 { + // infrequent + if throttler.isDormant() { + throttler.collectMySQLMetrics(ctx) + } + } + } + case metric := <-throttler.mysqlThrottleMetricChan: + { + // incoming MySQL metric, frequent, as result of collectMySQLMetrics() + throttler.mysqlInventory.InstanceKeyMetrics[metric.GetClusterInstanceKey()] = metric + } + case <-mysqlRefreshTicker.C: + { + // sparse + if atomic.LoadInt64(&throttler.isLeader) > 0 { + go throttler.refreshMySQLInventory(ctx) + } + } + case probes := <-throttler.mysqlClusterProbesChan: + { + // incoming structural update, sparse, as result of refreshMySQLInventory() + throttler.updateMySQLClusterProbes(ctx, probes) + } + case <-mysqlAggregateTicker.C: + { + if atomic.LoadInt64(&throttler.isLeader) > 0 { + throttler.aggregateMySQLMetrics(ctx) + } + } + case <-throttledAppsTicker.C: + { + if atomic.LoadInt64(&throttler.isLeader) > 0 { + go throttler.expireThrottledApps() + } + } + } + } +} + +func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error { + // synchronously, get lists of probes + for clusterName, probes := range throttler.mysqlInventory.ClustersProbes { + clusterName := clusterName + probes := probes + go func() { + // probes is known not to change. It can be *replaced*, but not changed. + // so it's safe to iterate it + for _, probe := range *probes { + probe := probe + go func() { + // Avoid querying the same server twice at the same time. If previous read is still there, + // we avoid re-reading it. + if !atomic.CompareAndSwapInt64(&probe.QueryInProgress, 0, 1) { + return + } + defer atomic.StoreInt64(&probe.QueryInProgress, 0) + throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName) + throttler.mysqlThrottleMetricChan <- throttleMetrics + }() + } + }() + } + return nil +} + +// refreshMySQLInventory will re-structure the inventory based on reading config settings, and potentially +// re-querying dynamic data such as HAProxy list of hosts +func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error { + log.Infof("refreshing MySQL inventory") + + addInstanceKey := func(key *mysql.InstanceKey, clusterName string, clusterSettings *config.MySQLClusterConfigurationSettings, probes *mysql.Probes) { + for _, ignore := range clusterSettings.IgnoreHosts { + if strings.Contains(key.StringCode(), ignore) { + log.Infof("Throttler: instance key ignored: %+v", key) + return + } + } + if !key.IsValid() { + log.Infof("Throttler: read invalid instance key: [%+v] for cluster %+v", key, clusterName) + return + } + log.Infof("Throttler: read instance key: %+v", key) + + probe := &mysql.Probe{ + Key: *key, + User: clusterSettings.User, + Password: clusterSettings.Password, + MetricQuery: clusterSettings.MetricQuery, + CacheMillis: clusterSettings.CacheMillis, + } + (*probes)[*key] = probe + } + + for clusterName, clusterSettings := range config.Settings().Stores.MySQL.Clusters { + clusterName := clusterName + clusterSettings := clusterSettings + // config may dynamically change, but internal structure (config.Settings().Stores.MySQL.Clusters in our case) + // is immutable and can only be _replaced_. Hence, it's safe to read in a goroutine: + go func() { + err := func() error { + throttler.mysqlClusterThresholds.Set(clusterName, clusterSettings.ThrottleThreshold, cache.DefaultExpiration) + + tabletAliases, err := throttler.ts.FindAllTabletAliasesInShard(ctx, throttler.keyspace, throttler.shard) + if err != nil { + return err + } + clusterProbes := &mysql.ClusterProbes{ + ClusterName: clusterName, + IgnoreHostsCount: clusterSettings.IgnoreHostsCount, + InstanceProbes: mysql.NewProbes(), + } + for _, tabletAlias := range tabletAliases { + tablet, err := throttler.ts.GetTablet(ctx, tabletAlias) + if err != nil { + return err + } + if throttler.throttleTabletTypesMap[tablet.Type] { + key := mysql.InstanceKey{Hostname: tablet.MysqlHostname, Port: int(tablet.MysqlPort)} + addInstanceKey(&key, clusterName, clusterSettings, clusterProbes.InstanceProbes) + } + } + throttler.mysqlClusterProbesChan <- clusterProbes + return nil + }() + if err != nil { + log.Errorf("refreshMySQLInventory: %+v", err) + } + }() + } + return nil +} + +// synchronous update of inventory +func (throttler *Throttler) updateMySQLClusterProbes(ctx context.Context, clusterProbes *mysql.ClusterProbes) error { + log.Infof("Throttler: updating MySQLClusterProbes: %s", clusterProbes.ClusterName) + throttler.mysqlInventory.ClustersProbes[clusterProbes.ClusterName] = clusterProbes.InstanceProbes + throttler.mysqlInventory.IgnoreHostsCount[clusterProbes.ClusterName] = clusterProbes.IgnoreHostsCount + throttler.mysqlInventory.IgnoreHostsThreshold[clusterProbes.ClusterName] = clusterProbes.IgnoreHostsThreshold + return nil +} + +// synchronous aggregation of collected data +func (throttler *Throttler) aggregateMySQLMetrics(ctx context.Context) error { + for clusterName, probes := range throttler.mysqlInventory.ClustersProbes { + metricName := fmt.Sprintf("mysql/%s", clusterName) + ignoreHostsCount := throttler.mysqlInventory.IgnoreHostsCount[clusterName] + ignoreHostsThreshold := throttler.mysqlInventory.IgnoreHostsThreshold[clusterName] + aggregatedMetric := aggregateMySQLProbes(ctx, probes, clusterName, throttler.mysqlInventory.InstanceKeyMetrics, ignoreHostsCount, config.Settings().Stores.MySQL.IgnoreDialTCPErrors, ignoreHostsThreshold) + throttler.aggregatedMetrics.Set(metricName, aggregatedMetric, cache.DefaultExpiration) + } + return nil +} + +func (throttler *Throttler) getNamedMetric(metricName string) base.MetricResult { + if metricResultVal, found := throttler.aggregatedMetrics.Get(metricName); found { + return metricResultVal.(base.MetricResult) + } + return base.NoSuchMetric +} + +func (throttler *Throttler) getMySQLClusterMetrics(ctx context.Context, clusterName string) (base.MetricResult, float64) { + if thresholdVal, found := throttler.mysqlClusterThresholds.Get(clusterName); found { + threshold, _ := thresholdVal.(float64) + metricName := fmt.Sprintf("mysql/%s", clusterName) + return throttler.getNamedMetric(metricName), threshold + } + + return base.NoSuchMetric, 0 +} + +func (throttler *Throttler) aggregatedMetricsSnapshot() map[string]base.MetricResult { + snapshot := make(map[string]base.MetricResult) + for key, value := range throttler.aggregatedMetrics.Items() { + metricResult, _ := value.Object.(base.MetricResult) + snapshot[key] = metricResult + } + return snapshot +} + +func (throttler *Throttler) expireThrottledApps() { + now := time.Now() + for appName, item := range throttler.throttledApps.Items() { + appThrottle := item.Object.(*base.AppThrottle) + if appThrottle.ExpireAt.Before(now) { + throttler.UnthrottleApp(appName) + } + } +} + +// ThrottleApp instructs the throttler to begin throttling an app, to som eperiod and with some ratio. +func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, ratio float64) { + throttler.throttledAppsMutex.Lock() + defer throttler.throttledAppsMutex.Unlock() + + var appThrottle *base.AppThrottle + now := time.Now() + if object, found := throttler.throttledApps.Get(appName); found { + appThrottle = object.(*base.AppThrottle) + if !expireAt.IsZero() { + appThrottle.ExpireAt = expireAt + } + if ratio >= 0 { + appThrottle.Ratio = ratio + } + } else { + if expireAt.IsZero() { + expireAt = now.Add(defaultThrottleTTLMinutes * time.Minute) + } + if ratio < 0 { + ratio = defaultThrottleRatio + } + appThrottle = base.NewAppThrottle(expireAt, ratio) + } + if now.Before(appThrottle.ExpireAt) { + throttler.throttledApps.Set(appName, appThrottle, cache.DefaultExpiration) + } else { + throttler.UnthrottleApp(appName) + } +} + +// UnthrottleApp cancels any throttling, if any, for a given app +func (throttler *Throttler) UnthrottleApp(appName string) { + throttler.throttledApps.Delete(appName) +} + +// IsAppThrottled tells whether some app should be throttled. +// Assuming an app is throttled to some extend, it will randomize the result based +// on the throttle ratio +func (throttler *Throttler) IsAppThrottled(appName string) bool { + if object, found := throttler.throttledApps.Get(appName); found { + appThrottle := object.(*base.AppThrottle) + if appThrottle.ExpireAt.Before(time.Now()) { + // throttling cleanup hasn't purged yet, but it is expired + return false + } + // handle ratio + if rand.Float64() < appThrottle.Ratio { + return true + } + } + return false +} + +// ThrottledAppsMap returns a (copy) map of currently throttled apps +func (throttler *Throttler) ThrottledAppsMap() (result map[string](*base.AppThrottle)) { + result = make(map[string](*base.AppThrottle)) + + for appName, item := range throttler.throttledApps.Items() { + appThrottle := item.Object.(*base.AppThrottle) + result[appName] = appThrottle + } + return result +} + +// markRecentApp takes note that an app has just asked about throttling, making it "recent" +func (throttler *Throttler) markRecentApp(appName string, remoteAddr string) { + recentAppKey := fmt.Sprintf("%s/%s", appName, remoteAddr) + throttler.recentApps.Set(recentAppKey, time.Now(), cache.DefaultExpiration) +} + +// RecentAppsMap returns a (copy) map of apps which checked for throttling recently +func (throttler *Throttler) RecentAppsMap() (result map[string](*base.RecentApp)) { + result = make(map[string](*base.RecentApp)) + + for recentAppKey, item := range throttler.recentApps.Items() { + recentApp := base.NewRecentApp(item.Object.(time.Time)) + result[recentAppKey] = recentApp + } + return result +} + +// markMetricHealthy will mark the time "now" as the last time a given metric was checked to be "OK" +func (throttler *Throttler) markMetricHealthy(metricName string) { + throttler.metricsHealth.Set(metricName, time.Now(), cache.DefaultExpiration) +} + +// timeSinceMetricHealthy returns time elapsed since the last time a metric checked "OK" +func (throttler *Throttler) timeSinceMetricHealthy(metricName string) (timeSinceHealthy time.Duration, found bool) { + if lastOKTime, found := throttler.metricsHealth.Get(metricName); found { + return time.Since(lastOKTime.(time.Time)), true + } + return 0, false +} + +func (throttler *Throttler) metricsHealthSnapshot() base.MetricHealthMap { + snapshot := make(base.MetricHealthMap) + for key, value := range throttler.metricsHealth.Items() { + lastHealthyAt, _ := value.Object.(time.Time) + snapshot[key] = base.NewMetricHealth(lastHealthyAt) + } + return snapshot +} + +// AppRequestMetricResult gets a metric result in the context of a specific app +func (throttler *Throttler) AppRequestMetricResult(ctx context.Context, appName string, metricResultFunc base.MetricResultFunc, denyApp bool) (metricResult base.MetricResult, threshold float64) { + if denyApp { + return base.AppDeniedMetric, 0 + } + if throttler.IsAppThrottled(appName) { + return base.AppDeniedMetric, 0 + } + return metricResultFunc() +} + +// Check is the main serving function of the throttler, and returns a check result for this cluster's lag +func (throttler *Throttler) Check(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { + return throttler.check.Check(ctx, appName, "mysql", localStoreName, remoteAddr, flags) +} + +// Status exports a status breakdown +func (throttler *Throttler) Status() *ThrottlerStatus { + return &ThrottlerStatus{ + Keyspace: throttler.keyspace, + Shard: throttler.shard, + + IsLeader: (atomic.LoadInt64(&throttler.isLeader) > 0), + IsOpen: (atomic.LoadInt64(&throttler.isOpen) > 0), + IsDormant: throttler.isDormant(), + + AggregatedMetrics: throttler.aggregatedMetricsSnapshot(), + MetricsHealth: throttler.metricsHealthSnapshot(), + } +} diff --git a/test/config.json b/test/config.json index d10894edbcb..4f27c2f878a 100644 --- a/test/config.json +++ b/test/config.json @@ -386,6 +386,17 @@ "site_test" ] }, + "tabletmanager_throttler": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/tabletmanager/throttler"], + "Command": [], + "Manual": false, + "Shard": 18, + "RetryMax": 0, + "Tags": [ + "site_test" + ] + }, "tabletmanager_zk2": { "File": "unused.go", "Args": [