diff --git a/go/vt/wrangler/migrater.go b/go/vt/wrangler/migrater.go index ef79d53199a..0d73c731e95 100644 --- a/go/vt/wrangler/migrater.go +++ b/go/vt/wrangler/migrater.go @@ -70,6 +70,7 @@ type migrater struct { sourceKeyspace string targetKeyspace string tables []string + sourceKSSchema *vindexes.KeyspaceSchema sourceWorkflows []string } @@ -117,6 +118,7 @@ func (wr *Wrangler) MigrateReads(ctx context.Context, targetKeyspace, workflow s mi.wr.Logger().Errorf("migrateTableReads failed: %v", err) return err } + return nil } if err := mi.migrateShardReads(ctx, cells, servedType, direction); err != nil { mi.wr.Logger().Errorf("migrateShardReads failed: %v", err) @@ -292,6 +294,14 @@ func (wr *Wrangler) buildMigrater(ctx context.Context, targetKeyspace, workflow } } } + vs, err := mi.wr.ts.GetVSchema(ctx, mi.sourceKeyspace) + if err != nil { + return nil, err + } + mi.sourceKSSchema, err = vindexes.BuildKeyspaceSchema(vs, mi.sourceKeyspace) + if err != nil { + return nil, err + } return mi, nil } @@ -678,14 +688,6 @@ func (mi *migrater) createJournals(ctx context.Context) error { } func (mi *migrater) createReverseReplication(ctx context.Context) error { - vs, err := mi.wr.ts.GetVSchema(ctx, mi.sourceKeyspace) - if err != nil { - return err - } - ksschema, err := vindexes.BuildKeyspaceSchema(vs, mi.sourceKeyspace) - if err != nil { - return err - } return mi.forAllUids(func(target *miTarget, uid uint32) error { bls := target.sources[uid] source := mi.sources[bls.Shard] @@ -698,19 +700,19 @@ func (mi *migrater) createReverseReplication(ctx context.Context) error { for _, rule := range bls.Filter.Rules { var filter string if strings.HasPrefix(rule.Match, "/") { - if ksschema.Keyspace.Sharded { + if mi.sourceKSSchema.Keyspace.Sharded { filter = bls.Shard } } else { var inKeyrange string - if ksschema.Keyspace.Sharded { - vtable, ok := ksschema.Tables[rule.Match] + if mi.sourceKSSchema.Keyspace.Sharded { + vtable, ok := mi.sourceKSSchema.Tables[rule.Match] if !ok { return fmt.Errorf("table %s not found in vschema", rule.Match) } // TODO(sougou): handle degenerate cases like sequence, etc. // We currently assume the primary vindex is the best way to filter, which may not be true. - inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), vs.Vindexes[vtable.ColumnVindexes[0].Name].Type, bls.Shard) + inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), vtable.ColumnVindexes[0].Type, bls.Shard) } filter = fmt.Sprintf("select * from %s%s", rule.Match, inKeyrange) } diff --git a/go/vt/wrangler/migrater_env_test.go b/go/vt/wrangler/migrater_env_test.go index 7b858d9823e..6763927c671 100644 --- a/go/vt/wrangler/migrater_env_test.go +++ b/go/vt/wrangler/migrater_env_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/vt/logutil" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vschema" vschemapb "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" @@ -197,7 +198,34 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } - vs := &vschemapb.Keyspace{Sharded: true} + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschema.Vindex{ + "thash": { + Type: "hash", + }, + }, + Tables: map[string]*vschema.Table{ + "t1": { + ColumnVindexes: []*vschema.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "thash", + }}, + }, + "t2": { + ColumnVindexes: []*vschema.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "thash", + }}, + }, + "t3": { + ColumnVindexes: []*vschema.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "thash", + }}, + }, + }, + } if err := tme.ts.SaveVSchema(ctx, "ks", vs); err != nil { t.Fatal(err) } diff --git a/go/vt/wrangler/stream_migrater.go b/go/vt/wrangler/stream_migrater.go index 9399bc789ba..7d167c40759 100644 --- a/go/vt/wrangler/stream_migrater.go +++ b/go/vt/wrangler/stream_migrater.go @@ -36,6 +36,7 @@ import ( "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" ) @@ -336,6 +337,8 @@ const ( reference ) +// templatizeRule replaces keyrange values with {{.}}. +// This can then be used by go's template package to substitute other keyrange values. func (sm *streamMigrater) templatize(ctx context.Context, tabletStreams []*vrStream) ([]*vrStream, error) { tabletStreams = copyTabletStreams(tabletStreams) var shardedStreams []*vrStream @@ -367,37 +370,38 @@ func (sm *streamMigrater) templatize(ctx context.Context, tabletStreams []*vrStr } func (sm *streamMigrater) templatizeRule(ctx context.Context, rule *binlogdatapb.Rule) (int, error) { + vtable, ok := sm.mi.sourceKSSchema.Tables[rule.Match] + if !ok { + return 0, fmt.Errorf("table %v not found in vschema", rule.Match) + } + if vtable.Type == vindexes.TypeReference { + return reference, nil + } switch { case rule.Filter == "": - return reference, nil + return unknown, fmt.Errorf("rule %v does not have a select expression in vreplication", rule) case key.IsKeyRange(rule.Filter): rule.Filter = "{{.}}" return sharded, nil case rule.Filter == vreplication.ExcludeStr: - return unknown, nil + return unknown, fmt.Errorf("unexpected rule in vreplication: %v", rule) default: - templatized, err := sm.templatizeQuery(ctx, rule.Filter) + err := sm.templatizeKeyRange(ctx, rule) if err != nil { return unknown, err } - if templatized != "" { - rule.Filter = templatized - return sharded, nil - } - return reference, nil + return sharded, nil } } -// templatizeQuery converts the underlying in_keyrange subexpression to -// a template to allow for new keyrange values to be substituted. -func (sm *streamMigrater) templatizeQuery(ctx context.Context, query string) (string, error) { - statement, err := sqlparser.Parse(query) +func (sm *streamMigrater) templatizeKeyRange(ctx context.Context, rule *binlogdatapb.Rule) error { + statement, err := sqlparser.Parse(rule.Filter) if err != nil { - return "", err + return err } sel, ok := statement.(*sqlparser.Select) if !ok { - return "", fmt.Errorf("unexpected query: %v", query) + return fmt.Errorf("unexpected query: %v", rule.Filter) } var expr sqlparser.Expr if sel.Where != nil { @@ -416,23 +420,36 @@ func (sm *streamMigrater) templatizeQuery(ctx context.Context, query string) (st case 3: krExpr = funcExpr.Exprs[2] default: - return "", fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) + return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) } aliased, ok := krExpr.(*sqlparser.AliasedExpr) if !ok { - return "", fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) + return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) } val, ok := aliased.Expr.(*sqlparser.SQLVal) if !ok { - return "", fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) + return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) } - if strings.Contains(query, "{{") { - return "", fmt.Errorf("cannot migrate queries that contain '{{' in their string: %s", query) + if strings.Contains(rule.Filter, "{{") { + return fmt.Errorf("cannot migrate queries that contain '{{' in their string: %s", rule.Filter) } val.Val = []byte("{{.}}") - return sqlparser.String(statement), nil + rule.Filter = sqlparser.String(statement) + return nil } - return "", nil + // There was no in_keyrange expression. Create a new one. + vtable := sm.mi.sourceKSSchema.Tables[rule.Match] + inkr := &sqlparser.FuncExpr{ + Name: sqlparser.NewColIdent("in_keyrange"), + Exprs: sqlparser.SelectExprs{ + &sqlparser.AliasedExpr{Expr: &sqlparser.ColName{Name: vtable.ColumnVindexes[0].Columns[0]}}, + &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte(vtable.ColumnVindexes[0].Type))}, + &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte("{{.}}"))}, + }, + } + sel.AddWhere(inkr) + rule.Filter = sqlparser.String(statement) + return nil } func (sm *streamMigrater) createTargetStreams(ctx context.Context, tmpl []*vrStream) error { diff --git a/go/vt/wrangler/stream_migrater_test.go b/go/vt/wrangler/stream_migrater_test.go index 47a61ff3c4a..7cd91fe0c84 100644 --- a/go/vt/wrangler/stream_migrater_test.go +++ b/go/vt/wrangler/stream_migrater_test.go @@ -28,6 +28,9 @@ import ( "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vschema" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" ) @@ -1372,12 +1375,12 @@ func TestTemplatize(t *testing.T) { }}, out: `[{"ID":1,"Workflow":"test","Bls":{"keyspace":"ks","shard":"80-","filter":{"rules":[{"match":"t1","filter":"select * from t1 where in_keyrange('{{.}}')"}]}}}]`, }, { - // Empty filter: reference table + // Reference table. in: []*vrStream{{ bls: &binlogdatapb.BinlogSource{ Filter: &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - Match: "t1", + Match: "ref", Filter: "", }}, }, @@ -1385,7 +1388,7 @@ func TestTemplatize(t *testing.T) { }}, out: "", }, { - // KeyRange filter + // Sharded table. in: []*vrStream{{ bls: &binlogdatapb.BinlogSource{ Filter: &binlogdatapb.Filter{ @@ -1398,39 +1401,44 @@ func TestTemplatize(t *testing.T) { }}, out: `[{"ID":0,"Workflow":"","Bls":{"filter":{"rules":[{"match":"t1","filter":"{{.}}"}]}}}]`, }, { - // Excluded table and empty filter + // table not found in: []*vrStream{{ bls: &binlogdatapb.BinlogSource{ Filter: &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: vreplication.ExcludeStr, - }, { - Match: "t2", - Filter: "", + Match: "t3", }}, }, }, }}, - out: "", + err: `table t3 not found in vschema`, }, { - // KeyRange filter and excluded table + // sharded table with no filter + in: []*vrStream{{ + bls: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + }, + }, + }}, + err: `rule match:"t1" does not have a select expression in vreplication`, + }, { + // Excluded table. in: []*vrStream{{ bls: &binlogdatapb.BinlogSource{ Filter: &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "t1", - Filter: "-80", - }, { - Match: "t2", Filter: vreplication.ExcludeStr, }}, }, }, }}, - out: `[{"ID":0,"Workflow":"","Bls":{"filter":{"rules":[{"match":"t1","filter":"{{.}}"},{"match":"t2","filter":"exclude"}]}}}]`, + err: `unexpected rule in vreplication: match:"t1" filter:"exclude" `, }, { - // KeyRange filter and ref table + // Sharded table and ref table in: []*vrStream{{ bls: &binlogdatapb.BinlogSource{ Filter: &binlogdatapb.Filter{ @@ -1438,20 +1446,20 @@ func TestTemplatize(t *testing.T) { Match: "t1", Filter: "-80", }, { - Match: "t2", + Match: "ref", Filter: "", }}, }, }, }}, - err: `cannot migrate streams with a mix of reference and sharded tables: filter: rules: > `, + err: `cannot migrate streams with a mix of reference and sharded tables: filter: rules: > `, }, { - // Ref table and keyRange filter (different code path) + // Ref table and sharded table (different code path) in: []*vrStream{{ bls: &binlogdatapb.BinlogSource{ Filter: &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - Match: "t1", + Match: "ref", Filter: "", }, { Match: "t2", @@ -1460,20 +1468,33 @@ func TestTemplatize(t *testing.T) { }, }, }}, - err: `cannot migrate streams with a mix of reference and sharded tables: filter: rules: > `, + err: `cannot migrate streams with a mix of reference and sharded tables: filter: rules: > `, }, { // Ref table with select expression in: []*vrStream{{ bls: &binlogdatapb.BinlogSource{ Filter: &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - Match: "t1", + Match: "ref", Filter: "select * from t1", }}, }, }, }}, out: "", + }, { + // Select expresstion with no keyrange value + in: []*vrStream{{ + bls: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + }, + }, + }}, + out: `[{"ID":0,"Workflow":"","Bls":{"filter":{"rules":[{"match":"t1","filter":"select * from t1 where in_keyrange(c1, 'hash', '{{.}}')"}]}}}]`, }, { // Select expresstion with one keyrange value in: []*vrStream{{ @@ -1579,15 +1600,47 @@ func TestTemplatize(t *testing.T) { }}, err: "cannot migrate queries that contain '{{' in their string: select '{{' from t1 where in_keyrange('-80')", }} + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschema.Vindex{ + "thash": { + Type: "hash", + }, + }, + Tables: map[string]*vschema.Table{ + "t1": { + ColumnVindexes: []*vschema.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "thash", + }}, + }, + "t2": { + ColumnVindexes: []*vschema.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "thash", + }}, + }, + "ref": { + Type: vindexes.TypeReference, + }, + }, + } + ksschema, err := vindexes.BuildKeyspaceSchema(vs, "ks") + if err != nil { + t.Fatal(err) + } + mi := &migrater{ + sourceKSSchema: ksschema, + } for _, tt := range tests { - sm := &streamMigrater{mi: nil} + sm := &streamMigrater{mi: mi} out, err := sm.templatize(context.Background(), tt.in) var gotErr string if err != nil { gotErr = err.Error() } if gotErr != tt.err { - t.Errorf("templatize(%v) err: %v, want %v", tt.in, err, tt.err) + t.Errorf("templatize(%v) err: %v, want %v", stringifyVRS(tt.in), err, tt.err) } got := stringifyVRS(out) if !reflect.DeepEqual(tt.out, got) {