diff --git a/doc/vtctlReference.md b/doc/vtctlReference.md index 4d294919d42..cb946962586 100644 --- a/doc/vtctlReference.md +++ b/doc/vtctlReference.md @@ -1119,8 +1119,11 @@ Applies the VTGate routing schema to the provided keyspace. Shows the result aft | :-------- | :--------- | :--------- | | cells | string | If specified, limits the rebuild to the cells, after upload. Ignored if skipRebuild is set. | | skip_rebuild | Boolean | If set, do no rebuild the SrvSchema objects. | +| dry-run | Boolean | Shows the proposed change without executing it | | vschema | string | Identifies the VTGate routing schema | | vschema_file | string | Identifies the VTGate routing schema file | +| sql | string | Identifies a VSchema DDL SQL statement | +| sql_file | string | Identifies a VSchema DDL SQL statement | #### Arguments diff --git a/go/vt/topotools/vschema_ddl.go b/go/vt/topotools/vschema_ddl.go new file mode 100644 index 00000000000..56a28426a9e --- /dev/null +++ b/go/vt/topotools/vschema_ddl.go @@ -0,0 +1,162 @@ +/* +Copyright 2018 The Vitess Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topotools + +import ( + "reflect" + + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" +) + +// ApplyVSchemaDDL applies the given DDL statement to the vschema +// keyspace definition and returns the modified keyspace object. +func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, ddl *sqlparser.DDL) (*vschemapb.Keyspace, error) { + if ks == nil { + ks = new(vschemapb.Keyspace) + } + + if ks.Tables == nil { + ks.Tables = map[string]*vschemapb.Table{} + } + + if ks.Vindexes == nil { + ks.Vindexes = map[string]*vschemapb.Vindex{} + } + + var tableName string + var table *vschemapb.Table + if !ddl.Table.IsEmpty() { + tableName = ddl.Table.Name.String() + table, _ = ks.Tables[tableName] + } + + switch ddl.Action { + case sqlparser.CreateVindexStr: + name := ddl.VindexSpec.Name.String() + if _, ok := ks.Vindexes[name]; ok { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already exists in keyspace %s", name, ksName) + } + + // Make sure the keyspace has the sharded bit set to true + // if this is the first vindex defined in the keyspace. + if len(ks.Vindexes) == 0 { + ks.Sharded = true + } + + owner, params := ddl.VindexSpec.ParseParams() + ks.Vindexes[name] = &vschemapb.Vindex{ + Type: ddl.VindexSpec.Type.String(), + Params: params, + Owner: owner, + } + + return ks, nil + + case sqlparser.AddColVindexStr: + // Support two cases: + // + // 1. The vindex type / params / owner are specified. If the + // named vindex doesn't exist, create it. If it does exist, + // require the parameters to match. + // + // 2. The vindex type is not specified. Make sure the vindex + // already exists. + spec := ddl.VindexSpec + name := spec.Name.String() + if !spec.Type.IsEmpty() { + owner, params := spec.ParseParams() + if vindex, ok := ks.Vindexes[name]; ok { + if vindex.Type != spec.Type.String() { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with type %s not %s", name, vindex.Type, spec.Type.String()) + } + if vindex.Owner != owner { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with owner %s not %s", name, vindex.Owner, owner) + } + if (len(vindex.Params) != 0 || len(params) != 0) && !reflect.DeepEqual(vindex.Params, params) { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with different parameters", name) + } + } else { + // Make sure the keyspace has the sharded bit set to true + // if this is the first vindex defined in the keyspace. + if len(ks.Vindexes) == 0 { + ks.Sharded = true + } + ks.Vindexes[name] = &vschemapb.Vindex{ + Type: spec.Type.String(), + Params: params, + Owner: owner, + } + } + } else { + if _, ok := ks.Vindexes[name]; !ok { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s does not exist in keyspace %s", name, ksName) + } + } + + // If this is the first vindex being defined on the table, create + // the empty table record + if table == nil { + table = &vschemapb.Table{ + ColumnVindexes: make([]*vschemapb.ColumnVindex, 0, 4), + } + } + + // Make sure there isn't already a vindex with the same name on + // this table. + for _, vindex := range table.ColumnVindexes { + if vindex.Name == name { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already defined on table %s", name, tableName) + } + } + + columns := make([]string, len(ddl.VindexCols), len(ddl.VindexCols)) + for i, col := range ddl.VindexCols { + columns[i] = col.String() + } + table.ColumnVindexes = append(table.ColumnVindexes, &vschemapb.ColumnVindex{ + Name: name, + Columns: columns, + }) + ks.Tables[tableName] = table + + return ks, nil + + case sqlparser.DropColVindexStr: + spec := ddl.VindexSpec + name := spec.Name.String() + if table == nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s.%s not defined in vschema", ksName, tableName) + } + + for i, colVindex := range table.ColumnVindexes { + if colVindex.Name == name { + table.ColumnVindexes = append(table.ColumnVindexes[:i], table.ColumnVindexes[i+1:]...) + if len(table.ColumnVindexes) == 0 { + delete(ks.Tables, tableName) + } + return ks, nil + } + } + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s not defined in table %s.%s", name, ksName, tableName) + } + + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected vindex ddl operation %s", ddl.Action) +} diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 16167296870..f01f416efe6 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -117,6 +117,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/schemamanager" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" @@ -390,7 +391,7 @@ var commands = []commandGroup{ "", "Displays the VTGate routing schema."}, {"ApplyVSchema", commandApplyVSchema, - "{-vschema= || -vschema_file=} [-cells=c1,c2,...] [-skip_rebuild] ", + "{-vschema= || -vschema_file= || -sql= || -sql_file=} [-cells=c1,c2,...] [-skip_rebuild] [-dry-run] ", "Applies the VTGate routing schema to the provided keyspace. Shows the result after application."}, {"RebuildVSchemaGraph", commandRebuildVSchemaGraph, "[-cells=c1,c2,...]", @@ -2122,6 +2123,9 @@ func commandRebuildVSchemaGraph(ctx context.Context, wr *wrangler.Wrangler, subF func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { vschema := subFlags.String("vschema", "", "Identifies the VTGate routing schema") vschemaFile := subFlags.String("vschema_file", "", "Identifies the VTGate routing schema file") + sql := subFlags.String("sql", "", "A vschema ddl SQL statement (e.g. `add vindex`, `alter table t add vindex hash(id)`, etc)") + sqlFile := subFlags.String("sql_file", "", "A vschema ddl SQL statement (e.g. `add vindex`, `alter table t add vindex hash(id)`, etc)") + dryRun := subFlags.Bool("dry-run", false, "If set, do not save the altered vschema, simply echo to console.") skipRebuild := subFlags.Bool("skip_rebuild", false, "If set, do no rebuild the SrvSchema objects.") var cells flagutil.StringListValue subFlags.Var(&cells, "cells", "If specified, limits the rebuild to the cells, after upload. Ignored if skipRebuild is set.") @@ -2132,34 +2136,88 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f if subFlags.NArg() != 1 { return fmt.Errorf("the argument is required for the ApplyVSchema command") } - if (*vschema == "") == (*vschemaFile == "") { - return fmt.Errorf("either the vschema or vschemaFile flag must be specified when calling the ApplyVSchema command") + keyspace := subFlags.Arg(0) + + var vs *vschemapb.Keyspace + var err error + + sqlMode := (*sql != "") != (*sqlFile != "") + jsonMode := (*vschema != "") != (*vschemaFile != "") + + if sqlMode && jsonMode { + return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") } - var schema []byte - if *vschemaFile != "" { - var err error - schema, err = ioutil.ReadFile(*vschemaFile) + + if !sqlMode && !jsonMode { + return fmt.Errorf("one of the sql, sql_file, vschema, or vschema_file flags must be specified when calling the ApplyVSchema command") + } + + if sqlMode { + if *sqlFile != "" { + sqlBytes, err := ioutil.ReadFile(*sqlFile) + if err != nil { + return err + } + *sql = string(sqlBytes) + } + + stmt, err := sqlparser.Parse(*sql) + if err != nil { + return fmt.Errorf("error parsing vschema statement `%s`: %v", *sql, err) + } + ddl, ok := stmt.(*sqlparser.DDL) + if !ok { + return fmt.Errorf("error parsing vschema statement `%s`: not a ddl statement", *sql) + } + + vs, err = wr.TopoServer().GetVSchema(ctx, keyspace) + if err != nil { + if topo.IsErrType(err, topo.NoNode) { + vs = &vschemapb.Keyspace{} + } else { + return err + } + } + + vs, err = topotools.ApplyVSchemaDDL(keyspace, vs, ddl) if err != nil { return err } + } else { - schema = []byte(*vschema) - } - var vs vschemapb.Keyspace - err := json2.Unmarshal(schema, &vs) - if err != nil { - return err - } - keyspace := subFlags.Arg(0) - if err := wr.TopoServer().SaveVSchema(ctx, keyspace, &vs); err != nil { - return err + // json mode + var schema []byte + if *vschemaFile != "" { + var err error + schema, err = ioutil.ReadFile(*vschemaFile) + if err != nil { + return err + } + } else { + schema = []byte(*vschema) + } + + vs = &vschemapb.Keyspace{} + err := json2.Unmarshal(schema, vs) + if err != nil { + return err + } } - b, err := json2.MarshalIndentPB(&vs, " ") + b, err := json2.MarshalIndentPB(vs, " ") if err != nil { wr.Logger().Errorf("Failed to marshal VSchema for display: %v", err) } else { - wr.Logger().Printf("Uploaded VSchema object:\n%s\nIf this is not what you expected, check the input data (as JSON parsing will skip unexpected fields).\n", b) + wr.Logger().Printf("New VSchema object:\n%s\nIf this is not what you expected, check the input data (as JSON parsing will skip unexpected fields).\n", b) + } + + if *dryRun { + wr.Logger().Printf("Dry run: Skipping update of VSchema\n") + return nil + } + + if err := wr.TopoServer().SaveVSchema(ctx, keyspace, vs); err != nil { + return err } if *skipRebuild { diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index f12dbc7d122..335ac41b779 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "net/http" - "reflect" "sort" "strings" "sync" @@ -42,6 +41,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/planbuilder" @@ -50,7 +50,6 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -339,7 +338,7 @@ func (e *Executor) handleDDL(ctx context.Context, safeSession *SafeSession, sql logStats.PlanTime = execStart.Sub(logStats.StartTime) switch ddl.Action { case sqlparser.CreateVindexStr, sqlparser.AddColVindexStr, sqlparser.DropColVindexStr: - err := e.handleVindexDDL(ctx, safeSession, dest, destKeyspace, destTabletType, ddl, logStats) + err := e.handleVSchemaDDL(ctx, safeSession, dest, destKeyspace, destTabletType, ddl, logStats) logStats.ExecuteTime = time.Since(execStart) return &sqltypes.Result{}, err default: @@ -366,7 +365,7 @@ func (e *Executor) handleDDL(ctx context.Context, safeSession *SafeSession, sql return result, err } -func (e *Executor) handleVindexDDL(ctx context.Context, safeSession *SafeSession, dest key.Destination, destKeyspace string, destTabletType topodatapb.TabletType, ddl *sqlparser.DDL, logStats *LogStats) error { +func (e *Executor) handleVSchemaDDL(ctx context.Context, safeSession *SafeSession, dest key.Destination, destKeyspace string, destTabletType topodatapb.TabletType, ddl *sqlparser.DDL, logStats *LogStats) error { vschema := e.vm.GetCurrentSrvVschema() if vschema == nil { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vschema not loaded") @@ -386,129 +385,20 @@ func (e *Executor) handleVindexDDL(ctx context.Context, safeSession *SafeSession if ksName == "" { ksName = destKeyspace } - if ksName == "" { return errNoKeyspace } ks, _ := vschema.Keyspaces[ksName] - if ks == nil { - ks = new(vschemapb.Keyspace) - vschema.Keyspaces[ksName] = ks - } + ks, err := topotools.ApplyVSchemaDDL(ksName, ks, ddl) - if ks.Tables == nil { - ks.Tables = map[string]*vschemapb.Table{} - } - - if ks.Vindexes == nil { - ks.Vindexes = map[string]*vschemapb.Vindex{} - } - - var tableName string - var table *vschemapb.Table - if !ddl.Table.IsEmpty() { - tableName = ddl.Table.Name.String() - table, _ = ks.Tables[tableName] + if err != nil { + return err } - switch ddl.Action { - case sqlparser.CreateVindexStr: - name := ddl.VindexSpec.Name.String() - if _, ok := ks.Vindexes[name]; ok { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already exists in keyspace %s", name, destKeyspace) - } - owner, params := ddl.VindexSpec.ParseParams() - ks.Vindexes[name] = &vschemapb.Vindex{ - Type: ddl.VindexSpec.Type.String(), - Params: params, - Owner: owner, - } - - return e.vm.UpdateVSchema(ctx, destKeyspace, vschema) - case sqlparser.AddColVindexStr: - // Support two cases: - // - // 1. The vindex type / params / owner are specified. If the - // named vindex doesn't exist, create it. If it does exist, - // require the parameters to match. - // - // 2. The vindex type is not specified. Make sure the vindex - // already exists. - spec := ddl.VindexSpec - name := spec.Name.String() - if !spec.Type.IsEmpty() { - owner, params := spec.ParseParams() - if vindex, ok := ks.Vindexes[name]; ok { - if vindex.Type != spec.Type.String() { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with type %s not %s", name, vindex.Type, spec.Type.String()) - } - if vindex.Owner != owner { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with owner %s not %s", name, vindex.Owner, owner) - } - if (len(vindex.Params) != 0 || len(params) != 0) && !reflect.DeepEqual(vindex.Params, params) { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with different parameters", name) - } - } else { - ks.Vindexes[name] = &vschemapb.Vindex{ - Type: spec.Type.String(), - Params: params, - Owner: owner, - } - } - } else { - if _, ok := ks.Vindexes[name]; !ok { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s does not exist in keyspace %s", name, destKeyspace) - } - } - - // If this is the first vindex being defined on the table, create - // the empty table record - if table == nil { - table = &vschemapb.Table{ - ColumnVindexes: make([]*vschemapb.ColumnVindex, 0, 4), - } - } - - // Make sure there isn't already a vindex with the same name on - // this table. - for _, vindex := range table.ColumnVindexes { - if vindex.Name == name { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already defined on table %s", name, tableName) - } - } - - columns := make([]string, len(ddl.VindexCols), len(ddl.VindexCols)) - for i, col := range ddl.VindexCols { - columns[i] = col.String() - } - table.ColumnVindexes = append(table.ColumnVindexes, &vschemapb.ColumnVindex{ - Name: name, - Columns: columns, - }) - ks.Tables[tableName] = table - - return e.vm.UpdateVSchema(ctx, destKeyspace, vschema) - case sqlparser.DropColVindexStr: - spec := ddl.VindexSpec - name := spec.Name.String() - if table == nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s.%s not defined in vschema", ksName, tableName) - } - - for i, colVindex := range table.ColumnVindexes { - if colVindex.Name == name { - table.ColumnVindexes = append(table.ColumnVindexes[:i], table.ColumnVindexes[i+1:]...) - if len(table.ColumnVindexes) == 0 { - delete(ks.Tables, tableName) - } - return e.vm.UpdateVSchema(ctx, destKeyspace, vschema) - } - } - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s not defined in table %s.%s", name, ksName, tableName) - } + vschema.Keyspaces[ksName] = ks - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected vindex ddl operation %s", ddl.Action) + return e.vm.UpdateVSchema(ctx, ksName, vschema) } func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) { diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 93ca6ff1f36..68f55d24e5b 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1203,51 +1203,51 @@ func TestExecutorCreateVindexDDL(t *testing.T) { t.Fatalf("test_vindex should not exist in original vschema") } + session := NewSafeSession(&vtgatepb.Session{TargetString: ks}) stmt := "create vindex test_vindex using hash" - wantCount := []int64{0, 0, 0} - _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{TargetString: ks}), stmt, nil) + _, err := executor.Execute(context.Background(), "TestExecute", session, stmt, nil) if err != nil { t.Error(err) } - gotCount := []int64{ - sbc1.ExecCount.Get(), - sbc2.ExecCount.Get(), - sbclookup.ExecCount.Get(), - } - if !reflect.DeepEqual(gotCount, wantCount) { - t.Errorf("Exec %s: %v, want %v", stmt, gotCount, wantCount) + + vschema, vindex = waitForVindex(t, ks, "test_vindex", vschemaUpdates, executor) + if vindex == nil || vindex.Type != "hash" { + t.Errorf("updated vschema did not contain test_vindex") } - time.Sleep(10 * time.Millisecond) + _, err = executor.Execute(context.Background(), "TestExecute", session, stmt, nil) + wantErr := "vindex test_vindex already exists in keyspace TestExecutor" + if err == nil || err.Error() != wantErr { + t.Errorf("create duplicate vindex: %v, want %s", err, wantErr) + } select { case vschema = <-vschemaUpdates: - vindex, ok = vschema.Keyspaces[ks].Vindexes["test_vindex"] - if !ok || vindex.Type != "hash" { - t.Errorf("updated vschema did not contain test_vindex") - } + t.Errorf("vschema should not be updated on error") default: - t.Errorf("vschema was not updated as expected") } - // Wait up to 10ms until the vindex manager gets notified of the update - for i := 0; i < 10; i++ { - vschema = executor.vm.GetCurrentSrvVschema() - vindex, ok = vschema.Keyspaces[ks].Vindexes["test_vindex"] - if ok { - break - } - time.Sleep(time.Millisecond) - } - if !ok || vindex.Type != "hash" { - t.Errorf("updated vschema did not contain test_vindex") + // Create a new vschema keyspace implicitly by creating a vindex with a different + // target in the session + ksNew := "test_new_keyspace" + session = NewSafeSession(&vtgatepb.Session{TargetString: ksNew}) + stmt = "create vindex test_vindex2 using hash" + _, err = executor.Execute(context.Background(), "TestExecute", session, stmt, nil) + if err != nil { + t.Fatalf("error in %s: %v", stmt, err) } - _, err = executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{TargetString: ks}), stmt, nil) - wantErr := "vindex test_vindex already exists in keyspace TestExecutor" - if err == nil || err.Error() != wantErr { - t.Errorf("create duplicate vindex: %v, want %s", err, wantErr) + vschema, vindex = waitForVindex(t, ksNew, "test_vindex2", vschemaUpdates, executor) + if vindex.Type != "hash" { + t.Errorf("vindex type %s not hash", vindex.Type) + } + keyspace, ok := vschema.Keyspaces[ksNew] + if !ok || !keyspace.Sharded { + t.Errorf("keyspace should have been created with Sharded=true") } - gotCount = []int64{ + + // No queries should have gone to any tablets + wantCount := []int64{0, 0, 0} + gotCount := []int64{ sbc1.ExecCount.Get(), sbc2.ExecCount.Get(), sbclookup.ExecCount.Get(), @@ -1255,11 +1255,6 @@ func TestExecutorCreateVindexDDL(t *testing.T) { if !reflect.DeepEqual(gotCount, wantCount) { t.Errorf("Exec %s: %v, want %v", stmt, gotCount, wantCount) } - select { - case vschema = <-vschemaUpdates: - t.Errorf("vschema shoud not be updated on error") - default: - } } func TestExecutorAddDropVindexDDL(t *testing.T) { @@ -1561,8 +1556,8 @@ func TestExecutorAddDropVindexDDL(t *testing.T) { } stmt = "alter table nonexistent drop vindex test_lookup" - _, err = executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{}), stmt, nil) - wantErr = errNoKeyspace.Error() + _, err = executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{TargetString: "InvalidKeyspace"}), stmt, nil) + wantErr = "table InvalidKeyspace.nonexistent not defined in vschema" if err == nil || err.Error() != wantErr { t.Errorf("got %v want err %s", err, wantErr) } diff --git a/go/vt/vtgate/vschema_manager.go b/go/vt/vtgate/vschema_manager.go index 5462a7fdcc4..5823c080c14 100644 --- a/go/vt/vtgate/vschema_manager.go +++ b/go/vt/vtgate/vschema_manager.go @@ -121,11 +121,11 @@ func (vm *VSchemaManager) watchSrvVSchema(ctx context.Context, cell string) { // UpdateVSchema propagates the updated vschema to the topo. The entry for // the given keyspace is updated in the global topo, and the full SrvVSchema // is updated in all known cells. -func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, keyspace string, vschema *vschemapb.SrvVSchema) error { - // update the global vschema, then update the SrvVschema for - // each cell - ks, _ := vschema.Keyspaces[keyspace] - err := vm.e.serv.GetTopoServer().SaveVSchema(ctx, keyspace, ks) +func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, vschema *vschemapb.SrvVSchema) error { + topo := vm.e.serv.GetTopoServer() + + ks := vschema.Keyspaces[ksName] + err := topo.SaveVSchema(ctx, ksName, ks) if err != nil { return err }