-
Notifications
You must be signed in to change notification settings - Fork 2.3k
vtctl support for vschema ddls #4385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
670647b
94cb4af
6acad1d
2682687
1d274cf
54f2192
1b0db07
17af5ea
ea1e126
e47bed0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,162 @@ | ||
| /* | ||
| Copyright 2018 The Vitess Authors | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package topotools | ||
|
|
||
| import ( | ||
| "reflect" | ||
|
|
||
| "vitess.io/vitess/go/vt/sqlparser" | ||
| "vitess.io/vitess/go/vt/vterrors" | ||
|
|
||
| vschemapb "vitess.io/vitess/go/vt/proto/vschema" | ||
| vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" | ||
| ) | ||
|
|
||
| // ApplyVSchemaDDL applies the given DDL statement to the vschema | ||
| // keyspace definition and returns the modified keyspace object. | ||
| func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, ddl *sqlparser.DDL) (*vschemapb.Keyspace, error) { | ||
| if ks == nil { | ||
| ks = new(vschemapb.Keyspace) | ||
| } | ||
|
|
||
| if ks.Tables == nil { | ||
| ks.Tables = map[string]*vschemapb.Table{} | ||
| } | ||
|
|
||
| if ks.Vindexes == nil { | ||
| ks.Vindexes = map[string]*vschemapb.Vindex{} | ||
| } | ||
|
|
||
| var tableName string | ||
| var table *vschemapb.Table | ||
| if !ddl.Table.IsEmpty() { | ||
| tableName = ddl.Table.Name.String() | ||
| table, _ = ks.Tables[tableName] | ||
| } | ||
|
|
||
| switch ddl.Action { | ||
| case sqlparser.CreateVindexStr: | ||
| name := ddl.VindexSpec.Name.String() | ||
| if _, ok := ks.Vindexes[name]; ok { | ||
| return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already exists in keyspace %s", name, ksName) | ||
| } | ||
|
|
||
| // Make sure the keyspace has the sharded bit set to true | ||
| // if this is the first vindex defined in the keyspace. | ||
| if len(ks.Vindexes) == 0 { | ||
| ks.Sharded = true | ||
| } | ||
|
|
||
| owner, params := ddl.VindexSpec.ParseParams() | ||
| ks.Vindexes[name] = &vschemapb.Vindex{ | ||
| Type: ddl.VindexSpec.Type.String(), | ||
| Params: params, | ||
| Owner: owner, | ||
| } | ||
|
|
||
| return ks, nil | ||
|
|
||
| case sqlparser.AddColVindexStr: | ||
| // Support two cases: | ||
| // | ||
| // 1. The vindex type / params / owner are specified. If the | ||
| // named vindex doesn't exist, create it. If it does exist, | ||
| // require the parameters to match. | ||
| // | ||
| // 2. The vindex type is not specified. Make sure the vindex | ||
| // already exists. | ||
| spec := ddl.VindexSpec | ||
| name := spec.Name.String() | ||
| if !spec.Type.IsEmpty() { | ||
| owner, params := spec.ParseParams() | ||
| if vindex, ok := ks.Vindexes[name]; ok { | ||
| if vindex.Type != spec.Type.String() { | ||
| return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with type %s not %s", name, vindex.Type, spec.Type.String()) | ||
| } | ||
| if vindex.Owner != owner { | ||
| return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with owner %s not %s", name, vindex.Owner, owner) | ||
| } | ||
| if (len(vindex.Params) != 0 || len(params) != 0) && !reflect.DeepEqual(vindex.Params, params) { | ||
| return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with different parameters", name) | ||
| } | ||
| } else { | ||
| // Make sure the keyspace has the sharded bit set to true | ||
| // if this is the first vindex defined in the keyspace. | ||
| if len(ks.Vindexes) == 0 { | ||
| ks.Sharded = true | ||
| } | ||
| ks.Vindexes[name] = &vschemapb.Vindex{ | ||
| Type: spec.Type.String(), | ||
| Params: params, | ||
| Owner: owner, | ||
| } | ||
| } | ||
| } else { | ||
| if _, ok := ks.Vindexes[name]; !ok { | ||
| return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s does not exist in keyspace %s", name, ksName) | ||
| } | ||
| } | ||
|
|
||
| // If this is the first vindex being defined on the table, create | ||
| // the empty table record | ||
| if table == nil { | ||
| table = &vschemapb.Table{ | ||
| ColumnVindexes: make([]*vschemapb.ColumnVindex, 0, 4), | ||
| } | ||
| } | ||
|
|
||
| // Make sure there isn't already a vindex with the same name on | ||
| // this table. | ||
| for _, vindex := range table.ColumnVindexes { | ||
| if vindex.Name == name { | ||
| return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already defined on table %s", name, tableName) | ||
| } | ||
| } | ||
|
|
||
| columns := make([]string, len(ddl.VindexCols), len(ddl.VindexCols)) | ||
| for i, col := range ddl.VindexCols { | ||
| columns[i] = col.String() | ||
| } | ||
| table.ColumnVindexes = append(table.ColumnVindexes, &vschemapb.ColumnVindex{ | ||
| Name: name, | ||
| Columns: columns, | ||
| }) | ||
| ks.Tables[tableName] = table | ||
|
|
||
| return ks, nil | ||
|
|
||
| case sqlparser.DropColVindexStr: | ||
|
||
| spec := ddl.VindexSpec | ||
| name := spec.Name.String() | ||
| if table == nil { | ||
| return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s.%s not defined in vschema", ksName, tableName) | ||
| } | ||
|
|
||
| for i, colVindex := range table.ColumnVindexes { | ||
| if colVindex.Name == name { | ||
| table.ColumnVindexes = append(table.ColumnVindexes[:i], table.ColumnVindexes[i+1:]...) | ||
| if len(table.ColumnVindexes) == 0 { | ||
| delete(ks.Tables, tableName) | ||
| } | ||
| return ks, nil | ||
| } | ||
| } | ||
| return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s not defined in table %s.%s", name, ksName, tableName) | ||
| } | ||
|
|
||
| return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected vindex ddl operation %s", ddl.Action) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -117,6 +117,7 @@ import ( | |
| "vitess.io/vitess/go/vt/log" | ||
| "vitess.io/vitess/go/vt/logutil" | ||
| "vitess.io/vitess/go/vt/schemamanager" | ||
| "vitess.io/vitess/go/vt/sqlparser" | ||
| "vitess.io/vitess/go/vt/topo" | ||
| "vitess.io/vitess/go/vt/topo/topoproto" | ||
| "vitess.io/vitess/go/vt/topotools" | ||
|
|
@@ -390,7 +391,7 @@ var commands = []commandGroup{ | |
| "<keyspace>", | ||
| "Displays the VTGate routing schema."}, | ||
| {"ApplyVSchema", commandApplyVSchema, | ||
| "{-vschema=<vschema> || -vschema_file=<vschema file>} [-cells=c1,c2,...] [-skip_rebuild] <keyspace>", | ||
| "{-vschema=<vschema> || -vschema_file=<vschema file> || -sql=<sql> || -sql_file=<sql file>} [-cells=c1,c2,...] [-skip_rebuild] [-dry-run] <keyspace>", | ||
| "Applies the VTGate routing schema to the provided keyspace. Shows the result after application."}, | ||
| {"RebuildVSchemaGraph", commandRebuildVSchemaGraph, | ||
| "[-cells=c1,c2,...]", | ||
|
|
@@ -2122,6 +2123,9 @@ func commandRebuildVSchemaGraph(ctx context.Context, wr *wrangler.Wrangler, subF | |
| func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { | ||
| vschema := subFlags.String("vschema", "", "Identifies the VTGate routing schema") | ||
| vschemaFile := subFlags.String("vschema_file", "", "Identifies the VTGate routing schema file") | ||
| sql := subFlags.String("sql", "", "A vschema ddl SQL statement (e.g. `add vindex`, `alter table t add vindex hash(id)`, etc)") | ||
|
||
| sqlFile := subFlags.String("sql_file", "", "A vschema ddl SQL statement (e.g. `add vindex`, `alter table t add vindex hash(id)`, etc)") | ||
|
||
| dryRun := subFlags.Bool("dry-run", false, "If set, do not save the altered vschema, simply echo to console.") | ||
| skipRebuild := subFlags.Bool("skip_rebuild", false, "If set, do no rebuild the SrvSchema objects.") | ||
| var cells flagutil.StringListValue | ||
| subFlags.Var(&cells, "cells", "If specified, limits the rebuild to the cells, after upload. Ignored if skipRebuild is set.") | ||
|
|
@@ -2132,34 +2136,88 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *f | |
| if subFlags.NArg() != 1 { | ||
| return fmt.Errorf("the <keyspace> argument is required for the ApplyVSchema command") | ||
| } | ||
| if (*vschema == "") == (*vschemaFile == "") { | ||
| return fmt.Errorf("either the vschema or vschemaFile flag must be specified when calling the ApplyVSchema command") | ||
| keyspace := subFlags.Arg(0) | ||
|
|
||
| var vs *vschemapb.Keyspace | ||
| var err error | ||
|
|
||
| sqlMode := (*sql != "") != (*sqlFile != "") | ||
| jsonMode := (*vschema != "") != (*vschemaFile != "") | ||
|
|
||
| if sqlMode && jsonMode { | ||
| return fmt.Errorf("only one of the sql, sql_file, vschema, or vschema_file flags may be specified when calling the ApplyVSchema command") | ||
| } | ||
| var schema []byte | ||
| if *vschemaFile != "" { | ||
| var err error | ||
| schema, err = ioutil.ReadFile(*vschemaFile) | ||
|
|
||
| if !sqlMode && !jsonMode { | ||
| return fmt.Errorf("one of the sql, sql_file, vschema, or vschema_file flags must be specified when calling the ApplyVSchema command") | ||
| } | ||
|
|
||
| if sqlMode { | ||
| if *sqlFile != "" { | ||
| sqlBytes, err := ioutil.ReadFile(*sqlFile) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| *sql = string(sqlBytes) | ||
| } | ||
|
|
||
| stmt, err := sqlparser.Parse(*sql) | ||
| if err != nil { | ||
| return fmt.Errorf("error parsing vschema statement `%s`: %v", *sql, err) | ||
| } | ||
| ddl, ok := stmt.(*sqlparser.DDL) | ||
| if !ok { | ||
| return fmt.Errorf("error parsing vschema statement `%s`: not a ddl statement", *sql) | ||
| } | ||
|
|
||
| vs, err = wr.TopoServer().GetVSchema(ctx, keyspace) | ||
| if err != nil { | ||
| if topo.IsErrType(err, topo.NoNode) { | ||
| vs = &vschemapb.Keyspace{} | ||
| } else { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| vs, err = topotools.ApplyVSchemaDDL(keyspace, vs, ddl) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| } else { | ||
| schema = []byte(*vschema) | ||
| } | ||
| var vs vschemapb.Keyspace | ||
| err := json2.Unmarshal(schema, &vs) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| keyspace := subFlags.Arg(0) | ||
| if err := wr.TopoServer().SaveVSchema(ctx, keyspace, &vs); err != nil { | ||
| return err | ||
| // json mode | ||
| var schema []byte | ||
| if *vschemaFile != "" { | ||
| var err error | ||
| schema, err = ioutil.ReadFile(*vschemaFile) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } else { | ||
| schema = []byte(*vschema) | ||
| } | ||
|
|
||
| vs = &vschemapb.Keyspace{} | ||
| err := json2.Unmarshal(schema, vs) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| b, err := json2.MarshalIndentPB(&vs, " ") | ||
| b, err := json2.MarshalIndentPB(vs, " ") | ||
| if err != nil { | ||
| wr.Logger().Errorf("Failed to marshal VSchema for display: %v", err) | ||
| } else { | ||
| wr.Logger().Printf("Uploaded VSchema object:\n%s\nIf this is not what you expected, check the input data (as JSON parsing will skip unexpected fields).\n", b) | ||
| wr.Logger().Printf("New VSchema object:\n%s\nIf this is not what you expected, check the input data (as JSON parsing will skip unexpected fields).\n", b) | ||
| } | ||
|
|
||
| if *dryRun { | ||
| wr.Logger().Printf("Dry run: Skipping update of VSchema\n") | ||
| return nil | ||
| } | ||
|
|
||
| if err := wr.TopoServer().SaveVSchema(ctx, keyspace, vs); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if *skipRebuild { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that it would make sense this to be driven by a flag. That way we can be explicit with the intent. If the flag is not provided and the Vindexes len is 0, adding a vindex should fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By definition, vindexes are only present in Sharded keyspaces, so I think the act of creating a vindex expresses the intent of whether we want
I have ideas about how we can extend the vschema ddl syntax to support unsharded keyspaces, but even then we won't have vindexes and I think above will hold.