diff --git a/go/vt/key/key.go b/go/vt/key/key.go index 367311ba5c8..9b2408619e9 100644 --- a/go/vt/key/key.go +++ b/go/vt/key/key.go @@ -22,6 +22,7 @@ import ( "encoding/hex" "fmt" "math" + "regexp" "strings" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -296,3 +297,10 @@ func ParseShardingSpec(spec string) ([]*topodatapb.KeyRange, error) { } return ranges, nil } + +var krRegexp = regexp.MustCompile(`^[0-9a-fA-F]*-[0-9a-fA-F]*$`) + +// IsKeyRange returns true if the string represents a keyrange. +func IsKeyRange(kr string) bool { + return krRegexp.MatchString(kr) +} diff --git a/go/vt/key/key_test.go b/go/vt/key/key_test.go index b6e2229e760..1c369a9accc 100644 --- a/go/vt/key/key_test.go +++ b/go/vt/key/key_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -381,7 +382,7 @@ func BenchmarkUint64KeyString(b *testing.B) { for i := 0; i < b.N; i++ { for _, key := range keys { - key.String() + _ = key.String() } } } @@ -435,3 +436,44 @@ func BenchmarkKeyRangesOverlap(b *testing.B) { KeyRangesOverlap(kr1, kr2) } } + +func TestIsKeyRange(t *testing.T) { + testcases := []struct { + in string + out bool + }{{ + in: "-", + out: true, + }, { + in: "-80", + out: true, + }, { + in: "40-80", + out: true, + }, { + in: "80-", + out: true, + }, { + in: "a0-", + out: true, + }, { + in: "-A0", + out: true, + }, { + in: "", + out: false, + }, { + in: "x-80", + out: false, + }, { + in: "-80x", + out: false, + }, { + in: "select", + out: false, + }} + + for _, tcase := range testcases { + assert.Equal(t, IsKeyRange(tcase.in), tcase.out, tcase.in) + } +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go index 4474b305129..547ce60cd32 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go @@ -85,6 +85,44 @@ func TestBuildPlayerPlan(t *testing.T) { }, }, }, + }, { + // Regular with keyrange + input: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/.*", + Filter: "-80", + }}, + }, + plan: &TestReplicatorPlan{ + VStreamFilter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1 where in_keyrange('-80')", + }}, + }, + TargetTables: []string{"t1"}, + TablePlans: map[string]*TestTablePlan{ + "t1": { + TargetName: "t1", + SendRule: "t1", + }, + }, + }, + planpk: &TestReplicatorPlan{ + VStreamFilter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1 where in_keyrange('-80')", + }}, + }, + TargetTables: []string{"t1"}, + TablePlans: map[string]*TestTablePlan{ + "t1": { + TargetName: "t1", + SendRule: "t1", + }, + }, + }, }, { // '*' expression input: &binlogdatapb.Filter{ diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 3c8d3bc27a4..1fe5c8bd96b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -23,6 +23,7 @@ import ( "strings" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/key" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/sqlparser" ) @@ -88,69 +89,64 @@ func buildReplicatorPlan(filter *binlogdatapb.Filter, tableKeys map[string][]str TablePlans: make(map[string]*TablePlan), tableKeys: tableKeys, } -nextTable: for tableName := range tableKeys { lastpk, ok := copyState[tableName] if ok && lastpk == nil { // Don't replicate uncopied tables. continue } - for _, rule := range filter.Rules { - switch { - case strings.HasPrefix(rule.Match, "/"): - expr := strings.Trim(rule.Match, "/") - result, err := regexp.MatchString(expr, tableName) - if err != nil { - return nil, err - } - if !result { - continue - } - sendRule := &binlogdatapb.Rule{ - Match: tableName, - Filter: buildQuery(tableName, rule.Filter), - } - plan.VStreamFilter.Rules = append(plan.VStreamFilter.Rules, sendRule) - tablePlan := &TablePlan{ - TargetName: tableName, - SendRule: sendRule, - Lastpk: lastpk, - } - plan.TargetTables[tableName] = tablePlan - plan.TablePlans[tableName] = tablePlan - continue nextTable - case rule.Match == tableName: - tablePlan, err := buildTablePlan(rule, tableKeys, lastpk) - if err != nil { - return nil, err - } - if _, ok := plan.TablePlans[tablePlan.SendRule.Match]; ok { - continue - } - plan.VStreamFilter.Rules = append(plan.VStreamFilter.Rules, tablePlan.SendRule) - plan.TargetTables[tableName] = tablePlan - plan.TablePlans[tablePlan.SendRule.Match] = tablePlan - continue nextTable - } + rule, err := tableMatches(tableName, filter) + if err != nil { + return nil, err + } + if rule == nil { + continue + } + tablePlan, err := buildTablePlan(tableName, rule.Filter, tableKeys, lastpk) + if err != nil { + return nil, err + } + if _, ok := plan.TablePlans[tablePlan.SendRule.Match]; ok { + continue } + plan.VStreamFilter.Rules = append(plan.VStreamFilter.Rules, tablePlan.SendRule) + plan.TargetTables[tableName] = tablePlan + plan.TablePlans[tablePlan.SendRule.Match] = tablePlan } return plan, nil } -func buildQuery(tableName, filter string) string { - buf := sqlparser.NewTrackedBuffer(nil) - buf.Myprintf("select * from %v", sqlparser.NewTableIdent(tableName)) - if filter != "" { - buf.Myprintf(" where in_keyrange(%v)", sqlparser.NewStrVal([]byte(filter))) +// tableMatches is similar to the one defined in vstreamer. +func tableMatches(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Rule, error) { + for _, rule := range filter.Rules { + switch { + case strings.HasPrefix(rule.Match, "/"): + expr := strings.Trim(rule.Match, "/") + result, err := regexp.MatchString(expr, tableName) + if err != nil { + return nil, err + } + if !result { + continue + } + return rule, nil + case tableName == rule.Match: + return rule, nil + } } - return buf.String() + return nil, nil } -func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, lastpk *sqltypes.Result) (*TablePlan, error) { - query := rule.Filter - if query == "" { +func buildTablePlan(tableName, filter string, tableKeys map[string][]string, lastpk *sqltypes.Result) (*TablePlan, error) { + query := filter + switch { + case filter == "": + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select * from %v", sqlparser.NewTableIdent(tableName)) + query = buf.String() + case key.IsKeyRange(filter): buf := sqlparser.NewTrackedBuffer(nil) - buf.Myprintf("select * from %v", sqlparser.NewTableIdent(rule.Match)) + buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewTableIdent(tableName), sqlparser.NewStrVal([]byte(filter))) query = buf.String() } sel, fromTable, err := analyzeSelectFrom(query) @@ -170,7 +166,7 @@ func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, last } sendRule.Filter = query tablePlan := &TablePlan{ - TargetName: rule.Match, + TargetName: tableName, SendRule: sendRule, Lastpk: lastpk, } @@ -178,7 +174,7 @@ func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, last } tpb := &tablePlanBuilder{ - name: sqlparser.NewTableIdent(rule.Match), + name: sqlparser.NewTableIdent(tableName), sendSelect: &sqlparser.Select{ From: sel.From, Where: sel.Where, diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index a9e593c0136..dc241c73b78 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -128,6 +128,7 @@ func mustSendDDL(query mysql.Query, dbname string, filter *binlogdatapb.Filter) return true } +// tableMatches is similar to the one defined in vreplication. func tableMatches(table sqlparser.TableName, dbname string, filter *binlogdatapb.Filter) bool { if !table.Qualifier.IsEmpty() && table.Qualifier.String() != dbname { return false