Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
541bf41
vrepl: where clause filtering WIP
sougou Mar 16, 2020
48d380e
Update test cases for where clause
rohit-nayak-ps Mar 18, 2020
073d333
Test case for int filter
rohit-nayak-ps Mar 19, 2020
c141cc0
Added rowstreamer test for filtered vreplication
rohit-nayak-ps Mar 19, 2020
b601056
Added support for varbinary, tests for both int/varbinary
rohit-nayak-ps Mar 20, 2020
8f4277e
Handle varbinary correctly
rohit-nayak-ps Mar 20, 2020
cd94047
Vreplication and multiple where clause tests
rohit-nayak-ps Mar 21, 2020
9bcd061
Only use styp if not null
rohit-nayak-ps Mar 21, 2020
491ec37
Removed extraneous log
rohit-nayak-ps Mar 21, 2020
d2a0754
vrepl: where clause filtering WIP
sougou Mar 16, 2020
b683ef8
Update test cases for where clause
rohit-nayak-ps Mar 18, 2020
62c5d69
Test case for int filter
rohit-nayak-ps Mar 19, 2020
bd5af6e
Added rowstreamer test for filtered vreplication
rohit-nayak-ps Mar 19, 2020
47bc7b2
Added support for varbinary, tests for both int/varbinary
rohit-nayak-ps Mar 20, 2020
cdded88
Handle varbinary correctly
rohit-nayak-ps Mar 20, 2020
1ef8d29
Vreplication and multiple where clause tests
rohit-nayak-ps Mar 21, 2020
9445b28
Only use styp if not null
rohit-nayak-ps Mar 21, 2020
1457bd9
Removed extraneous log
rohit-nayak-ps Mar 21, 2020
164f14d
Merge branch 'vrepl-filter' of github.com:planetscale/vitess into vre…
rohit-nayak-ps Mar 23, 2020
7a3b317
Merge branch 'master' into vrepl-filter
rohit-nayak-ps Mar 23, 2020
30e0302
vrepl: where clause filtering WIP
sougou Mar 16, 2020
1e40496
Update test cases for where clause
rohit-nayak-ps Mar 18, 2020
a80b389
Test case for int filter
rohit-nayak-ps Mar 19, 2020
25cd623
Added support for varbinary, tests for both int/varbinary
rohit-nayak-ps Mar 20, 2020
c7c7e3f
Handle varbinary correctly
rohit-nayak-ps Mar 20, 2020
f20ad63
Vreplication and multiple where clause tests
rohit-nayak-ps Mar 21, 2020
1d5eab1
Only use styp if not null
rohit-nayak-ps Mar 21, 2020
3caa0ea
Removed extraneous log
rohit-nayak-ps Mar 21, 2020
ccad250
Merge branch 'vrepl-filter' of github.com:planetscale/vitess into vre…
rohit-nayak-ps Mar 23, 2020
320a50d
Fixed merge issue
rohit-nayak-ps Mar 24, 2020
e5642dc
varchar type can have only varchar or varbinary column types
rohit-nayak-ps Mar 24, 2020
a8ce1a0
Also treat binary/blob types as varbinary
rohit-nayak-ps Mar 24, 2020
fd3374a
Also treat binary/blob types as varbinary, gofmt-ed ...
rohit-nayak-ps Mar 24, 2020
2971652
Deleted extraneous files
rohit-nayak-ps Mar 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,15 +447,20 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
return sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte(fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second))), 8, nil
case TypeVarchar, TypeVarString:
// We trust that styp is compatible with the column type
// Length is encoded in 1 or 2 bytes.
typeToUse := querypb.Type_VARCHAR
if styp == querypb.Type_VARBINARY || styp == querypb.Type_BINARY || styp == querypb.Type_BLOB {
typeToUse = styp
}
if metadata > 255 {
l := int(uint64(data[pos]) |
uint64(data[pos+1])<<8)
return sqltypes.MakeTrusted(querypb.Type_VARCHAR,
return sqltypes.MakeTrusted(typeToUse,
data[pos+2:pos+2+l]), l + 2, nil
}
l := int(data[pos])
return sqltypes.MakeTrusted(querypb.Type_VARCHAR,
return sqltypes.MakeTrusted(typeToUse,
data[pos+1:pos+1+l]), l + 1, nil
case TypeBit:
// The contents is just the bytes, quoted.
Expand Down
38 changes: 38 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func TestPlayerFilters(t *testing.T) {
"create table no(id int, val varbinary(128), primary key(id))",
"create table nopk(id int, val varbinary(128))",
fmt.Sprintf("create table %s.nopk(id int, val varbinary(128))", vrepldb),
"create table src4(id1 int, id2 int, val varbinary(128), primary key(id1))",
fmt.Sprintf("create table %s.dst4(id1 int, val varbinary(128), primary key(id1))", vrepldb),
"create table src5(id1 int, id2 int, val varbinary(128), primary key(id1))",
fmt.Sprintf("create table %s.dst5(id1 int, val varbinary(128), primary key(id1))", vrepldb),
})
defer execStatements(t, []string{
"drop table src1",
Expand All @@ -148,6 +152,10 @@ func TestPlayerFilters(t *testing.T) {
"drop table no",
"drop table nopk",
fmt.Sprintf("drop table %s.nopk", vrepldb),
"drop table src4",
fmt.Sprintf("drop table %s.dst4", vrepldb),
"drop table src5",
fmt.Sprintf("drop table %s.dst5", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

Expand All @@ -165,6 +173,12 @@ func TestPlayerFilters(t *testing.T) {
Match: "/yes",
}, {
Match: "/nopk",
}, {
Match: "dst4",
Filter: "select id1, val from src4 where id2 = 100",
}, {
Match: "dst5",
Filter: "select id1, val from src5 where val = 'abc'",
}},
}
bls := &binlogdatapb.BinlogSource{
Expand Down Expand Up @@ -386,6 +400,30 @@ func TestPlayerFilters(t *testing.T) {
},
table: "nopk",
data: [][]string{},
}, {
// filter by int
input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')",
output: []string{
"begin",
"insert into dst4(id1,val) values (1,'aaa')",
"insert into dst4(id1,val) values (3,'ccc')",
"/update _vt.vreplication set pos=",
"commit",
},
table: "dst4",
data: [][]string{{"1", "aaa"}, {"3", "ccc"}},
}, {
// filter by int
input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')",
output: []string{
"begin",
"insert into dst5(id1,val) values (1,'abc')",
"insert into dst5(id1,val) values (4,'abc')",
"/update _vt.vreplication set pos=",
"commit",
},
table: "dst5",
data: [][]string{{"1", "abc"}, {"4", "abc"}},
}}

for _, tcase := range testcases {
Expand Down
160 changes: 135 additions & 25 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,33 @@ import (
// Plan represents the plan for a table.
type Plan struct {
Table *Table

// ColExprs is the list of column expressions to be sent
// in the stream.
ColExprs []ColExpr

// Filters is the list of filters to be applied to the columns
// of the table.
Filters []Filter
}

// Opcode enumerates the operators supported in a where clause
type Opcode int

const (
// Equal is used to filter an integer column on a specific value
Equal = Opcode(iota)
// VindexMatch is used for an in_keyrange() construct
VindexMatch
)

// Filter contains opcodes for filtering.
type Filter struct {
Opcode Opcode
ColNum int
Value sqltypes.Value

// Parameters for VindexMatch.
// Vindex, VindexColumns and KeyRange, if set, will be used
// to filter the row.
// VindexColumns contains the column numbers of the table,
Expand Down Expand Up @@ -89,13 +112,24 @@ func (plan *Plan) fields() []*querypb.Field {
// filter filters the row against the plan. It returns false if the row did not match.
// If the row matched, it returns the columns to be sent.
func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error) {
if plan.Vindex != nil {
ksid, err := getKeyspaceID(values, plan.Vindex, plan.VindexColumns)
if err != nil {
return false, nil, err
}
if !key.KeyRangeContains(plan.KeyRange, ksid) {
return false, nil, nil
for _, filter := range plan.Filters {
switch filter.Opcode {
case Equal:
result, err := sqltypes.NullsafeCompare(values[filter.ColNum], filter.Value)
if err != nil {
return false, nil, err
}
if result != 0 {
return false, nil, nil
}
case VindexMatch:
ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns)
if err != nil {
return false, nil, err
}
if !key.KeyRangeContains(filter.KeyRange, ksid) {
return false, nil, nil
}
}
}

Expand Down Expand Up @@ -241,8 +275,11 @@ func buildREPlan(ti *Table, vschema *localVSchema, filter string) (*Plan, error)
if err != nil {
return nil, err
}
plan.Vindex = cv.Vindex
plan.VindexColumns, err = buildVindexColumns(plan.Table, cv.Columns)
whereFilter := Filter{
Opcode: VindexMatch,
Vindex: cv.Vindex,
}
whereFilter.VindexColumns, err = buildVindexColumns(plan.Table, cv.Columns)
if err != nil {
return nil, err
}
Expand All @@ -255,7 +292,8 @@ func buildREPlan(ti *Table, vschema *localVSchema, filter string) (*Plan, error)
if len(keyranges) != 1 {
return nil, fmt.Errorf("error parsing keyrange: %v", filter)
}
plan.KeyRange = keyranges[0]
whereFilter.KeyRange = keyranges[0]
plan.Filters = append(plan.Filters, whereFilter)
return plan, nil
}

Expand All @@ -273,6 +311,9 @@ func buildTablePlan(ti *Table, vschema *localVSchema, query string) (*Plan, erro
plan := &Plan{
Table: ti,
}
if err := plan.analyzeWhere(vschema, sel.Where); err != nil {
return nil, err
}
if err := plan.analyzeExprs(vschema, sel.SelectExprs); err != nil {
return nil, err
}
Expand All @@ -281,16 +322,6 @@ func buildTablePlan(ti *Table, vschema *localVSchema, query string) (*Plan, erro
return plan, nil
}

funcExpr, ok := sel.Where.Expr.(*sqlparser.FuncExpr)
if !ok {
return nil, fmt.Errorf("unsupported where clause: %v", sqlparser.String(sel.Where))
}
if !funcExpr.Name.EqualString("in_keyrange") {
return nil, fmt.Errorf("unsupported where clause: %v", sqlparser.String(sel.Where))
}
if err := plan.analyzeInKeyRange(vschema, funcExpr.Exprs); err != nil {
return nil, err
}
return plan, nil
}

Expand All @@ -317,6 +348,81 @@ func analyzeSelect(query string) (sel *sqlparser.Select, fromTable sqlparser.Tab
return sel, fromTable, nil
}

func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) error {
if where == nil {
return nil
}
exprs := splitAndExpression(nil, where.Expr)
for _, expr := range exprs {
switch expr := expr.(type) {
case *sqlparser.ComparisonExpr:
qualifiedName, ok := expr.Left.(*sqlparser.ColName)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
if !qualifiedName.Qualifier.IsEmpty() {
return fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(qualifiedName))
}
colnum, err := findColumn(plan.Table, qualifiedName.Name)
if err != nil {
return err
}
val, ok := expr.Right.(*sqlparser.SQLVal)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
//StrVal is varbinary, we do not support varchar since we would have to implement all collation types
if val.Type != sqlparser.IntVal && val.Type != sqlparser.StrVal {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
pv, err := sqlparser.NewPlanValue(val)
if err != nil {
return err
}
resolved, err := pv.ResolveValue(nil)
if err != nil {
return err
}
plan.Filters = append(plan.Filters, Filter{
Opcode: Equal,
ColNum: colnum,
Value: resolved,
})
case *sqlparser.FuncExpr:
if !expr.Name.EqualString("in_keyrange") {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil {
return err
}
default:
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
}
return nil
}

// splitAndExpression breaks up the Expr into AND-separated conditions
// and appends them to filters, which can be shuffled and recombined
// as needed.
func splitAndExpression(filters []sqlparser.Expr, node sqlparser.Expr) []sqlparser.Expr {
if node == nil {
return filters
}
switch node := node.(type) {
case *sqlparser.AndExpr:
filters = splitAndExpression(filters, node.Left)
return splitAndExpression(filters, node.Right)
case *sqlparser.ParenExpr:
// If the inner expression is AndExpr, then we can remove
// the parenthesis because they are unnecessary.
if node, ok := node.Expr.(*sqlparser.AndExpr); ok {
return splitAndExpression(filters, node)
}
}
return append(filters, node)
}

func (plan *Plan) analyzeExprs(vschema *localVSchema, selExprs sqlparser.SelectExprs) error {
if _, ok := selExprs[0].(*sqlparser.StarExpr); !ok {
for _, expr := range selExprs {
Expand Down Expand Up @@ -395,14 +501,17 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp
func (plan *Plan) analyzeInKeyRange(vschema *localVSchema, exprs sqlparser.SelectExprs) error {
var colnames []sqlparser.ColIdent
var krExpr sqlparser.SelectExpr
whereFilter := Filter{
Opcode: VindexMatch,
}
switch {
case len(exprs) == 1:
cv, err := vschema.FindColVindex(plan.Table.Name)
if err != nil {
return err
}
colnames = cv.Columns
plan.Vindex = cv.Vindex
whereFilter.Vindex = cv.Vindex
krExpr = exprs[0]
case len(exprs) >= 3:
for _, expr := range exprs[:len(exprs)-2] {
Expand All @@ -424,11 +533,11 @@ func (plan *Plan) analyzeInKeyRange(vschema *localVSchema, exprs sqlparser.Selec
if err != nil {
return err
}
plan.Vindex, err = vschema.FindOrCreateVindex(vtype)
whereFilter.Vindex, err = vschema.FindOrCreateVindex(vtype)
if err != nil {
return err
}
if !plan.Vindex.IsUnique() {
if !whereFilter.Vindex.IsUnique() {
return fmt.Errorf("vindex must be Unique to be used for VReplication: %s", vtype)
}

Expand All @@ -437,7 +546,7 @@ func (plan *Plan) analyzeInKeyRange(vschema *localVSchema, exprs sqlparser.Selec
return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(exprs))
}
var err error
plan.VindexColumns, err = buildVindexColumns(plan.Table, colnames)
whereFilter.VindexColumns, err = buildVindexColumns(plan.Table, colnames)
if err != nil {
return err
}
Expand All @@ -452,7 +561,8 @@ func (plan *Plan) analyzeInKeyRange(vschema *localVSchema, exprs sqlparser.Selec
if len(keyranges) != 1 {
return fmt.Errorf("unexpected in_keyrange parameter: %v", sqlparser.String(krExpr))
}
plan.KeyRange = keyranges[0]
whereFilter.KeyRange = keyranges[0]
plan.Filters = append(plan.Filters, whereFilter)
return nil
}

Expand Down
Loading