diff --git a/changelog/17.0/17.0.0/summary.md b/changelog/17.0/17.0.0/summary.md
index 306a787fa71..252a27e2699 100644
--- a/changelog/17.0/17.0.0/summary.md
+++ b/changelog/17.0/17.0.0/summary.md
@@ -14,6 +14,7 @@
- **[New command line flags and behavior](#new-flag)**
- [Builtin backup: read buffering flags](#builtin-backup-read-buffering-flags)
- [Manifest backup external decompressor command](#manifest-backup-external-decompressor-command)
+ - [Throttler config via topo enabled by default](#throttler-config-via-topo)
- **[New stats](#new-stats)**
- [Detailed backup and restore stats](#detailed-backup-and-restore-stats)
- [VTtablet Error count with code](#vttablet-error-count-with-code)
@@ -164,6 +165,12 @@ This feature enables the following flow:
```
2. Restore that backup with a mere `Restore` command, without having to specify `--external-decompressor`.
+#### vttablet --throttler-config-via-topo
+
+This flag was introduced in v16 and defaulted to `false`. In v17 it defaults to `true`, and there is no need to supply it.
+
+Note that this flag overrides `--enable-lag-throttler` and `--throttle-threshold`, which now give warnings, and will be removed in v18.
+
### New stats
#### Detailed backup and restore stats
diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt
index 6a4e858a711..f7adc1292a8 100644
--- a/go/flags/endtoend/vttablet.txt
+++ b/go/flags/endtoend/vttablet.txt
@@ -316,7 +316,7 @@ Usage of vttablet:
--throttle_metrics_threshold float Override default throttle threshold, respective to --throttle_metrics_query (default 1.7976931348623157e+308)
--throttle_tablet_types string Comma separated VTTablet types to be considered by the throttler. default: 'replica'. example: 'replica,rdonly'. 'replica' aways implicitly included (default "replica")
--throttle_threshold duration Replication lag threshold for default lag throttling (default 1s)
- --throttler-config-via-topo When 'true', read config from topo service and ignore throttle_threshold, throttle_metrics_threshold, throttle_metrics_query, throttle_check_as_check_self
+ --throttler-config-via-topo When 'true', read config from topo service and ignore throttle_threshold, throttle_metrics_threshold, throttle_metrics_query, throttle_check_as_check_self (default true)
--topo_consul_lock_delay duration LockDelay for consul session. (default 15s)
--topo_consul_lock_session_checks string List of checks for consul session. (default "serfHealth")
--topo_consul_lock_session_ttl string TTL for consul session.
diff --git a/go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go b/go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go
index c5ad11943cf..3dc635c8870 100644
--- a/go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go
+++ b/go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go
@@ -31,6 +31,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
+ "vitess.io/vitess/go/test/endtoend/throttler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -166,9 +167,6 @@ func TestMain(m *testing.M) {
}
clusterInstance.VtTabletExtraArgs = []string{
- "--enable-lag-throttler",
- "--throttle_threshold", "1s",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
@@ -218,6 +216,9 @@ func TestSchemaChange(t *testing.T) {
defer cluster.PanicHandler(t)
shards = clusterInstance.Keyspaces[0].Shards
assert.Equal(t, 2, len(shards))
+
+ throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second)
+
testWithInitialSchema(t)
t.Run("create non_online", func(t *testing.T) {
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online", "")
diff --git a/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go b/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go
index 6c90764b931..dd0b6d84a53 100644
--- a/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go
+++ b/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go
@@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
+ "vitess.io/vitess/go/test/endtoend/throttler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -150,9 +151,6 @@ func TestMain(m *testing.M) {
}
clusterInstance.VtTabletExtraArgs = []string{
- "--enable-lag-throttler",
- "--throttle_threshold", "1s",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
@@ -205,6 +203,8 @@ func TestSchemaChange(t *testing.T) {
shards = clusterInstance.Keyspaces[0].Shards
require.Equal(t, 1, len(shards))
+ throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second)
+
t.Run("revertible", testRevertible)
t.Run("revert", testRevert)
}
diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
index c2eb46cf579..c9c60af2d85 100644
--- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
+++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
@@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
+ "vitess.io/vitess/go/test/endtoend/throttler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -187,9 +188,6 @@ func TestMain(m *testing.M) {
}
clusterInstance.VtTabletExtraArgs = []string{
- "--enable-lag-throttler",
- "--throttle_threshold", "1s",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--watch_replication_stream",
@@ -234,6 +232,9 @@ func TestMain(m *testing.M) {
}
func TestSchemaChange(t *testing.T) {
+
+ throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second)
+
t.Run("scheduler", testScheduler)
t.Run("singleton", testSingleton)
t.Run("declarative", testDeclarative)
diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
index be88fc4618b..b012cd4f074 100644
--- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
+++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
@@ -171,8 +171,6 @@ func TestMain(m *testing.M) {
}
clusterInstance.VtTabletExtraArgs = []string{
- "--throttler-config-via-topo",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
@@ -259,7 +257,7 @@ func TestSchemaChange(t *testing.T) {
err := clusterInstance.WaitForTabletsToHealthyInVtgate()
require.NoError(t, err)
- _, err = throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", false)
+ _, err = throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "")
require.NoError(t, err)
for _, ks := range clusterInstance.Keyspaces {
diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go
index 0039361a37f..1f1f3b9c5b7 100644
--- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go
+++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go
@@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
+ "vitess.io/vitess/go/test/endtoend/throttler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -177,9 +178,6 @@ func TestMain(m *testing.M) {
}
clusterInstance.VtTabletExtraArgs = []string{
- "--enable-lag-throttler",
- "--throttle_threshold", "1s",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
@@ -232,6 +230,8 @@ func TestSchemaChange(t *testing.T) {
shards = clusterInstance.Keyspaces[0].Shards
require.Equal(t, 1, len(shards))
+ throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second)
+
t.Run("create schema", func(t *testing.T) {
assert.Equal(t, 1, len(clusterInstance.Keyspaces[0].Shards))
testWithInitialSchema(t)
diff --git a/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go b/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go
index 4c7965d1109..92fb7cf13e5 100644
--- a/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go
+++ b/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go
@@ -47,6 +47,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
+ "vitess.io/vitess/go/test/endtoend/throttler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -429,9 +430,6 @@ func TestMain(m *testing.M) {
// thereby examining lastPK on vcopier side. We will be iterating tables using non-PK order throughout
// this test suite, and so the low setting ensures we hit the more interesting code paths.
clusterInstance.VtTabletExtraArgs = []string{
- "--enable-lag-throttler",
- "--throttle_threshold", "1s",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
@@ -485,6 +483,8 @@ func TestSchemaChange(t *testing.T) {
shards = clusterInstance.Keyspaces[0].Shards
require.Equal(t, 1, len(shards))
+ throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second)
+
for _, testcase := range testCases {
require.NotEmpty(t, testcase.name)
t.Run(testcase.name, func(t *testing.T) {
diff --git a/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go b/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go
index 7e655b4b868..c8b87215036 100644
--- a/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go
+++ b/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go
@@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
+ "vitess.io/vitess/go/test/endtoend/throttler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -82,9 +83,6 @@ func TestMain(m *testing.M) {
}
clusterInstance.VtTabletExtraArgs = []string{
- "--enable-lag-throttler",
- "--throttle_threshold", "1s",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
@@ -134,6 +132,8 @@ func TestSchemaChange(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
require.Equal(t, 1, len(shards))
+ throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second)
+
files, err := os.ReadDir(testDataPath)
require.NoError(t, err)
for _, f := range files {
diff --git a/go/test/endtoend/schemadiff/vrepl/schemadiff_vrepl_suite_test.go b/go/test/endtoend/schemadiff/vrepl/schemadiff_vrepl_suite_test.go
index 12491f1f152..2dc79840018 100644
--- a/go/test/endtoend/schemadiff/vrepl/schemadiff_vrepl_suite_test.go
+++ b/go/test/endtoend/schemadiff/vrepl/schemadiff_vrepl_suite_test.go
@@ -25,6 +25,7 @@ import (
"regexp"
"strings"
"testing"
+ "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -33,6 +34,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
+ "vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/schemadiff"
"vitess.io/vitess/go/vt/sqlparser"
)
@@ -87,9 +89,6 @@ func TestMain(m *testing.M) {
}
clusterInstance.VtTabletExtraArgs = []string{
- "--enable-lag-throttler",
- "--throttle_threshold", "1s",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
@@ -139,6 +138,8 @@ func TestSchemaChange(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
require.Equal(t, 1, len(shards))
+ throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second)
+
files, err := os.ReadDir(testDataPath)
require.NoError(t, err)
for _, f := range files {
diff --git a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go
index eca52cbb106..1b43ecf2d90 100644
--- a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go
+++ b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go
@@ -97,7 +97,6 @@ func TestMain(m *testing.M) {
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--enable_replication_reporter",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--gc_check_interval", gcCheckInterval.String(),
"--gc_purge_check_interval", gcPurgeCheckInterval.String(),
diff --git a/go/test/endtoend/tabletmanager/throttler/throttler_test.go b/go/test/endtoend/tabletmanager/throttler/throttler_test.go
index 28d0c287c24..5ca4bc32a87 100644
--- a/go/test/endtoend/tabletmanager/throttler/throttler_test.go
+++ b/go/test/endtoend/tabletmanager/throttler/throttler_test.go
@@ -96,12 +96,12 @@ func TestMain(m *testing.M) {
// Set extra tablet args for lock timeout
clusterInstance.VtTabletExtraArgs = []string{
+ "--throttler-config-via-topo=false",
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--enable_replication_reporter",
"--enable-lag-throttler",
"--throttle_threshold", throttlerThreshold.String(),
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", onDemandHeartbeatDuration.String(),
"--disable_active_reparents",
diff --git a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go
index d7968ebb4e0..e173384eb62 100644
--- a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go
+++ b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go
@@ -98,6 +98,7 @@ func TestMain(m *testing.M) {
// Set extra tablet args for lock timeout
clusterInstance.VtTabletExtraArgs = []string{
+ "--throttler-config-via-topo=false",
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--enable_replication_reporter",
@@ -105,7 +106,6 @@ func TestMain(m *testing.M) {
"--throttle_metrics_query", "show global status like 'threads_running'",
"--throttle_metrics_threshold", fmt.Sprintf("%d", testThreshold),
"--throttle_check_as_check_self",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
}
diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
index a3204e0be2f..654870fae97 100644
--- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
+++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
@@ -39,7 +39,7 @@ import (
const (
customQuery = "show global status like 'threads_running'"
- customThreshold = 5 * time.Second
+ customThreshold = 5
unreasonablyLowThreshold = 1 * time.Millisecond
extremelyHighThreshold = 1 * time.Hour
onDemandHeartbeatDuration = 5 * time.Second
@@ -112,8 +112,6 @@ func TestMain(m *testing.M) {
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--enable_replication_reporter",
- "--throttler-config-via-topo",
- "--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", onDemandHeartbeatDuration.String(),
"--disable_active_reparents",
@@ -247,7 +245,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK)
})
t.Run("enabling throttler with very low threshold", func(t *testing.T) {
- _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery, false)
+ _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery)
assert.NoError(t, err)
// Wait for the throttler to be enabled everywhere with the new config.
@@ -259,7 +257,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests)
})
t.Run("disabling throttler", func(t *testing.T) {
- _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), useDefaultQuery, false)
+ _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), useDefaultQuery)
assert.NoError(t, err)
// Wait for the throttler to be disabled everywhere.
@@ -273,7 +271,7 @@ func TestInitialThrottler(t *testing.T) {
t.Run("enabling throttler, again", func(t *testing.T) {
// Enable the throttler again with the default query which also moves us back
// to the default threshold.
- _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery, true)
+ _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery)
assert.NoError(t, err)
// Wait for the throttler to be enabled everywhere again with the default config.
@@ -285,7 +283,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests)
})
t.Run("setting high threshold", func(t *testing.T) {
- _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), useDefaultQuery, true)
+ _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), useDefaultQuery)
assert.NoError(t, err)
// Wait for the throttler to be enabled everywhere with new config.
@@ -297,7 +295,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK)
})
t.Run("setting low threshold", func(t *testing.T) {
- _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, true)
+ _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery)
assert.NoError(t, err)
// Wait for the throttler to be enabled everywhere with new config.
@@ -438,15 +436,21 @@ func TestCustomQuery(t *testing.T) {
defer cluster.PanicHandler(t)
t.Run("enabling throttler with custom query and threshold", func(t *testing.T) {
- _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold.Seconds(), customQuery, false)
+ _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, customQuery)
assert.NoError(t, err)
// Wait for the throttler to be enabled everywhere with new custom config.
- for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets {
- throttler.WaitForThrottlerStatusEnabled(t, tablet, true, &throttler.Config{Query: customQuery, Threshold: customThreshold.Seconds()}, throttlerEnabledTimeout)
+ expectConfig := &throttler.Config{Query: customQuery, Threshold: customThreshold}
+ for _, ks := range clusterInstance.Keyspaces {
+ for _, shard := range ks.Shards {
+ for _, tablet := range shard.Vttablets {
+ throttler.WaitForThrottlerStatusEnabled(t, tablet, true, expectConfig, throttlerEnabledTimeout)
+ }
+ }
}
})
t.Run("validating OK response from throttler with custom query", func(t *testing.T) {
+ throttler.WaitForValidData(t, primaryTablet, throttlerEnabledTimeout)
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
defer resp.Body.Close()
@@ -455,28 +459,24 @@ func TestCustomQuery(t *testing.T) {
t.Run("test threads running", func(t *testing.T) {
sleepDuration := 20 * time.Second
var wg sync.WaitGroup
- for i := 0; i < int(customThreshold.Seconds()); i++ {
- // Generate different Sleep() calls, all at minimum sleepDuration.
- wg.Add(1)
- go func(i int) {
- defer wg.Done()
- vtgateExec(t, fmt.Sprintf("select sleep(%d)", int(sleepDuration.Seconds())+i), "")
- }(i)
- }
+ t.Run("generate running queries", func(t *testing.T) {
+ for i := 0; i < customThreshold+1; i++ {
+ // Generate different Sleep() calls, all at minimum sleepDuration.
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ // Make sure to generate a different query in each goroutine, so that vtgate does not oversmart us
+ // and optimizes connections/caching.
+ query := fmt.Sprintf("select sleep(%d) + %d", int(sleepDuration.Seconds()), i)
+ vtgateExec(t, query, "")
+ }(i)
+ }
+ })
t.Run("exceeds threshold", func(t *testing.T) {
- throttler.WaitForQueryResult(t, primaryTablet,
- "select if(variable_value > 5, 'true', 'false') as result from performance_schema.global_status where variable_name='threads_running'",
- "true", sleepDuration/3)
- throttler.WaitForValidData(t, primaryTablet, sleepDuration-(5*time.Second))
- // Now we should be reporting ~ customThreshold*2 threads_running, and we should
+ // Now we should be reporting ~ customThreshold+1 threads_running, and we should
// hit the threshold. For example:
// {"StatusCode":429,"Value":6,"Threshold":5,"Message":"Threshold exceeded"}
- {
- resp, err := throttleCheck(primaryTablet, false)
- require.NoError(t, err)
- defer resp.Body.Close()
- assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
- }
+ waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests)
{
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
@@ -486,15 +486,9 @@ func TestCustomQuery(t *testing.T) {
})
t.Run("wait for queries to terminate", func(t *testing.T) {
wg.Wait()
- time.Sleep(1 * time.Second) // graceful time to let throttler read metrics
})
t.Run("restored below threshold", func(t *testing.T) {
- {
- resp, err := throttleCheck(primaryTablet, false)
- require.NoError(t, err)
- defer resp.Body.Close()
- assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
- }
+ waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK)
{
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
@@ -510,7 +504,7 @@ func TestRestoreDefaultQuery(t *testing.T) {
// Validate going back from custom-query to default-query (replication lag) still works.
t.Run("enabling throttler with default query and threshold", func(t *testing.T) {
- _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, false)
+ _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery)
assert.NoError(t, err)
// Wait for the throttler to be up and running everywhere again with the default config.
diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go
index e8769999fc1..c6a7b2f69a5 100644
--- a/go/test/endtoend/throttler/util.go
+++ b/go/test/endtoend/throttler/util.go
@@ -21,13 +21,15 @@ import (
"fmt"
"io"
"net/http"
+ "strings"
"testing"
"time"
- "github.com/buger/jsonparser"
"github.com/stretchr/testify/require"
+ "github.com/tidwall/gjson"
"vitess.io/vitess/go/test/endtoend/cluster"
+ "vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
)
@@ -51,13 +53,8 @@ var DefaultConfig = &Config{
// This retries the command until it succeeds or times out as the
// SrvKeyspace record may not yet exist for a newly created
// Keyspace that is still initializing before it becomes serving.
-func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string, viaVtctldClient bool) (result string, err error) {
+func UpdateThrottlerTopoConfigRaw(vtctldProcess *cluster.VtctldClientProcess, keyspaceName string, enable bool, disable bool, threshold float64, metricsQuery string) (result string, err error) {
args := []string{}
- clientfunc := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput
- if !viaVtctldClient {
- args = append(args, "--")
- clientfunc = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput
- }
args = append(args, "UpdateThrottlerConfig")
if enable {
args = append(args, "--enable")
@@ -74,7 +71,7 @@ func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, ena
} else {
args = append(args, "--check-as-check-shard")
}
- args = append(args, clusterInstance.Keyspaces[0].Name)
+ args = append(args, keyspaceName)
ctx, cancel := context.WithTimeout(context.Background(), ConfigTimeout)
defer cancel()
@@ -83,7 +80,7 @@ func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, ena
defer ticker.Stop()
for {
- result, err = clientfunc(args...)
+ result, err = vtctldProcess.ExecuteCommandWithOutput(args...)
if err == nil {
return result, nil
}
@@ -95,44 +92,91 @@ func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, ena
}
}
+// UpdateThrottlerTopoConfig runs vtctlclient UpdateThrottlerConfig.
+// This retries the command until it succeeds or times out as the
+// SrvKeyspace record may not yet exist for a newly created
+// Keyspace that is still initializing before it becomes serving.
+func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string) (string, error) {
+ rec := concurrency.AllErrorRecorder{}
+ var (
+ err error
+ res strings.Builder
+ )
+ for _, ks := range clusterInstance.Keyspaces {
+ ires, err := UpdateThrottlerTopoConfigRaw(&clusterInstance.VtctldClientProcess, ks.Name, enable, disable, threshold, metricsQuery)
+ if err != nil {
+ rec.RecordError(err)
+ }
+ res.WriteString(ires)
+ }
+ if rec.HasErrors() {
+ err = rec.Error()
+ }
+ return res.String(), err
+}
+
// WaitForThrottlerStatusEnabled waits for a tablet to report its throttler status as
// enabled/disabled and have the provided config (if any) until the specified timeout.
func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, enabled bool, config *Config, timeout time.Duration) {
enabledJSONPath := "IsEnabled"
queryJSONPath := "Query"
thresholdJSONPath := "Threshold"
- url := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort)
+ throttlerURL := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort)
+ tabletURL := fmt.Sprintf("http://localhost:%d/debug/status_details", tablet.HTTPPort)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
- body := getHTTPBody(url)
- isEnabled, err := jsonparser.GetBoolean([]byte(body), enabledJSONPath)
- require.NoError(t, err)
+ throttlerBody := getHTTPBody(throttlerURL)
+ isEnabled := gjson.Get(throttlerBody, enabledJSONPath).Bool()
if isEnabled == enabled {
if config == nil {
return
}
- query, err := jsonparser.GetString([]byte(body), queryJSONPath)
- require.NoError(t, err)
- threshold, err := jsonparser.GetFloat([]byte(body), thresholdJSONPath)
- require.NoError(t, err)
+ query := gjson.Get(throttlerBody, queryJSONPath).String()
+ threshold := gjson.Get(throttlerBody, thresholdJSONPath).Float()
if query == config.Query && threshold == config.Threshold {
return
}
}
+ // If the tablet is Not Serving due to e.g. being involved in a
+ // Reshard where its QueryService is explicitly disabled, then
+ // we should not fail the test as the throttler will not be Open.
+ tabletBody := getHTTPBody(tabletURL)
+ class := strings.ToLower(gjson.Get(tabletBody, "0.Class").String())
+ value := strings.ToLower(gjson.Get(tabletBody, "0.Value").String())
+ if class == "unhappy" && strings.Contains(value, "not serving") {
+ log.Infof("tablet %s is Not Serving, so ignoring throttler status as the throttler will not be Opened", tablet.Alias)
+ return
+ }
select {
case <-ctx.Done():
t.Errorf("timed out waiting for the %s tablet's throttler status enabled to be %t with the correct config after %v; last seen value: %s",
- tablet.Alias, enabled, timeout, body)
+ tablet.Alias, enabled, timeout, throttlerBody)
return
case <-ticker.C:
}
}
}
+// EnableLagThrottlerAndWaitForStatus is a utility function to enable the throttler at the beginning of an endtoend test.
+// The throttler is configued to use the standard replication lag metric. The function waits until the throttler is confirmed
+// to be running on all tablets.
+func EnableLagThrottlerAndWaitForStatus(t *testing.T, clusterInstance *cluster.LocalProcessCluster, lag time.Duration) {
+ _, err := UpdateThrottlerTopoConfig(clusterInstance, true, false, lag.Seconds(), "")
+ require.NoError(t, err)
+
+ for _, ks := range clusterInstance.Keyspaces {
+ for _, shard := range ks.Shards {
+ for _, tablet := range shard.Vttablets {
+ WaitForThrottlerStatusEnabled(t, tablet, true, nil, time.Minute)
+ }
+ }
+ }
+}
+
func getHTTPBody(url string) string {
resp, err := http.Get(url)
if err != nil {
@@ -149,32 +193,6 @@ func getHTTPBody(url string) string {
return body
}
-// WaitForQueryResult waits for a tablet to return the given result for the given
-// query until the specified timeout.
-// This is for simple queries that return 1 column in 1 row. It compares the result
-// for that column as a string with the provided result.
-func WaitForQueryResult(t *testing.T, tablet *cluster.Vttablet, query, result string, timeout time.Duration) {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- ticker := time.NewTicker(time.Second)
- defer ticker.Stop()
-
- for {
- res, err := tablet.VttabletProcess.QueryTablet(query, "", false)
- require.NoError(t, err)
- if res != nil && len(res.Rows) == 1 && res.Rows[0][0].ToString() == result {
- return
- }
- select {
- case <-ctx.Done():
- t.Errorf("timed out waiting for the %q query to produce a result of %q on tablet %s after %v; last seen value: %s",
- query, result, tablet.Alias, timeout, res.Rows[0][0].ToString())
- return
- case <-ticker.C:
- }
- }
-}
-
// WaitForValidData waits for a tablet's checks to return a non 500 http response
// which indicates that it's not able to provide valid results. This is most
// commonly caused by the throttler still gathering the initial results for
diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go
index 046e0736698..12539b778de 100644
--- a/go/test/endtoend/vreplication/cluster_test.go
+++ b/go/test/endtoend/vreplication/cluster_test.go
@@ -37,6 +37,7 @@ import (
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/test/endtoend/cluster"
+ "vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
)
@@ -57,6 +58,8 @@ var (
extraVTTabletArgs = []string{}
parallelInsertWorkers = "--vreplication-parallel-insert-workers=4"
+
+ throttlerConfig = throttler.Config{Threshold: 15}
)
// ClusterConfig defines the parameters like ports, tmpDir, tablet types which uniquely define a vitess cluster
@@ -364,6 +367,11 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string,
if err := vc.VtctldClient.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName); err != nil {
t.Fatalf(err.Error())
}
+
+ log.Infof("Applying throttler config for keyspace %s", keyspace.Name)
+ res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, keyspace.Name, true, false, throttlerConfig.Threshold, throttlerConfig.Query)
+ require.NoError(t, err, res)
+
cellsToWatch := ""
for i, cell := range cells {
if i > 0 {
@@ -392,7 +400,9 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string,
vc.StartVtgate(t, cell, cellsToWatch)
}
}
- _ = vc.VtctlClient.ExecuteCommand("RebuildKeyspaceGraph", ksName)
+
+ err = vc.VtctlClient.ExecuteCommand("RebuildKeyspaceGraph", ksName)
+ require.NoError(t, err)
return keyspace, nil
}
@@ -402,8 +412,7 @@ func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace,
options := []string{
"--queryserver-config-schema-reload-time", "5",
- "--enable-lag-throttler",
- "--heartbeat_enable",
+ "--heartbeat_on_demand_duration", "5s",
"--heartbeat_interval", "250ms",
} // FIXME: for multi-cell initial schema doesn't seem to load without "--queryserver-config-schema-reload-time"
options = append(options, extraVTTabletArgs...)
@@ -537,9 +546,26 @@ func (vc *VitessCluster) AddShards(t *testing.T, cells []*Cell, keyspace *Keyspa
require.NotEqual(t, 0, primaryTabletUID, "Should have created a primary tablet")
log.Infof("InitializeShard and make %d primary", primaryTabletUID)
require.NoError(t, vc.VtctlClient.InitializeShard(keyspace.Name, shardName, cells[0].Name, primaryTabletUID))
+
log.Infof("Finished creating shard %s", shard.Name)
}
+ err := vc.VtctlClient.ExecuteCommand("RebuildKeyspaceGraph", keyspace.Name)
+ require.NoError(t, err)
+
+ log.Infof("Waiting for throttler config to be applied on all shards")
+ for _, shard := range keyspace.Shards {
+ for _, tablet := range shard.Tablets {
+ clusterTablet := &cluster.Vttablet{
+ Alias: tablet.Name,
+ HTTPPort: tablet.Vttablet.Port,
+ }
+ log.Infof("+ Waiting for throttler config to be applied on %s, type=%v", tablet.Name, tablet.Vttablet.TabletType)
+ throttler.WaitForThrottlerStatusEnabled(t, clusterTablet, true, nil, time.Minute)
+ }
+ }
+ log.Infof("Throttler config applied on all shards")
+
return nil
}
diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go
index cdf9d18523a..e12dbfa1cb1 100644
--- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go
+++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go
@@ -125,6 +125,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
if tabletTypes != "" {
args = append(args, "--tablet_types", tabletTypes)
}
+ args = append(args, "--timeout", time.Minute.String())
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
args = append(args, action, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...)
diff --git a/go/vt/srvtopo/watch.go b/go/vt/srvtopo/watch.go
index 2d571f4930f..2d470327c4e 100644
--- a/go/vt/srvtopo/watch.go
+++ b/go/vt/srvtopo/watch.go
@@ -195,7 +195,7 @@ func (entry *watchEntry) onErrorLocked(err error, init bool) {
// This watcher will able to continue to return the last value till it is not able to connect to the topo server even if the cache TTL is reached.
// TTL cache is only checked if the error is a known error i.e topo.Error.
_, isTopoErr := err.(topo.Error)
- if isTopoErr && time.Since(entry.lastValueTime) > entry.rw.cacheTTL {
+ if entry.value != nil && isTopoErr && time.Since(entry.lastValueTime) > entry.rw.cacheTTL {
log.Errorf("WatchSrvKeyspace clearing cached entry for %v", entry.key)
entry.value = nil
}
diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go
index 9c2981fd861..5f6899c6e1d 100644
--- a/go/vt/vttablet/onlineddl/executor.go
+++ b/go/vt/vttablet/onlineddl/executor.go
@@ -2131,7 +2131,7 @@ func (e *Executor) ThrottleMigration(ctx context.Context, uuid string, expireStr
if err != nil {
return nil, err
}
- if err := e.lagThrottler.CheckIsReady(); err != nil {
+ if err := e.lagThrottler.CheckIsOpen(); err != nil {
return nil, err
}
_ = e.lagThrottler.ThrottleApp(uuid, time.Now().Add(duration), ratio)
@@ -2144,7 +2144,7 @@ func (e *Executor) ThrottleAllMigrations(ctx context.Context, expireString strin
if err != nil {
return nil, err
}
- if err := e.lagThrottler.CheckIsReady(); err != nil {
+ if err := e.lagThrottler.CheckIsOpen(); err != nil {
return nil, err
}
_ = e.lagThrottler.ThrottleApp(throttlerapp.OnlineDDLName.String(), time.Now().Add(duration), ratio)
@@ -2153,7 +2153,7 @@ func (e *Executor) ThrottleAllMigrations(ctx context.Context, expireString strin
// UnthrottleMigration
func (e *Executor) UnthrottleMigration(ctx context.Context, uuid string) (result *sqltypes.Result, err error) {
- if err := e.lagThrottler.CheckIsReady(); err != nil {
+ if err := e.lagThrottler.CheckIsOpen(); err != nil {
return nil, err
}
defer e.triggerNextCheckInterval()
@@ -2163,7 +2163,7 @@ func (e *Executor) UnthrottleMigration(ctx context.Context, uuid string) (result
// UnthrottleAllMigrations
func (e *Executor) UnthrottleAllMigrations(ctx context.Context) (result *sqltypes.Result, err error) {
- if err := e.lagThrottler.CheckIsReady(); err != nil {
+ if err := e.lagThrottler.CheckIsOpen(); err != nil {
return nil, err
}
defer e.triggerNextCheckInterval()
@@ -3472,13 +3472,12 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
}
var currentUserThrottleRatio float64
- if err := e.lagThrottler.CheckIsReady(); err == nil {
- // No point in reviewing throttler info if it's not enabled&open
- for _, app := range e.lagThrottler.ThrottledApps() {
- if throttlerapp.OnlineDDLName.Equals(app.AppName) {
- currentUserThrottleRatio = app.Ratio
- break
- }
+
+ // No point in reviewing throttler info if it's not enabled&open
+ for _, app := range e.lagThrottler.ThrottledApps() {
+ if throttlerapp.OnlineDDLName.Equals(app.AppName) {
+ currentUserThrottleRatio = app.Ratio
+ break
}
}
@@ -3593,7 +3592,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
}
}
go throttlerOnce.Do(func() {
- if e.lagThrottler.CheckIsReady() != nil {
+ if !e.lagThrottler.IsRunning() {
return
}
// Self healing: in the following scenario:
@@ -3611,6 +3610,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
// on-demand heartbeats, unlocking the deadlock.
e.lagThrottler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite)
})
+
}
}
case schema.DDLStrategyPTOSC:
diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go
index 44f9db06256..c55fb3ac6b1 100644
--- a/go/vt/vttablet/tabletserver/query_executor.go
+++ b/go/vt/vttablet/tabletserver/query_executor.go
@@ -968,7 +968,7 @@ func (qre *QueryExecutor) execShowMigrationLogs() (*sqltypes.Result, error) {
}
func (qre *QueryExecutor) execShowThrottledApps() (*sqltypes.Result, error) {
- if err := qre.tsv.lagThrottler.CheckIsReady(); err != nil {
+ if err := qre.tsv.lagThrottler.CheckIsOpen(); err != nil {
return nil, err
}
if _, ok := qre.plan.FullStmt.(*sqlparser.ShowThrottledApps); !ok {
@@ -1007,7 +1007,7 @@ func (qre *QueryExecutor) execShowThrottlerStatus() (*sqltypes.Result, error) {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "Expecting SHOW VITESS_THROTTLER STATUS plan")
}
var enabled int32
- if err := qre.tsv.lagThrottler.CheckIsReady(); err == nil {
+ if qre.tsv.lagThrottler.IsEnabled() {
enabled = 1
}
result := &sqltypes.Result{
diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go
index 83cb28ae849..917ff5a7eb4 100644
--- a/go/vt/vttablet/tabletserver/throttle/throttler.go
+++ b/go/vt/vttablet/tabletserver/throttle/throttler.go
@@ -72,7 +72,7 @@ var (
throttleMetricQuery string
throttleMetricThreshold = math.MaxFloat64
throttlerCheckAsCheckSelf = false
- throttlerConfigViaTopo = false
+ throttlerConfigViaTopo = true
)
func init() {
@@ -91,7 +91,7 @@ func registerThrottlerFlags(fs *pflag.FlagSet) {
}
var (
- ErrThrottlerNotReady = errors.New("throttler not enabled/ready")
+ ErrThrottlerNotOpen = errors.New("throttler not open")
)
// ThrottleCheckType allows a client to indicate what type of check it wants to issue. See available types below.
@@ -222,15 +222,6 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv
return throttler
}
-// CheckIsReady checks if this throttler is ready to serve. If not, it returns an error
-func (throttler *Throttler) CheckIsReady() error {
- if throttler.IsEnabled() {
- // all good
- return nil
- }
- return ErrThrottlerNotReady
-}
-
func (throttler *Throttler) StoreMetricsThreshold(threshold float64) {
throttler.MetricsThreshold.Store(math.Float64bits(threshold))
}
@@ -325,22 +316,25 @@ func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspa
throttlerConfig := throttler.normalizeThrottlerConfig(srvks.ThrottlerConfig)
if throttler.IsEnabled() {
- // Throttler is running and we should apply the config change through Operate()
- // or else we get into race conditions.
+ // Throttler is enabled and we should apply the config change
+ // through Operate() or else we get into race conditions.
go func() {
log.Infof("Throttler: submitting a throttler config apply message with: %+v", throttlerConfig)
throttler.throttlerConfigChan <- throttlerConfig
}()
} else {
- // throttler is not running, we should apply directly
+ throttler.initMutex.Lock()
+ defer throttler.initMutex.Unlock()
+ // Throttler is not enabled, we should apply directly.
throttler.applyThrottlerConfig(context.Background(), throttlerConfig)
}
return true
}
-// applyThrottlerConfig receives a Throttlerconfig as read from SrvKeyspace, and applies the configuration. This may cause
-// the throttler to be enabled/disabled, and of course it affects the throttling query/threshold.
+// applyThrottlerConfig receives a Throttlerconfig as read from SrvKeyspace, and applies the configuration.
+// This may cause the throttler to be enabled/disabled, and of course it affects the throttling query/threshold.
+// Note: you should be holding the initMutex when calling this function.
func (throttler *Throttler) applyThrottlerConfig(ctx context.Context, throttlerConfig *topodatapb.ThrottlerConfig) {
if !throttlerConfigViaTopo {
return
@@ -364,6 +358,24 @@ func (throttler *Throttler) IsEnabled() bool {
return atomic.LoadInt64(&throttler.isEnabled) > 0
}
+func (throttler *Throttler) IsOpen() bool {
+ return atomic.LoadInt64(&throttler.isOpen) > 0
+}
+
+// CheckIsOpen checks if this throttler is ready to serve. If not, it
+// returns an error.
+func (throttler *Throttler) CheckIsOpen() error {
+ if throttler.IsOpen() {
+ // all good
+ return nil
+ }
+ return ErrThrottlerNotOpen
+}
+
+func (throttler *Throttler) IsRunning() bool {
+ return throttler.IsOpen() && throttler.IsEnabled()
+}
+
// Enable activates the throttler probes; when enabled, the throttler responds to check queries based on
// the collected metrics.
func (throttler *Throttler) Enable(ctx context.Context) bool {
@@ -412,10 +424,14 @@ func (throttler *Throttler) Disable(ctx context.Context) bool {
// Open opens database pool and initializes the schema
func (throttler *Throttler) Open() error {
+ // TODO: remove `EnableLagThrottler` in v18
+ if throttler.env.Config().EnableLagThrottler {
+ log.Warningf("The flags `--enable_lag_throttler` and `--throttle_threshold` will be removed in v18. Use 'vtctl UpdateThrottlerConfig', see https://vitess.io/docs/17.0/reference/programs/vtctldclient/vtctldclient_updatethrottlerconfig/")
+ }
log.Infof("Throttler: started execution of Open. Acquiring initMutex lock")
throttler.initMutex.Lock()
defer throttler.initMutex.Unlock()
- if atomic.LoadInt64(&throttler.isOpen) > 0 {
+ if throttler.IsOpen() {
// already open
log.Infof("Throttler: throttler is already open")
return nil
@@ -442,31 +458,34 @@ func (throttler *Throttler) Open() error {
// opening of all other components. We thus read the throttler config in the background.
// However, we want to handle a situation where the read errors out.
// So we kick a loop that keeps retrying reading the config, for as long as this throttler is open.
- go func() {
- retryTicker := time.NewTicker(30 * time.Second)
+ retryReadAndApplyThrottlerConfig := func() {
+ retryInterval := 10 * time.Second
+ retryTicker := time.NewTicker(retryInterval)
defer retryTicker.Stop()
for {
- if atomic.LoadInt64(&throttler.isOpen) == 0 {
- // closed down. No need to keep retrying
+ if !throttler.IsOpen() {
+ // Throttler is not open so no need to keep retrying.
+ log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting")
return
}
throttlerConfig, err := throttler.readThrottlerConfig(ctx)
if err == nil {
- // it's possible that during a retry-sleep, the throttler is closed and opened again, leading
+ log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig)
+ // It's possible that during a retry-sleep, the throttler is closed and opened again, leading
// to two (or more) instances of this goroutine. That's not a big problem; it's fine if all
// attempt to read the throttler config; but we just want to ensure they don't step on each other
// while applying the changes.
throttler.initMutex.Lock()
defer throttler.initMutex.Unlock()
-
throttler.applyThrottlerConfig(ctx, throttlerConfig) // may issue an Enable
return
}
- log.Errorf("Throttler.Open(): error reading throttler config. Will retry in 1 minute. Err=%+v", err)
+ log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): error reading throttler config. Will retry in %v. Err=%+v", retryInterval, err)
<-retryTicker.C
}
- }()
+ }
+ go retryReadAndApplyThrottlerConfig()
} else {
// backwards-cmpatible: check for --enable-lag-throttler flag in vttablet
// this will be removed in a future version
@@ -483,7 +502,7 @@ func (throttler *Throttler) Close() {
throttler.initMutex.Lock()
log.Infof("Throttler: acquired initMutex lock")
defer throttler.initMutex.Unlock()
- if atomic.LoadInt64(&throttler.isOpen) == 0 {
+ if !throttler.IsOpen() {
log.Infof("Throttler: throttler is not open")
return
}
@@ -570,7 +589,6 @@ func (throttler *Throttler) isDormant() bool {
// Operate is the main entry point for the throttler operation and logic. It will
// run the probes, collect metrics, refresh inventory, etc.
func (throttler *Throttler) Operate(ctx context.Context) {
-
tickers := [](*timer.SuspendableTicker){}
addTicker := func(d time.Duration) *timer.SuspendableTicker {
t := timer.NewSuspendableTicker(d, false)
@@ -605,7 +623,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
// sparse
shouldBeLeader := int64(0)
- if atomic.LoadInt64(&throttler.isOpen) > 0 {
+ if throttler.IsOpen() {
if throttler.tabletTypeFunc() == topodatapb.TabletType_PRIMARY {
shouldBeLeader = 1
}
@@ -631,7 +649,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
}
case <-mysqlCollectTicker.C:
{
- if atomic.LoadInt64(&throttler.isOpen) > 0 {
+ if throttler.IsOpen() {
// frequent
if !throttler.isDormant() {
throttler.collectMySQLMetrics(ctx)
@@ -640,7 +658,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
}
case <-mysqlDormantCollectTicker.C:
{
- if atomic.LoadInt64(&throttler.isOpen) > 0 {
+ if throttler.IsOpen() {
// infrequent
if throttler.isDormant() {
throttler.collectMySQLMetrics(ctx)
@@ -655,7 +673,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
case <-mysqlRefreshTicker.C:
{
// sparse
- if atomic.LoadInt64(&throttler.isOpen) > 0 {
+ if throttler.IsOpen() {
go throttler.refreshMySQLInventory(ctx)
}
}
@@ -666,13 +684,13 @@ func (throttler *Throttler) Operate(ctx context.Context) {
}
case <-mysqlAggregateTicker.C:
{
- if atomic.LoadInt64(&throttler.isOpen) > 0 {
+ if throttler.IsOpen() {
throttler.aggregateMySQLMetrics(ctx)
}
}
case <-throttledAppsTicker.C:
{
- if atomic.LoadInt64(&throttler.isOpen) > 0 {
+ if throttler.IsOpen() {
go throttler.expireThrottledApps()
}
}
@@ -1028,7 +1046,7 @@ func (throttler *Throttler) AppRequestMetricResult(ctx context.Context, appName
// checkStore checks the aggregated value of given MySQL store
func (throttler *Throttler) checkStore(ctx context.Context, appName string, storeName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) {
- if !throttler.IsEnabled() {
+ if !throttler.IsRunning() {
return okMetricCheckResult
}
if throttlerapp.ExemptFromChecks(appName) {