Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/hex"
"fmt"
"math"
"regexp"
"strings"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -296,3 +297,10 @@ func ParseShardingSpec(spec string) ([]*topodatapb.KeyRange, error) {
}
return ranges, nil
}

var krRegexp = regexp.MustCompile(`^[0-9a-fA-F]*-[0-9a-fA-F]*$`)

// IsKeyRange returns true if the string represents a keyrange.
func IsKeyRange(kr string) bool {
return krRegexp.MatchString(kr)
}
44 changes: 43 additions & 1 deletion go/vt/key/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
Expand Down Expand Up @@ -381,7 +382,7 @@ func BenchmarkUint64KeyString(b *testing.B) {

for i := 0; i < b.N; i++ {
for _, key := range keys {
key.String()
_ = key.String()
}
}
}
Expand Down Expand Up @@ -435,3 +436,44 @@ func BenchmarkKeyRangesOverlap(b *testing.B) {
KeyRangesOverlap(kr1, kr2)
}
}

func TestIsKeyRange(t *testing.T) {
testcases := []struct {
in string
out bool
}{{
in: "-",
out: true,
}, {
in: "-80",
out: true,
}, {
in: "40-80",
out: true,
}, {
in: "80-",
out: true,
}, {
in: "a0-",
out: true,
}, {
in: "-A0",
out: true,
}, {
in: "",
out: false,
}, {
in: "x-80",
out: false,
}, {
in: "-80x",
out: false,
}, {
in: "select",
out: false,
}}

for _, tcase := range testcases {
assert.Equal(t, IsKeyRange(tcase.in), tcase.out, tcase.in)
}
}
38 changes: 38 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,44 @@ func TestBuildPlayerPlan(t *testing.T) {
},
},
},
}, {
// Regular with keyrange
input: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
Filter: "-80",
}},
},
plan: &TestReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1 where in_keyrange('-80')",
}},
},
TargetTables: []string{"t1"},
TablePlans: map[string]*TestTablePlan{
"t1": {
TargetName: "t1",
SendRule: "t1",
},
},
},
planpk: &TestReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1 where in_keyrange('-80')",
}},
},
TargetTables: []string{"t1"},
TablePlans: map[string]*TestTablePlan{
"t1": {
TargetName: "t1",
SendRule: "t1",
},
},
},
}, {
// '*' expression
input: &binlogdatapb.Filter{
Expand Down
96 changes: 46 additions & 50 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/sqlparser"
)
Expand Down Expand Up @@ -88,69 +89,64 @@ func buildReplicatorPlan(filter *binlogdatapb.Filter, tableKeys map[string][]str
TablePlans: make(map[string]*TablePlan),
tableKeys: tableKeys,
}
nextTable:
for tableName := range tableKeys {
lastpk, ok := copyState[tableName]
if ok && lastpk == nil {
// Don't replicate uncopied tables.
continue
}
for _, rule := range filter.Rules {
switch {
case strings.HasPrefix(rule.Match, "/"):
expr := strings.Trim(rule.Match, "/")
result, err := regexp.MatchString(expr, tableName)
if err != nil {
return nil, err
}
if !result {
continue
}
sendRule := &binlogdatapb.Rule{
Match: tableName,
Filter: buildQuery(tableName, rule.Filter),
}
plan.VStreamFilter.Rules = append(plan.VStreamFilter.Rules, sendRule)
tablePlan := &TablePlan{
TargetName: tableName,
SendRule: sendRule,
Lastpk: lastpk,
}
plan.TargetTables[tableName] = tablePlan
plan.TablePlans[tableName] = tablePlan
continue nextTable
case rule.Match == tableName:
tablePlan, err := buildTablePlan(rule, tableKeys, lastpk)
if err != nil {
return nil, err
}
if _, ok := plan.TablePlans[tablePlan.SendRule.Match]; ok {
continue
}
plan.VStreamFilter.Rules = append(plan.VStreamFilter.Rules, tablePlan.SendRule)
plan.TargetTables[tableName] = tablePlan
plan.TablePlans[tablePlan.SendRule.Match] = tablePlan
continue nextTable
}
rule, err := tableMatches(tableName, filter)
if err != nil {
return nil, err
}
if rule == nil {
continue
}
tablePlan, err := buildTablePlan(tableName, rule.Filter, tableKeys, lastpk)
if err != nil {
return nil, err
}
if _, ok := plan.TablePlans[tablePlan.SendRule.Match]; ok {
continue
}
plan.VStreamFilter.Rules = append(plan.VStreamFilter.Rules, tablePlan.SendRule)
plan.TargetTables[tableName] = tablePlan
plan.TablePlans[tablePlan.SendRule.Match] = tablePlan
}
return plan, nil
}

func buildQuery(tableName, filter string) string {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select * from %v", sqlparser.NewTableIdent(tableName))
if filter != "" {
buf.Myprintf(" where in_keyrange(%v)", sqlparser.NewStrVal([]byte(filter)))
// tableMatches is similar to the one defined in vstreamer.
func tableMatches(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Rule, error) {
for _, rule := range filter.Rules {
switch {
case strings.HasPrefix(rule.Match, "/"):
expr := strings.Trim(rule.Match, "/")
result, err := regexp.MatchString(expr, tableName)
if err != nil {
return nil, err
}
if !result {
continue
}
return rule, nil
case tableName == rule.Match:
return rule, nil
}
}
return buf.String()
return nil, nil
}

func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, lastpk *sqltypes.Result) (*TablePlan, error) {
query := rule.Filter
if query == "" {
func buildTablePlan(tableName, filter string, tableKeys map[string][]string, lastpk *sqltypes.Result) (*TablePlan, error) {
query := filter
switch {
case filter == "":
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select * from %v", sqlparser.NewTableIdent(tableName))
query = buf.String()
case key.IsKeyRange(filter):
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select * from %v", sqlparser.NewTableIdent(rule.Match))
buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewTableIdent(tableName), sqlparser.NewStrVal([]byte(filter)))
query = buf.String()
}
sel, fromTable, err := analyzeSelectFrom(query)
Expand All @@ -170,15 +166,15 @@ func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, last
}
sendRule.Filter = query
tablePlan := &TablePlan{
TargetName: rule.Match,
TargetName: tableName,
SendRule: sendRule,
Lastpk: lastpk,
}
return tablePlan, nil
}

tpb := &tablePlanBuilder{
name: sqlparser.NewTableIdent(rule.Match),
name: sqlparser.NewTableIdent(tableName),
sendSelect: &sqlparser.Select{
From: sel.From,
Where: sel.Where,
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func mustSendDDL(query mysql.Query, dbname string, filter *binlogdatapb.Filter)
return true
}

// tableMatches is similar to the one defined in vreplication.
func tableMatches(table sqlparser.TableName, dbname string, filter *binlogdatapb.Filter) bool {
if !table.Qualifier.IsEmpty() && table.Qualifier.String() != dbname {
return false
Expand Down