From 670647ba52970495b1262a41c0ab21678ba4526f Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sat, 24 Nov 2018 13:09:03 -0500 Subject: [PATCH 01/10] rework vtgate's vschema_manager to use topotools.RebuildVschema Pull in the topotools.RebuildVschema implementation into the vtgate VschemaManager to share the same helper function that vtctl uses to find all cells and propagate the changed vschema to all of them. This changes the UpdateVschema interface to take in a single modified keyspace and not the whole SrvVschema. Signed-off-by: Michael Demmer --- go/vt/vtgate/executor.go | 6 +++--- go/vt/vtgate/vschema_manager.go | 26 ++++++-------------------- 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index f12dbc7d122..459a2cae727 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -425,7 +425,7 @@ func (e *Executor) handleVindexDDL(ctx context.Context, safeSession *SafeSession Owner: owner, } - return e.vm.UpdateVSchema(ctx, destKeyspace, vschema) + return e.vm.UpdateVSchema(ctx, destKeyspace, ks) case sqlparser.AddColVindexStr: // Support two cases: // @@ -488,7 +488,7 @@ func (e *Executor) handleVindexDDL(ctx context.Context, safeSession *SafeSession }) ks.Tables[tableName] = table - return e.vm.UpdateVSchema(ctx, destKeyspace, vschema) + return e.vm.UpdateVSchema(ctx, destKeyspace, ks) case sqlparser.DropColVindexStr: spec := ddl.VindexSpec name := spec.Name.String() @@ -502,7 +502,7 @@ func (e *Executor) handleVindexDDL(ctx context.Context, safeSession *SafeSession if len(table.ColumnVindexes) == 0 { delete(ks.Tables, tableName) } - return e.vm.UpdateVSchema(ctx, destKeyspace, vschema) + return e.vm.UpdateVSchema(ctx, destKeyspace, ks) } } return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s not defined in table %s.%s", name, ksName, tableName) diff --git a/go/vt/vtgate/vschema_manager.go b/go/vt/vtgate/vschema_manager.go index 5462a7fdcc4..72d5cca0993 100644 --- a/go/vt/vtgate/vschema_manager.go +++ b/go/vt/vtgate/vschema_manager.go @@ -24,7 +24,9 @@ import ( "github.com/golang/protobuf/proto" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtgate/vindexes" vschemapb "vitess.io/vitess/go/vt/proto/vschema" @@ -121,28 +123,12 @@ func (vm *VSchemaManager) watchSrvVSchema(ctx context.Context, cell string) { // UpdateVSchema propagates the updated vschema to the topo. The entry for // the given keyspace is updated in the global topo, and the full SrvVSchema // is updated in all known cells. -func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, keyspace string, vschema *vschemapb.SrvVSchema) error { - // update the global vschema, then update the SrvVschema for - // each cell - ks, _ := vschema.Keyspaces[keyspace] - err := vm.e.serv.GetTopoServer().SaveVSchema(ctx, keyspace, ks) +func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, keyspace *vschemapb.Keyspace) error { + topo := vm.e.serv.GetTopoServer() + err := topo.SaveVSchema(ctx, ksName, keyspace) if err != nil { return err } - cells, err := vm.e.serv.GetTopoServer().GetKnownCells(ctx) - if err != nil { - return err - } - - // even if one cell fails, continue to try the others - for _, cell := range cells { - cellErr := vm.e.serv.GetTopoServer().UpdateSrvVSchema(ctx, cell, vschema) - if cellErr != nil { - err = cellErr - log.Errorf("error updating vschema in cell %s: %v", cell, cellErr) - } - } - - return err + return topotools.RebuildVSchema(ctx, logutil.NewConsoleLogger(), topo, nil) } From 94cb4af27abe47d1e191121afebae675e031c023 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sat, 24 Nov 2018 13:14:53 -0500 Subject: [PATCH 02/10] add a -dry_run flag to vtctl ApplyVschema Signed-off-by: Michael Demmer --- go/vt/vtctl/vtctl.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 16167296870..0ad15e1c28d 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2122,6 +2122,7 @@ func commandRebuildVSchemaGraph(ctx context.Context, wr *wrangler.Wrangler, subF func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { vschema := subFlags.String("vschema", "", "Identifies the VTGate routing schema") vschemaFile := subFlags.String("vschema_file", "", "Identifies the VTGate routing schema file") + dryRun := subFlags.Bool("dry_run", false, "If set, do not save the altered vschema, simply echo to console.") skipRebuild := subFlags.Bool("skip_rebuild", false, "If set, do no rebuild the SrvSchema objects.") var cells flagutil.StringListValue subFlags.Var(&cells, "cells", "If specified, limits the rebuild to the cells, after upload. Ignored if skipRebuild is set.") @@ -2150,16 +2151,23 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f if err != nil { return err } + keyspace := subFlags.Arg(0) - if err := wr.TopoServer().SaveVSchema(ctx, keyspace, &vs); err != nil { - return err - } b, err := json2.MarshalIndentPB(&vs, " ") if err != nil { wr.Logger().Errorf("Failed to marshal VSchema for display: %v", err) } else { - wr.Logger().Printf("Uploaded VSchema object:\n%s\nIf this is not what you expected, check the input data (as JSON parsing will skip unexpected fields).\n", b) + wr.Logger().Printf("New VSchema object:\n%s\nIf this is not what you expected, check the input data (as JSON parsing will skip unexpected fields).\n", b) + } + + if *dryRun { + wr.Logger().Printf("Dry run: Skipping update of VSchema\n") + return nil + } + + if err := wr.TopoServer().SaveVSchema(ctx, keyspace, &vs); err != nil { + return err } if *skipRebuild { From 6acad1d5ea58e0d99fdfeb112c01aa73ef22aad9 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sat, 24 Nov 2018 15:48:42 -0500 Subject: [PATCH 03/10] move the vschema ddl implementation into a new topotools helper Abstract out the execution of the vschema ddl statements into a new function in topotools. Signed-off-by: Michael Demmer --- go/vt/topotools/vschema_ddl.go | 149 +++++++++++++++++++++++++++++++++ go/vt/vtgate/executor.go | 126 ++-------------------------- 2 files changed, 156 insertions(+), 119 deletions(-) create mode 100644 go/vt/topotools/vschema_ddl.go diff --git a/go/vt/topotools/vschema_ddl.go b/go/vt/topotools/vschema_ddl.go new file mode 100644 index 00000000000..ffba159325d --- /dev/null +++ b/go/vt/topotools/vschema_ddl.go @@ -0,0 +1,149 @@ +/* +Copyright 2018 The Vitess Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topotools + +import ( + "reflect" + + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" +) + +// ApplyVSchemaDDL applies the given DDL statement to the vschema +// keyspace definition and returns the modified keyspace object. +func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, ddl *sqlparser.DDL) (*vschemapb.Keyspace, error) { + if ks == nil { + ks = new(vschemapb.Keyspace) + } + + if ks.Tables == nil { + ks.Tables = map[string]*vschemapb.Table{} + } + + if ks.Vindexes == nil { + ks.Vindexes = map[string]*vschemapb.Vindex{} + } + + var tableName string + var table *vschemapb.Table + if !ddl.Table.IsEmpty() { + tableName = ddl.Table.Name.String() + table, _ = ks.Tables[tableName] + } + + switch ddl.Action { + case sqlparser.CreateVindexStr: + name := ddl.VindexSpec.Name.String() + if _, ok := ks.Vindexes[name]; ok { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already exists in keyspace %s", name, ksName) + } + owner, params := ddl.VindexSpec.ParseParams() + ks.Vindexes[name] = &vschemapb.Vindex{ + Type: ddl.VindexSpec.Type.String(), + Params: params, + Owner: owner, + } + return ks, nil + + case sqlparser.AddColVindexStr: + // Support two cases: + // + // 1. The vindex type / params / owner are specified. If the + // named vindex doesn't exist, create it. If it does exist, + // require the parameters to match. + // + // 2. The vindex type is not specified. Make sure the vindex + // already exists. + spec := ddl.VindexSpec + name := spec.Name.String() + if !spec.Type.IsEmpty() { + owner, params := spec.ParseParams() + if vindex, ok := ks.Vindexes[name]; ok { + if vindex.Type != spec.Type.String() { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with type %s not %s", name, vindex.Type, spec.Type.String()) + } + if vindex.Owner != owner { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with owner %s not %s", name, vindex.Owner, owner) + } + if (len(vindex.Params) != 0 || len(params) != 0) && !reflect.DeepEqual(vindex.Params, params) { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with different parameters", name) + } + } else { + ks.Vindexes[name] = &vschemapb.Vindex{ + Type: spec.Type.String(), + Params: params, + Owner: owner, + } + } + } else { + if _, ok := ks.Vindexes[name]; !ok { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s does not exist in keyspace %s", name, ksName) + } + } + + // If this is the first vindex being defined on the table, create + // the empty table record + if table == nil { + table = &vschemapb.Table{ + ColumnVindexes: make([]*vschemapb.ColumnVindex, 0, 4), + } + } + + // Make sure there isn't already a vindex with the same name on + // this table. + for _, vindex := range table.ColumnVindexes { + if vindex.Name == name { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already defined on table %s", name, tableName) + } + } + + columns := make([]string, len(ddl.VindexCols), len(ddl.VindexCols)) + for i, col := range ddl.VindexCols { + columns[i] = col.String() + } + table.ColumnVindexes = append(table.ColumnVindexes, &vschemapb.ColumnVindex{ + Name: name, + Columns: columns, + }) + ks.Tables[tableName] = table + + return ks, nil + + case sqlparser.DropColVindexStr: + spec := ddl.VindexSpec + name := spec.Name.String() + if table == nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s.%s not defined in vschema", ksName, tableName) + } + + for i, colVindex := range table.ColumnVindexes { + if colVindex.Name == name { + table.ColumnVindexes = append(table.ColumnVindexes[:i], table.ColumnVindexes[i+1:]...) + if len(table.ColumnVindexes) == 0 { + delete(ks.Tables, tableName) + } + return ks, nil + } + } + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s not defined in table %s.%s", name, ksName, tableName) + } + + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected vindex ddl operation %s", ddl.Action) +} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 459a2cae727..b449f786c8c 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "net/http" - "reflect" "sort" "strings" "sync" @@ -42,6 +41,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/planbuilder" @@ -50,7 +50,6 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -339,7 +338,7 @@ func (e *Executor) handleDDL(ctx context.Context, safeSession *SafeSession, sql logStats.PlanTime = execStart.Sub(logStats.StartTime) switch ddl.Action { case sqlparser.CreateVindexStr, sqlparser.AddColVindexStr, sqlparser.DropColVindexStr: - err := e.handleVindexDDL(ctx, safeSession, dest, destKeyspace, destTabletType, ddl, logStats) + err := e.handleVSchemaDDL(ctx, safeSession, dest, destKeyspace, destTabletType, ddl, logStats) logStats.ExecuteTime = time.Since(execStart) return &sqltypes.Result{}, err default: @@ -366,7 +365,7 @@ func (e *Executor) handleDDL(ctx context.Context, safeSession *SafeSession, sql return result, err } -func (e *Executor) handleVindexDDL(ctx context.Context, safeSession *SafeSession, dest key.Destination, destKeyspace string, destTabletType topodatapb.TabletType, ddl *sqlparser.DDL, logStats *LogStats) error { +func (e *Executor) handleVSchemaDDL(ctx context.Context, safeSession *SafeSession, dest key.Destination, destKeyspace string, destTabletType topodatapb.TabletType, ddl *sqlparser.DDL, logStats *LogStats) error { vschema := e.vm.GetCurrentSrvVschema() if vschema == nil { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vschema not loaded") @@ -386,129 +385,18 @@ func (e *Executor) handleVindexDDL(ctx context.Context, safeSession *SafeSession if ksName == "" { ksName = destKeyspace } - if ksName == "" { return errNoKeyspace } ks, _ := vschema.Keyspaces[ksName] - if ks == nil { - ks = new(vschemapb.Keyspace) - vschema.Keyspaces[ksName] = ks - } - - if ks.Tables == nil { - ks.Tables = map[string]*vschemapb.Table{} - } - - if ks.Vindexes == nil { - ks.Vindexes = map[string]*vschemapb.Vindex{} - } - - var tableName string - var table *vschemapb.Table - if !ddl.Table.IsEmpty() { - tableName = ddl.Table.Name.String() - table, _ = ks.Tables[tableName] - } + ks, err := topotools.ApplyVSchemaDDL(ksName, ks, ddl) - switch ddl.Action { - case sqlparser.CreateVindexStr: - name := ddl.VindexSpec.Name.String() - if _, ok := ks.Vindexes[name]; ok { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already exists in keyspace %s", name, destKeyspace) - } - owner, params := ddl.VindexSpec.ParseParams() - ks.Vindexes[name] = &vschemapb.Vindex{ - Type: ddl.VindexSpec.Type.String(), - Params: params, - Owner: owner, - } - - return e.vm.UpdateVSchema(ctx, destKeyspace, ks) - case sqlparser.AddColVindexStr: - // Support two cases: - // - // 1. The vindex type / params / owner are specified. If the - // named vindex doesn't exist, create it. If it does exist, - // require the parameters to match. - // - // 2. The vindex type is not specified. Make sure the vindex - // already exists. - spec := ddl.VindexSpec - name := spec.Name.String() - if !spec.Type.IsEmpty() { - owner, params := spec.ParseParams() - if vindex, ok := ks.Vindexes[name]; ok { - if vindex.Type != spec.Type.String() { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with type %s not %s", name, vindex.Type, spec.Type.String()) - } - if vindex.Owner != owner { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with owner %s not %s", name, vindex.Owner, owner) - } - if (len(vindex.Params) != 0 || len(params) != 0) && !reflect.DeepEqual(vindex.Params, params) { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with different parameters", name) - } - } else { - ks.Vindexes[name] = &vschemapb.Vindex{ - Type: spec.Type.String(), - Params: params, - Owner: owner, - } - } - } else { - if _, ok := ks.Vindexes[name]; !ok { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s does not exist in keyspace %s", name, destKeyspace) - } - } - - // If this is the first vindex being defined on the table, create - // the empty table record - if table == nil { - table = &vschemapb.Table{ - ColumnVindexes: make([]*vschemapb.ColumnVindex, 0, 4), - } - } - - // Make sure there isn't already a vindex with the same name on - // this table. - for _, vindex := range table.ColumnVindexes { - if vindex.Name == name { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already defined on table %s", name, tableName) - } - } - - columns := make([]string, len(ddl.VindexCols), len(ddl.VindexCols)) - for i, col := range ddl.VindexCols { - columns[i] = col.String() - } - table.ColumnVindexes = append(table.ColumnVindexes, &vschemapb.ColumnVindex{ - Name: name, - Columns: columns, - }) - ks.Tables[tableName] = table - - return e.vm.UpdateVSchema(ctx, destKeyspace, ks) - case sqlparser.DropColVindexStr: - spec := ddl.VindexSpec - name := spec.Name.String() - if table == nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s.%s not defined in vschema", ksName, tableName) - } - - for i, colVindex := range table.ColumnVindexes { - if colVindex.Name == name { - table.ColumnVindexes = append(table.ColumnVindexes[:i], table.ColumnVindexes[i+1:]...) - if len(table.ColumnVindexes) == 0 { - delete(ks.Tables, tableName) - } - return e.vm.UpdateVSchema(ctx, destKeyspace, ks) - } - } - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s not defined in table %s.%s", name, ksName, tableName) + if err != nil { + return err } - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected vindex ddl operation %s", ddl.Action) + return e.vm.UpdateVSchema(ctx, ksName, ks) } func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) { From 26826875294dad91569da74ae436776b82f6e69c Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sat, 24 Nov 2018 17:09:55 -0500 Subject: [PATCH 04/10] add vtctl support to alter vschema using sql statements Add new flags to ApplyVschema to be able to take a sql ddl statement either on the command line or in a file to enable vschema changes without having to edit the whole json file. Signed-off-by: Michael Demmer --- go/vt/vtctl/vtctl.go | 96 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 79 insertions(+), 17 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 0ad15e1c28d..586057c00da 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -117,6 +117,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/schemamanager" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" @@ -2122,6 +2123,8 @@ func commandRebuildVSchemaGraph(ctx context.Context, wr *wrangler.Wrangler, subF func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { vschema := subFlags.String("vschema", "", "Identifies the VTGate routing schema") vschemaFile := subFlags.String("vschema_file", "", "Identifies the VTGate routing schema file") + sql := subFlags.String("sql", "", "A vschema ddl SQL statement (e.g. `add vindex`, `alter table t add vindex hash(id)`, etc)") + sqlFile := subFlags.String("sql_file", "", "A vschema ddl SQL statement (e.g. `add vindex`, `alter table t add vindex hash(id)`, etc)") dryRun := subFlags.Bool("dry_run", false, "If set, do not save the altered vschema, simply echo to console.") skipRebuild := subFlags.Bool("skip_rebuild", false, "If set, do no rebuild the SrvSchema objects.") var cells flagutil.StringListValue @@ -2133,28 +2136,87 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f if subFlags.NArg() != 1 { return fmt.Errorf("the argument is required for the ApplyVSchema command") } - if (*vschema == "") == (*vschemaFile == "") { - return fmt.Errorf("either the vschema or vschemaFile flag must be specified when calling the ApplyVSchema command") - } - var schema []byte - if *vschemaFile != "" { - var err error - schema, err = ioutil.ReadFile(*vschemaFile) + keyspace := subFlags.Arg(0) + + var vs *vschemapb.Keyspace + var err error + if *sql != "" || *sqlFile != "" { + // + // Handle the case where vschema modification is supplied as a sql statement + // + if *sql != "" && *sqlFile != "" { + return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") + } + if *vschema != "" || *vschemaFile != "" { + return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") + } + + if *sqlFile != "" { + sqlBytes, err := ioutil.ReadFile(*sqlFile) + if err != nil { + return err + } + *sql = string(sqlBytes) + } + + stmt, err := sqlparser.Parse(*sql) + if err != nil { + return fmt.Errorf("error parsing vschema statement `%s`: %v", *sql, err) + } + ddl, ok := stmt.(*sqlparser.DDL) + if !ok { + return fmt.Errorf("error parsing: vschema statement `%s`: not a ddl statement", *sql) + } + + vs, err = wr.TopoServer().GetVSchema(ctx, keyspace) + if err != nil { + if topo.IsErrType(err, topo.NoNode) { + vs = &vschemapb.Keyspace{} + } else { + return err + } + } + + vs, err = topotools.ApplyVSchemaDDL(keyspace, vs, ddl) + if err != nil { + return err + } + + } else if *vschema != "" || *vschemaFile != "" { + // + // Handle the case where vschema modification is supplied as a json document + // + if *vschema != "" && *vschemaFile != "" { + return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") + } + if *sql != "" || *sqlFile != "" { + return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") + } + + var schema []byte + if *vschemaFile != "" { + var err error + schema, err = ioutil.ReadFile(*vschemaFile) + if err != nil { + return err + } + } else { + schema = []byte(*vschema) + } + + // Create a local schema object to unmarshal into + var vsLocal vschemapb.Keyspace + vs = &vsLocal + + err := json2.Unmarshal(schema, vs) if err != nil { return err } } else { - schema = []byte(*vschema) + return fmt.Errorf("one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") } - var vs vschemapb.Keyspace - err := json2.Unmarshal(schema, &vs) - if err != nil { - return err - } - - keyspace := subFlags.Arg(0) - b, err := json2.MarshalIndentPB(&vs, " ") + b, err := json2.MarshalIndentPB(vs, " ") if err != nil { wr.Logger().Errorf("Failed to marshal VSchema for display: %v", err) } else { @@ -2166,7 +2228,7 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f return nil } - if err := wr.TopoServer().SaveVSchema(ctx, keyspace, &vs); err != nil { + if err := wr.TopoServer().SaveVSchema(ctx, keyspace, vs); err != nil { return err } From 1d274cf970a8f942581ce6f5df2ae558e7444a85 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sat, 24 Nov 2018 17:14:24 -0500 Subject: [PATCH 05/10] set the sharded bit when creating the first vindex in a keyspace Make sure to set the Sharded bit to true when creating the first vindex in a keyspace, otherwise the vschema ddl statements can't be used to bootstrap a new sharded keyspace. Signed-off-by: Michael Demmer --- go/vt/topotools/vschema_ddl.go | 13 +++++++ go/vt/vtgate/executor_test.go | 67 ++++++++++++++++------------------ 2 files changed, 44 insertions(+), 36 deletions(-) diff --git a/go/vt/topotools/vschema_ddl.go b/go/vt/topotools/vschema_ddl.go index ffba159325d..56a28426a9e 100644 --- a/go/vt/topotools/vschema_ddl.go +++ b/go/vt/topotools/vschema_ddl.go @@ -54,12 +54,20 @@ func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, ddl *sqlparser.DDL) if _, ok := ks.Vindexes[name]; ok { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already exists in keyspace %s", name, ksName) } + + // Make sure the keyspace has the sharded bit set to true + // if this is the first vindex defined in the keyspace. + if len(ks.Vindexes) == 0 { + ks.Sharded = true + } + owner, params := ddl.VindexSpec.ParseParams() ks.Vindexes[name] = &vschemapb.Vindex{ Type: ddl.VindexSpec.Type.String(), Params: params, Owner: owner, } + return ks, nil case sqlparser.AddColVindexStr: @@ -86,6 +94,11 @@ func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, ddl *sqlparser.DDL) return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with different parameters", name) } } else { + // Make sure the keyspace has the sharded bit set to true + // if this is the first vindex defined in the keyspace. + if len(ks.Vindexes) == 0 { + ks.Sharded = true + } ks.Vindexes[name] = &vschemapb.Vindex{ Type: spec.Type.String(), Params: params, diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 93ca6ff1f36..baa14d42ec1 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1203,51 +1203,51 @@ func TestExecutorCreateVindexDDL(t *testing.T) { t.Fatalf("test_vindex should not exist in original vschema") } + session := NewSafeSession(&vtgatepb.Session{TargetString: ks}) stmt := "create vindex test_vindex using hash" - wantCount := []int64{0, 0, 0} - _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{TargetString: ks}), stmt, nil) + _, err := executor.Execute(context.Background(), "TestExecute", session, stmt, nil) if err != nil { t.Error(err) } - gotCount := []int64{ - sbc1.ExecCount.Get(), - sbc2.ExecCount.Get(), - sbclookup.ExecCount.Get(), - } - if !reflect.DeepEqual(gotCount, wantCount) { - t.Errorf("Exec %s: %v, want %v", stmt, gotCount, wantCount) + + vschema, vindex = waitForVindex(t, ks, "test_vindex", vschemaUpdates, executor) + if vindex == nil || vindex.Type != "hash" { + t.Errorf("updated vschema did not contain test_vindex") } - time.Sleep(10 * time.Millisecond) + _, err = executor.Execute(context.Background(), "TestExecute", session, stmt, nil) + wantErr := "vindex test_vindex already exists in keyspace TestExecutor" + if err == nil || err.Error() != wantErr { + t.Errorf("create duplicate vindex: %v, want %s", err, wantErr) + } select { case vschema = <-vschemaUpdates: - vindex, ok = vschema.Keyspaces[ks].Vindexes["test_vindex"] - if !ok || vindex.Type != "hash" { - t.Errorf("updated vschema did not contain test_vindex") - } + t.Errorf("vschema should not be updated on error") default: - t.Errorf("vschema was not updated as expected") } - // Wait up to 10ms until the vindex manager gets notified of the update - for i := 0; i < 10; i++ { - vschema = executor.vm.GetCurrentSrvVschema() - vindex, ok = vschema.Keyspaces[ks].Vindexes["test_vindex"] - if ok { - break - } - time.Sleep(time.Millisecond) - } - if !ok || vindex.Type != "hash" { - t.Errorf("updated vschema did not contain test_vindex") + // Create a new vschema keyspace implicitly by creating a vindex with a different + // target in the session + ksNew := "test_new_keyspace" + session = NewSafeSession(&vtgatepb.Session{TargetString: ksNew}) + stmt = "create vindex test_vindex2 using hash" + _, err = executor.Execute(context.Background(), "TestExecute", session, stmt, nil) + if err != nil { + t.Fatalf("error in %s: %v", stmt, err) } - _, err = executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{TargetString: ks}), stmt, nil) - wantErr := "vindex test_vindex already exists in keyspace TestExecutor" - if err == nil || err.Error() != wantErr { - t.Errorf("create duplicate vindex: %v, want %s", err, wantErr) + vschema, vindex = waitForVindex(t, ksNew, "test_vindex2", vschemaUpdates, executor) + if vindex.Type != "hash" { + t.Errorf("vindex type %s not hash", vindex.Type) + } + keyspace, ok := vschema.Keyspaces[ksNew] + if !ok || !keyspace.Sharded { + t.Errorf("keyspace should have been created with Sharded=true") } - gotCount = []int64{ + + // No queries should have gone to any tablets + wantCount := []int64{0, 0, 0} + gotCount := []int64{ sbc1.ExecCount.Get(), sbc2.ExecCount.Get(), sbclookup.ExecCount.Get(), @@ -1255,11 +1255,6 @@ func TestExecutorCreateVindexDDL(t *testing.T) { if !reflect.DeepEqual(gotCount, wantCount) { t.Errorf("Exec %s: %v, want %v", stmt, gotCount, wantCount) } - select { - case vschema = <-vschemaUpdates: - t.Errorf("vschema shoud not be updated on error") - default: - } } func TestExecutorAddDropVindexDDL(t *testing.T) { From 54f219246422bf5599d0a4beca0246bc9b0ef11b Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Mon, 26 Nov 2018 11:00:29 -0800 Subject: [PATCH 06/10] fix vschema test expectations Signed-off-by: Michael Demmer --- go/vt/vtgate/executor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index baa14d42ec1..68f55d24e5b 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1556,8 +1556,8 @@ func TestExecutorAddDropVindexDDL(t *testing.T) { } stmt = "alter table nonexistent drop vindex test_lookup" - _, err = executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{}), stmt, nil) - wantErr = errNoKeyspace.Error() + _, err = executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{TargetString: "InvalidKeyspace"}), stmt, nil) + wantErr = "table InvalidKeyspace.nonexistent not defined in vschema" if err == nil || err.Error() != wantErr { t.Errorf("got %v want err %s", err, wantErr) } From 1b0db074c13039bd08fe8444135858f3f5b8d39a Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Thu, 29 Nov 2018 12:50:06 -0800 Subject: [PATCH 07/10] change parameter to -dry-run for consistency Signed-off-by: Michael Demmer --- go/vt/vtctl/vtctl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 586057c00da..9657911896f 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2125,7 +2125,7 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f vschemaFile := subFlags.String("vschema_file", "", "Identifies the VTGate routing schema file") sql := subFlags.String("sql", "", "A vschema ddl SQL statement (e.g. `add vindex`, `alter table t add vindex hash(id)`, etc)") sqlFile := subFlags.String("sql_file", "", "A vschema ddl SQL statement (e.g. `add vindex`, `alter table t add vindex hash(id)`, etc)") - dryRun := subFlags.Bool("dry_run", false, "If set, do not save the altered vschema, simply echo to console.") + dryRun := subFlags.Bool("dry-run", false, "If set, do not save the altered vschema, simply echo to console.") skipRebuild := subFlags.Bool("skip_rebuild", false, "If set, do no rebuild the SrvSchema objects.") var cells flagutil.StringListValue subFlags.Var(&cells, "cells", "If specified, limits the rebuild to the cells, after upload. Ignored if skipRebuild is set.") From 17af5ea1cb5b86c48be236e3713374680a98b01b Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Thu, 29 Nov 2018 13:18:52 -0800 Subject: [PATCH 08/10] update doc strings as suggested in pr review Signed-off-by: Michael Demmer --- doc/vtctlReference.md | 3 +++ go/vt/vtctl/vtctl.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/doc/vtctlReference.md b/doc/vtctlReference.md index 4d294919d42..cb946962586 100644 --- a/doc/vtctlReference.md +++ b/doc/vtctlReference.md @@ -1119,8 +1119,11 @@ Applies the VTGate routing schema to the provided keyspace. Shows the result aft | :-------- | :--------- | :--------- | | cells | string | If specified, limits the rebuild to the cells, after upload. Ignored if skipRebuild is set. | | skip_rebuild | Boolean | If set, do no rebuild the SrvSchema objects. | +| dry-run | Boolean | Shows the proposed change without executing it | | vschema | string | Identifies the VTGate routing schema | | vschema_file | string | Identifies the VTGate routing schema file | +| sql | string | Identifies a VSchema DDL SQL statement | +| sql_file | string | Identifies a VSchema DDL SQL statement | #### Arguments diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 9657911896f..93040b1c576 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -391,7 +391,7 @@ var commands = []commandGroup{ "", "Displays the VTGate routing schema."}, {"ApplyVSchema", commandApplyVSchema, - "{-vschema= || -vschema_file=} [-cells=c1,c2,...] [-skip_rebuild] ", + "{-vschema= || -vschema_file= || -sql= || -sql_file=} [-cells=c1,c2,...] [-skip_rebuild] [-dry-run] ", "Applies the VTGate routing schema to the provided keyspace. Shows the result after application."}, {"RebuildVSchemaGraph", commandRebuildVSchemaGraph, "[-cells=c1,c2,...]", From ea1e126a0697cfbbc6b6c042733ed66e0fa14786 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Thu, 29 Nov 2018 13:19:20 -0800 Subject: [PATCH 09/10] rework and clean up the implementation as suggested in PR feedback Signed-off-by: Michael Demmer --- go/vt/vtctl/vtctl.go | 44 ++++++++++++++++---------------------------- 1 file changed, 16 insertions(+), 28 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 93040b1c576..f01f416efe6 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2140,17 +2140,19 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f var vs *vschemapb.Keyspace var err error - if *sql != "" || *sqlFile != "" { - // - // Handle the case where vschema modification is supplied as a sql statement - // - if *sql != "" && *sqlFile != "" { - return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") - } - if *vschema != "" || *vschemaFile != "" { - return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") - } + sqlMode := (*sql != "") != (*sqlFile != "") + jsonMode := (*vschema != "") != (*vschemaFile != "") + + if sqlMode && jsonMode { + return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") + } + + if !sqlMode && !jsonMode { + return fmt.Errorf("one of the sql, sql_file, vschema, or vschema_file flags must be specified when calling the ApplyVSchema command") + } + + if sqlMode { if *sqlFile != "" { sqlBytes, err := ioutil.ReadFile(*sqlFile) if err != nil { @@ -2165,7 +2167,7 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f } ddl, ok := stmt.(*sqlparser.DDL) if !ok { - return fmt.Errorf("error parsing: vschema statement `%s`: not a ddl statement", *sql) + return fmt.Errorf("error parsing vschema statement `%s`: not a ddl statement", *sql) } vs, err = wr.TopoServer().GetVSchema(ctx, keyspace) @@ -2182,17 +2184,8 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f return err } - } else if *vschema != "" || *vschemaFile != "" { - // - // Handle the case where vschema modification is supplied as a json document - // - if *vschema != "" && *vschemaFile != "" { - return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") - } - if *sql != "" || *sqlFile != "" { - return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") - } - + } else { + // json mode var schema []byte if *vschemaFile != "" { var err error @@ -2204,16 +2197,11 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f schema = []byte(*vschema) } - // Create a local schema object to unmarshal into - var vsLocal vschemapb.Keyspace - vs = &vsLocal - + vs = &vschemapb.Keyspace{} err := json2.Unmarshal(schema, vs) if err != nil { return err } - } else { - return fmt.Errorf("one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") } b, err := json2.MarshalIndentPB(vs, " ") From e47bed0e71d5c8012850954feee22b023d9243fe Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Mon, 3 Dec 2018 08:52:59 -0800 Subject: [PATCH 10/10] revert change to use topotools.RebuildVSchema in the executor The fact that the vschema ddl needs to go through the normal srvtopo interface to get to the underlying topo server when applying vschema ddl operations doesn't play well with the SandboxTopo used in some of the unit tests. So restore the previous implementation which edits the SrvVschema in place and then reapplies it to all cells, rather than trying to reuse topotools.RebuildVschema from the executor. Signed-off-by: Michael Demmer --- go/vt/vtgate/executor.go | 4 +++- go/vt/vtgate/vschema_manager.go | 24 +++++++++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index b449f786c8c..335ac41b779 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -396,7 +396,9 @@ func (e *Executor) handleVSchemaDDL(ctx context.Context, safeSession *SafeSessio return err } - return e.vm.UpdateVSchema(ctx, ksName, ks) + vschema.Keyspaces[ksName] = ks + + return e.vm.UpdateVSchema(ctx, ksName, vschema) } func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) { diff --git a/go/vt/vtgate/vschema_manager.go b/go/vt/vtgate/vschema_manager.go index 72d5cca0993..5823c080c14 100644 --- a/go/vt/vtgate/vschema_manager.go +++ b/go/vt/vtgate/vschema_manager.go @@ -24,9 +24,7 @@ import ( "github.com/golang/protobuf/proto" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtgate/vindexes" vschemapb "vitess.io/vitess/go/vt/proto/vschema" @@ -123,12 +121,28 @@ func (vm *VSchemaManager) watchSrvVSchema(ctx context.Context, cell string) { // UpdateVSchema propagates the updated vschema to the topo. The entry for // the given keyspace is updated in the global topo, and the full SrvVSchema // is updated in all known cells. -func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, keyspace *vschemapb.Keyspace) error { +func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, vschema *vschemapb.SrvVSchema) error { topo := vm.e.serv.GetTopoServer() - err := topo.SaveVSchema(ctx, ksName, keyspace) + + ks := vschema.Keyspaces[ksName] + err := topo.SaveVSchema(ctx, ksName, ks) + if err != nil { + return err + } + + cells, err := vm.e.serv.GetTopoServer().GetKnownCells(ctx) if err != nil { return err } - return topotools.RebuildVSchema(ctx, logutil.NewConsoleLogger(), topo, nil) + // even if one cell fails, continue to try the others + for _, cell := range cells { + cellErr := vm.e.serv.GetTopoServer().UpdateSrvVSchema(ctx, cell, vschema) + if cellErr != nil { + err = cellErr + log.Errorf("error updating vschema in cell %s: %v", cell, cellErr) + } + } + + return err }