diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index eed83337267..a097ce2a0cc 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -265,6 +265,7 @@ Flags: --publish-retry-interval duration how long vttablet waits to retry publishing the tablet record (default 30s) --purge-logs-interval duration how often try to remove old logs (default 1h0m0s) --query-log-stream-handler string URL handler for streaming queries log (default "/debug/querylog") + --query-throttler-config-refresh-interval duration How frequently to refresh configuration for the query throttler (default 1m0s) --query-timeout int Sets the default query timeout (in ms). Can be overridden by session variable (query_timeout) or comment directive (QUERY_TIMEOUT_MS) --querylog-buffer-size int Maximum number of buffered query logs before throttling log output (default 10) --querylog-emit-on-any-condition-met Emit to query log when any of the conditions (row-threshold, time-threshold, filter-tag) is met (default false) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index e0fd537df5c..f06b91e73e4 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -259,6 +259,7 @@ Flags: --publish-retry-interval duration how long vttablet waits to retry publishing the tablet record (default 30s) --purge-logs-interval duration how often try to remove old logs (default 1h0m0s) --query-log-stream-handler string URL handler for streaming queries log (default "/debug/querylog") + --query-throttler-config-refresh-interval duration How frequently to refresh configuration for the query throttler (default 1m0s) --querylog-emit-on-any-condition-met Emit to query log when any of the conditions (row-threshold, time-threshold, filter-tag) is met (default false) --querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization --querylog-format string format for query logs ("text" or "json") (default "text") diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index c63e5d0c69a..e5b7e61c789 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -158,6 +158,10 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { return nil, err } + if reqThrottledErr := qre.tsv.queryThrottler.Throttle(qre.ctx, qre.targetTabletType, qre.plan.FullQuery, qre.connID, qre.options); reqThrottledErr != nil { + return nil, reqThrottledErr + } + if qre.plan.PlanID == p.PlanNextval { return qre.execNextval() } @@ -350,6 +354,10 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error { return err } + if reqThrottledErr := qre.tsv.queryThrottler.Throttle(qre.ctx, qre.targetTabletType, qre.plan.FullQuery, qre.connID, qre.options); reqThrottledErr != nil { + return reqThrottledErr + } + switch qre.plan.PlanID { case p.PlanSelectStream: if qre.bindVars[sqltypes.BvReplaceSchemaName] != nil { diff --git a/go/vt/vttablet/tabletserver/querythrottler/config.go b/go/vt/vttablet/tabletserver/querythrottler/config.go new file mode 100644 index 00000000000..f81f7b7fc89 --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/config.go @@ -0,0 +1,37 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package querythrottler + +import "vitess.io/vitess/go/vt/vttablet/tabletserver/querythrottler/registry" + +// Compile-time interface compliance check +var _ registry.StrategyConfig = (*Config)(nil) + +// Config defines the runtime configuration for the QueryThrottler. +// It specifies whether throttling is enabled and which strategy to use. +type Config struct { + // Enabled indicates whether the throttler should actively apply throttling logic. + Enabled bool `json:"enabled"` + + // Strategy selects which throttling strategy should be used. + Strategy registry.ThrottlingStrategy `json:"strategy"` +} + +// GetStrategy implements registry.StrategyConfig interface +func (c Config) GetStrategy() registry.ThrottlingStrategy { + return c.Strategy +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/config_loader_interface.go b/go/vt/vttablet/tabletserver/querythrottler/config_loader_interface.go new file mode 100644 index 00000000000..ce04ffe7d9c --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/config_loader_interface.go @@ -0,0 +1,24 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package querythrottler + +import "context" + +type ConfigLoader interface { + // Load returns the latest throttler config (may come from file, topo, etc.) + Load(ctx context.Context) (Config, error) +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/file_based_config_loader.go b/go/vt/vttablet/tabletserver/querythrottler/file_based_config_loader.go new file mode 100644 index 00000000000..68b78149b5c --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/file_based_config_loader.go @@ -0,0 +1,69 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package querythrottler + +import ( + "context" + "encoding/json" + "os" +) + +const defaultConfigPath = "/config/throttler-config.json" + +var _ ConfigLoader = (*FileBasedConfigLoader)(nil) + +// FileBasedConfigLoader implements ConfigLoader by reading configuration from a JSON file. +type FileBasedConfigLoader struct { + configPath string + readFile func(string) ([]byte, error) + unmarshal func([]byte, interface{}) error +} + +// NewFileBasedConfigLoader creates a new instance of FileBasedConfigLoader. +// It uses the standard config path "/config/throttler-config.json" and standard os.ReadFile and json.Unmarshal functions. +func NewFileBasedConfigLoader() *FileBasedConfigLoader { + return &FileBasedConfigLoader{ + configPath: defaultConfigPath, + readFile: os.ReadFile, + unmarshal: json.Unmarshal, + } +} + +// NewFileBasedConfigLoaderWithDeps creates a new instance with custom dependencies for testing. +// This allows injection of mock functions without global state modification. +func NewFileBasedConfigLoaderWithDeps(configPath string, readFile func(string) ([]byte, error), unmarshal func([]byte, interface{}) error) *FileBasedConfigLoader { + return &FileBasedConfigLoader{ + configPath: configPath, + readFile: readFile, + unmarshal: unmarshal, + } +} + +// Load reads the configuration from the configured file path. +func (f *FileBasedConfigLoader) Load(ctx context.Context) (Config, error) { + data, err := f.readFile(f.configPath) + if err != nil { + return Config{}, err + } + + var cfg Config + if unMarshalErr := f.unmarshal(data, &cfg); unMarshalErr != nil { + return Config{}, unMarshalErr + } + + return cfg, nil +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/file_based_config_loader_test.go b/go/vt/vttablet/tabletserver/querythrottler/file_based_config_loader_test.go new file mode 100644 index 00000000000..19ea02c3e2b --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/file_based_config_loader_test.go @@ -0,0 +1,220 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package querythrottler + +import ( + "context" + "encoding/json" + "errors" + "testing" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/querythrottler/registry" + + "github.com/stretchr/testify/require" +) + +func TestNewFileBasedConfigLoader(t *testing.T) { + loader := NewFileBasedConfigLoader() + require.NotNil(t, loader) + require.IsType(t, &FileBasedConfigLoader{}, loader) + require.Equal(t, defaultConfigPath, loader.configPath) +} + +func TestFileBasedConfigLoader_Load(t *testing.T) { + tests := []struct { + name string + configPath string + mockReadFile func(filename string) ([]byte, error) + mockJsonUnmarshal func(data []byte, v interface{}) error + expectedConfig Config + expectedError string + expectedErrorNotNil bool + }{ + { + name: "successful config load with minimal config", + configPath: "/config/throttler-config.json", + mockReadFile: func(filename string) ([]byte, error) { + require.Equal(t, "/config/throttler-config.json", filename) + return []byte(`{"enabled": true, "strategy": "TabletThrottler"}`), nil + }, + mockJsonUnmarshal: func(data []byte, v interface{}) error { + return json.Unmarshal(data, v) + }, + expectedConfig: Config{ + Enabled: true, + Strategy: registry.ThrottlingStrategyTabletThrottler, + }, + expectedErrorNotNil: false, + }, + { + name: "successful config load with disabled throttler", + configPath: "/config/throttler-config.json", + mockReadFile: func(filename string) ([]byte, error) { + require.Equal(t, "/config/throttler-config.json", filename) + return []byte(`{"enabled": false, "strategy": "TabletThrottler"}`), nil + }, + mockJsonUnmarshal: func(data []byte, v interface{}) error { + return json.Unmarshal(data, v) + }, + expectedConfig: Config{ + Enabled: false, + Strategy: registry.ThrottlingStrategyTabletThrottler, + }, + expectedErrorNotNil: false, + }, + { + name: "file read error - file not found", + configPath: "/nonexistent/config.json", + mockReadFile: func(filename string) ([]byte, error) { + require.Equal(t, "/nonexistent/config.json", filename) + return nil, errors.New("no such file or directory") + }, + mockJsonUnmarshal: func(data []byte, v interface{}) error { + return json.Unmarshal(data, v) + }, + expectedConfig: Config{}, + expectedError: "no such file or directory", + expectedErrorNotNil: true, + }, + { + name: "file read error - permission denied", + configPath: "/config/throttler-config.json", + mockReadFile: func(filename string) ([]byte, error) { + require.Equal(t, "/config/throttler-config.json", filename) + return nil, errors.New("permission denied") + }, + mockJsonUnmarshal: func(data []byte, v interface{}) error { + return json.Unmarshal(data, v) + }, + expectedConfig: Config{}, + expectedError: "permission denied", + expectedErrorNotNil: true, + }, + { + name: "json unmarshal error - invalid json", + configPath: "/config/throttler-config.json", + mockReadFile: func(filename string) ([]byte, error) { + require.Equal(t, "/config/throttler-config.json", filename) + return []byte(`{"enabled": true`), nil + }, + mockJsonUnmarshal: func(data []byte, v interface{}) error { + return json.Unmarshal(data, v) + }, + expectedConfig: Config{}, + expectedError: "unexpected end of JSON input", + expectedErrorNotNil: true, + }, + { + name: "json unmarshal error - invalid field type", + configPath: "/config/throttler-config.json", + mockReadFile: func(filename string) ([]byte, error) { + require.Equal(t, "/config/throttler-config.json", filename) + return []byte(`{"enabled": "not_a_boolean", "strategy": "TabletThrottler"}`), nil + }, + mockJsonUnmarshal: func(data []byte, v interface{}) error { + return json.Unmarshal(data, v) + }, + expectedConfig: Config{}, + expectedError: "cannot unmarshal string into Go struct field Config.enabled of type bool", + expectedErrorNotNil: true, + }, + { + name: "empty file - should unmarshal to zero value config", + configPath: "/config/throttler-config.json", + mockReadFile: func(filename string) ([]byte, error) { + require.Equal(t, "/config/throttler-config.json", filename) + return []byte(`{}`), nil + }, + mockJsonUnmarshal: func(data []byte, v interface{}) error { + return json.Unmarshal(data, v) + }, + expectedConfig: Config{ + Enabled: false, + Strategy: "", + }, + expectedErrorNotNil: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create loader with injected dependencies + loader := NewFileBasedConfigLoaderWithDeps(tt.configPath, tt.mockReadFile, tt.mockJsonUnmarshal) + + // Test + config, err := loader.Load(context.Background()) + + // Assert + if tt.expectedErrorNotNil { + require.Error(t, err) + require.Contains(t, err.Error(), tt.expectedError) + require.Equal(t, tt.expectedConfig, config) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectedConfig, config) + } + }) + } +} + +func TestFileBasedConfigLoader_Load_ConfigPath(t *testing.T) { + // Test that the production loader uses the default config path + var capturedPath string + + mockReadFile := func(filename string) ([]byte, error) { + capturedPath = filename + return []byte(`{"enabled": true, "strategy": "TabletThrottler"}`), nil + } + + mockJsonUnmarshal := func(data []byte, v interface{}) error { + return json.Unmarshal(data, v) + } + + // Test with production constructor (should use default path) + loader := NewFileBasedConfigLoaderWithDeps(defaultConfigPath, mockReadFile, mockJsonUnmarshal) + _, err := loader.Load(context.Background()) + + require.NoError(t, err) + require.Equal(t, "/config/throttler-config.json", capturedPath) +} + +func TestFileBasedConfigLoader_ImplementsConfigLoader(t *testing.T) { + // Verify that FileBasedConfigLoader implements ConfigLoader interface + var _ ConfigLoader = (*FileBasedConfigLoader)(nil) + + // This should compile without issues, proving interface compliance + loader := NewFileBasedConfigLoader() + require.NotNil(t, loader) +} + +func TestNewFileBasedConfigLoaderWithDeps(t *testing.T) { + configPath := "/test/config.json" + mockReadFile := func(string) ([]byte, error) { return nil, nil } + mockUnmarshal := func([]byte, interface{}) error { return nil } + + loader := NewFileBasedConfigLoaderWithDeps(configPath, mockReadFile, mockUnmarshal) + + require.NotNil(t, loader) + require.Equal(t, configPath, loader.configPath) + // Note: We can't directly test function equality, but the constructor should set them +} + +func TestFileBasedConfigLoader_UsesDefaultPath(t *testing.T) { + // Test that the production constructor uses the default path + loader := NewFileBasedConfigLoader() + require.Equal(t, "/config/throttler-config.json", loader.configPath) +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/query_throttler.go b/go/vt/vttablet/tabletserver/querythrottler/query_throttler.go new file mode 100644 index 00000000000..3d7920f48f1 --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/query_throttler.go @@ -0,0 +1,162 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package querythrottler + +import ( + "context" + "sync" + "time" + + "vitess.io/vitess/go/vt/sqlparser" + + "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/querythrottler/registry" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" +) + +type QueryThrottler struct { + ctx context.Context + throttleClient *throttle.Client + tabletConfig *tabletenv.TabletConfig + mu sync.RWMutex + // cfg holds the current configuration for the throttler. + cfg Config + // cfgLoader is responsible for loading the configuration. + cfgLoader ConfigLoader + // strategy is the current throttling strategy handler. + strategy registry.ThrottlingStrategyHandler +} + +// NewQueryThrottler creates a new query throttler. +func NewQueryThrottler(ctx context.Context, throttler *throttle.Throttler, cfgLoader ConfigLoader, env tabletenv.Env) *QueryThrottler { + client := throttle.NewBackgroundClient(throttler, throttlerapp.QueryThrottlerName, base.UndefinedScope) + + qt := &QueryThrottler{ + ctx: ctx, + throttleClient: client, + tabletConfig: env.Config(), + cfg: Config{}, + cfgLoader: cfgLoader, + strategy: ®istry.NoOpStrategy{}, // default strategy until config is loaded + } + + // Start the initial strategy + qt.strategy.Start() + + // starting the loop which will be responsible for refreshing the config. + qt.startConfigRefreshLoop() + + return qt +} + +// Shutdown gracefully stops the throttler and cleans up resources. +// This should be called when the QueryThrottler is no longer needed. +func (qt *QueryThrottler) Shutdown() { + qt.mu.Lock() + defer qt.mu.Unlock() + + // Stop the current strategy to clean up any background processes + if qt.strategy != nil { + qt.strategy.Stop() + } +} + +// Throttle checks if the tablet is under heavy load +// and enforces throttling by rejecting the incoming request if necessary. +// Note: This method performs lock-free reads of config and strategy for optimal performance. +// Config updates are rare (default: every 1 minute) compared to query frequency, +// so the tiny risk of reading slightly stale data during config updates is acceptable +// for the significant performance improvement of avoiding mutex contention. +func (qt *QueryThrottler) Throttle(ctx context.Context, tabletType topodatapb.TabletType, parsedQuery *sqlparser.ParsedQuery, transactionID int64, options *querypb.ExecuteOptions) error { + // Lock-free read: for maximum performance in the hot path as cfg and strategy are updated rarely (default once per minute). + // They are word-sized and safe for atomic reads; stale data for one query is acceptable and avoids mutex contention in the hot path. + if !qt.cfg.Enabled { + return nil + } + + // Evaluate the throttling decision + decision := qt.strategy.Evaluate(ctx, tabletType, parsedQuery, transactionID, options) + + // If no throttling is needed, allow the query + if !decision.Throttle { + return nil + } + + // Normal throttling: return an error to reject the query + return vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, decision.Message) +} + +// selectThrottlingStrategy returns the appropriate strategy implementation based on the config. +func selectThrottlingStrategy(cfg Config, client *throttle.Client, tabletConfig *tabletenv.TabletConfig) registry.ThrottlingStrategyHandler { + deps := registry.Deps{ + ThrottleClient: client, + TabletConfig: tabletConfig, + } + return registry.CreateStrategy(cfg, deps) +} + +// startConfigRefreshLoop launches a background goroutine that refreshes the throttler's configuration +// at the interval specified by QueryThrottlerConfigRefreshInterval. +func (qt *QueryThrottler) startConfigRefreshLoop() { + go func() { + refreshInterval := qt.tabletConfig.QueryThrottlerConfigRefreshInterval + configRefreshTicker := time.NewTicker(refreshInterval) + defer configRefreshTicker.Stop() + + for { + select { + case <-qt.ctx.Done(): + return + case <-configRefreshTicker.C: + newCfg, err := qt.cfgLoader.Load(qt.ctx) + if err != nil { + log.Errorf("Error loading config: %v", err) + continue + } + + // Only restart strategy if the strategy type has changed + if qt.cfg.Strategy != newCfg.Strategy { + // Stop the current strategy before switching to a new one + if qt.strategy != nil { + qt.strategy.Stop() + } + + newStrategy := selectThrottlingStrategy(newCfg, qt.throttleClient, qt.tabletConfig) + // Update strategy and start the new one + qt.mu.Lock() + qt.strategy = newStrategy + qt.mu.Unlock() + if qt.strategy != nil { + qt.strategy.Start() + } + } + + // Always update the configuration + qt.mu.Lock() + qt.cfg = newCfg + qt.mu.Unlock() + } + } + }() +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/query_throttler_test.go b/go/vt/vttablet/tabletserver/querythrottler/query_throttler_test.go new file mode 100644 index 00000000000..5824edaf6cc --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/query_throttler_test.go @@ -0,0 +1,152 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package querythrottler + +import ( + "context" + "testing" + "time" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/querythrottler/registry" + + "vitess.io/vitess/go/vt/vtenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" +) + +func TestNewQueryThrottler_ConfigRefresh(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + config := &tabletenv.TabletConfig{ + QueryThrottlerConfigRefreshInterval: 10 * time.Millisecond, + } + env := tabletenv.NewEnv(vtenv.NewTestEnv(), config, "TestThrottler") + + throttler := &throttle.Throttler{} // use mock if needed + iqt := NewQueryThrottler(ctx, throttler, newFakeConfigLoader(Config{ + Enabled: true, + Strategy: registry.ThrottlingStrategyTabletThrottler, + }), env) + + // Assert initial state (should be NoOpStrategy) + require.NotNil(t, iqt) + iqt.mu.RLock() + initialStrategy := iqt.strategy + iqt.mu.RUnlock() + require.IsType(t, ®istry.NoOpStrategy{}, initialStrategy) + + require.Eventually(t, func() bool { + iqt.mu.RLock() + defer iqt.mu.RUnlock() + + // Assert updated cfg and strategy after config refresh + if !iqt.cfg.Enabled { + return false + } + if iqt.cfg.Strategy != registry.ThrottlingStrategyTabletThrottler { + return false + } + return true + }, 1*time.Second, 10*time.Millisecond, "Config should be refreshed and strategy should be updated") +} + +func TestSelectThrottlingStrategy(t *testing.T) { + tests := []struct { + name string + giveThrottlingStrategy registry.ThrottlingStrategy + expectedType registry.ThrottlingStrategyHandler + }{ + { + name: "Unknown strategy defaults to NoOp", + giveThrottlingStrategy: "some-unknown-string", + expectedType: ®istry.NoOpStrategy{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockClient := &throttle.Client{} + + config := &tabletenv.TabletConfig{ + QueryThrottlerConfigRefreshInterval: 10 * time.Millisecond, + } + + strategy := selectThrottlingStrategy(Config{Enabled: true, Strategy: tt.giveThrottlingStrategy}, mockClient, config) + + require.IsType(t, tt.expectedType, strategy) + }) + } +} + +// TestQueryThrottler_StrategyLifecycleManagement tests that strategies are properly started and stopped. +func TestQueryThrottler_StrategyLifecycleManagement(t *testing.T) { + // Test that initial strategy is started + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + throttler := &throttle.Throttler{} + config := &tabletenv.TabletConfig{ + QueryThrottlerConfigRefreshInterval: 10 * time.Millisecond, + } + env := tabletenv.NewEnv(vtenv.NewTestEnv(), config, "TestThrottler") + + iqt := NewQueryThrottler(ctx, throttler, newFakeConfigLoader(Config{ + Enabled: true, + Strategy: registry.ThrottlingStrategyTabletThrottler, + }), env) + + // Verify initial strategy was started (NoOpStrategy in this case) + require.NotNil(t, iqt.strategy) + + // Test Shutdown properly stops the strategy + iqt.Shutdown() + + // After shutdown, the strategy should have been stopped + // In a real test, we would verify the strategy's Stop method was called + require.NotNil(t, iqt.strategy) // Strategy reference should still exist but be stopped +} + +// TestQueryThrottler_Shutdown tests the Shutdown method. +func TestQueryThrottler_Shutdown(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + config := &tabletenv.TabletConfig{ + QueryThrottlerConfigRefreshInterval: 10 * time.Millisecond, + } + env := tabletenv.NewEnv(vtenv.NewTestEnv(), config, "TestThrottler") + + throttler := &throttle.Throttler{} + iqt := NewQueryThrottler(ctx, throttler, newFakeConfigLoader(Config{ + Enabled: false, + Strategy: registry.ThrottlingStrategyTabletThrottler, + }), env) + + // Should not panic when called multiple times + iqt.Shutdown() + iqt.Shutdown() + + // Should still be able to check the strategy reference + iqt.mu.RLock() + strategy := iqt.strategy + iqt.mu.RUnlock() + require.NotNil(t, strategy) +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/registry/noop_strategy.go b/go/vt/vttablet/tabletserver/querythrottler/registry/noop_strategy.go new file mode 100644 index 00000000000..4b40a949ff3 --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/registry/noop_strategy.go @@ -0,0 +1,54 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +// NoOpStrategy sits in the registry sub-package because registry must produce a safe fallback without importing its parent. +// Moving it up would create a circular dependency (querythrottler imports registry, and registry would then have to import querythrottler), which Go prohibits +import ( + "context" + + "vitess.io/vitess/go/vt/sqlparser" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// NoOpStrategy is not intended to be a selectable policy. It exists solely as a hard-wired, last-resort fallback for the registry. +// By instantiating it directly inside CreateStrategy(), we guarantee the system always has some strategy, even when configuration is wrong or plugin registration fails. +// Registering it would make it user-configurable, which opens the door for an accidental ‘no throttling’ setup in production. Hence, it deliberately remains unregistered. +var _ ThrottlingStrategyHandler = (*NoOpStrategy)(nil) + +// NoOpStrategy is a fallback strategy that performs no throttling. +type NoOpStrategy struct{} + +// Evaluate always returns a decision to not throttle since this is a no-op strategy. +func (s *NoOpStrategy) Evaluate(ctx context.Context, targetTabletType topodatapb.TabletType, fullQuery *sqlparser.ParsedQuery, transactionID int64, options *querypb.ExecuteOptions) ThrottleDecision { + return ThrottleDecision{ + Throttle: false, + Message: "NoOpStrategy: no throttling applied", + } +} + +// Start is a no-op for the NoOpStrategy since it requires no initialization. +func (s *NoOpStrategy) Start() { + // No-op: NoOpStrategy requires no initialization or background processes +} + +// Stop is a no-op for the NoOpStrategy since it has no resources to clean up. +func (s *NoOpStrategy) Stop() { + // No-op: NoOpStrategy has no resources to clean up +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/registry/noop_strategy_test.go b/go/vt/vttablet/tabletserver/querythrottler/registry/noop_strategy_test.go new file mode 100644 index 00000000000..99dcc512355 --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/registry/noop_strategy_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "context" + "testing" + + "vitess.io/vitess/go/vt/sqlparser" + + "github.com/stretchr/testify/require" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// TestNoOpStrategy_Lifecycle tests the Start and Stop methods of NoOpStrategy. +func TestNoOpStrategy_Lifecycle(t *testing.T) { + strategy := &NoOpStrategy{} + + // Test that Start and Stop can be called without errors + // These are no-ops but should not panic + strategy.Start() + strategy.Stop() + + // Test multiple Start/Stop calls (should be safe) + strategy.Start() + strategy.Start() + strategy.Stop() + strategy.Stop() + + // Verify Evaluate still works after Start/Stop + decision := strategy.Evaluate(context.Background(), topodatapb.TabletType_PRIMARY, &sqlparser.ParsedQuery{Query: "SELECT 1"}, 0, nil) + require.False(t, decision.Throttle, "NoOpStrategy should never throttle") +} + +func TestNoOpStrategy_Evaluate(t *testing.T) { + tests := []struct { + name string + giveTabletType topodatapb.TabletType + giveSQL string + expectedResult ThrottleDecision + }{ + { + name: "no throttling for tablet type primary", + giveTabletType: topodatapb.TabletType_PRIMARY, + giveSQL: "SELECT * FROM users", + expectedResult: ThrottleDecision{ + Throttle: false, + Message: "NoOpStrategy: no throttling applied", + }, + }, + { + name: "no throttling for tablet type replica", + giveTabletType: topodatapb.TabletType_REPLICA, + giveSQL: "INSERT INTO logs VALUES (1)", + expectedResult: ThrottleDecision{ + Throttle: false, + Message: "NoOpStrategy: no throttling applied", + }, + }, + { + name: "no throttling for tablet type rdonly", + giveTabletType: topodatapb.TabletType_RDONLY, + giveSQL: "UPDATE stats SET count = count + 1", + expectedResult: ThrottleDecision{ + Throttle: false, + Message: "NoOpStrategy: no throttling applied", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + strategy := &NoOpStrategy{} + + result := strategy.Evaluate(context.Background(), tt.giveTabletType, &sqlparser.ParsedQuery{Query: tt.giveSQL}, 0, nil) + require.Equal(t, tt.expectedResult, result) + }) + } +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/registry/registry.go b/go/vt/vttablet/tabletserver/querythrottler/registry/registry.go new file mode 100644 index 00000000000..02ea175cd67 --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/registry/registry.go @@ -0,0 +1,89 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "fmt" + "sync" + + "vitess.io/vitess/go/vt/log" +) + +var ( + mu sync.RWMutex + factories = map[ThrottlingStrategy]StrategyFactory{} +) + +// Register registers a new strategy factory with the given name. +// Panics if a strategy with the same name is already registered (fail-fast behavior). +func Register(name ThrottlingStrategy, factory StrategyFactory) { + mu.Lock() + defer mu.Unlock() + + if _, exists := factories[name]; exists { + panic(fmt.Sprintf("strategy %s already registered", name)) + } + + factories[name] = factory + log.Infof("Registered throttling strategy: %s", name) +} + +// Get retrieves a strategy factory by name. +// Returns the factory and true if found, nil and false otherwise. +func Get(name ThrottlingStrategy) (StrategyFactory, bool) { + mu.RLock() + defer mu.RUnlock() + + factory, exists := factories[name] + return factory, exists +} + +// CreateStrategy creates a new strategy instance using the registered factory. +// Falls back to NoOpStrategy for unknown strategies or factory errors. +func CreateStrategy(cfg StrategyConfig, deps Deps) ThrottlingStrategyHandler { + // If the requested strategy name is unknown or the factory panics/returns error, CreateStrategy() directly constructs &NoOpStrategy{} as its unconditional fallback. + // Configuration files or runtime switches never list "NoOp" as a valid strategy choice; they list “TabletThrottler”, “Cinnamon”, etc. + // The design intent is: + // Every “real” strategy must self-register to opt-in. + // NoOpStrategy must always be available—even before any registration happens—so the registry itself can safely fall back on it. + factory, ok := Get(cfg.GetStrategy()) + if !ok { + log.Warningf("Unknown strategy %s, using NoOp", cfg.GetStrategy()) + return &NoOpStrategy{} + } + + strategy, err := factory.New(deps, cfg) + if err != nil { + log.Errorf("Strategy %s failed to init: %v, using NoOp", cfg.GetStrategy(), err) + return &NoOpStrategy{} + } + + return strategy +} + +// ListRegistered returns a list of all registered strategy names. +// Useful for debugging and testing. +func ListRegistered() []ThrottlingStrategy { + mu.RLock() + defer mu.RUnlock() + + strategies := make([]ThrottlingStrategy, 0, len(factories)) + for name := range factories { + strategies = append(strategies, name) + } + return strategies +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/registry/registry_test.go b/go/vt/vttablet/tabletserver/querythrottler/registry/registry_test.go new file mode 100644 index 00000000000..149c0bd33d2 --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/registry/registry_test.go @@ -0,0 +1,130 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// reset clears all registered factories. This is used for testing. +func reset() { + mu.Lock() + defer mu.Unlock() + factories = make(map[ThrottlingStrategy]StrategyFactory) +} + +// testStrategyFactory is a simple factory for testing. +type testStrategyFactory struct{} + +// Compile-time interface compliance check +var _ StrategyFactory = (*testStrategyFactory)(nil) + +func (f testStrategyFactory) New(deps Deps, cfg StrategyConfig) (ThrottlingStrategyHandler, error) { + return &NoOpStrategy{}, nil +} + +func TestRegister(t *testing.T) { + reset() + + // Test successful registration + testFactory := testStrategyFactory{} + Register("test-strategy", testFactory) + + factory, exists := Get("test-strategy") + require.True(t, exists) + require.Equal(t, testFactory, factory) +} + +func TestRegisterDuplicate(t *testing.T) { + reset() + + testFactory := testStrategyFactory{} + Register("test-strategy", testFactory) + + // Should panic on duplicate registration + require.Panics(t, func() { + Register("test-strategy", testFactory) + }) +} + +func TestGetUnknown(t *testing.T) { + reset() + + factory, exists := Get("unknown-strategy") + require.False(t, exists) + require.Nil(t, factory) +} + +// testConfig implements the Config interface for testing +type testConfig struct { + strategy ThrottlingStrategy +} + +func (c testConfig) GetStrategy() ThrottlingStrategy { + return c.strategy +} + +func (c testConfig) GetTabletStrategyConfig() interface{} { + return nil +} + +func TestCreateStrategy(t *testing.T) { + reset() + + // Register a test factory + Register("test-strategy", testStrategyFactory{}) + + cfg := testConfig{ + strategy: "test-strategy", + } + deps := Deps{} + + strategy := CreateStrategy(cfg, deps) + require.IsType(t, &NoOpStrategy{}, strategy) +} + +func TestCreateStrategyUnknown(t *testing.T) { + reset() + + cfg := testConfig{ + strategy: "unknown-strategy", + } + deps := Deps{} + + strategy := CreateStrategy(cfg, deps) + // Should fallback to NoOpStrategy + require.IsType(t, &NoOpStrategy{}, strategy) +} + +func TestListRegistered(t *testing.T) { + reset() + + // Initially empty + strategies := ListRegistered() + require.Empty(t, strategies) + + // Register some strategies + Register("strategy1", testStrategyFactory{}) + Register("strategy2", testStrategyFactory{}) + + strategies = ListRegistered() + require.Len(t, strategies, 2) + require.Contains(t, strategies, ThrottlingStrategy("strategy1")) + require.Contains(t, strategies, ThrottlingStrategy("strategy2")) +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/registry/throttling_handler.go b/go/vt/vttablet/tabletserver/querythrottler/registry/throttling_handler.go new file mode 100644 index 00000000000..43e8c92db39 --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/registry/throttling_handler.go @@ -0,0 +1,61 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "context" + + "vitess.io/vitess/go/vt/sqlparser" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// Predefined throttling strategies for the QueryThrottler. +const ( + // ThrottlingStrategyTabletThrottler uses Vitess Tablet Throttler to shed load + // from incoming queries when the tablet is under pressure. + // Reference: https://vitess.io/docs/21.0/reference/features/tablet-throttler/ + ThrottlingStrategyTabletThrottler ThrottlingStrategy = "TabletThrottler" + + // ThrottlingStrategyUnknown is used when the strategy is not known. + ThrottlingStrategyUnknown ThrottlingStrategy = "Unknown" +) + +// ThrottlingStrategy represents the strategy used to apply throttling +// to incoming queries based on system load or external signals. +type ThrottlingStrategy string + +// ThrottlingStrategyHandler defines the interface for throttling strategies +// used by the QueryThrottler. Each strategy encapsulates its own logic +// to determine whether throttling should be applied for an incoming query. +type ThrottlingStrategyHandler interface { + // Evaluate determines whether a query should be throttled and returns detailed information about the decision. + // This method separates the decision-making logic from the enforcement action, enabling features like dry-run mode. + // It returns a ThrottleDecision struct containing all relevant information about the throttling decision. + Evaluate(ctx context.Context, targetTabletType topodatapb.TabletType, parsedQuery *sqlparser.ParsedQuery, transactionID int64, options *querypb.ExecuteOptions) ThrottleDecision + + // Start initializes and starts the throttling strategy. + // This method should be called when the strategy becomes active. + // Implementations may start background processes, caching, or other resources. + Start() + + // Stop gracefully shuts down the throttling strategy and releases any resources. + // This method should be called when the strategy is no longer needed. + // Implementations should clean up background processes, caches, or other resources. + Stop() +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/registry/types.go b/go/vt/vttablet/tabletserver/querythrottler/registry/types.go new file mode 100644 index 00000000000..c7fa4eb576b --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/registry/types.go @@ -0,0 +1,62 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" +) + +// ThrottleDecision represents the result of evaluating whether a query should be throttled. +// It separates the decision-making logic from the enforcement action. +type ThrottleDecision struct { + // Throttle indicates whether the query should be throttled. + Throttle bool + + // Message contains a human-readable description of the throttling decision. + // This message can be used for logging, error responses, or metrics. + Message string + + // MetricName identifies which metric triggered the throttling decision (if any). + MetricName string + + // MetricValue contains the current value of the metric that triggered throttling. + MetricValue float64 + + // Threshold contains the threshold value that was breached. + Threshold float64 + + // ThrottlePercentage contains the percentage chance this query was throttled (0.0-1.0). + ThrottlePercentage float64 +} + +// StrategyConfig defines the configuration interface that strategy implementations +// must satisfy. This avoids circular imports by using a generic interface. +type StrategyConfig interface { + GetStrategy() ThrottlingStrategy +} + +// Deps holds the dependencies required by strategy factories. +type Deps struct { + ThrottleClient *throttle.Client + TabletConfig *tabletenv.TabletConfig +} + +// StrategyFactory creates a new strategy instance with the given dependencies and configuration. +type StrategyFactory interface { + New(deps Deps, cfg StrategyConfig) (ThrottlingStrategyHandler, error) +} diff --git a/go/vt/vttablet/tabletserver/querythrottler/test_wrappers.go b/go/vt/vttablet/tabletserver/querythrottler/test_wrappers.go new file mode 100644 index 00000000000..85f74eb6080 --- /dev/null +++ b/go/vt/vttablet/tabletserver/querythrottler/test_wrappers.go @@ -0,0 +1,37 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package querythrottler + +import "context" + +// fakeConfigLoader is a test fake that implements ConfigLoader. +type fakeConfigLoader struct { + giveConfig Config +} + +// newFakeConfigLoader creates a fake config loader +// with a fully constructed Config. +func newFakeConfigLoader(cfg Config) *fakeConfigLoader { + return &fakeConfigLoader{ + giveConfig: cfg, + } +} + +// Load implements the ConfigLoader interface. +func (f *fakeConfigLoader) Load(ctx context.Context) (Config, error) { + return f.giveConfig, nil +} diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index f53fce59288..e1a22c3dae7 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -124,6 +124,7 @@ type stateManager struct { messager subComponent ddle onlineDDLExecutor throttler lagThrottler + qThrottler queryThrottler tableGC tableGarbageCollector // hcticks starts on initialization and runs forever. @@ -193,6 +194,11 @@ type ( Open() error Close() } + + queryThrottler interface { + Open() error + Close() + } ) // Init performs the second phase of initialization. @@ -468,6 +474,7 @@ func (sm *stateManager) servePrimary() error { sm.te.AcceptReadWrite() sm.messager.Open() sm.throttler.Open() + sm.qThrottler.Open() sm.tableGC.Open() sm.ddle.Open() sm.setState(topodatapb.TabletType_PRIMARY, StateServing) @@ -511,6 +518,7 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er sm.rt.MakeNonPrimary() sm.watcher.Open() sm.throttler.Open() + sm.qThrottler.Open() sm.setState(wantTabletType, StateServing) return nil } @@ -563,6 +571,8 @@ func (sm *stateManager) unserveCommon() { log.Infof("Finished table garbage collector close. Started lag throttler close") sm.throttler.Close() log.Infof("Finished lag throttler close. Started messager close") + sm.qThrottler.Close() + log.Infof("Finished query throttler close. Started query throttler close") sm.messager.Close() log.Infof("Finished messager close. Started txEngine close") sm.te.Close() diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index b4b36bbcfa4..255f4beda24 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -88,8 +88,9 @@ func TestStateManagerServePrimary(t *testing.T) { verifySubcomponent(t, 8, sm.te, testStatePrimary) verifySubcomponent(t, 9, sm.messager, testStateOpen) verifySubcomponent(t, 10, sm.throttler, testStateOpen) - verifySubcomponent(t, 11, sm.tableGC, testStateOpen) - verifySubcomponent(t, 12, sm.ddle, testStateOpen) + verifySubcomponent(t, 11, sm.qThrottler, testStateOpen) + verifySubcomponent(t, 12, sm.tableGC, testStateOpen) + verifySubcomponent(t, 13, sm.ddle, testStateOpen) assert.False(t, sm.se.(*testSchemaEngine).nonPrimary) assert.True(t, sm.se.(*testSchemaEngine).ensureCalled) @@ -132,17 +133,18 @@ func TestStateManagerUnservePrimary(t *testing.T) { verifySubcomponent(t, 1, sm.ddle, testStateClosed) verifySubcomponent(t, 2, sm.tableGC, testStateClosed) verifySubcomponent(t, 3, sm.throttler, testStateClosed) - verifySubcomponent(t, 4, sm.messager, testStateClosed) - verifySubcomponent(t, 5, sm.te, testStateClosed) + verifySubcomponent(t, 4, sm.qThrottler, testStateClosed) + verifySubcomponent(t, 5, sm.messager, testStateClosed) + verifySubcomponent(t, 6, sm.te, testStateClosed) - verifySubcomponent(t, 6, sm.tracker, testStateClosed) - verifySubcomponent(t, 7, sm.watcher, testStateClosed) - verifySubcomponent(t, 8, sm.se, testStateOpen) - verifySubcomponent(t, 9, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 10, sm.qe, testStateOpen) - verifySubcomponent(t, 11, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 7, sm.tracker, testStateClosed) + verifySubcomponent(t, 8, sm.watcher, testStateClosed) + verifySubcomponent(t, 9, sm.se, testStateOpen) + verifySubcomponent(t, 10, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 11, sm.qe, testStateOpen) + verifySubcomponent(t, 12, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 12, sm.rt, testStatePrimary) + verifySubcomponent(t, 13, sm.rt, testStatePrimary) assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.target.TabletType) assert.Equal(t, StateNotServing, sm.state) @@ -189,19 +191,20 @@ func TestStateManagerUnserveNonPrimary(t *testing.T) { verifySubcomponent(t, 1, sm.ddle, testStateClosed) verifySubcomponent(t, 2, sm.tableGC, testStateClosed) verifySubcomponent(t, 3, sm.throttler, testStateClosed) - verifySubcomponent(t, 4, sm.messager, testStateClosed) - verifySubcomponent(t, 5, sm.te, testStateClosed) + verifySubcomponent(t, 4, sm.qThrottler, testStateClosed) + verifySubcomponent(t, 5, sm.messager, testStateClosed) + verifySubcomponent(t, 6, sm.te, testStateClosed) - verifySubcomponent(t, 6, sm.tracker, testStateClosed) + verifySubcomponent(t, 7, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonPrimary) - verifySubcomponent(t, 7, sm.se, testStateOpen) - verifySubcomponent(t, 8, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 9, sm.qe, testStateOpen) - verifySubcomponent(t, 10, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 8, sm.se, testStateOpen) + verifySubcomponent(t, 9, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 10, sm.qe, testStateOpen) + verifySubcomponent(t, 11, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 11, sm.rt, testStateNonPrimary) - verifySubcomponent(t, 12, sm.watcher, testStateOpen) + verifySubcomponent(t, 12, sm.rt, testStateNonPrimary) + verifySubcomponent(t, 13, sm.watcher, testStateOpen) assert.Equal(t, topodatapb.TabletType_RDONLY, sm.target.TabletType) assert.Equal(t, StateNotServing, sm.state) @@ -216,16 +219,17 @@ func TestStateManagerClose(t *testing.T) { verifySubcomponent(t, 1, sm.ddle, testStateClosed) verifySubcomponent(t, 2, sm.tableGC, testStateClosed) verifySubcomponent(t, 3, sm.throttler, testStateClosed) - verifySubcomponent(t, 4, sm.messager, testStateClosed) - verifySubcomponent(t, 5, sm.te, testStateClosed) - verifySubcomponent(t, 6, sm.tracker, testStateClosed) - - verifySubcomponent(t, 7, sm.txThrottler, testStateClosed) - verifySubcomponent(t, 8, sm.qe, testStateClosed) - verifySubcomponent(t, 9, sm.watcher, testStateClosed) - verifySubcomponent(t, 10, sm.vstreamer, testStateClosed) - verifySubcomponent(t, 11, sm.rt, testStateClosed) - verifySubcomponent(t, 12, sm.se, testStateClosed) + verifySubcomponent(t, 4, sm.qThrottler, testStateClosed) + verifySubcomponent(t, 5, sm.messager, testStateClosed) + verifySubcomponent(t, 6, sm.te, testStateClosed) + verifySubcomponent(t, 7, sm.tracker, testStateClosed) + + verifySubcomponent(t, 8, sm.txThrottler, testStateClosed) + verifySubcomponent(t, 9, sm.qe, testStateClosed) + verifySubcomponent(t, 10, sm.watcher, testStateClosed) + verifySubcomponent(t, 11, sm.vstreamer, testStateClosed) + verifySubcomponent(t, 12, sm.rt, testStateClosed) + verifySubcomponent(t, 13, sm.se, testStateClosed) assert.Equal(t, topodatapb.TabletType_RDONLY, sm.target.TabletType) assert.Equal(t, StateNotConnected, sm.state) @@ -823,6 +827,7 @@ func newTestStateManager() *stateManager { ddle: &testOnlineDDLExecutor{}, diskHealthMonitor: newNoopDiskHealthMonitor(), throttler: &testLagThrottler{}, + qThrottler: &testQueryThrottler{}, tableGC: &testTableGC{}, rw: newRequestsWaiter(), } @@ -1021,6 +1026,21 @@ func (te *testOnlineDDLExecutor) Close() { te.state = testStateClosed } +type testQueryThrottler struct { + testOrderState +} + +func (te *testQueryThrottler) Open() error { + te.order = order.Add(1) + te.state = testStateOpen + return nil +} + +func (te *testQueryThrottler) Close() { + te.order = order.Add(1) + te.state = testStateClosed +} + type testLagThrottler struct { testOrderState } diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index dc83fb64e70..dbeccfb8767 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -68,19 +68,20 @@ var ( StatsLogger = streamlog.New[*LogStats]("TabletServer", 50) // The following vars are used for custom initialization of Tabletconfig. - enableHotRowProtection bool - enableHotRowProtectionDryRun bool - enableConsolidator bool - enableConsolidatorReplicas bool - enableHeartbeat bool - heartbeatInterval time.Duration - heartbeatOnDemandDuration time.Duration - healthCheckInterval time.Duration - semiSyncMonitorInterval time.Duration - degradedThreshold time.Duration - unhealthyThreshold time.Duration - transitionGracePeriod time.Duration - enableReplicationReporter bool + enableHotRowProtection bool + enableHotRowProtectionDryRun bool + enableConsolidator bool + enableConsolidatorReplicas bool + enableHeartbeat bool + heartbeatInterval time.Duration + heartbeatOnDemandDuration time.Duration + healthCheckInterval time.Duration + semiSyncMonitorInterval time.Duration + degradedThreshold time.Duration + unhealthyThreshold time.Duration + transitionGracePeriod time.Duration + enableReplicationReporter bool + queryThrottlerConfigRefreshInterval time.Duration ) func init() { @@ -219,6 +220,8 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.BoolVar(¤tConfig.EnablePerWorkloadTableMetrics, "enable-per-workload-table-metrics", defaultConfig.EnablePerWorkloadTableMetrics, "If true, query counts and query error metrics include a label that identifies the workload") fs.BoolVar(¤tConfig.SkipUserMetrics, "skip-user-metrics", defaultConfig.SkipUserMetrics, "If true, user based stats are not recorded.") + fs.DurationVar(&queryThrottlerConfigRefreshInterval, "query-throttler-config-refresh-interval", time.Minute, "How frequently to refresh configuration for the query throttler") + fs.BoolVar(¤tConfig.Unmanaged, "unmanaged", false, "Indicates an unmanaged tablet, i.e. using an external mysql-compatible database") } @@ -282,6 +285,8 @@ func Init() { currentConfig.GracePeriods.Transition = transitionGracePeriod currentConfig.SemiSyncMonitor.Interval = semiSyncMonitorInterval + currentConfig.QueryThrottlerConfigRefreshInterval = queryThrottlerConfigRefreshInterval + logFormat := streamlog.GetQueryLogConfig().Format switch logFormat { case streamlog.QueryLogFormatText: @@ -371,8 +376,9 @@ type TabletConfig struct { EnableViews bool `json:"-"` - EnablePerWorkloadTableMetrics bool `json:"-"` - SkipUserMetrics bool `json:"-"` + EnablePerWorkloadTableMetrics bool `json:"-"` + SkipUserMetrics bool `json:"-"` + QueryThrottlerConfigRefreshInterval time.Duration `json:"-"` } func (cfg *TabletConfig) MarshalJSON() ([]byte, error) { @@ -1119,6 +1125,8 @@ var defaultConfig = TabletConfig{ EnablePerWorkloadTableMetrics: false, TwoPCAbandonAge: 15 * time.Minute, + + QueryThrottlerConfigRefreshInterval: time.Minute, } // defaultTxThrottlerConfig returns the default TxThrottlerConfigFlag object based on diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 16492c4e8c3..d70d93d7dca 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -32,6 +32,8 @@ import ( "syscall" "time" + "vitess.io/vitess/go/vt/vttablet/tabletserver/querythrottler" + "vitess.io/vitess/go/acl" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/pools/smartconnpool" @@ -93,6 +95,12 @@ var logComputeRowSerializerKey = logutil.NewThrottledLogger("ComputeRowSerialize // perform one-time initializations and must be idempotent. // Open and Close can be called repeatedly during the lifetime of // a subcomponent. These should also be idempotent. + +const ( + throttlerPoolName = "ThrottlerPool" + queryThrottlerPoolName = "QueryThrottlerPool" +) + type TabletServer struct { exporter *servenv.Exporter config *tabletenv.TabletConfig @@ -118,6 +126,7 @@ type TabletServer struct { messager *messager.Engine hs *healthStreamer lagThrottler *throttle.Throttler + qThrottler *throttle.Throttler tableGC *gc.TableGC // sm manages state transitions. @@ -131,6 +140,8 @@ type TabletServer struct { checkMysqlGaugeFunc *stats.GaugeFunc env *vtenv.Environment + + queryThrottler *querythrottler.QueryThrottler } var _ queryservice.QueryService = (*TabletServer)(nil) @@ -177,7 +188,10 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c tsv.se = schema.NewEngine(tsv) tsv.hs = newHealthStreamer(tsv, alias, tsv.se) tsv.rt = repltracker.NewReplTracker(tsv, alias) - tsv.lagThrottler = throttle.NewThrottler(tsv, srvTopoServer, topoServer, alias, tsv.rt.HeartbeatWriter(), tabletTypeFunc) + tsv.lagThrottler = throttle.NewThrottler(tsv, srvTopoServer, topoServer, alias, tsv.rt.HeartbeatWriter(), tabletTypeFunc, throttlerPoolName) + tsv.qThrottler = throttle.NewThrottler(tsv, srvTopoServer, topoServer, alias, tsv.rt.HeartbeatWriter(), tabletTypeFunc, queryThrottlerPoolName) + tsv.queryThrottler = querythrottler.NewQueryThrottler(ctx, tsv.qThrottler, querythrottler.NewFileBasedConfigLoader(), tsv) + tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se, tsv.lagThrottler, alias.Cell) tsv.tracker = schema.NewTracker(tsv, tsv.vstreamer, tsv.se) tsv.watcher = NewBinlogWatcher(tsv, tsv.vstreamer, tsv.config) @@ -205,6 +219,7 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c messager: tsv.messager, ddle: tsv.onlineDDLExecutor, throttler: tsv.lagThrottler, + qThrottler: tsv.qThrottler, tableGC: tsv.tableGC, rw: newRequestsWaiter(), diskHealthMonitor: newDiskHealthMonitor(ctx), @@ -310,6 +325,7 @@ func (tsv *TabletServer) InitDBConfig(target *querypb.Target, dbcfgs *dbconfigs. tsv.hs.InitDBConfig(target) tsv.onlineDDLExecutor.InitDBConfig(target.Keyspace, target.Shard, dbcfgs.DBName) tsv.lagThrottler.InitDBConfig(target.Keyspace, target.Shard) + tsv.qThrottler.InitDBConfig(target.Keyspace, target.Shard) tsv.tableGC.InitDBConfig(target.Keyspace, target.Shard, dbcfgs.DBName) return nil } @@ -900,6 +916,11 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq } func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, reservedID int64, settings []string, options *querypb.ExecuteOptions) (result *sqltypes.Result, err error) { + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return nil, err + } + allowOnShutdown := transactionID != 0 timeout := tsv.loadQueryTimeoutWithTxAndOptions(transactionID, options) err = tsv.execRequest( @@ -916,6 +937,7 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq if err != nil { return err } + if err = plan.IsValid(reservedID != 0, len(settings) > 0); err != nil { return err } @@ -934,10 +956,6 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq return err } } - targetType, err := tsv.resolveTargetType(ctx, target) - if err != nil { - return err - } qre := &QueryExecutor{ query: query, marginComments: comments, diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index a0c6b693a23..fa6c6f2da70 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -216,7 +216,7 @@ type ThrottlerStatus struct { } // NewThrottler creates a Throttler -func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, heartbeatWriter heartbeat.HeartbeatWriter, tabletTypeFunc func() topodatapb.TabletType) *Throttler { +func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, heartbeatWriter heartbeat.HeartbeatWriter, tabletTypeFunc func() topodatapb.TabletType, connectionPoolName string) *Throttler { throttler := &Throttler{ tabletAlias: tabletAlias, env: env, @@ -224,7 +224,7 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv srvTopoServer: srvTopoServer, ts: ts, heartbeatWriter: heartbeatWriter, - pool: connpool.NewPool(env, "ThrottlerPool", tabletenv.ConnPoolConfig{ + pool: connpool.NewPool(env, connectionPoolName, tabletenv.ConnPoolConfig{ Size: 2, IdleTimeout: env.Config().OltpReadPool.IdleTimeout, }), diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go index 6d42ec779c2..f5e1f3cc812 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -79,6 +79,7 @@ const ( TestingName Name = "test" TestingAlwaysThrottledName Name = "always-throttled-app" + QueryThrottlerName Name = "query-throttler-app" ) var (