diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index c9c60af2d85..694de0747dc 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" @@ -501,7 +502,7 @@ func testScheduler(t *testing.T) { testTableSequentialTimes(t, t1uuid, t2uuid) }) - t.Run("ALTER both tables, elligible for concurrenct", func(t *testing.T) { + t.Run("ALTER both tables, elligible for concurrent", func(t *testing.T) { // ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" --allow-concurrent --postpone-completion", "vtgate", "", "", true)) // skip wait @@ -536,9 +537,11 @@ func testScheduler(t *testing.T) { }) testTableCompletionTimes(t, t2uuid, t1uuid) }) - t.Run("ALTER both tables, elligible for concurrenct, with throttling", func(t *testing.T) { + t.Run("ALTER both tables, elligible for concurrent, with throttling", func(t *testing.T) { onlineddl.ThrottleAllMigrations(t, &vtParams) defer onlineddl.UnthrottleAllMigrations(t, &vtParams) + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true) + // ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait @@ -555,6 +558,7 @@ func testScheduler(t *testing.T) { onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) }) + t.Run("check ready to complete (before)", func(t *testing.T) { for _, uuid := range []string{t1uuid, t2uuid} { rs := onlineddl.ReadMigrations(t, &vtParams, uuid) @@ -607,6 +611,8 @@ func testScheduler(t *testing.T) { testTableCompletionTimes(t, t2uuid, t1uuid) }) + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false) + t.Run("REVERT both tables concurrent, postponed", func(t *testing.T) { t1uuid = testRevertMigration(t, createRevertParams(t1uuid, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", true)) t2uuid = testRevertMigration(t, createRevertParams(t2uuid, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", true)) diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index e59e5759a75..c3b1bfa8864 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -35,11 +35,14 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" - "github.com/buger/jsonparser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +const ( + ThrottledAppsTimeout = 60 * time.Second +) + // VtgateExecQuery runs a query on VTGate using given query params func VtgateExecQuery(t *testing.T, vtParams *mysql.ConnParams, query string, expectError string) *sqltypes.Result { t.Helper() @@ -313,16 +316,35 @@ func UnthrottleAllMigrations(t *testing.T, vtParams *mysql.ConnParams) { // CheckThrottledApps checks for existence or non-existence of an app in the throttled apps list func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, throttlerApp throttlerapp.Name, expectFind bool) { - query := "show vitess_throttled_apps" - r := VtgateExecQuery(t, vtParams, query, "") - found := false - for _, row := range r.Named().Rows { - if throttlerApp.Equals(row.AsString("app", "")) { - found = true + ctx, cancel := context.WithTimeout(context.Background(), ThrottledAppsTimeout) + defer cancel() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + query := "show vitess_throttled_apps" + r := VtgateExecQuery(t, vtParams, query, "") + + appFound := false + for _, row := range r.Named().Rows { + if throttlerApp.Equals(row.AsString("app", "")) { + appFound = true + } + } + if appFound == expectFind { + // we're all good + return + } + + select { + case <-ctx.Done(): + assert.Failf(t, "CheckThrottledApps timed out waiting for %v to be in throttled status '%v'", throttlerApp.String(), expectFind) + return + case <-ticker.C: } } - assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", throttlerApp, found) } // WaitForThrottledTimestamp waits for a migration to have a non-empty last_throttled_timestamp @@ -350,33 +372,6 @@ func WaitForThrottledTimestamp(t *testing.T, vtParams *mysql.ConnParams, uuid st return } -// WaitForThrottlerStatusEnabled waits for a tablet to report its throttler status as enabled. -func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, timeout time.Duration) { - jsonPath := "IsEnabled" - url := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort) - - ctx, cancel := context.WithTimeout(context.Background(), throttlerConfigTimeout) - defer cancel() - - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - body := getHTTPBody(url) - val, err := jsonparser.GetBoolean([]byte(body), jsonPath) - require.NoError(t, err) - if val { - return - } - select { - case <-ctx.Done(): - t.Error("timeout waiting for tablet's throttler status to be enabled") - return - case <-ticker.C: - } - } -} - func getHTTPBody(url string) string { resp, err := http.Get(url) if err != nil { diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index 7a2f13cca77..6a404fc1f52 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -210,19 +210,23 @@ func UnthrottleApp(clusterInstance *cluster.LocalProcessCluster, throttlerApp th return throttleApp(clusterInstance, throttlerApp, false) } -// ThrottleAppAndWaitUntilTabletsConfirm -func ThrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name) (string, error) { - res, err := throttleApp(clusterInstance, throttlerApp, true) - if err != nil { - return res, err - } +func WaitUntilTabletsConfirmThrottledApp(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name, expectThrottled bool) { for _, ks := range clusterInstance.Keyspaces { for _, shard := range ks.Shards { for _, tablet := range shard.Vttablets { - WaitForThrottledApp(t, tablet, throttlerApp, true, ConfigTimeout) + WaitForThrottledApp(t, tablet, throttlerApp, expectThrottled, ConfigTimeout) } } } +} + +// ThrottleAppAndWaitUntilTabletsConfirm +func ThrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name) (string, error) { + res, err := throttleApp(clusterInstance, throttlerApp, true) + if err != nil { + return res, err + } + WaitUntilTabletsConfirmThrottledApp(t, clusterInstance, throttlerApp, true) return res, nil } @@ -232,13 +236,7 @@ func UnthrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *clus if err != nil { return res, err } - for _, ks := range clusterInstance.Keyspaces { - for _, shard := range ks.Shards { - for _, tablet := range shard.Vttablets { - WaitForThrottledApp(t, tablet, throttlerApp, false, ConfigTimeout) - } - } - } + WaitUntilTabletsConfirmThrottledApp(t, clusterInstance, throttlerApp, false) return res, nil } diff --git a/go/vt/proto/topodata/cached_size.go b/go/vt/proto/topodata/cached_size.go index 92da50b703e..d06ebd0d3f0 100644 --- a/go/vt/proto/topodata/cached_size.go +++ b/go/vt/proto/topodata/cached_size.go @@ -41,3 +41,21 @@ func (cached *KeyRange) CachedSize(alloc bool) int64 { } return size } +func (cached *ThrottledAppRule) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(80) + } + // field unknownFields []byte + { + size += hack.RuntimeAllocSize(int64(cap(cached.unknownFields))) + } + // field Name string + size += hack.RuntimeAllocSize(int64(len(cached.Name))) + // field ExpiresAt *vitess.io/vitess/go/vt/proto/vttime.Time + size += cached.ExpiresAt.CachedSize(true) + return size +} diff --git a/go/vt/proto/vttime/cached_size.go b/go/vt/proto/vttime/cached_size.go new file mode 100644 index 00000000000..e34da16852c --- /dev/null +++ b/go/vt/proto/vttime/cached_size.go @@ -0,0 +1,35 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by Sizegen. DO NOT EDIT. + +package vttime + +import hack "vitess.io/vitess/go/hack" + +func (cached *Time) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(64) + } + // field unknownFields []byte + { + size += hack.RuntimeAllocSize(int64(cap(cached.unknownFields))) + } + return size +} diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 8f7ca406d3d..72f3b3e7543 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -1097,6 +1097,20 @@ func (cached *SysVarSetAware) CachedSize(alloc bool) int64 { } return size } +func (cached *ThrottleApp) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(16) + } + // field Keyspace *vitess.io/vitess/go/vt/vtgate/vindexes.Keyspace + size += cached.Keyspace.CachedSize(true) + // field ThrottledAppRule *vitess.io/vitess/go/vt/proto/topodata.ThrottledAppRule + size += cached.ThrottledAppRule.CachedSize(true) + return size +} //go:nocheckptr func (cached *Update) CachedSize(alloc bool) int64 { diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 4149d4418e5..314da40ccee 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -93,6 +93,10 @@ func (t *noopVCursor) SetExec(ctx context.Context, name string, value string) er panic("implement me") } +func (t *noopVCursor) ThrottleApp(ctx context.Context, throttleAppRule *topodatapb.ThrottledAppRule) error { + panic("implement me") +} + func (t *noopVCursor) ShowExec(ctx context.Context, command sqlparser.ShowCommandType, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) { panic("implement me") } diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index ddded092887..825c7e82b9f 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -29,6 +29,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) @@ -111,6 +112,8 @@ type ( ShowExec(ctx context.Context, command sqlparser.ShowCommandType, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) // SetExec takes in k,v pair and use executor to set them in topo metadata. SetExec(ctx context.Context, name string, value string) error + // ThrottleApp sets a ThrottlerappRule in topo + ThrottleApp(ctx context.Context, throttleAppRule *topodatapb.ThrottledAppRule) error // CanUseSetVar returns true if system_settings can use SET_VAR hint. CanUseSetVar() bool diff --git a/go/vt/vtgate/engine/throttle_app.go b/go/vt/vtgate/engine/throttle_app.go new file mode 100644 index 00000000000..db485e6bec3 --- /dev/null +++ b/go/vt/vtgate/engine/throttle_app.go @@ -0,0 +1,89 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +var _ Primitive = (*ThrottleApp)(nil) + +// ThrottleApp represents the instructions to perform an online schema change via vtctld +type ThrottleApp struct { + Keyspace *vindexes.Keyspace + ThrottledAppRule *topodatapb.ThrottledAppRule + + noTxNeeded + + noInputs +} + +func (v *ThrottleApp) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "ThrottleApp", + Keyspace: v.Keyspace, + Other: map[string]any{ + "appName": v.ThrottledAppRule.Name, + "expireAt": v.ThrottledAppRule.ExpiresAt, + "ratio": v.ThrottledAppRule.Ratio, + }, + } +} + +// RouteType implements the Primitive interface +func (v *ThrottleApp) RouteType() string { + return "ThrottleApp" +} + +// GetKeyspaceName implements the Primitive interface +func (v *ThrottleApp) GetKeyspaceName() string { + return v.Keyspace.Name +} + +// GetTableName implements the Primitive interface +func (v *ThrottleApp) GetTableName() string { + return "" +} + +// TryExecute implements the Primitive interface +func (v *ThrottleApp) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (result *sqltypes.Result, err error) { + if err := vcursor.ThrottleApp(ctx, v.ThrottledAppRule); err != nil { + return nil, err + } + return &sqltypes.Result{}, nil +} + +// TryStreamExecute implements the Primitive interface +func (v *ThrottleApp) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + results, err := v.TryExecute(ctx, vcursor, bindVars, wantfields) + if err != nil { + return err + } + return callback(results) +} + +// GetFields implements the Primitive interface +func (v *ThrottleApp) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] GetFields is not reachable") +} diff --git a/go/vt/vtgate/planbuilder/builder.go b/go/vt/vtgate/planbuilder/builder.go index cad0b70b0ae..735a1cdd20c 100644 --- a/go/vt/vtgate/planbuilder/builder.go +++ b/go/vt/vtgate/planbuilder/builder.go @@ -247,7 +247,7 @@ func createInstructionFor(ctx context.Context, query string, stmt sqlparser.Stat case sqlparser.DDLStatement: return buildGeneralDDLPlan(ctx, query, stmt, reservedVars, vschema, enableOnlineDDL, enableDirectDDL) case *sqlparser.AlterMigration: - return buildAlterMigrationPlan(query, vschema, enableOnlineDDL) + return buildAlterMigrationPlan(query, stmt, vschema, enableOnlineDDL) case *sqlparser.RevertMigration: return buildRevertMigrationPlan(query, stmt, vschema, enableOnlineDDL) case *sqlparser.ShowMigrationLogs: diff --git a/go/vt/vtgate/planbuilder/migration.go b/go/vt/vtgate/planbuilder/migration.go index 468c86d3ffb..330a71cd982 100644 --- a/go/vt/vtgate/planbuilder/migration.go +++ b/go/vt/vtgate/planbuilder/migration.go @@ -17,19 +17,73 @@ limitations under the License. package planbuilder import ( + "strconv" + "time" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/logutil" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) -func buildAlterMigrationPlan(query string, vschema plancontext.VSchema, enableOnlineDDL bool) (*planResult, error) { +func validateThrottleParams(alterMigrationType sqlparser.AlterMigrationType, expireString string, ratioLiteral *sqlparser.Literal) (duration time.Duration, ratio float64, err error) { + switch alterMigrationType { + case sqlparser.UnthrottleMigrationType, + sqlparser.UnthrottleAllMigrationType: + // Unthrottling is like throttling with duration=0 + duration = 0 + default: + duration = time.Hour * 24 * 365 * 100 + if expireString != "" { + duration, err = time.ParseDuration(expireString) + if err != nil || duration < 0 { + return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid EXPIRE value: %s. Try '120s', '30m', '1h', etc. Allowed units are (s)ec, (m)in, (h)hour", expireString) + } + } + } + ratio = 1.0 + if ratioLiteral != nil { + ratio, err = strconv.ParseFloat(ratioLiteral.Val, 64) + if err != nil || ratio < 0 || ratio > 1 { + return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid RATIO value: %s. Try any decimal number between '0.0' (no throttle) and `1.0` (fully throttled)", ratioLiteral.Val) + } + } + return duration, ratio, nil +} + +func buildAlterMigrationThrottleAppPlan(query string, alterMigration *sqlparser.AlterMigration, keyspace *vindexes.Keyspace) (*planResult, error) { + duration, ratio, err := validateThrottleParams(alterMigration.Type, alterMigration.Expire, alterMigration.Ratio) + if err != nil { + return nil, err + } + expireAt := time.Now().Add(duration) + appName := alterMigration.UUID + if appName == "" { + appName = throttlerapp.OnlineDDLName.String() + } + throttledAppRule := &topodatapb.ThrottledAppRule{ + Name: appName, + ExpiresAt: logutil.TimeToProto(expireAt), + Ratio: ratio, + } + return newPlanResult(&engine.ThrottleApp{ + Keyspace: keyspace, + ThrottledAppRule: throttledAppRule, + }), nil +} + +func buildAlterMigrationPlan(query string, alterMigration *sqlparser.AlterMigration, vschema plancontext.VSchema, enableOnlineDDL bool) (*planResult, error) { if !enableOnlineDDL { return nil, schema.ErrOnlineDDLDisabled } + dest, ks, tabletType, err := vschema.TargetDestination("") if err != nil { return nil, err @@ -38,6 +92,15 @@ func buildAlterMigrationPlan(query string, vschema plancontext.VSchema, enableOn return nil, vterrors.VT09005() } + switch alterMigration.Type { + case sqlparser.ThrottleMigrationType, + sqlparser.ThrottleAllMigrationType, + sqlparser.UnthrottleMigrationType, + sqlparser.UnthrottleAllMigrationType: + // ALTER VITESS_MIGRATION ... THROTTLE ... queries go to topo (similarly to `vtctldclient UpdateThrottlerConfig`) + return buildAlterMigrationThrottleAppPlan(query, alterMigration, ks) + } + if tabletType != topodatapb.TabletType_PRIMARY { return nil, vterrors.VT09006("ALTER") } diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 4b48285d997..d54c68b1692 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -37,6 +37,7 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" @@ -1137,6 +1138,55 @@ func (vc *vcursorImpl) SetExec(ctx context.Context, name string, value string) e return vc.executor.setVitessMetadata(ctx, name, value) } +func (vc *vcursorImpl) ThrottleApp(ctx context.Context, throttledAppRule *topodatapb.ThrottledAppRule) (err error) { + if throttledAppRule == nil { + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ThrottleApp: nil rule") + } + if throttledAppRule.Name == "" { + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ThrottleApp: app name is empty") + } + // We don't strictly have to construct a UpdateThrottlerConfigRequest here, because we only populate it + // with a couple variables; we could do without it. However, constructing the request makes the remaining code + // consistent with vtctldclient/command/throttler.go and we prefer this consistency + req := &vtctldatapb.UpdateThrottlerConfigRequest{ + Keyspace: vc.keyspace, + ThrottledApp: throttledAppRule, + } + + update := func(throttlerConfig *topodatapb.ThrottlerConfig) *topodatapb.ThrottlerConfig { + if throttlerConfig == nil { + throttlerConfig = &topodatapb.ThrottlerConfig{} + } + if throttlerConfig.ThrottledApps == nil { + throttlerConfig.ThrottledApps = make(map[string]*topodatapb.ThrottledAppRule) + } + throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp + return throttlerConfig + } + + ctx, unlock, lockErr := vc.topoServer.LockKeyspace(ctx, req.Keyspace, "UpdateThrottlerConfig") + if lockErr != nil { + return lockErr + } + defer unlock(&err) + + ki, err := vc.topoServer.GetKeyspace(ctx, req.Keyspace) + if err != nil { + return err + } + + ki.ThrottlerConfig = update(ki.ThrottlerConfig) + + err = vc.topoServer.UpdateKeyspace(ctx, ki) + if err != nil { + return err + } + + _, err = vc.topoServer.UpdateSrvKeyspaceThrottlerConfig(ctx, req.Keyspace, []string{}, update) + + return err +} + func (vc *vcursorImpl) CanUseSetVar() bool { return sqlparser.IsMySQL80AndAbove() && setVarEnabled }