From a75191c9d5be13afe8f27482475bb50d97e62d63 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 27 Mar 2023 16:33:16 -0400 Subject: [PATCH 01/19] Flakes: effectively disable vtorc for deterministic behavior For example, we stop replication, wait a few seconds, then expect there to be lag. But vtorc could repair replication during that wait and then the lag is gone. Signed-off-by: Matt Lord --- .../endtoend/tabletmanager/throttler/throttler_test.go | 8 +++++++- .../throttler_custom_config/throttler_test.go | 6 ++++++ .../tabletmanager/throttler_topo/throttler_test.go | 8 +++++++- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler/throttler_test.go b/go/test/endtoend/tabletmanager/throttler/throttler_test.go index 28d0c287c24..9094a9a4e6b 100644 --- a/go/test/endtoend/tabletmanager/throttler/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler/throttler_test.go @@ -107,6 +107,12 @@ func TestMain(m *testing.M) { "--disable_active_reparents", } + // Tell vtorc not to watch our keyspace(s) as we don't want it to e.g. repair + // replication when we explicitly stop it. + for i := range clusterInstance.VTOrcProcesses { + clusterInstance.VTOrcProcesses[i].ExtraArgs = []string{`--clusters_to_watch=""`} + } + // Start keyspace keyspace := &cluster.Keyspace{ Name: keyspaceName, @@ -257,7 +263,7 @@ func TestLag(t *testing.T) { assert.NoError(t, err) }) t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) { - time.Sleep(2 * throttlerThreshold) + time.Sleep(3 * throttlerThreshold) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) diff --git a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go index 04f520defe9..b513f47df05 100644 --- a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go @@ -108,6 +108,12 @@ func TestMain(m *testing.M) { "--heartbeat_interval", "250ms", } + // Tell vtorc not to watch our keyspace(s) as we don't want it to e.g. repair + // replication when we explicitly stop it. + for i := range clusterInstance.VTOrcProcesses { + clusterInstance.VTOrcProcesses[i].ExtraArgs = []string{`--clusters_to_watch=""`} + } + // Start keyspace keyspace := &cluster.Keyspace{ Name: keyspaceName, diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 78daaed63a0..0e3b5a54854 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -116,6 +116,12 @@ func TestMain(m *testing.M) { "--disable_active_reparents", } + // Tell vtorc not to watch our keyspace(s) as we don't want it to e.g. repair + // replication when we explicitly stop it. + for i := range clusterInstance.VTOrcProcesses { + clusterInstance.VTOrcProcesses[i].ExtraArgs = []string{`--clusters_to_watch=""`} + } + // Start keyspace keyspace := &cluster.Keyspace{ Name: keyspaceName, @@ -339,7 +345,7 @@ func TestLag(t *testing.T) { assert.NoError(t, err) }) t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) { - time.Sleep(2 * throttlerThreshold) + time.Sleep(3 * throttlerThreshold) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) From e3b999a12e53029864c695f3acce43d95b22443a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 27 Mar 2023 17:44:42 -0400 Subject: [PATCH 02/19] Wait for the throttler to be up and running everywhere Signed-off-by: Matt Lord --- .../tabletmanager/throttler_topo/throttler_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 0e3b5a54854..f6ee9df4bf9 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -87,6 +87,7 @@ const ( extremelyHighThreshold = 1 * time.Hour onDemandHeartbeatDuration = 5 * time.Second applyConfigWait = 15 * time.Second // time after which we're sure the throttler has refreshed config and tablets + throttlerEnabledTimeout = 60 * time.Second ) func TestMain(m *testing.M) { @@ -340,6 +341,11 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { func TestLag(t *testing.T) { defer cluster.PanicHandler(t) + // Wait for the throttler to be up and running everywhere. + for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} { + onlineddl.WaitForThrottlerStatusEnabled(t, &tablet, throttlerEnabledTimeout) + } + t.Run("stopping replication", func(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) assert.NoError(t, err) @@ -485,6 +491,11 @@ func TestRestoreDefaultQuery(t *testing.T) { t.Run("enabling throttler with standard threshold", func(t *testing.T) { _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttlerThreshold.Seconds(), "", false) assert.NoError(t, err) + + // Wait for the throttler to be up and running everywhere. + for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} { + onlineddl.WaitForThrottlerStatusEnabled(t, &tablet, throttlerEnabledTimeout) + } }) t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) From d30344423d7b97aa7acf5adea99a04192a1dbd59 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 27 Mar 2023 20:08:07 -0400 Subject: [PATCH 03/19] Expose tablet's throttler config and leverage to deflake tests Signed-off-by: Matt Lord --- examples/common/scripts/vttablet-up.sh | 1 + go/flags/endtoend/vttablet.txt | 2 +- .../onlineddl/vrepl/onlineddl_vrepl_test.go | 5 +- go/test/endtoend/onlineddl/vtctlutil.go | 50 ------ .../tabletmanager/throttler/throttler_test.go | 2 - .../throttler_custom_config/throttler_test.go | 6 - .../throttler_topo/throttler_test.go | 114 +++++++------ go/test/endtoend/throttler/util.go | 154 ++++++++++++++++++ .../tabletserver/throttle/throttler.go | 12 +- 9 files changed, 238 insertions(+), 108 deletions(-) create mode 100644 go/test/endtoend/throttler/util.go diff --git a/examples/common/scripts/vttablet-up.sh b/examples/common/scripts/vttablet-up.sh index 0e70837d235..05d16945932 100755 --- a/examples/common/scripts/vttablet-up.sh +++ b/examples/common/scripts/vttablet-up.sh @@ -56,6 +56,7 @@ vttablet \ --pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \ --vtctld_addr http://$hostname:$vtctld_web_port/ \ --disable_active_reparents \ + --throttler-config-via-topo \ > $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 & # Block waiting for the tablet to be listening diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 4d23fffd942..51803119e6c 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -304,7 +304,7 @@ Usage of vttablet: --tablet_protocol string Protocol to use to make queryservice RPCs to vttablets. (default "grpc") --throttle_check_as_check_self Should throttler/check return a throttler/check-self result (changes throttler behavior for writes) --throttle_metrics_query SELECT Override default heartbeat/lag metric. Use either SELECT (must return single row, single value) or `SHOW GLOBAL ... LIKE ...` queries. Set -throttle_metrics_threshold respectively. - --throttle_metrics_threshold float Override default throttle threshold, respective to -throttle_metrics_query (default 1.7976931348623157e+308) + --throttle_metrics_threshold float Override default throttle threshold, respective to --throttle_metrics_query (default 1.7976931348623157e+308) --throttle_tablet_types string Comma separated VTTablet types to be considered by the throttler. default: 'replica'. example: 'replica,rdonly'. 'replica' aways implicitly included (default "replica") --throttle_threshold duration Replication lag threshold for default lag throttling (default 1s) --throttler-config-via-topo When 'true', read config from topo service and ignore throttle_threshold, throttle_metrics_threshold, throttle_metrics_query, throttle_check_as_check_self diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index bc886a7a83f..e54a45b93d1 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" @@ -259,13 +260,13 @@ func TestSchemaChange(t *testing.T) { err := clusterInstance.WaitForTabletsToHealthyInVtgate() require.NoError(t, err) - _, err = onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", false) + _, err = throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", false) require.NoError(t, err) for _, ks := range clusterInstance.Keyspaces { for _, shard := range ks.Shards { for _, tablet := range shard.Vttablets { - onlineddl.WaitForThrottlerStatusEnabled(t, tablet, extendedMigrationWait) + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, nil, extendedMigrationWait) } } } diff --git a/go/test/endtoend/onlineddl/vtctlutil.go b/go/test/endtoend/onlineddl/vtctlutil.go index 52a50dd8214..7bfd09a0585 100644 --- a/go/test/endtoend/onlineddl/vtctlutil.go +++ b/go/test/endtoend/onlineddl/vtctlutil.go @@ -17,8 +17,6 @@ limitations under the License. package onlineddl import ( - "context" - "fmt" "testing" "time" @@ -36,51 +34,3 @@ func CheckCancelAllMigrationsViaVtctl(t *testing.T, vtctlclient *cluster.VtctlCl _, err := vtctlclient.ApplySchemaWithOutput(keyspace, cancelQuery, cluster.VtctlClientParams{}) assert.NoError(t, err) } - -// UpdateThrottlerTopoConfig runs vtctlclient UpdateThrottlerConfig. -// This retries the command until it succeeds or times out as the -// SrvKeyspace record may not yet exist for a newly created -// Keyspace that is still initializing before it becomes serving. -func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string, viaVtctldClient bool) (result string, err error) { - args := []string{} - clientfunc := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput - if !viaVtctldClient { - args = append(args, "--") - clientfunc = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput - } - args = append(args, "UpdateThrottlerConfig") - if enable { - args = append(args, "--enable") - } - if disable { - args = append(args, "--disable") - } - if threshold > 0 { - args = append(args, "--threshold", fmt.Sprintf("%f", threshold)) - } - if metricsQuery != "" { - args = append(args, "--custom-query", metricsQuery) - args = append(args, "--check-as-check-self") - } else { - args = append(args, "--check-as-check-shard") - } - args = append(args, clusterInstance.Keyspaces[0].Name) - - ctx, cancel := context.WithTimeout(context.Background(), throttlerConfigTimeout) - defer cancel() - - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - result, err = clientfunc(args...) - if err == nil { - return result, nil - } - select { - case <-ctx.Done(): - return "", fmt.Errorf("timed out waiting for UpdateThrottlerConfig to succeed after %v. Last seen value: %+v, error: %v", throttlerConfigTimeout, result, err) - case <-ticker.C: - } - } -} diff --git a/go/test/endtoend/tabletmanager/throttler/throttler_test.go b/go/test/endtoend/tabletmanager/throttler/throttler_test.go index 9094a9a4e6b..8aa2c03b314 100644 --- a/go/test/endtoend/tabletmanager/throttler/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler/throttler_test.go @@ -99,8 +99,6 @@ func TestMain(m *testing.M) { "--lock_tables_timeout", "5s", "--watch_replication_stream", "--enable_replication_reporter", - "--enable-lag-throttler", - "--throttle_threshold", throttlerThreshold.String(), "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", onDemandHeartbeatDuration.String(), diff --git a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go index b513f47df05..04f520defe9 100644 --- a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go @@ -108,12 +108,6 @@ func TestMain(m *testing.M) { "--heartbeat_interval", "250ms", } - // Tell vtorc not to watch our keyspace(s) as we don't want it to e.g. repair - // replication when we explicitly stop it. - for i := range clusterInstance.VTOrcProcesses { - clusterInstance.VTOrcProcesses[i].ExtraArgs = []string{`--clusters_to_watch=""`} - } - // Start keyspace keyspace := &cluster.Keyspace{ Name: keyspaceName, diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index f6ee9df4bf9..dbbc8e4e5cd 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -31,12 +31,21 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" "vitess.io/vitess/go/test/endtoend/cluster" - "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +const ( + customQuery = "show global status like 'threads_running'" + customThreshold = 5 * time.Second + unreasonablyLowThreshold = 1 * time.Millisecond + extremelyHighThreshold = 1 * time.Hour + onDemandHeartbeatDuration = 5 * time.Second + throttlerEnabledTimeout = 60 * time.Second +) + var ( clusterInstance *cluster.LocalProcessCluster primaryTablet *cluster.Vttablet @@ -77,17 +86,6 @@ var ( throttledAppsAPIPath = "throttler/throttled-apps" checkAPIPath = "throttler/check" checkSelfAPIPath = "throttler/check-self" - customQuery = "show global status like 'threads_running'" - customThreshold = 5 -) - -const ( - throttlerThreshold = 1 * time.Second // standard, tight threshold - unreasonablyLowThreshold = 1 * time.Millisecond - extremelyHighThreshold = 1 * time.Hour - onDemandHeartbeatDuration = 5 * time.Second - applyConfigWait = 15 * time.Second // time after which we're sure the throttler has refreshed config and tablets - throttlerEnabledTimeout = 60 * time.Second ) func TestMain(m *testing.M) { @@ -110,7 +108,7 @@ func TestMain(m *testing.M) { "--watch_replication_stream", "--enable_replication_reporter", "--throttler-config-via-topo", - "--throttle_threshold", throttlerThreshold.String(), + "--throttle_threshold", throttler.DefaultThreshold.String(), "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", onDemandHeartbeatDuration.String(), @@ -198,7 +196,7 @@ func warmUpHeartbeat(t *testing.T) (respStatus int) { // waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode int) { _ = warmUpHeartbeat(t) - ctx, cancel := context.WithTimeout(context.Background(), onDemandHeartbeatDuration+applyConfigWait) + ctx, cancel := context.WithTimeout(context.Background(), onDemandHeartbeatDuration*4) defer cancel() for { @@ -251,36 +249,61 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("enabling throttler with low threshold", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), "", false) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), "", false) assert.NoError(t, err) + + // Wait for the throttler to be enabled everywhere with the new config. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: unreasonablyLowThreshold.Seconds()}, throttlerEnabledTimeout) + } }) t.Run("validating pushback response from throttler", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) t.Run("disabling throttler", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), "", false) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), "", false) assert.NoError(t, err) + + // Wait for the throttler to be disabled everywhere. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, false, nil, throttlerEnabledTimeout) + } }) t.Run("validating OK response from disabled throttler, again", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("enabling throttler, again", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", true) assert.NoError(t, err) + + // Wait for the throttler to be enabled everywhere again. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: unreasonablyLowThreshold.Seconds()}, throttlerEnabledTimeout) + } }) t.Run("validating pushback response from throttler, again", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) t.Run("setting high threshold", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), "", true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, extremelyHighThreshold.Seconds(), "", true) assert.NoError(t, err) + + // Wait for the throttler to be enabled everywhere with new config. + for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} { + throttler.WaitForThrottlerStatusEnabled(t, &tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: extremelyHighThreshold.Seconds()}, throttlerEnabledTimeout) + } }) t.Run("validating OK response from throttler with high threshold", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("setting low threshold", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttlerThreshold.Seconds(), "", true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), "", true) assert.NoError(t, err) + + // Wait for the throttler to be enabled everywhere with new config. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, throttler.DefaultConfig, throttlerEnabledTimeout) + } }) t.Run("validating pushback response from throttler on low threshold", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) @@ -313,7 +336,7 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { // By this time metrics will have been collected. We expect no lag, and something like: // {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""} - // + // Note: by WHAT time? We just created a fresh cluster and the throttler is disabled... t.Run("validating throttler OK", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) @@ -341,17 +364,22 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { func TestLag(t *testing.T) { defer cluster.PanicHandler(t) - // Wait for the throttler to be up and running everywhere. - for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} { - onlineddl.WaitForThrottlerStatusEnabled(t, &tablet, throttlerEnabledTimeout) - } + t.Run("enabling throttler with defaults", func(t *testing.T) { + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", false) + assert.NoError(t, err) + + // Wait for the throttler to be up and running everywhere. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, throttler.DefaultConfig, throttlerEnabledTimeout) + } + }) t.Run("stopping replication", func(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) assert.NoError(t, err) }) t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) { - time.Sleep(3 * throttlerThreshold) + time.Sleep(10 * time.Second) // Let replication lag buildup due to it having been stopped resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) @@ -404,7 +432,6 @@ func TestNoReplicas(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("restoring to REPLICA", func(t *testing.T) { - err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "REPLICA") assert.NoError(t, err) @@ -415,10 +442,10 @@ func TestNoReplicas(t *testing.T) { func TestCustomQuery(t *testing.T) { defer cluster.PanicHandler(t) - t.Run("enabling throttler with low threshold", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, float64(customThreshold), customQuery, false) + t.Run("enabling throttler with custom query and threshold", func(t *testing.T) { + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold.Seconds(), customQuery, false) + assert.NoError(t, err) - time.Sleep(applyConfigWait) }) t.Run("validating OK response from throttler with custom query", func(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) @@ -432,7 +459,7 @@ func TestCustomQuery(t *testing.T) { t.Run("test threads running", func(t *testing.T) { sleepDuration := 10 * time.Second var wg sync.WaitGroup - for i := 0; i < customThreshold; i++ { + for i := 0; i < int(customThreshold.Seconds()); i++ { // generate different Sleep() calls, all at minimum sleepDuration wg.Add(1) go func(i int) { @@ -482,29 +509,24 @@ func TestCustomQuery(t *testing.T) { } }) }) -} - -func TestRestoreDefaultQuery(t *testing.T) { - // validte going back from custom-query to default-query (replication lag) still works - defer cluster.PanicHandler(t) - - t.Run("enabling throttler with standard threshold", func(t *testing.T) { - _, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttlerThreshold.Seconds(), "", false) + // validate going back from custom-query to default-query (replication lag) still works + t.Run("enabling throttler with default query and threshold", func(t *testing.T) { + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), throttler.DefaultQuery, false) assert.NoError(t, err) // Wait for the throttler to be up and running everywhere. - for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} { - onlineddl.WaitForThrottlerStatusEnabled(t, &tablet, throttlerEnabledTimeout) + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, throttler.DefaultConfig, throttlerEnabledTimeout) } }) - t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) { - waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) - }) - t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { - time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops + t.Run("validating OK response from throttler with low threshold, heartbeats running still", func(t *testing.T) { + time.Sleep(1 * time.Second) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) + }) + t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { + waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) } diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go new file mode 100644 index 00000000000..6c1eb6427a0 --- /dev/null +++ b/go/test/endtoend/throttler/util.go @@ -0,0 +1,154 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package throttler + +import ( + "context" + "fmt" + "io" + "net/http" + "testing" + "time" + + "github.com/buger/jsonparser" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" +) + +type Config struct { + Query string + Threshold float64 +} + +const ( + DefaultQuery = "select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from _vt.heartbeat" + DefaultThreshold = 1 * time.Second + ConfigTimeout = 60 * time.Second +) + +var DefaultConfig = &Config{ + Query: DefaultQuery, + Threshold: DefaultThreshold.Seconds(), +} + +// UpdateThrottlerTopoConfig runs vtctlclient UpdateThrottlerConfig. +// This retries the command until it succeeds or times out as the +// SrvKeyspace record may not yet exist for a newly created +// Keyspace that is still initializing before it becomes serving. +func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string, viaVtctldClient bool) (result string, err error) { + args := []string{} + clientfunc := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput + if !viaVtctldClient { + args = append(args, "--") + clientfunc = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput + } + args = append(args, "UpdateThrottlerConfig") + if enable { + args = append(args, "--enable") + } + if disable { + args = append(args, "--disable") + } + if threshold > 0 { + args = append(args, "--threshold", fmt.Sprintf("%f", threshold)) + } + if metricsQuery != "" { + args = append(args, "--custom-query", metricsQuery) + args = append(args, "--check-as-check-self") + } else { + args = append(args, "--check-as-check-shard") + } + args = append(args, clusterInstance.Keyspaces[0].Name) + + ctx, cancel := context.WithTimeout(context.Background(), ConfigTimeout) + defer cancel() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + result, err = clientfunc(args...) + if err == nil { + return result, nil + } + select { + case <-ctx.Done(): + return "", fmt.Errorf("timed out waiting for UpdateThrottlerConfig to succeed after %v. Last seen value: %+v, error: %v", ConfigTimeout, result, err) + case <-ticker.C: + } + } +} + +// WaitForThrottlerStatusEnabled waits for a tablet to report its throttler status as +// enabled/disabled and have the provided config (if any) until the specified timeout. +func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, enabled bool, config *Config, timeout time.Duration) { + enabledJSONPath := "IsEnabled" + queryJSONPath := "Query" + thresholdJSONPath := "Threshold" + + url := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + body := getHTTPBody(url) + t.Logf("tablet %s throttler status: %s", tablet.Alias, body) + isEnabled, err := jsonparser.GetBoolean([]byte(body), enabledJSONPath) + require.NoError(t, err) + if isEnabled == enabled { + if config == nil { + return + } + query, err := jsonparser.GetString([]byte(body), queryJSONPath) + require.NoError(t, err) + threshold, err := jsonparser.GetFloat([]byte(body), thresholdJSONPath) + require.NoError(t, err) + if query == config.Query && threshold == config.Threshold { + return + } + } + select { + case <-ctx.Done(): + t.Errorf("timeout waiting for the %s tablet's throttler status enabled to be %t with the correct config; last seen value: %s", + tablet.Alias, enabled, body) + return + case <-ticker.C: + } + } +} + +func getHTTPBody(url string) string { + resp, err := http.Get(url) + if err != nil { + log.Infof("http Get returns %+v", err) + return "" + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + log.Infof("http Get returns status %d", resp.StatusCode) + return "" + } + respByte, _ := io.ReadAll(resp.Body) + body := string(respByte) + return body +} diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 2ce6cf20fe8..4be3e9084fc 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -84,7 +84,7 @@ func registerThrottlerFlags(fs *pflag.FlagSet) { fs.DurationVar(&throttleThreshold, "throttle_threshold", throttleThreshold, "Replication lag threshold for default lag throttling") fs.StringVar(&throttleMetricQuery, "throttle_metrics_query", throttleMetricQuery, "Override default heartbeat/lag metric. Use either `SELECT` (must return single row, single value) or `SHOW GLOBAL ... LIKE ...` queries. Set -throttle_metrics_threshold respectively.") - fs.Float64Var(&throttleMetricThreshold, "throttle_metrics_threshold", throttleMetricThreshold, "Override default throttle threshold, respective to -throttle_metrics_query") + fs.Float64Var(&throttleMetricThreshold, "throttle_metrics_threshold", throttleMetricThreshold, "Override default throttle threshold, respective to --throttle_metrics_query") fs.BoolVar(&throttlerCheckAsCheckSelf, "throttle_check_as_check_self", throttlerCheckAsCheckSelf, "Should throttler/check return a throttler/check-self result (changes throttler behavior for writes)") fs.BoolVar(&throttlerConfigViaTopo, "throttler-config-via-topo", throttlerConfigViaTopo, "When 'true', read config from topo service and ignore throttle_threshold, throttle_metrics_threshold, throttle_metrics_query, throttle_check_as_check_self") } @@ -165,6 +165,9 @@ type ThrottlerStatus struct { IsEnabled bool IsDormant bool + Query string + Threshold float64 + AggregatedMetrics map[string]base.MetricResult MetricsHealth base.MetricHealthMap } @@ -255,6 +258,10 @@ func (throttler *Throttler) GetMetricsQuery() string { return throttler.metricsQuery.Load().(string) } +func (throttler *Throttler) GetMetricsThreshold() float64 { + return math.Float64frombits(throttler.MetricsThreshold.Load()) +} + // initThrottler initializes config func (throttler *Throttler) initConfig() { log.Infof("Throttler: initializing config") @@ -1038,6 +1045,9 @@ func (throttler *Throttler) Status() *ThrottlerStatus { IsEnabled: (atomic.LoadInt64(&throttler.isEnabled) > 0), IsDormant: throttler.isDormant(), + Query: throttler.GetMetricsQuery(), + Threshold: throttler.GetMetricsThreshold(), + AggregatedMetrics: throttler.aggregatedMetricsSnapshot(), MetricsHealth: throttler.metricsHealthSnapshot(), } From c623dc913c46231abc9b5031b59fbda6426c0abf Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 Mar 2023 02:11:41 -0400 Subject: [PATCH 04/19] Apply various corrections Signed-off-by: Matt Lord --- .../tabletmanager/throttler/throttler_test.go | 4 +- .../throttler_topo/throttler_test.go | 40 +++++++++---------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler/throttler_test.go b/go/test/endtoend/tabletmanager/throttler/throttler_test.go index 8aa2c03b314..3a1d2b00e52 100644 --- a/go/test/endtoend/tabletmanager/throttler/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler/throttler_test.go @@ -99,6 +99,8 @@ func TestMain(m *testing.M) { "--lock_tables_timeout", "5s", "--watch_replication_stream", "--enable_replication_reporter", + "--enable-lag-throttler", + "--throttle_threshold", throttlerThreshold.String(), "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", onDemandHeartbeatDuration.String(), @@ -261,7 +263,7 @@ func TestLag(t *testing.T) { assert.NoError(t, err) }) t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) { - time.Sleep(3 * throttlerThreshold) + time.Sleep(2 * throttlerThreshold) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index dbbc8e4e5cd..d15c3b8a89f 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -108,7 +108,6 @@ func TestMain(m *testing.M) { "--watch_replication_stream", "--enable_replication_reporter", "--throttler-config-via-topo", - "--throttle_threshold", throttler.DefaultThreshold.String(), "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", onDemandHeartbeatDuration.String(), @@ -336,7 +335,6 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { // By this time metrics will have been collected. We expect no lag, and something like: // {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""} - // Note: by WHAT time? We just created a fresh cluster and the throttler is disabled... t.Run("validating throttler OK", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) @@ -364,22 +362,12 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { func TestLag(t *testing.T) { defer cluster.PanicHandler(t) - t.Run("enabling throttler with defaults", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", false) - assert.NoError(t, err) - - // Wait for the throttler to be up and running everywhere. - for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { - throttler.WaitForThrottlerStatusEnabled(t, tablet, true, throttler.DefaultConfig, throttlerEnabledTimeout) - } - }) - t.Run("stopping replication", func(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) assert.NoError(t, err) }) t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) { - time.Sleep(10 * time.Second) // Let replication lag buildup due to it having been stopped + time.Sleep(2 * throttler.DefaultThreshold) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) @@ -444,8 +432,12 @@ func TestCustomQuery(t *testing.T) { t.Run("enabling throttler with custom query and threshold", func(t *testing.T) { _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold.Seconds(), customQuery, false) - assert.NoError(t, err) + + // Wait for the throttler to be enabled everywhere with new custom config. + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, &throttler.Config{Query: customQuery, Threshold: customThreshold.Seconds()}, throttlerEnabledTimeout) + } }) t.Run("validating OK response from throttler with custom query", func(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) @@ -509,24 +501,28 @@ func TestCustomQuery(t *testing.T) { } }) }) - // validate going back from custom-query to default-query (replication lag) still works +} + +func TestRestoreDefaultQuery(t *testing.T) { + defer cluster.PanicHandler(t) + + // Validate going back from custom-query to default-query (replication lag) still works. t.Run("enabling throttler with default query and threshold", func(t *testing.T) { _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), throttler.DefaultQuery, false) assert.NoError(t, err) - // Wait for the throttler to be up and running everywhere. + // Wait for the throttler to be up and running everywhere again with the default config. for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { throttler.WaitForThrottlerStatusEnabled(t, tablet, true, throttler.DefaultConfig, throttlerEnabledTimeout) } }) - t.Run("validating OK response from throttler with low threshold, heartbeats running still", func(t *testing.T) { - time.Sleep(1 * time.Second) + t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) { + waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) + }) + t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { + time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) - }) - t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { - waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) } From 262147799fd5cb1d0db38225541bee682291fe65 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 Mar 2023 03:14:23 -0400 Subject: [PATCH 05/19] Be more explicit about VTOrc behavior changes Signed-off-by: Matt Lord --- .../tabletmanager/throttler/throttler_test.go | 6 ------ .../tabletmanager/throttler_topo/throttler_test.go | 11 +++++------ go/test/endtoend/throttler/util.go | 1 - 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler/throttler_test.go b/go/test/endtoend/tabletmanager/throttler/throttler_test.go index 3a1d2b00e52..28d0c287c24 100644 --- a/go/test/endtoend/tabletmanager/throttler/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler/throttler_test.go @@ -107,12 +107,6 @@ func TestMain(m *testing.M) { "--disable_active_reparents", } - // Tell vtorc not to watch our keyspace(s) as we don't want it to e.g. repair - // replication when we explicitly stop it. - for i := range clusterInstance.VTOrcProcesses { - clusterInstance.VTOrcProcesses[i].ExtraArgs = []string{`--clusters_to_watch=""`} - } - // Start keyspace keyspace := &cluster.Keyspace{ Name: keyspaceName, diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index d15c3b8a89f..61610007212 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -114,12 +114,6 @@ func TestMain(m *testing.M) { "--disable_active_reparents", } - // Tell vtorc not to watch our keyspace(s) as we don't want it to e.g. repair - // replication when we explicitly stop it. - for i := range clusterInstance.VTOrcProcesses { - clusterInstance.VTOrcProcesses[i].ExtraArgs = []string{`--clusters_to_watch=""`} - } - // Start keyspace keyspace := &cluster.Keyspace{ Name: keyspaceName, @@ -361,6 +355,11 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { func TestLag(t *testing.T) { defer cluster.PanicHandler(t) + // Temporarily disable VTOrc recoveries because we want to + // STOP replication specifically in order to increase the + // lag and we DO NOT want VTOrc to try and fix this. + clusterInstance.DisableVTOrcRecoveries(t) + defer clusterInstance.EnableVTOrcRecoveries(t) t.Run("stopping replication", func(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index 6c1eb6427a0..5da02de1ec6 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -112,7 +112,6 @@ func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, enabl for { body := getHTTPBody(url) - t.Logf("tablet %s throttler status: %s", tablet.Alias, body) isEnabled, err := jsonparser.GetBoolean([]byte(body), enabledJSONPath) require.NoError(t, err) if isEnabled == enabled { From 421d38b998fe924eb45e2d95fb1f036d767c0ba0 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 Mar 2023 12:34:58 -0400 Subject: [PATCH 06/19] Note received throttler response when it is unexpected Signed-off-by: Matt Lord --- .../throttler_topo/throttler_test.go | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 61610007212..493761d4543 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -86,6 +86,10 @@ var ( throttledAppsAPIPath = "throttler/throttled-apps" checkAPIPath = "throttler/check" checkSelfAPIPath = "throttler/check-self" + getResponse = func(resp *http.Response) string { + body, _ := io.ReadAll(resp.Body) + return string(body) + } ) func TestMain(m *testing.M) { @@ -310,14 +314,14 @@ func TestInitialThrottler(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) }) t.Run("validating OK response from throttler with low threshold, heartbeats running still", func(t *testing.T) { time.Sleep(1 * time.Second) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) }) t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) @@ -371,7 +375,7 @@ func TestLag(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) }) t.Run("primary self-check should still be fine", func(t *testing.T) { resp, err := throttleCheckSelf(primaryTablet) @@ -442,10 +446,7 @@ func TestCustomQuery(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - - b, err := io.ReadAll(resp.Body) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode, "response: %v", string(b)) + assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) }) t.Run("test threads running", func(t *testing.T) { sleepDuration := 10 * time.Second @@ -466,19 +467,13 @@ func TestCustomQuery(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - - b, err := io.ReadAll(resp.Body) - assert.NoError(t, err) - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode, "response: %v", string(b)) + assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) } { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - - b, err := io.ReadAll(resp.Body) - assert.NoError(t, err) - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode, "response: %v", string(b)) + assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) } }) t.Run("wait for queries to terminate", func(t *testing.T) { @@ -490,13 +485,13 @@ func TestCustomQuery(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) } { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) } }) }) From 7a6d31a48ff50afa933639f028d5bc57b0865842 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 Mar 2023 14:30:27 -0400 Subject: [PATCH 07/19] Fixes from local testing Signed-off-by: Matt Lord --- examples/common/scripts/vttablet-up.sh | 2 +- .../throttler_topo/throttler_test.go | 15 ++++++++------- go/test/endtoend/throttler/util.go | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/examples/common/scripts/vttablet-up.sh b/examples/common/scripts/vttablet-up.sh index 05d16945932..481b4a46019 100755 --- a/examples/common/scripts/vttablet-up.sh +++ b/examples/common/scripts/vttablet-up.sh @@ -56,7 +56,7 @@ vttablet \ --pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \ --vtctld_addr http://$hostname:$vtctld_web_port/ \ --disable_active_reparents \ - --throttler-config-via-topo \ + --throttler-config-via-topo --heartbeat_enable --heartbeat_interval=250ms --heartbeat_on_demand_duration=5s \ > $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 & # Block waiting for the tablet to be listening diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 493761d4543..9587c773263 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -44,6 +44,7 @@ const ( extremelyHighThreshold = 1 * time.Hour onDemandHeartbeatDuration = 5 * time.Second throttlerEnabledTimeout = 60 * time.Second + useDefaultQuery = "" ) var ( @@ -245,8 +246,8 @@ func TestInitialThrottler(t *testing.T) { t.Run("validating OK response from disabled throttler", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) - t.Run("enabling throttler with low threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), "", false) + t.Run("enabling throttler with very low threshold", func(t *testing.T) { + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery, false) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere with the new config. @@ -258,7 +259,7 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) t.Run("disabling throttler", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), "", false) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), useDefaultQuery, false) assert.NoError(t, err) // Wait for the throttler to be disabled everywhere. @@ -270,7 +271,7 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("enabling throttler, again", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery, true) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere again. @@ -282,7 +283,7 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) t.Run("setting high threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, extremelyHighThreshold.Seconds(), "", true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, extremelyHighThreshold.Seconds(), useDefaultQuery, true) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere with new config. @@ -294,7 +295,7 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("setting low threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), "", true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, true) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere with new config. @@ -502,7 +503,7 @@ func TestRestoreDefaultQuery(t *testing.T) { // Validate going back from custom-query to default-query (replication lag) still works. t.Run("enabling throttler with default query and threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), throttler.DefaultQuery, false) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, false) assert.NoError(t, err) // Wait for the throttler to be up and running everywhere again with the default config. diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index 5da02de1ec6..777d258fbd5 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -68,8 +68,8 @@ func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, ena if threshold > 0 { args = append(args, "--threshold", fmt.Sprintf("%f", threshold)) } + args = append(args, "--custom-query", metricsQuery) if metricsQuery != "" { - args = append(args, "--custom-query", metricsQuery) args = append(args, "--check-as-check-self") } else { args = append(args, "--check-as-check-shard") From 4f98ae1ff0da7a2e91c3ec1ebe0b273b30dfd083 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 Mar 2023 14:36:41 -0400 Subject: [PATCH 08/19] Nits from self review Signed-off-by: Matt Lord --- go/test/endtoend/throttler/util.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index 777d258fbd5..9f38d9daf1b 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -89,7 +89,7 @@ func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, ena } select { case <-ctx.Done(): - return "", fmt.Errorf("timed out waiting for UpdateThrottlerConfig to succeed after %v. Last seen value: %+v, error: %v", ConfigTimeout, result, err) + return "", fmt.Errorf("timed out waiting for UpdateThrottlerConfig to succeed after %v; last seen value: %+v, error: %v", ConfigTimeout, result, err) case <-ticker.C: } } @@ -101,12 +101,9 @@ func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, enabl enabledJSONPath := "IsEnabled" queryJSONPath := "Query" thresholdJSONPath := "Threshold" - url := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort) - ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -128,8 +125,8 @@ func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, enabl } select { case <-ctx.Done(): - t.Errorf("timeout waiting for the %s tablet's throttler status enabled to be %t with the correct config; last seen value: %s", - tablet.Alias, enabled, body) + t.Errorf("timed out waiting for the %s tablet's throttler status enabled to be %t with the correct config after %v; last seen value: %s", + tablet.Alias, enabled, timeout, body) return case <-ticker.C: } From d28de5ab9038823319f18fd83f5be0d53e58a6b6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 Mar 2023 16:31:47 -0400 Subject: [PATCH 09/19] Use assert.Equalf on failed assertions Signed-off-by: Matt Lord --- .../throttler_topo/throttler_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 9587c773263..58565d89041 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -315,14 +315,14 @@ func TestInitialThrottler(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) }) t.Run("validating OK response from throttler with low threshold, heartbeats running still", func(t *testing.T) { time.Sleep(1 * time.Second) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) }) t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) @@ -376,7 +376,7 @@ func TestLag(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) }) t.Run("primary self-check should still be fine", func(t *testing.T) { resp, err := throttleCheckSelf(primaryTablet) @@ -447,7 +447,7 @@ func TestCustomQuery(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) }) t.Run("test threads running", func(t *testing.T) { sleepDuration := 10 * time.Second @@ -468,13 +468,13 @@ func TestCustomQuery(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) } { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) } }) t.Run("wait for queries to terminate", func(t *testing.T) { @@ -486,13 +486,13 @@ func TestCustomQuery(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) } { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) } }) }) From d613f944b040d4a55f19d07b2e85f6ed8f7274f5 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 Mar 2023 16:39:30 -0400 Subject: [PATCH 10/19] Ummm, duh. Signed-off-by: Matt Lord --- .../throttler_topo/throttler_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 58565d89041..0cda2649b0a 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -87,7 +87,7 @@ var ( throttledAppsAPIPath = "throttler/throttled-apps" checkAPIPath = "throttler/check" checkSelfAPIPath = "throttler/check-self" - getResponse = func(resp *http.Response) string { + getResponseBody = func(resp *http.Response) string { body, _ := io.ReadAll(resp.Body) return string(body) } @@ -315,14 +315,14 @@ func TestInitialThrottler(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("validating OK response from throttler with low threshold, heartbeats running still", func(t *testing.T) { time.Sleep(1 * time.Second) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) @@ -376,7 +376,7 @@ func TestLag(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("primary self-check should still be fine", func(t *testing.T) { resp, err := throttleCheckSelf(primaryTablet) @@ -447,7 +447,7 @@ func TestCustomQuery(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("test threads running", func(t *testing.T) { sleepDuration := 10 * time.Second @@ -468,13 +468,13 @@ func TestCustomQuery(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) } { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) } }) t.Run("wait for queries to terminate", func(t *testing.T) { @@ -486,13 +486,13 @@ func TestCustomQuery(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) } { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponse) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) } }) }) From 3a793f82a5f0aa3d20776ef762e94b6cbf172b8b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 Mar 2023 20:12:10 -0400 Subject: [PATCH 11/19] Try to get rid of last bit of flakiness Which seemed to revolve around NOT sleeping long enough after starting all the sleep queries. Signed-off-by: Matt Lord --- .../throttler_topo/throttler_test.go | 20 ++++-- go/test/endtoend/throttler/util.go | 62 +++++++++++++++++++ 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 0cda2649b0a..8462e2f2a74 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -461,9 +461,13 @@ func TestCustomQuery(t *testing.T) { }(i) } t.Run("exceeds threshold", func(t *testing.T) { - time.Sleep(sleepDuration / 2) - // by this time we will have testThreshold+1 threads_running, and we should hit the threshold - // {"StatusCode":429,"Value":2,"Threshold":2,"Message":"Threshold exceeded"} + throttler.WaitForQueryResult(t, primaryTablet, + "select if(variable_value > 5, 'true', 'false') as result from performance_schema.global_status where variable_name='threads_running'", + "true", sleepDuration/2) + throttler.WaitForValidData(t, primaryTablet, sleepDuration/2) + // Now we should be reporting ~ customThreshold*2 threads_running, and we should + // hit the threshold. For example: + // {"StatusCode":429,"Value":6,"Threshold":5,"Message":"Threshold exceeded"} { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) @@ -511,13 +515,17 @@ func TestRestoreDefaultQuery(t *testing.T) { throttler.WaitForThrottlerStatusEnabled(t, tablet, true, throttler.DefaultConfig, throttlerEnabledTimeout) } }) - t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) { - waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) + t.Run("validating OK response from throttler with default threshold, heartbeats running", func(t *testing.T) { + resp, err := throttleCheck(primaryTablet, false) + require.NoError(t, err) + defer resp.Body.Close() + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) - t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { + t.Run("validating pushback response from throttler on default threshold once heartbeats go stale", func(t *testing.T) { time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() + assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) }) } diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index 9f38d9daf1b..94dbfc01e04 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -148,3 +148,65 @@ func getHTTPBody(url string) string { body := string(respByte) return body } + +// WaitForQueryResult waits for a tablet to return the given result for the given +// query until the specified timeout. +// This is for simple queries that return 1 column in 1 row. It compares the result +// for that column as a string with the provided result. +func WaitForQueryResult(t *testing.T, tablet *cluster.Vttablet, query, result string, timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + res, err := tablet.VttabletProcess.QueryTablet(query, "", false) + require.NoError(t, err) + if res != nil && len(res.Rows) == 1 && res.Rows[0][0].ToString() == result { + return + } + select { + case <-ctx.Done(): + t.Errorf("timed out waiting for the %q query to produce a result of %s on tablet %s after %v; last seen value: %s", + query, result, tablet.Alias, timeout, res.Rows[0][0].ToString()) + return + case <-ticker.C: + } + } +} + +// WaitForValidData waits for a tablet's checks to return a non 500 http response +// which indicates that it's not able to provide valid results. This is most +// commonly caused by the throttler still gathering the initial results for +// the given configuration. +func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Duration) { + checkURL := fmt.Sprintf("http://localhost:%d/throttler/check", tablet.HTTPPort) + selfCheckURL := fmt.Sprintf("http://localhost:%d/throttler/check-self", tablet.HTTPPort) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + checkResp, checkErr := http.Get(checkURL) + if checkErr != nil { + defer checkResp.Body.Close() + } + selfCheckResp, selfCheckErr := http.Get(selfCheckURL) + if selfCheckErr != nil { + defer selfCheckResp.Body.Close() + } + if checkErr == nil && selfCheckErr == nil && + checkResp.StatusCode != http.StatusInternalServerError && + selfCheckResp.StatusCode != http.StatusInternalServerError { + return + } + select { + case <-ctx.Done(): + t.Errorf("timed out waiting for %s tablet's throttler to return a valid status after %v; last seen value: %+v", + tablet.Alias, timeout, checkResp) + return + case <-ticker.C: + } + } +} From 07333c2ec62c8d526bb285c8f39aff190381cdc7 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 29 Mar 2023 00:33:12 -0400 Subject: [PATCH 12/19] Nits from self review Signed-off-by: Matt Lord --- go/test/endtoend/throttler/util.go | 4 ++-- go/vt/vttablet/tabletserver/throttle/throttler.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index 94dbfc01e04..e8769999fc1 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -167,7 +167,7 @@ func WaitForQueryResult(t *testing.T, tablet *cluster.Vttablet, query, result st } select { case <-ctx.Done(): - t.Errorf("timed out waiting for the %q query to produce a result of %s on tablet %s after %v; last seen value: %s", + t.Errorf("timed out waiting for the %q query to produce a result of %q on tablet %s after %v; last seen value: %s", query, result, tablet.Alias, timeout, res.Rows[0][0].ToString()) return case <-ticker.C: @@ -203,7 +203,7 @@ func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Durat } select { case <-ctx.Done(): - t.Errorf("timed out waiting for %s tablet's throttler to return a valid status after %v; last seen value: %+v", + t.Errorf("timed out waiting for %s tablet's throttler to return a valid result after %v; last seen value: %+v", tablet.Alias, timeout, checkResp) return case <-ticker.C: diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 4be3e9084fc..b32403596f9 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -317,7 +317,8 @@ func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspa throttlerConfig := throttler.normalizeThrottlerConfig(srvks.ThrottlerConfig) if throttler.IsEnabled() { - // throttler is running and we should apply the config change through Operate() or else we get into race conditions + // Throttler is running and we should apply the config change through Operate() + // or else we get into race conditions. go func() { throttler.throttlerConfigChan <- throttlerConfig }() From 181a5cdf717f451ea0e976fefe71d055dcc978e1 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 29 Mar 2023 11:02:23 -0400 Subject: [PATCH 13/19] Address review comments Signed-off-by: Matt Lord --- .../endtoend/tabletmanager/throttler_topo/throttler_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 8462e2f2a74..5a9bc6b9d87 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -271,7 +271,8 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("enabling throttler, again", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery, true) + // Enable throttler again with the default query, leaving threshold unchanged (0 parameter value). + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery, true) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere again. @@ -295,7 +296,7 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("setting low threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, true) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere with new config. From a3017f2f7804e7d560f51ba72c0084f116ee9141 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 29 Mar 2023 13:06:38 -0400 Subject: [PATCH 14/19] Adjust test for behavior and comment it And adjust timing Signed-off-by: Matt Lord --- .../throttler_topo/throttler_test.go | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 5a9bc6b9d87..33db6eb3309 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -45,6 +45,7 @@ const ( onDemandHeartbeatDuration = 5 * time.Second throttlerEnabledTimeout = 60 * time.Second useDefaultQuery = "" + useDefaultThreshold = 0 ) var ( @@ -271,13 +272,17 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("enabling throttler, again", func(t *testing.T) { - // Enable throttler again with the default query, leaving threshold unchanged (0 parameter value). - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery, true) + // Enable throttler again with the default query and moving back to the default threshold + // as the 0 parameter value elides the --threshold flag to the client command, which in + // turn causes a zero value threshold to be sent in the RPC request which in turn causes + // the default threshold to be set. So this should in effect enable the throttler again + // with the default config. + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, useDefaultThreshold, useDefaultQuery, true) assert.NoError(t, err) - // Wait for the throttler to be enabled everywhere again. + // Wait for the throttler to be enabled everywhere again with the default config. for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { - throttler.WaitForThrottlerStatusEnabled(t, tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: unreasonablyLowThreshold.Seconds()}, throttlerEnabledTimeout) + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, throttler.DefaultConfig, throttlerEnabledTimeout) } }) t.Run("validating pushback response from throttler, again", func(t *testing.T) { @@ -451,10 +456,10 @@ func TestCustomQuery(t *testing.T) { assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("test threads running", func(t *testing.T) { - sleepDuration := 10 * time.Second + sleepDuration := 20 * time.Second var wg sync.WaitGroup for i := 0; i < int(customThreshold.Seconds()); i++ { - // generate different Sleep() calls, all at minimum sleepDuration + // Generate different Sleep() calls, all at minimum sleepDuration. wg.Add(1) go func(i int) { defer wg.Done() @@ -464,8 +469,8 @@ func TestCustomQuery(t *testing.T) { t.Run("exceeds threshold", func(t *testing.T) { throttler.WaitForQueryResult(t, primaryTablet, "select if(variable_value > 5, 'true', 'false') as result from performance_schema.global_status where variable_name='threads_running'", - "true", sleepDuration/2) - throttler.WaitForValidData(t, primaryTablet, sleepDuration/2) + "true", sleepDuration/3) + throttler.WaitForValidData(t, primaryTablet, sleepDuration-(5*time.Second)) // Now we should be reporting ~ customThreshold*2 threads_running, and we should // hit the threshold. For example: // {"StatusCode":429,"Value":6,"Threshold":5,"Message":"Threshold exceeded"} From 19fe7c89aa3c091daafe0fdf63ce1b0eac3e3b98 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 29 Mar 2023 15:49:00 -0400 Subject: [PATCH 15/19] Align both stale hearbeat checks Signed-off-by: Matt Lord --- .../throttler_topo/throttler_test.go | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 33db6eb3309..9cc0d45b26b 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -214,7 +214,7 @@ func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode require.NoError(t, err) resp.Body.Close() - assert.Equal(t, wantCode, resp.StatusCode, "body: %v", string(b)) + assert.Equalf(t, wantCode, resp.StatusCode, "body: %s", string(b)) return default: resp.Body.Close() @@ -272,11 +272,11 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("enabling throttler, again", func(t *testing.T) { - // Enable throttler again with the default query and moving back to the default threshold - // as the 0 parameter value elides the --threshold flag to the client command, which in - // turn causes a zero value threshold to be sent in the RPC request which in turn causes - // the default threshold to be set. So this should in effect enable the throttler again - // with the default config. + // Enable throttler again with the default query which also moves us back to the default + // threshold as the 0 parameter value elides the --threshold flag to the client command, + // which in turn causes a zero value threshold to be sent in the RPC request which in turn + // causes the default threshold to be set when reverting to the default query (lag throttler). + // So this should in effect enable the throttler again with the default config. _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, useDefaultThreshold, useDefaultQuery, true) assert.NoError(t, err) @@ -331,6 +331,7 @@ func TestInitialThrottler(t *testing.T) { assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { + time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) } @@ -347,20 +348,20 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { resp, body, err := throttledApps(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) assert.Contains(t, body, "always-throttled-app") }) t.Run("validating primary check self", func(t *testing.T) { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("validating replica check self", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) } @@ -389,13 +390,13 @@ func TestLag(t *testing.T) { require.NoError(t, err) defer resp.Body.Close() // self (on primary) is unaffected by replication lag - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("replica self-check should show error", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("starting replication", func(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias) @@ -409,13 +410,13 @@ func TestLag(t *testing.T) { require.NoError(t, err) defer resp.Body.Close() // self (on primary) is unaffected by replication lag - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) t.Run("replica self-check should be fine", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) require.NoError(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) }) } @@ -529,9 +530,6 @@ func TestRestoreDefaultQuery(t *testing.T) { }) t.Run("validating pushback response from throttler on default threshold once heartbeats go stale", func(t *testing.T) { time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops - resp, err := throttleCheck(primaryTablet, false) - require.NoError(t, err) - defer resp.Body.Close() - assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) } From 830b9bd7086567343a2d2ce754e3a5f52eb14607 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 29 Mar 2023 19:51:16 -0400 Subject: [PATCH 16/19] Remove no longer needed flag This is because enabling heartbeats with --heartbeat_enable also results in the replication reporter being enabled: https://github.com/vitessio/vitess/blob/3d9ef871e42bd20a60ec95997c97ecf0694c1e78/go/vt/vttablet/tabletserver/tabletenv/config.go#L235-L237 Signed-off-by: Matt Lord --- examples/common/scripts/vttablet-up.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/common/scripts/vttablet-up.sh b/examples/common/scripts/vttablet-up.sh index 481b4a46019..d22fd33ab48 100755 --- a/examples/common/scripts/vttablet-up.sh +++ b/examples/common/scripts/vttablet-up.sh @@ -46,7 +46,6 @@ vttablet \ --init_shard $shard \ --init_tablet_type $tablet_type \ --health_check_interval 5s \ - --enable_replication_reporter \ --backup_storage_implementation file \ --file_backup_storage_root $VTDATAROOT/backups \ --restore_from_backup \ From 7f412079b794bdd694471bb24428ad190b72d0a0 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 29 Mar 2023 21:15:48 -0400 Subject: [PATCH 17/19] Correct comment Signed-off-by: Matt Lord --- .../tabletmanager/throttler_topo/throttler_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 9cc0d45b26b..96a09f8a64f 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -45,7 +45,6 @@ const ( onDemandHeartbeatDuration = 5 * time.Second throttlerEnabledTimeout = 60 * time.Second useDefaultQuery = "" - useDefaultThreshold = 0 ) var ( @@ -273,11 +272,8 @@ func TestInitialThrottler(t *testing.T) { }) t.Run("enabling throttler, again", func(t *testing.T) { // Enable throttler again with the default query which also moves us back to the default - // threshold as the 0 parameter value elides the --threshold flag to the client command, - // which in turn causes a zero value threshold to be sent in the RPC request which in turn - // causes the default threshold to be set when reverting to the default query (lag throttler). - // So this should in effect enable the throttler again with the default config. - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, useDefaultThreshold, useDefaultQuery, true) + // threshold. + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery, true) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere again with the default config. From db8e13d5665c89d8f870a20693d974dbc478476c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 29 Mar 2023 21:18:22 -0400 Subject: [PATCH 18/19] Correct comment part II: electric boogaloo Signed-off-by: Matt Lord --- .../endtoend/tabletmanager/throttler_topo/throttler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 96a09f8a64f..daa7a9f280c 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -271,8 +271,8 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("enabling throttler, again", func(t *testing.T) { - // Enable throttler again with the default query which also moves us back to the default - // threshold. + // Enable the throttler again with the default query which also moves us back + // to the default threshold. _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery, true) assert.NoError(t, err) From 06628d8d64483a93708325ea1cd6e9d2e776ce40 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 29 Mar 2023 21:21:26 -0400 Subject: [PATCH 19/19] Revert one other minor unnecessary change. Signed-off-by: Matt Lord --- go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index daa7a9f280c..fe87262a21f 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -285,7 +285,7 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) t.Run("setting high threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, extremelyHighThreshold.Seconds(), useDefaultQuery, true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), useDefaultQuery, true) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere with new config.