diff --git a/go.mod b/go.mod index 0661ef2057e..50a70afd81e 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/evanphx/json-patch v4.5.0+incompatible github.com/go-critic/go-critic v0.4.0 // indirect github.com/go-ini/ini v1.12.0 // indirect - github.com/gogo/protobuf v1.3.1 // indirect + github.com/gogo/protobuf v1.3.1 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 1a68b5b567e..6f12fd748bc 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -312,8 +312,11 @@ var commands = []commandGroup{ "[-skip_schema_copy] ", "Start a Resharding process. Example: Reshard ks.workflow001 '0' '-80,80-'"}, {"Migrate", commandMigrate, - "[-cell=] [-tablet_types=] -workflow= ", + "[-cell=] [-tablet_types=] -workflow= ", `Start a table(s) migration, table_specs is a list of tables or the tables section of the vschema for the target keyspace. Example: '{"t1":{"column_vindexes": [{""column": "id1", "name": "hash"}]}, "t2":{"column_vindexes": [{""column": "id2", "name": "hash"}]}}`}, + {"CreateLookupVindex", commandCreateLookupVindex, + "[-cell=] [-tablet_types=] ", + `Create and backfill a lookup vindex. the json_spec must contain the vindex and colvindex specs for the new lookup.`}, {"Materialize", commandMaterialize, `, example : '{"workflow": "aaa", "source_keyspace": "source", "target_keyspace": "target", "table_settings": [{"target_table": "customer", "source_expression": "select * from customer", "create_ddl": "copy"}]}'`, "Performs materialization based on the json spec."}, @@ -1830,6 +1833,23 @@ func commandMigrate(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F return wr.Migrate(ctx, *workflow, source, target, tableSpecs, *cell, *tabletTypes) } +func commandCreateLookupVindex(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + cell := subFlags.String("cell", "", "Cell to replicate from.") + tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.") + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 2 { + return fmt.Errorf("two arguments are required: keyspace and json_spec") + } + keyspace := subFlags.Arg(0) + specs := &vschemapb.Keyspace{} + if err := json2.Unmarshal([]byte(subFlags.Arg(1)), specs); err != nil { + return err + } + return wr.CreateLookupVindex(ctx, keyspace, specs, *cell, *tabletTypes) +} + func commandMaterialize(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { if err := subFlags.Parse(args); err != nil { return err diff --git a/go/vt/vtgate/vindexes/vschema.go b/go/vt/vtgate/vindexes/vschema.go index 6bddad43359..517b7dfe2e4 100644 --- a/go/vt/vtgate/vindexes/vschema.go +++ b/go/vt/vtgate/vindexes/vschema.go @@ -25,6 +25,7 @@ import ( "strings" "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/sqlparser" querypb "vitess.io/vitess/go/vt/proto/query" @@ -614,6 +615,19 @@ func LoadFormalKeyspace(filename string) (*vschemapb.Keyspace, error) { return formal, nil } +// ChooseVindexForType chooses the most appropriate vindex for the give type. +func ChooseVindexForType(typ querypb.Type) (string, error) { + switch { + case sqltypes.IsIntegral(typ): + return "hash", nil + case sqltypes.IsText(typ): + return "unicode_loose_md5", nil + case sqltypes.IsBinary(typ): + return "binary_md5", nil + } + return "", fmt.Errorf("type %v is not recommended for a vindex", typ) +} + // FindBestColVindex finds the best ColumnVindex for VReplication. func FindBestColVindex(table *Table) (*ColumnVindex, error) { if len(table.ColumnVindexes) == 0 { diff --git a/go/vt/vtgate/vindexes/vschema_test.go b/go/vt/vtgate/vindexes/vschema_test.go index 14088eed740..184a1eb5222 100644 --- a/go/vt/vtgate/vindexes/vschema_test.go +++ b/go/vt/vtgate/vindexes/vschema_test.go @@ -820,6 +820,115 @@ func TestVSchemaRoutingRules(t *testing.T) { } } +func TestChooseVindexForType(t *testing.T) { + testcases := []struct { + in querypb.Type + out string + }{{ + in: sqltypes.Null, + out: "", + }, { + in: sqltypes.Int8, + out: "hash", + }, { + in: sqltypes.Uint8, + out: "hash", + }, { + in: sqltypes.Int16, + out: "hash", + }, { + in: sqltypes.Uint16, + out: "hash", + }, { + in: sqltypes.Int24, + out: "hash", + }, { + in: sqltypes.Uint24, + out: "hash", + }, { + in: sqltypes.Int32, + out: "hash", + }, { + in: sqltypes.Uint32, + out: "hash", + }, { + in: sqltypes.Int64, + out: "hash", + }, { + in: sqltypes.Uint64, + out: "hash", + }, { + in: sqltypes.Float32, + out: "hash", + }, { + in: sqltypes.Float64, + out: "", + }, { + in: sqltypes.Timestamp, + out: "", + }, { + in: sqltypes.Date, + out: "", + }, { + in: sqltypes.Time, + out: "", + }, { + in: sqltypes.Datetime, + out: "", + }, { + in: sqltypes.Year, + out: "hash", + }, { + in: sqltypes.Decimal, + out: "", + }, { + in: sqltypes.Text, + out: "unicode_loose_md5", + }, { + in: sqltypes.Blob, + out: "binary_md5", + }, { + in: sqltypes.VarChar, + out: "unicode_loose_md5", + }, { + in: sqltypes.VarBinary, + out: "binary_md5", + }, { + in: sqltypes.Char, + out: "unicode_loose_md5", + }, { + in: sqltypes.Binary, + out: "binary_md5", + }, { + in: sqltypes.Bit, + out: "", + }, { + in: sqltypes.Enum, + out: "", + }, { + in: sqltypes.Set, + out: "", + }, { + in: sqltypes.Geometry, + out: "", + }, { + in: sqltypes.TypeJSON, + out: "", + }, { + in: sqltypes.Expression, + out: "", + }} + + for _, tcase := range testcases { + out, err := ChooseVindexForType(tcase.in) + if out == "" { + assert.Error(t, err, tcase.in) + continue + } + assert.Equal(t, out, tcase.out, tcase.in) + } +} + func TestFindBestColVindex(t *testing.T) { testSrvVSchema := &vschemapb.SrvVSchema{ Keyspaces: map[string]*vschemapb.Keyspace{ diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 10e44b3fa03..637b9875226 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -22,6 +22,7 @@ import ( "sync" "text/template" + "github.com/gogo/protobuf/proto" "golang.org/x/net/context" "vitess.io/vitess/go/json2" @@ -114,6 +115,310 @@ func (wr *Wrangler) Migrate(ctx context.Context, workflow, sourceKeyspace, targe return wr.Materialize(ctx, ms) } +// CreateLookupVindex creates a lookup vindex and sets up the backfill. +func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, cell, tabletTypes string) error { + ms, sourceVSchema, targetVSchema, err := wr.prepareCreateLookup(ctx, keyspace, specs) + if err != nil { + return err + } + if err := wr.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil { + return err + } + ms.Cell = cell + ms.TabletTypes = tabletTypes + if err := wr.Materialize(ctx, ms); err != nil { + return err + } + if err := wr.ts.SaveVSchema(ctx, keyspace, sourceVSchema); err != nil { + return err + } + + return wr.ts.RebuildSrvVSchema(ctx, nil) +} + +// prepareCreateLookup performs the preparatory steps for creating a lookup vindex. +func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) { + // Important variables are pulled out here. + var ( + // lookup vindex info + vindexName string + vindex *vschemapb.Vindex + targetKeyspace string + targetTableName string + vindexFromCols []string + vindexToCol string + + // source table info + sourceTableName string + // sourceTable is the supplied table info + sourceTable *vschemapb.Table + // sourceVSchemaTable is the table info present in the vschema + sourceVSchemaTable *vschemapb.Table + // sourceVindexColumns are computed from the input sourceTable + sourceVindexColumns []string + + // target table info + createDDL string + materializeQuery string + ) + + // Validate input vindex + if len(specs.Vindexes) != 1 { + return nil, nil, nil, fmt.Errorf("only one vindex must be specified in the specs: %v", specs.Vindexes) + } + for name, vi := range specs.Vindexes { + vindexName = name + vindex = vi + } + if !strings.Contains(vindex.Type, "lookup") { + return nil, nil, nil, fmt.Errorf("vindex %s is not a lookup type", vindex.Type) + } + strs := strings.Split(vindex.Params["table"], ".") + if len(strs) != 2 { + return nil, nil, nil, fmt.Errorf("vindex 'table' must be .: %v", vindex) + } + targetKeyspace, targetTableName = strs[0], strs[1] + + vindexFromCols = strings.Split(vindex.Params["from"], ",") + if strings.Contains(vindex.Type, "unique") { + if len(vindexFromCols) != 1 { + return nil, nil, nil, fmt.Errorf("unique vindex 'from' should have only one column: %v", vindex) + } + } else { + if len(vindexFromCols) < 2 { + return nil, nil, nil, fmt.Errorf("non-unique vindex 'from' should have more than one column: %v", vindex) + } + } + vindexToCol = vindex.Params["to"] + // Make the vindex write_only. If one exists already in the vschema, + // it will need to match this vindex exactly, including the write_only setting. + vindex.Params["write_only"] = "true" + // See if we can create the vindex without errors. + if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil { + return nil, nil, nil, err + } + + // Validate input table + if len(specs.Tables) != 1 { + return nil, nil, nil, fmt.Errorf("exactly one table must be specified in the specs: %v", specs.Tables) + } + // Loop executes once. + for k, ti := range specs.Tables { + if len(ti.ColumnVindexes) != 1 { + return nil, nil, nil, fmt.Errorf("exactly one ColumnVindex must be specified for the table: %v", specs.Tables) + } + sourceTableName = k + sourceTable = ti + } + + // Validate input table and vindex consistency + if sourceTable.ColumnVindexes[0].Name != vindexName { + return nil, nil, nil, fmt.Errorf("ColumnVindex name must match vindex name: %s vs %s", sourceTable.ColumnVindexes[0].Name, vindexName) + } + if vindex.Owner != "" && vindex.Owner != sourceTableName { + return nil, nil, nil, fmt.Errorf("vindex owner must match table name: %v vs %v", vindex.Owner, sourceTableName) + } + if len(sourceTable.ColumnVindexes[0].Columns) != 0 { + sourceVindexColumns = sourceTable.ColumnVindexes[0].Columns + } else { + if sourceTable.ColumnVindexes[0].Column == "" { + return nil, nil, nil, fmt.Errorf("at least one column must be specified in ColumnVindexes: %v", sourceTable.ColumnVindexes) + } + sourceVindexColumns = []string{sourceTable.ColumnVindexes[0].Column} + } + if len(sourceVindexColumns) != len(vindexFromCols) { + return nil, nil, nil, fmt.Errorf("length of table columns differes from length of vindex columns: %v vs %v", sourceVindexColumns, vindexFromCols) + } + + // Validate against source vschema + sourceVSchema, err = wr.ts.GetVSchema(ctx, keyspace) + if err != nil { + return nil, nil, nil, err + } + if sourceVSchema.Vindexes == nil { + sourceVSchema.Vindexes = make(map[string]*vschemapb.Vindex) + } + // If source and target keyspaces are same, Make vschemas point to the same object. + if keyspace == targetKeyspace { + targetVSchema = sourceVSchema + } else { + targetVSchema, err = wr.ts.GetVSchema(ctx, targetKeyspace) + if err != nil { + return nil, nil, nil, err + } + } + if targetVSchema.Vindexes == nil { + targetVSchema.Vindexes = make(map[string]*vschemapb.Vindex) + } + if targetVSchema.Tables == nil { + targetVSchema.Tables = make(map[string]*vschemapb.Table) + } + if existing, ok := sourceVSchema.Vindexes[vindexName]; ok { + if !proto.Equal(existing, vindex) { + return nil, nil, nil, fmt.Errorf("a conflicting vindex named %s already exists in the source vschema", vindexName) + } + } + sourceVSchemaTable = sourceVSchema.Tables[sourceTableName] + if sourceVSchemaTable == nil { + return nil, nil, nil, fmt.Errorf("source table %s not found in vschema", sourceTableName) + } + for _, colVindex := range sourceVSchemaTable.ColumnVindexes { + // For a conflict, the vindex name and column should match. + if colVindex.Name != vindexName { + continue + } + colName := colVindex.Column + if len(colVindex.Columns) != 0 { + colName = colVindex.Columns[0] + } + if colName == sourceVindexColumns[0] { + return nil, nil, nil, fmt.Errorf("ColumnVindex for table %v already exists: %v, please remove it and try again", sourceTableName, colName) + } + } + + // Validate against source schema + sourceShards, err := wr.ts.GetServingShards(ctx, keyspace) + if err != nil { + return nil, nil, nil, err + } + onesource := sourceShards[0] + if onesource.MasterAlias == nil { + return nil, nil, nil, fmt.Errorf("source shard has no master: %v", onesource.ShardName()) + } + tableSchema, err := wr.GetSchema(ctx, onesource.MasterAlias, []string{sourceTableName}, nil, false) + if err != nil { + return nil, nil, nil, err + } + if len(tableSchema.TableDefinitions) != 1 { + return nil, nil, nil, fmt.Errorf("unexpected number of tables returned from schema: %v", tableSchema.TableDefinitions) + } + + // Generate "create table" statement + lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n") + if len(lines) < 3 { + // Unreachable + return nil, nil, nil, fmt.Errorf("schema looks incorrect: %s, expecting at least four lines", tableSchema.TableDefinitions[0].Schema) + } + var modified []string + modified = append(modified, strings.Replace(lines[0], sourceTableName, targetTableName, 1)) + for i := range sourceVindexColumns { + line, err := generateColDef(lines, sourceVindexColumns[i], vindexFromCols[i]) + if err != nil { + return nil, nil, nil, err + } + modified = append(modified, line) + } + modified = append(modified, fmt.Sprintf(" `%s` varbinary(128),", vindexToCol)) + buf := sqlparser.NewTrackedBuffer(nil) + fmt.Fprintf(buf, " PRIMARY KEY (") + prefix := "" + for _, col := range vindexFromCols { + fmt.Fprintf(buf, "%s`%s`", prefix, col) + prefix = ", " + } + fmt.Fprintf(buf, ")") + modified = append(modified, buf.String()) + modified = append(modified, ")") + createDDL = strings.Join(modified, "\n") + + // Generate vreplication query + buf = sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select ") + for i := range vindexFromCols { + buf.Myprintf("%v as %v, ", sqlparser.NewColIdent(sourceVindexColumns[i]), sqlparser.NewColIdent(vindexFromCols[i])) + } + buf.Myprintf("keyspace_id() as %v ", sqlparser.NewColIdent(vindexToCol)) + buf.Myprintf("from %v", sqlparser.NewTableIdent(sourceTableName)) + if vindex.Owner != "" { + // Only backfill + buf.Myprintf(" group by ") + for i := range vindexFromCols { + buf.Myprintf("%v, ", sqlparser.NewColIdent(vindexFromCols[i])) + } + buf.Myprintf("%v", sqlparser.NewColIdent(vindexToCol)) + } + materializeQuery = buf.String() + + // Update targetVSchema + var targetTable *vschemapb.Table + if targetVSchema.Sharded { + // Choose a primary vindex type for target table based on source specs + var targetVindexType string + var targetVindex *vschemapb.Vindex + for _, field := range tableSchema.TableDefinitions[0].Fields { + if sourceVindexColumns[0] == field.Name { + targetVindexType, err = vindexes.ChooseVindexForType(field.Type) + if err != nil { + return nil, nil, nil, err + } + targetVindex = &vschemapb.Vindex{ + Type: targetVindexType, + } + break + } + } + if targetVindex == nil { + // Unreachable. We validated column names when generating the DDL. + return nil, nil, nil, fmt.Errorf("column %s not found in schema %v", sourceVindexColumns[0], tableSchema.TableDefinitions[0]) + } + if existing, ok := targetVSchema.Vindexes[targetVindexType]; ok { + if !proto.Equal(existing, targetVindex) { + return nil, nil, nil, fmt.Errorf("a conflicting vindex named %v already exists in the target vschema", targetVindexType) + } + } else { + targetVSchema.Vindexes[targetVindexType] = targetVindex + } + + targetTable = &vschemapb.Table{ + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: vindexFromCols[0], + Name: targetVindexType, + }}, + } + } else { + targetTable = &vschemapb.Table{} + } + if existing, ok := targetVSchema.Tables[targetTableName]; ok { + if !proto.Equal(existing, targetTable) { + return nil, nil, nil, fmt.Errorf("a conflicting table named %v already exists in the target vschema", targetTableName) + } + } else { + targetVSchema.Tables[targetTableName] = targetTable + } + + ms = &vtctldatapb.MaterializeSettings{ + Workflow: targetTableName + "_vdx", + SourceKeyspace: keyspace, + TargetKeyspace: targetKeyspace, + StopAfterCopy: vindex.Owner != "", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: targetTableName, + SourceExpression: materializeQuery, + CreateDdl: createDDL, + }}, + } + + // Update sourceVSchema + sourceVSchema.Vindexes[vindexName] = vindex + sourceVSchemaTable.ColumnVindexes = append(sourceVSchemaTable.ColumnVindexes, sourceTable.ColumnVindexes[0]) + + return ms, sourceVSchema, targetVSchema, nil +} + +func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (string, error) { + source := fmt.Sprintf("`%s`", sourceVindexCol) + target := fmt.Sprintf("`%s`", vindexFromCol) + for _, line := range lines[1:] { + if strings.Contains(line, source) { + line = strings.Replace(line, source, target, 1) + line = strings.Replace(line, " AUTO_INCREMENT", "", 1) + line = strings.Replace(line, " DEFAULT NULL", "", 1) + return line, nil + } + } + return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines) +} + // Materialize performs the steps needed to materialize a list of tables based on the materialization specs. func (wr *Wrangler) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSettings) error { if err := wr.validateNewWorkflow(ctx, ms.TargetKeyspace, ms.Workflow); err != nil { diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index c1cad55c239..ab88b81e3bd 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -67,10 +67,12 @@ func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, s _ = env.addTablet(tabletID, env.ms.SourceKeyspace, shard, topodatapb.TabletType_MASTER) tabletID += 10 } - tabletID = 200 - for _, shard := range targets { - _ = env.addTablet(tabletID, env.ms.TargetKeyspace, shard, topodatapb.TabletType_MASTER) - tabletID += 10 + if ms.SourceKeyspace != ms.TargetKeyspace { + tabletID = 200 + for _, shard := range targets { + _ = env.addTablet(tabletID, env.ms.TargetKeyspace, shard, topodatapb.TabletType_MASTER) + tabletID += 10 + } } for _, ts := range ms.TableSettings { @@ -90,7 +92,9 @@ func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, s }}, } } - env.expectValidation() + if ms.Workflow != "" { + env.expectValidation() + } return env } diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 611040c9433..886f6ecef4a 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -18,12 +18,17 @@ package wrangler import ( "fmt" + "strings" "testing" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/logutil" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" @@ -96,6 +101,1195 @@ func TestMigrateVSchema(t *testing.T) { } } +func TestCreateLookupVindexFull(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "lkp_vdx", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + + specs := &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.lkp", + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + } + // Dummy sourceSchema + sourceSchema := "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1" + + sourceVSchema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }}, + }, + }, + } + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Fields: []*querypb.Field{{ + Name: "col1", + Type: querypb.Type_INT64, + }, { + Name: "col2", + Type: querypb.Type_INT64, + }}, + Schema: sourceSchema, + }}, + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.TargetKeyspace, &vschemapb.Keyspace{}); err != nil { + t.Fatal(err) + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, sourceVSchema); err != nil { + t.Fatal(err) + } + + env.tmc.expectVRQuery(200, "/CREATE TABLE `lkp`", &sqltypes.Result{}) + env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{}) + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='lkp_vdx'", &sqltypes.Result{}) + + ctx := context.Background() + err := env.wr.CreateLookupVindex(ctx, ms.SourceKeyspace, specs, "cell", "MASTER") + require.NoError(t, err) + + wantvschema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.lkp", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }, { + Name: "v", + Column: "col2", + }}, + }, + }, + } + vschema, err := env.topoServ.GetVSchema(ctx, ms.SourceKeyspace) + require.NoError(t, err) + assert.Equal(t, wantvschema, vschema) + + wantvschema = &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "lkp": {}, + }, + } + vschema, err = env.topoServ.GetVSchema(ctx, ms.TargetKeyspace) + require.NoError(t, err) + assert.Equal(t, wantvschema, vschema) +} + +func TestCreateLookupVindexCreateDDL(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "col1", + Name: "hash", + }}, + }, + }, + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, vs); err != nil { + t.Fatal(err) + } + + testcases := []struct { + description string + specs *vschemapb.Keyspace + sourceSchema string + out string + err string + }{{ + description: "unique lookup", + specs: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace), + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + }, + sourceSchema: "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " `col3` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1", + out: "CREATE TABLE `lkp` (\n" + + " `c1` int(11),\n" + + " `c2` varbinary(128),\n" + + " PRIMARY KEY (`c1`)\n" + + ")", + }, { + description: "unique lookup, also pk", + specs: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace), + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + }, + sourceSchema: "CREATE TABLE `t1` (\n" + + " `col2` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col1` int(11) DEFAULT NULL,\n" + + " `col4` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1", + out: "CREATE TABLE `lkp` (\n" + + " `c1` int(11) NOT NULL,\n" + + " `c2` varbinary(128),\n" + + " PRIMARY KEY (`c1`)\n" + + ")", + }, { + description: "non-unique lookup, also pk", + specs: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup", + Params: map[string]string{ + "table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace), + "from": "c1,c2", + "to": "c3", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Columns: []string{"col2", "col1"}, + }}, + }, + }, + }, + sourceSchema: "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) NOT NULL,\n" + + " `col3` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1", + out: "CREATE TABLE `lkp` (\n" + + " `c1` int(11) NOT NULL,\n" + + " `c2` int(11) NOT NULL,\n" + + " `c3` varbinary(128),\n" + + " PRIMARY KEY (`c1`, `c2`)\n" + + ")", + }, { + description: "column missing", + specs: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace), + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "nocol", + }}, + }, + }, + }, + sourceSchema: "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) NOT NULL,\n" + + " `col3` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1", + err: "column nocol not found in schema", + }, { + description: "no table in schema", + specs: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace), + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "nocol", + }}, + }, + }, + }, + sourceSchema: "", + err: "unexpected number of tables returned from schema", + }} + for _, tcase := range testcases { + if tcase.sourceSchema != "" { + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Schema: tcase.sourceSchema, + }}, + } + } else { + delete(env.tmc.schema, ms.SourceKeyspace+".t1") + } + + outms, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, tcase.specs) + if tcase.err != "" { + if err == nil || !strings.Contains(err.Error(), tcase.err) { + t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) + } + continue + } + require.NoError(t, err) + want := strings.Split(tcase.out, "\n") + got := strings.Split(outms.TableSettings[0].CreateDdl, "\n") + assert.Equal(t, want, got, tcase.description) + } +} + +func TestCreateLookupVindexSourceVSchema(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + + specs := &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.lkp", + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + } + // Dummy sourceSchema + sourceSchema := "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1" + + testcases := []struct { + description string + sourceVSchema *vschemapb.Keyspace + out *vschemapb.Keyspace + }{{ + description: "source vschema has no prior info", + sourceVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }}, + }, + }, + }, + out: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.lkp", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }, { + Name: "v", + Column: "col2", + }}, + }, + }, + }, + }, { + description: "source vschema has the lookup vindex", + sourceVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.lkp", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }}, + }, + }, + }, + out: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.lkp", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }, { + Name: "v", + Column: "col2", + }}, + }, + }, + }, + }, { + description: "source vschema table has a different vindex on same column", + sourceVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.lkp", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }, { + Name: "hash", + Column: "col2", + }}, + }, + }, + }, + out: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.lkp", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }, { + Name: "hash", + Column: "col2", + }, { + Name: "v", + Column: "col2", + }}, + }, + }, + }, + }} + for _, tcase := range testcases { + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Fields: []*querypb.Field{{ + Name: "col1", + Type: querypb.Type_INT64, + }, { + Name: "col2", + Type: querypb.Type_INT64, + }}, + Schema: sourceSchema, + }}, + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.TargetKeyspace, &vschemapb.Keyspace{}); err != nil { + t.Fatal(err) + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, tcase.sourceVSchema); err != nil { + t.Fatal(err) + } + + _, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs) + require.NoError(t, err) + if !proto.Equal(got, tcase.out) { + t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out) + } + } +} + +func TestCreateLookupVindexTargetVSchema(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + sourcevs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "col1", + Name: "hash", + }}, + }, + }, + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, sourcevs); err != nil { + t.Fatal(err) + } + + // withTable is a target vschema with a pre-existing table. + withTable := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + } + + specs := &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "will be set by the test case", + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + } + // Dummy sourceSchema + sourceSchema := "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1" + + testcases := []struct { + description string + targetTable string + sourceFieldType querypb.Type + targetVSchema *vschemapb.Keyspace + out *vschemapb.Keyspace + err string + }{{ + description: "sharded, int64, empty target", + targetTable: "lkp", + sourceFieldType: querypb.Type_INT64, + targetVSchema: &vschemapb.Keyspace{Sharded: true}, + out: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "lkp": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + }, + }, { + description: "sharded, varchar, empty target", + targetTable: "lkp", + sourceFieldType: querypb.Type_VARCHAR, + targetVSchema: &vschemapb.Keyspace{Sharded: true}, + out: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "unicode_loose_md5": { + Type: "unicode_loose_md5", + }, + }, + Tables: map[string]*vschemapb.Table{ + "lkp": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "unicode_loose_md5", + }}, + }, + }, + }, + }, { + description: "sharded, int64, good vindex", + targetTable: "lkp", + sourceFieldType: querypb.Type_INT64, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + // Create a misleading vindex name. + "hash": { + Type: "hash", + }, + }, + }, + out: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "lkp": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + }, + }, { + description: "sharded, int64, bad vindex", + targetTable: "lkp", + sourceFieldType: querypb.Type_INT64, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + // Create a misleading vindex name. + "hash": { + Type: "unicode_loose_md5", + }, + }, + }, + err: "a conflicting vindex named hash already exists in the target vschema", + }, { + description: "sharded, int64, good table", + targetTable: "t2", + sourceFieldType: querypb.Type_INT64, + targetVSchema: withTable, + out: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + }, + }, { + description: "sharded, int64, table mismatch", + targetTable: "t2", + sourceFieldType: querypb.Type_VARCHAR, + targetVSchema: withTable, + err: "a conflicting table named t2 already exists in the target vschema", + }, { + description: "unsharded", + targetTable: "lkp", + sourceFieldType: querypb.Type_INT64, + targetVSchema: &vschemapb.Keyspace{}, + out: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{}, + Tables: map[string]*vschemapb.Table{ + "lkp": {}, + }, + }, + }, { + description: "invalid column type", + targetTable: "lkp", + sourceFieldType: querypb.Type_SET, + targetVSchema: &vschemapb.Keyspace{Sharded: true}, + err: "type SET is not recommended for a vindex", + }} + for _, tcase := range testcases { + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Fields: []*querypb.Field{{ + Name: "col2", + Type: tcase.sourceFieldType, + }}, + Schema: sourceSchema, + }}, + } + specs.Vindexes["v"].Params["table"] = fmt.Sprintf("%s.%s", ms.TargetKeyspace, tcase.targetTable) + if err := env.topoServ.SaveVSchema(context.Background(), ms.TargetKeyspace, tcase.targetVSchema); err != nil { + t.Fatal(err) + } + + _, _, got, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs) + if tcase.err != "" { + if err == nil || !strings.Contains(err.Error(), tcase.err) { + t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) + } + continue + } + require.NoError(t, err) + assert.Equal(t, tcase.out, got, tcase.description) + } +} + +func TestCreateLookupVindexSameKeyspace(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "ks", + TargetKeyspace: "ks", + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + + specs := &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "ks.lkp", + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + } + // Dummy sourceSchema + sourceSchema := "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1" + + vschema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }}, + }, + }, + } + want := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "ks.lkp", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }, { + Name: "v", + Column: "col2", + }}, + }, + "lkp": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + } + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Fields: []*querypb.Field{{ + Name: "col1", + Type: querypb.Type_INT64, + }, { + Name: "col2", + Type: querypb.Type_INT64, + }}, + Schema: sourceSchema, + }}, + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, vschema); err != nil { + t.Fatal(err) + } + + _, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs) + require.NoError(t, err) + if !proto.Equal(got, want) { + t.Errorf("same keyspace: got:\n%v, want\n%v", got, want) + } +} + +func TestCreateLookupVindexFailures(t *testing.T) { + topoServ := memorytopo.NewServer("cell") + wr := New(logutil.NewConsoleLogger(), topoServ, nil) + + unique := map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.t", + "from": "c1", + "to": "c2", + }, + }, + } + + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "other": { + Type: "hash", + }, + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.t", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "v", + }}, + }, + }, + } + if err := topoServ.SaveVSchema(context.Background(), "sourceks", vs); err != nil { + t.Fatal(err) + } + if err := topoServ.SaveVSchema(context.Background(), "targetks", &vschemapb.Keyspace{}); err != nil { + t.Fatal(err) + } + + testcases := []struct { + description string + input *vschemapb.Keyspace + err string + }{{ + description: "dup vindex", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v1": { + Type: "hash", + }, + "v2": { + Type: "hash", + }, + }, + }, + err: "only one vindex must be specified in the specs", + }, { + description: "not a lookup", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "hash", + }, + }, + }, + err: "vindex hash is not a lookup type", + }, { + description: "unqualified table", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup", + Params: map[string]string{ + "table": "t", + }, + }, + }, + }, + err: "vindex 'table' must be .
", + }, { + description: "unique lookup should have only one from column", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.t", + "from": "c1,c2", + }, + }, + }, + }, + err: "unique vindex 'from' should have only one column", + }, { + description: "non-unique lookup should have more than one column", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup", + Params: map[string]string{ + "table": "targetks.t", + "from": "c1", + }, + }, + }, + }, + err: "non-unique vindex 'from' should have more than one column", + }, { + description: "vindex not found", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_noexist", + Params: map[string]string{ + "table": "targetks.t", + "from": "c1,c2", + }, + }, + }, + }, + err: `vindexType "lookup_noexist" not found`, + }, { + description: "only one table", + input: &vschemapb.Keyspace{ + Vindexes: unique, + }, + err: "exactly one table must be specified in the specs", + }, { + description: "only one colvindex", + input: &vschemapb.Keyspace{ + Vindexes: unique, + Tables: map[string]*vschemapb.Table{ + "t1": {}, + }, + }, + err: "exactly one ColumnVindex must be specified for the table", + }, { + description: "vindex name must match", + input: &vschemapb.Keyspace{ + Vindexes: unique, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "other", + }}, + }, + }, + }, + err: "ColumnVindex name must match vindex name: other vs v", + }, { + description: "owner must match", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.t", + "from": "c1", + }, + Owner: "otherTable", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + }}, + }, + }, + }, + err: "vindex owner must match table name: otherTable vs t1", + }, { + description: "owner must match", + input: &vschemapb.Keyspace{ + Vindexes: unique, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + }}, + }, + }, + }, + err: "at least one column must be specified in ColumnVindexes", + }, { + description: "columnvindex length mismatch", + input: &vschemapb.Keyspace{ + Vindexes: unique, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Columns: []string{"col1", "col2"}, + }}, + }, + }, + }, + err: "length of table columns differes from length of vindex columns", + }, { + description: "vindex mismatches with what's in vschema", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "other": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.t", + "from": "c1", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "other", + Column: "col", + }}, + }, + }, + }, + err: "a conflicting vindex named other already exists in the source vschema", + }, { + description: "source table not in vschema", + input: &vschemapb.Keyspace{ + Vindexes: unique, + Tables: map[string]*vschemapb.Table{ + "other": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col", + }}, + }, + }, + }, + err: "source table other not found in vschema", + }, { + description: "colvindex already exists in vschema", + input: &vschemapb.Keyspace{ + Vindexes: unique, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "c1", + }}, + }, + }, + }, + err: "ColumnVindex for table t1 already exists: c1", + }} + for _, tcase := range testcases { + err := wr.CreateLookupVindex(context.Background(), "sourceks", tcase.input, "", "") + if !strings.Contains(err.Error(), tcase.err) { + t.Errorf("CreateLookupVindex(%s) err: %v, must contain %v", tcase.description, err, tcase.err) + } + } +} + func TestMaterializerOneToOne(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow",