diff --git a/go/mysql/binlog_event_rbr.go b/go/mysql/binlog_event_rbr.go index e29e3f0fabe..89e87c19091 100644 --- a/go/mysql/binlog_event_rbr.go +++ b/go/mysql/binlog_event_rbr.go @@ -447,15 +447,20 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ return sqltypes.MakeTrusted(querypb.Type_DATETIME, []byte(fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second))), 8, nil case TypeVarchar, TypeVarString: + // We trust that styp is compatible with the column type // Length is encoded in 1 or 2 bytes. + typeToUse := querypb.Type_VARCHAR + if styp == querypb.Type_VARBINARY || styp == querypb.Type_BINARY || styp == querypb.Type_BLOB { + typeToUse = styp + } if metadata > 255 { l := int(uint64(data[pos]) | uint64(data[pos+1])<<8) - return sqltypes.MakeTrusted(querypb.Type_VARCHAR, + return sqltypes.MakeTrusted(typeToUse, data[pos+2:pos+2+l]), l + 2, nil } l := int(data[pos]) - return sqltypes.MakeTrusted(querypb.Type_VARCHAR, + return sqltypes.MakeTrusted(typeToUse, data[pos+1:pos+1+l]), l + 1, nil case TypeBit: // The contents is just the bytes, quoted. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go index dd95e70912a..5abc68b0b3c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go @@ -135,6 +135,10 @@ func TestPlayerFilters(t *testing.T) { "create table no(id int, val varbinary(128), primary key(id))", "create table nopk(id int, val varbinary(128))", fmt.Sprintf("create table %s.nopk(id int, val varbinary(128))", vrepldb), + "create table src4(id1 int, id2 int, val varbinary(128), primary key(id1))", + fmt.Sprintf("create table %s.dst4(id1 int, val varbinary(128), primary key(id1))", vrepldb), + "create table src5(id1 int, id2 int, val varbinary(128), primary key(id1))", + fmt.Sprintf("create table %s.dst5(id1 int, val varbinary(128), primary key(id1))", vrepldb), }) defer execStatements(t, []string{ "drop table src1", @@ -148,6 +152,10 @@ func TestPlayerFilters(t *testing.T) { "drop table no", "drop table nopk", fmt.Sprintf("drop table %s.nopk", vrepldb), + "drop table src4", + fmt.Sprintf("drop table %s.dst4", vrepldb), + "drop table src5", + fmt.Sprintf("drop table %s.dst5", vrepldb), }) env.SchemaEngine.Reload(context.Background()) @@ -165,6 +173,12 @@ func TestPlayerFilters(t *testing.T) { Match: "/yes", }, { Match: "/nopk", + }, { + Match: "dst4", + Filter: "select id1, val from src4 where id2 = 100", + }, { + Match: "dst5", + Filter: "select id1, val from src5 where val = 'abc'", }}, } bls := &binlogdatapb.BinlogSource{ @@ -386,6 +400,30 @@ func TestPlayerFilters(t *testing.T) { }, table: "nopk", data: [][]string{}, + }, { + // filter by int + input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')", + output: []string{ + "begin", + "insert into dst4(id1,val) values (1,'aaa')", + "insert into dst4(id1,val) values (3,'ccc')", + "/update _vt.vreplication set pos=", + "commit", + }, + table: "dst4", + data: [][]string{{"1", "aaa"}, {"3", "ccc"}}, + }, { + // filter by int + input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')", + output: []string{ + "begin", + "insert into dst5(id1,val) values (1,'abc')", + "insert into dst5(id1,val) values (4,'abc')", + "/update _vt.vreplication set pos=", + "commit", + }, + table: "dst5", + data: [][]string{{"1", "abc"}, {"4", "abc"}}, }} for _, tcase := range testcases { diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index bc5083f5217..901d8064cf5 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -35,10 +35,33 @@ import ( // Plan represents the plan for a table. type Plan struct { Table *Table + // ColExprs is the list of column expressions to be sent // in the stream. ColExprs []ColExpr + // Filters is the list of filters to be applied to the columns + // of the table. + Filters []Filter +} + +// Opcode enumerates the operators supported in a where clause +type Opcode int + +const ( + // Equal is used to filter an integer column on a specific value + Equal = Opcode(iota) + // VindexMatch is used for an in_keyrange() construct + VindexMatch +) + +// Filter contains opcodes for filtering. +type Filter struct { + Opcode Opcode + ColNum int + Value sqltypes.Value + + // Parameters for VindexMatch. // Vindex, VindexColumns and KeyRange, if set, will be used // to filter the row. // VindexColumns contains the column numbers of the table, @@ -89,13 +112,24 @@ func (plan *Plan) fields() []*querypb.Field { // filter filters the row against the plan. It returns false if the row did not match. // If the row matched, it returns the columns to be sent. func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error) { - if plan.Vindex != nil { - ksid, err := getKeyspaceID(values, plan.Vindex, plan.VindexColumns) - if err != nil { - return false, nil, err - } - if !key.KeyRangeContains(plan.KeyRange, ksid) { - return false, nil, nil + for _, filter := range plan.Filters { + switch filter.Opcode { + case Equal: + result, err := sqltypes.NullsafeCompare(values[filter.ColNum], filter.Value) + if err != nil { + return false, nil, err + } + if result != 0 { + return false, nil, nil + } + case VindexMatch: + ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns) + if err != nil { + return false, nil, err + } + if !key.KeyRangeContains(filter.KeyRange, ksid) { + return false, nil, nil + } } } @@ -241,8 +275,11 @@ func buildREPlan(ti *Table, vschema *localVSchema, filter string) (*Plan, error) if err != nil { return nil, err } - plan.Vindex = cv.Vindex - plan.VindexColumns, err = buildVindexColumns(plan.Table, cv.Columns) + whereFilter := Filter{ + Opcode: VindexMatch, + Vindex: cv.Vindex, + } + whereFilter.VindexColumns, err = buildVindexColumns(plan.Table, cv.Columns) if err != nil { return nil, err } @@ -255,7 +292,8 @@ func buildREPlan(ti *Table, vschema *localVSchema, filter string) (*Plan, error) if len(keyranges) != 1 { return nil, fmt.Errorf("error parsing keyrange: %v", filter) } - plan.KeyRange = keyranges[0] + whereFilter.KeyRange = keyranges[0] + plan.Filters = append(plan.Filters, whereFilter) return plan, nil } @@ -273,6 +311,9 @@ func buildTablePlan(ti *Table, vschema *localVSchema, query string) (*Plan, erro plan := &Plan{ Table: ti, } + if err := plan.analyzeWhere(vschema, sel.Where); err != nil { + return nil, err + } if err := plan.analyzeExprs(vschema, sel.SelectExprs); err != nil { return nil, err } @@ -281,16 +322,6 @@ func buildTablePlan(ti *Table, vschema *localVSchema, query string) (*Plan, erro return plan, nil } - funcExpr, ok := sel.Where.Expr.(*sqlparser.FuncExpr) - if !ok { - return nil, fmt.Errorf("unsupported where clause: %v", sqlparser.String(sel.Where)) - } - if !funcExpr.Name.EqualString("in_keyrange") { - return nil, fmt.Errorf("unsupported where clause: %v", sqlparser.String(sel.Where)) - } - if err := plan.analyzeInKeyRange(vschema, funcExpr.Exprs); err != nil { - return nil, err - } return plan, nil } @@ -317,6 +348,81 @@ func analyzeSelect(query string) (sel *sqlparser.Select, fromTable sqlparser.Tab return sel, fromTable, nil } +func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) error { + if where == nil { + return nil + } + exprs := splitAndExpression(nil, where.Expr) + for _, expr := range exprs { + switch expr := expr.(type) { + case *sqlparser.ComparisonExpr: + qualifiedName, ok := expr.Left.(*sqlparser.ColName) + if !ok { + return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + if !qualifiedName.Qualifier.IsEmpty() { + return fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(qualifiedName)) + } + colnum, err := findColumn(plan.Table, qualifiedName.Name) + if err != nil { + return err + } + val, ok := expr.Right.(*sqlparser.SQLVal) + if !ok { + return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + //StrVal is varbinary, we do not support varchar since we would have to implement all collation types + if val.Type != sqlparser.IntVal && val.Type != sqlparser.StrVal { + return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + pv, err := sqlparser.NewPlanValue(val) + if err != nil { + return err + } + resolved, err := pv.ResolveValue(nil) + if err != nil { + return err + } + plan.Filters = append(plan.Filters, Filter{ + Opcode: Equal, + ColNum: colnum, + Value: resolved, + }) + case *sqlparser.FuncExpr: + if !expr.Name.EqualString("in_keyrange") { + return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) + } + if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil { + return err + } + default: + return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) + } + } + return nil +} + +// splitAndExpression breaks up the Expr into AND-separated conditions +// and appends them to filters, which can be shuffled and recombined +// as needed. +func splitAndExpression(filters []sqlparser.Expr, node sqlparser.Expr) []sqlparser.Expr { + if node == nil { + return filters + } + switch node := node.(type) { + case *sqlparser.AndExpr: + filters = splitAndExpression(filters, node.Left) + return splitAndExpression(filters, node.Right) + case *sqlparser.ParenExpr: + // If the inner expression is AndExpr, then we can remove + // the parenthesis because they are unnecessary. + if node, ok := node.Expr.(*sqlparser.AndExpr); ok { + return splitAndExpression(filters, node) + } + } + return append(filters, node) +} + func (plan *Plan) analyzeExprs(vschema *localVSchema, selExprs sqlparser.SelectExprs) error { if _, ok := selExprs[0].(*sqlparser.StarExpr); !ok { for _, expr := range selExprs { @@ -395,6 +501,9 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp func (plan *Plan) analyzeInKeyRange(vschema *localVSchema, exprs sqlparser.SelectExprs) error { var colnames []sqlparser.ColIdent var krExpr sqlparser.SelectExpr + whereFilter := Filter{ + Opcode: VindexMatch, + } switch { case len(exprs) == 1: cv, err := vschema.FindColVindex(plan.Table.Name) @@ -402,7 +511,7 @@ func (plan *Plan) analyzeInKeyRange(vschema *localVSchema, exprs sqlparser.Selec return err } colnames = cv.Columns - plan.Vindex = cv.Vindex + whereFilter.Vindex = cv.Vindex krExpr = exprs[0] case len(exprs) >= 3: for _, expr := range exprs[:len(exprs)-2] { @@ -424,11 +533,11 @@ func (plan *Plan) analyzeInKeyRange(vschema *localVSchema, exprs sqlparser.Selec if err != nil { return err } - plan.Vindex, err = vschema.FindOrCreateVindex(vtype) + whereFilter.Vindex, err = vschema.FindOrCreateVindex(vtype) if err != nil { return err } - if !plan.Vindex.IsUnique() { + if !whereFilter.Vindex.IsUnique() { return fmt.Errorf("vindex must be Unique to be used for VReplication: %s", vtype) } @@ -437,7 +546,7 @@ func (plan *Plan) analyzeInKeyRange(vschema *localVSchema, exprs sqlparser.Selec return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(exprs)) } var err error - plan.VindexColumns, err = buildVindexColumns(plan.Table, colnames) + whereFilter.VindexColumns, err = buildVindexColumns(plan.Table, colnames) if err != nil { return err } @@ -452,7 +561,8 @@ func (plan *Plan) analyzeInKeyRange(vschema *localVSchema, exprs sqlparser.Selec if len(keyranges) != 1 { return fmt.Errorf("unexpected in_keyrange parameter: %v", sqlparser.String(krExpr)) } - plan.KeyRange = keyranges[0] + whereFilter.KeyRange = keyranges[0] + plan.Filters = append(plan.Filters, whereFilter) return nil } diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go index b49f6ca1151..05f7acca0b7 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go @@ -245,7 +245,14 @@ func TestPlanbuilder(t *testing.T) { Alias: sqlparser.NewColIdent("val"), Type: sqltypes.VarBinary, }}, - VindexColumns: []int{0}, + Filters: []Filter{{ + Opcode: VindexMatch, + ColNum: 0, + Value: sqltypes.NULL, + Vindex: nil, + VindexColumns: []int{0}, + KeyRange: nil, + }}, }, }, { inTable: t1, @@ -302,7 +309,14 @@ func TestPlanbuilder(t *testing.T) { Alias: sqlparser.NewColIdent("id"), Type: sqltypes.Int64, }}, - VindexColumns: []int{0}, + Filters: []Filter{{ + Opcode: VindexMatch, + ColNum: 0, + Value: sqltypes.NULL, + Vindex: nil, + VindexColumns: []int{0}, + KeyRange: nil, + }}, }, }, { inTable: t1, @@ -317,7 +331,36 @@ func TestPlanbuilder(t *testing.T) { Alias: sqlparser.NewColIdent("id"), Type: sqltypes.Int64, }}, - VindexColumns: []int{0}, + Filters: []Filter{{ + Opcode: VindexMatch, + ColNum: 0, + Value: sqltypes.NULL, + Vindex: nil, + VindexColumns: []int{0}, + KeyRange: nil, + }}, + }, + }, { + inTable: t1, + inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select val, id from t1 where id = 1"}, + outPlan: &Plan{ + ColExprs: []ColExpr{{ + ColNum: 1, + Alias: sqlparser.NewColIdent("val"), + Type: sqltypes.VarBinary, + }, { + ColNum: 0, + Alias: sqlparser.NewColIdent("id"), + Type: sqltypes.Int64, + }}, + Filters: []Filter{{ + Opcode: Equal, + ColNum: 0, + Value: sqltypes.NewInt64(1), + Vindex: nil, + VindexColumns: nil, + KeyRange: nil, + }}, }, }, { inTable: t2, @@ -335,7 +378,14 @@ func TestPlanbuilder(t *testing.T) { Alias: sqlparser.NewColIdent("id"), Type: sqltypes.Int64, }}, - VindexColumns: []int{0, 1}, + Filters: []Filter{{ + Opcode: VindexMatch, + ColNum: 0, + Value: sqltypes.NULL, + Vindex: nil, + VindexColumns: []int{0, 1}, + KeyRange: nil, + }}, }, }, { inTable: regional, @@ -400,14 +450,10 @@ func TestPlanbuilder(t *testing.T) { inTable: t1, inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select *, id from t1"}, outErr: `unsupported: *, id`, - }, { - inTable: t1, - inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where id=1"}, - outErr: `unsupported where clause: where id = 1`, }, { inTable: t1, inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where max(id)"}, - outErr: `unsupported where clause: where max(id)`, + outErr: `unsupported constraint: max(id)`, }, { inTable: t1, inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(id)"}, @@ -463,15 +509,20 @@ func TestPlanbuilder(t *testing.T) { inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(id, 1+1, '-80')"}, outErr: `unsupported: 1 + 1`, }} - for _, tcase := range testcases { plan, err := buildPlan(tcase.inTable, testLocalVSchema, &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{tcase.inRule}, }) if plan != nil { plan.Table = nil - plan.Vindex = nil - plan.KeyRange = nil + for ind := range plan.Filters { + plan.Filters[ind].KeyRange = nil + if plan.Filters[ind]. + Opcode == VindexMatch { + plan.Filters[ind].Value = sqltypes.NULL + } + plan.Filters[ind].Vindex = nil + } if !reflect.DeepEqual(tcase.outPlan, plan) { t.Errorf("Plan(%v, %v):\n%v, want\n%v", tcase.inTable, tcase.inRule, plan, tcase.outPlan) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go index ab294562dde..01056c893e3 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go @@ -210,6 +210,64 @@ func TestStreamRowsKeyRange(t *testing.T) { checkStream(t, "select * from t1 where in_keyrange('-80')", nil, wantQuery, wantStream) } +func TestStreamRowsFilterInt(t *testing.T) { + if testing.Short() { + t.Skip() + } + + if err := env.SetVSchema(shardedVSchema); err != nil { + t.Fatal(err) + } + defer env.SetVSchema("{}") + + execStatements(t, []string{ + "create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))", + "insert into t1 values (1, 100, 'aaa'), (2, 200, 'bbb'), (3, 200, 'ccc'), (4, 100, 'ddd'), (5, 200, 'eee')", + }) + defer execStatements(t, []string{ + "drop table t1", + }) + engine.se.Reload(context.Background()) + + time.Sleep(1 * time.Second) + + wantStream := []string{ + `fields: fields: pkfields: `, + `rows: rows: lastpk: `, + } + wantQuery := "select id1, id2, val from t1 order by id1" + checkStream(t, "select id1, val from t1 where id2 = 100", nil, wantQuery, wantStream) +} + +func TestStreamRowsFilterVarBinary(t *testing.T) { + if testing.Short() { + t.Skip() + } + + if err := env.SetVSchema(shardedVSchema); err != nil { + t.Fatal(err) + } + defer env.SetVSchema("{}") + + execStatements(t, []string{ + "create table t1(id1 int, val varbinary(128), primary key(id1))", + "insert into t1 values (1,'kepler'), (2, 'newton'), (3, 'newton'), (4, 'kepler'), (5, 'newton'), (6, 'kepler')", + }) + defer execStatements(t, []string{ + "drop table t1", + }) + engine.se.Reload(context.Background()) + + time.Sleep(1 * time.Second) + + wantStream := []string{ + `fields: fields: pkfields: `, + `rows: rows: rows: lastpk: `, + } + wantQuery := "select id1, val from t1 order by id1" + checkStream(t, "select id1, val from t1 where val = 'newton'", nil, wantQuery, wantStream) +} + func TestStreamRowsMultiPacket(t *testing.T) { if testing.Short() { t.Skip() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 5b9edf50860..98cb8e2ddc1 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -37,6 +37,112 @@ type testcase struct { output [][]string } +func TestFilteredVarBinary(t *testing.T) { + if testing.Short() { + t.Skip() + } + + execStatements(t, []string{ + "create table t1(id1 int, val varbinary(128), primary key(id1))", + }) + defer execStatements(t, []string{ + "drop table t1", + }) + engine.se.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select id1, val from t1 where val = 'newton'", + }}, + } + + testcases := []testcase{{ + input: []string{ + "begin", + "insert into t1 values (1, 'kepler')", + "insert into t1 values (2, 'newton')", + "insert into t1 values (3, 'newton')", + "insert into t1 values (4, 'kepler')", + "insert into t1 values (5, 'newton')", + "update t1 set val = 'newton' where id1 = 1", + "update t1 set val = 'kepler' where id1 = 2", + "update t1 set val = 'newton' where id1 = 2", + "update t1 set val = 'kepler' where id1 = 1", + "delete from t1 where id1 in (2,3)", + "commit", + }, + output: [][]string{{ + `begin`, + `type:FIELD field_event: fields: > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > row_changes: > > `, + `gtid`, + `commit`, + }}, + }} + runCases(t, filter, testcases, "") +} + +func TestFilteredInt(t *testing.T) { + if testing.Short() { + t.Skip() + } + + execStatements(t, []string{ + "create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))", + }) + defer execStatements(t, []string{ + "drop table t1", + }) + engine.se.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select id1, val from t1 where id2 = 200", + }}, + } + + testcases := []testcase{{ + input: []string{ + "begin", + "insert into t1 values (1, 100, 'aaa')", + "insert into t1 values (2, 200, 'bbb')", + "insert into t1 values (3, 100, 'ccc')", + "insert into t1 values (4, 200, 'ddd')", + "insert into t1 values (5, 200, 'eee')", + "update t1 set val = 'newddd' where id1 = 4", + "update t1 set id2 = 200 where id1 = 1", + "update t1 set id2 = 100 where id1 = 2", + "update t1 set id2 = 100 where id1 = 1", + "update t1 set id2 = 200 where id1 = 2", + "commit", + }, + output: [][]string{{ + `begin`, + `type:FIELD field_event: fields: > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: after: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `gtid`, + `commit`, + }}, + }} + runCases(t, filter, testcases, "") +} + func TestStatements(t *testing.T) { if testing.Short() { t.Skip() @@ -1135,6 +1241,52 @@ func TestNoFutureGTID(t *testing.T) { } } +func TestFilteredMultipleWhere(t *testing.T) { + if testing.Short() { + t.Skip() + } + + execStatements(t, []string{ + "create table t1(id1 int, id2 int, id3 int, val varbinary(128), primary key(id1))", + }) + defer execStatements(t, []string{ + "drop table t1", + }) + engine.se.Reload(context.Background()) + + setVSchema(t, shardedVSchema) + defer env.SetVSchema("{}") + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select id1, val from t1 where in_keyrange('-80') and id2 = 200 and id3 = 1000 and val = 'newton'", + }}, + } + + testcases := []testcase{{ + input: []string{ + "begin", + "insert into t1 values (1, 100, 1000, 'kepler')", + "insert into t1 values (2, 200, 1000, 'newton')", + "insert into t1 values (3, 100, 2000, 'kepler')", + "insert into t1 values (128, 200, 1000, 'newton')", + "insert into t1 values (5, 200, 2000, 'kepler')", + "insert into t1 values (129, 200, 1000, 'kepler')", + "commit", + }, + output: [][]string{{ + `begin`, + `type:FIELD field_event: fields: > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `gtid`, + `commit`, + }}, + }} + runCases(t, filter, testcases, "") +} + func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string) { t.Helper() ctx, cancel := context.WithCancel(context.Background())