From 0efd023e46ed4ab150938740090a3eae9a3a4e69 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 27 Apr 2021 19:42:30 +0200 Subject: [PATCH 1/3] Pad binlog values for binary() columns to match the value returned by a select query. This also ensures that if such columns are used as sharding keys we get the same keyspace_id Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/config.go | 22 ++++++++++++++----- go/test/endtoend/vreplication/helper.go | 3 +++ .../vreplication/unsharded_init_data.sql | 4 ++++ .../vreplication/vreplication_test.go | 7 ++++-- .../vreplication/vreplication_test_env.go | 16 ++++++++------ .../tabletserver/vstreamer/planbuilder.go | 22 ++++++++++++++----- .../tabletserver/vstreamer/vstreamer.go | 2 ++ 7 files changed, 57 insertions(+), 19 deletions(-) diff --git a/go/test/endtoend/vreplication/config.go b/go/test/endtoend/vreplication/config.go index 6d73e4db255..214876a2050 100644 --- a/go/test/endtoend/vreplication/config.go +++ b/go/test/endtoend/vreplication/config.go @@ -10,6 +10,7 @@ create table orders(oid int, cid int, pid int, mname varchar(128), price int, pr create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; create table customer2(cid int, name varbinary(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid)); create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; +create table tenant(tenant_id binary(16), name varbinary(16), primary key (tenant_id)); ` initialProductVSchema = ` @@ -28,7 +29,8 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id) }, "order_seq": { "type": "sequence" - } + }, + "tenant": {} } } ` @@ -39,9 +41,12 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id) "vindexes": { "reverse_bits": { "type": "reverse_bits" - } + }, + "binary_md5": { + "type": "binary_md5" + } }, - "tables": { + "tables": { "customer": { "column_vindexes": [ { @@ -65,9 +70,16 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id) "column": "cid", "sequence": "customer_seq2" } - } + }, + "tenant": { + "column_vindexes": [ + { + "column": "tenant_id", + "name": "binary_md5" + } + ] + } } - } ` merchantVSchema = ` diff --git a/go/test/endtoend/vreplication/helper.go b/go/test/endtoend/vreplication/helper.go index 0fea60f0d83..15726e2dd36 100644 --- a/go/test/endtoend/vreplication/helper.go +++ b/go/test/endtoend/vreplication/helper.go @@ -24,6 +24,9 @@ import ( func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines string) { queries := strings.Split(lines, "\n") for _, query := range queries { + if strings.HasPrefix(query, "--") { + continue + } execVtgateQuery(t, conn, database, string(query)) } } diff --git a/go/test/endtoend/vreplication/unsharded_init_data.sql b/go/test/endtoend/vreplication/unsharded_init_data.sql index 15e2405eb0a..1b58404cfb7 100644 --- a/go/test/endtoend/vreplication/unsharded_init_data.sql +++ b/go/test/endtoend/vreplication/unsharded_init_data.sql @@ -11,3 +11,7 @@ insert into orders(oid, cid, mname, pid, price) values(3, 2, 'monoprice', 2, 20) insert into customer2(cid, name, typ, sport) values(1, 'john',1,'football,baseball'); insert into customer2(cid, name, typ, sport) values(2, 'paul','soho','cricket'); insert into customer2(cid, name, typ, sport) values(3, 'ringo','enterprise',''); +-- for testing edge case where inserted binary value is 15 bytes, field is 16, mysql adds a null while storing but binlog returns 15 bytes +insert into tenant(tenant_id, name) values (x'02BD00987932461E8820C908E84BAE', 'abc'); + + diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 8fb71792d38..f4998cd3d79 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -108,7 +108,9 @@ func TestBasicVreplicationWorkflow(t *testing.T) { materializeRollup(t) shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName) - + // the tenant table was to test a specific case with binary sharding keys. Drop it now so that we don't + // have to update the rest of the tests + execVtgateQuery(t, vtgateConn, "customer", "drop table tenant") validateRollupReplicates(t) shardOrders(t) shardMerchant(t) @@ -249,7 +251,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl t.Fatal(err) } - tables := "customer" + tables := "customer,tenant" moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables) // Assume we are operating on first cell @@ -267,6 +269,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')" matchInsertQuery1 := "insert into customer(cid, `name`) values (:vtg1, :vtg2)" require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1)) + execVtgateQuery(t, vtgateConn, "product", "update tenant set name='xyz'") vdiff(t, ksWorkflow, "") switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard) switchReads(t, allCellNames, ksWorkflow) diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index e618432a7ef..4c53f5104b8 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -19,14 +19,14 @@ package vreplication var dryRunResultsSwitchWritesCustomerShard = []string{ "Lock keyspace product", "Lock keyspace customer", - "Stop writes on keyspace product, tables [customer]:", + "Stop writes on keyspace product, tables [customer,tenant]:", "/ Keyspace product, Shard 0 at Position", "Wait for VReplication on stopped streams to catchup for upto 30s", "Create reverse replication workflow p2c_reverse", "Create journal entries on source databases", - "Enable writes on keyspace customer tables [customer]", + "Enable writes on keyspace customer tables [customer,tenant]", "Switch routing from keyspace product to keyspace customer", - "Routing rules for tables [customer] will be updated", + "Routing rules for tables [customer,tenant] will be updated", "SwitchWrites completed, freeze and delete vreplication streams on:", " tablet 200 ", " tablet 300 ", @@ -41,8 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ var dryRunResultsReadCustomerShard = []string{ "Lock keyspace product", - "Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA,RDONLY]", - "Routing rules for tables [customer] will be updated", + "Switch reads for tables [customer,tenant] to keyspace customer for tablet types [REPLICA,RDONLY]", + "Routing rules for tables [customer,tenant] will be updated", "Unlock keyspace product", } @@ -91,7 +91,8 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{ "Lock keyspace customer", "Dropping these tables from the database and removing them from the vschema for keyspace product:", " Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer", - "Blacklisted tables [customer] will be removed from:", + " Keyspace product Shard 0 DbName vt_product Tablet 100 Table tenant", + "Blacklisted tables [customer,tenant] will be removed from:", " Keyspace product Shard 0 Tablet 100", "Delete reverse vreplication streams on source:", " Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100", @@ -108,7 +109,8 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{ "Lock keyspace customer", "Renaming these tables from the database and removing them from the vschema for keyspace product:", " Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer", - "Blacklisted tables [customer] will be removed from:", + " Keyspace product Shard 0 DbName vt_product Tablet 100 Table tenant", + "Blacklisted tables [customer,tenant] will be removed from:", " Keyspace product Shard 0 Tablet 100", "Delete reverse vreplication streams on source:", " Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100", diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 10afd1e83e4..6da6cf94307 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -130,7 +130,7 @@ func (plan *Plan) filter(values, result []sqltypes.Value) (bool, error) { return false, nil } case VindexMatch: - ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns) + ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns, plan.Table.Fields) if err != nil { return false, err } @@ -150,7 +150,7 @@ func (plan *Plan) filter(values, result []sqltypes.Value) (bool, error) { if colExpr.Vindex == nil { result[i] = values[colExpr.ColNum] } else { - ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns) + ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns, plan.Table.Fields) if err != nil { return false, err } @@ -160,10 +160,22 @@ func (plan *Plan) filter(values, result []sqltypes.Value) (bool, error) { return true, nil } -func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int) (key.DestinationKeyspaceID, error) { +func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int, fields []*querypb.Field) (key.DestinationKeyspaceID, error) { vindexValues := make([]sqltypes.Value, 0, len(vindexColumns)) - for _, col := range vindexColumns { - vindexValues = append(vindexValues, values[col]) + for colNum, col := range vindexColumns { + // For binary(n) column types, mysql pads the data on the right with nulls. However the binlog event contains + // the data without this padding. In particular, this causes an issue if a binary(n) column is part of the + // sharding key: the keyspace_id() returned during the copy phase (where the value is the result of a mysql query) + // is different from the one during replication (where the value is the one from the binlogs) + // Hence we need to add the padding here + value := values[col] + if fields[colNum].Type == querypb.Type_BINARY { + newValueBytes := make([]byte, int(fields[colNum].ColumnLength)) + copy(newValueBytes[:value.Len()], value.Raw()) + value = sqltypes.MakeTrusted(fields[colNum].Type, newValueBytes) + } + + vindexValues = append(vindexValues, value) } destinations, err := vindexes.Map(vindex, nil, [][]sqltypes.Value{vindexValues}) if err != nil { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index c1bb26c7987..1ad46f7e61f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -871,7 +871,9 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo return false, nil, err } pos += l + values[colNum] = value + valueIndex++ } filtered := make([]sqltypes.Value, len(plan.ColExprs)) From 91357c62a492785508c964b3da49c553fd73945a Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 3 May 2021 23:15:17 +0200 Subject: [PATCH 2/3] Pad binary() values in the binlog reader directly so that all consumers see the padded value instead of doing it later in vstreamer or doint it just for keyspace id computation Signed-off-by: Rohit Nayak --- go/mysql/binlog_event_rbr.go | 12 ++++++++++ .../vreplication/table_plan_builder.go | 14 ++---------- .../vreplication/vcopier_test.go | 2 +- .../vreplication/vplayer_flaky_test.go | 12 +++++----- .../tabletserver/vstreamer/planbuilder.go | 16 ++------------ .../tabletserver/vstreamer/vstreamer.go | 1 - .../tabletserver/vstreamer/vstreamer_test.go | 22 +++++++++---------- 7 files changed, 34 insertions(+), 45 deletions(-) diff --git a/go/mysql/binlog_event_rbr.go b/go/mysql/binlog_event_rbr.go index 7c2341a322a..906ccba5c39 100644 --- a/go/mysql/binlog_event_rbr.go +++ b/go/mysql/binlog_event_rbr.go @@ -895,6 +895,18 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ l := int(data[pos]) mdata := data[pos+1 : pos+1+l] if sqltypes.IsBinary(styp) { + // For binary(n) column types, mysql pads the data on the right with nulls. However the binlog event contains + // the data without this padding. This causes several issues: + // * if a binary(n) column is part of the sharding key, the keyspace_id() returned during the copy phase + // (where the value is the result of a mysql query) is different from the one during replication + // (where the value is the one from the binlogs) + // * mysql where clause comparisons do not do the right thing without padding + // So for fixed length binary() columns we right-pad it with nulls if necessary + if l < max { + paddedData := make([]byte, max) + copy(paddedData[:l], mdata) + mdata = paddedData + } return sqltypes.MakeTrusted(querypb.Type_BINARY, mdata), l + 1, nil } return sqltypes.MakeTrusted(querypb.Type_VARCHAR, mdata), l + 1, nil diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 9a9b9871106..66c57ca09d0 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -685,16 +685,6 @@ func (tpb *tablePlanBuilder) generateDeleteStatement() *sqlparser.ParsedQuery { return buf.ParsedQuery() } -// For binary(n) column types, the value in the where clause needs to be padded with nulls upto the length of the column -// for MySQL comparison to work properly. This is achieved by casting it to the column type -func castIfNecessary(buf *sqlparser.TrackedBuffer, cexpr *colExpr) { - if cexpr.dataType == "binary" { - buf.Myprintf("cast(%v as %s)", cexpr.expr, cexpr.columnType) - return - } - buf.Myprintf("%v", cexpr.expr) -} - func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) { buf.WriteString(" where ") bvf.mode = bvBefore @@ -702,11 +692,11 @@ func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bi for _, cexpr := range tpb.pkCols { if _, ok := cexpr.expr.(*sqlparser.ColName); ok { buf.Myprintf("%s%v=", separator, cexpr.colName) - castIfNecessary(buf, cexpr) + buf.Myprintf("%v", cexpr.expr) } else { // Parenthesize non-trivial expressions. buf.Myprintf("%s%v=(", separator, cexpr.colName) - castIfNecessary(buf, cexpr) + buf.Myprintf("%v", cexpr.expr) buf.Myprintf(")") } separator = " and " diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 95a15a4de08..bd68ed60046 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -119,7 +119,7 @@ func TestPlayerCopyCharPK(t *testing.T) { "/update _vt.vreplication set state='Copying'", "insert into dst(idc,val) values ('a\\0',1)", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - `update dst set val=3 where idc=cast('a' as binary(2)) and ('a') <= ('a\0')`, + `update dst set val=3 where idc='a\0' and ('a\0') <= ('a\0')`, "insert into dst(idc,val) values ('c\\0',2)", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, "/delete from _vt.copy_state.*dst", diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index a30a609d8ee..89a3d29a94d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -182,14 +182,14 @@ func TestCharPK(t *testing.T) { data [][]string }{{ //binary(2) input: "insert into t1 values(1, 'a')", - output: "insert into t1(id,val) values (1,'a')", + output: "insert into t1(id,val) values (1,'a\\0')", table: "t1", data: [][]string{ {"1", "a\000"}, }, }, { input: "update t1 set id = 2 where val = 'a\000'", - output: "update t1 set id=2 where val=cast('a' as binary(2))", + output: "update t1 set id=2 where val='a\\0'", table: "t1", data: [][]string{ {"2", "a\000"}, @@ -1321,16 +1321,16 @@ func TestPlayerTypes(t *testing.T) { fmt.Sprintf("create table %s.vitess_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, y year, primary key(tiny))", vrepldb), "create table vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))", fmt.Sprintf("create table %s.vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))", vrepldb), - "create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", - fmt.Sprintf("create table %s.vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", vrepldb), + "create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(5), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", + fmt.Sprintf("create table %s.vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(5), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", vrepldb), "create table vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))", fmt.Sprintf("create table %s.vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))", vrepldb), "create table vitess_null(id int, val varbinary(128), primary key(id))", fmt.Sprintf("create table %s.vitess_null(id int, val varbinary(128), primary key(id))", vrepldb), "create table src1(id int, val varbinary(128), primary key(id))", fmt.Sprintf("create table %s.src1(id int, val varbinary(128), primary key(id))", vrepldb), - "create table binary_pk(b binary(4), val varbinary(4), primary key(b))", - fmt.Sprintf("create table %s.binary_pk(b binary(4), val varbinary(4), primary key(b))", vrepldb), + "create table binary_pk(b binary(7), val varbinary(4), primary key(b))", + fmt.Sprintf("create table %s.binary_pk(b binary(7), val varbinary(4), primary key(b))", vrepldb), }) defer execStatements(t, []string{ "drop table vitess_ints", diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 6da6cf94307..eed93df1106 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -162,20 +162,8 @@ func (plan *Plan) filter(values, result []sqltypes.Value) (bool, error) { func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int, fields []*querypb.Field) (key.DestinationKeyspaceID, error) { vindexValues := make([]sqltypes.Value, 0, len(vindexColumns)) - for colNum, col := range vindexColumns { - // For binary(n) column types, mysql pads the data on the right with nulls. However the binlog event contains - // the data without this padding. In particular, this causes an issue if a binary(n) column is part of the - // sharding key: the keyspace_id() returned during the copy phase (where the value is the result of a mysql query) - // is different from the one during replication (where the value is the one from the binlogs) - // Hence we need to add the padding here - value := values[col] - if fields[colNum].Type == querypb.Type_BINARY { - newValueBytes := make([]byte, int(fields[colNum].ColumnLength)) - copy(newValueBytes[:value.Len()], value.Raw()) - value = sqltypes.MakeTrusted(fields[colNum].Type, newValueBytes) - } - - vindexValues = append(vindexValues, value) + for _, col := range vindexColumns { + vindexValues = append(vindexValues, values[col]) } destinations, err := vindexes.Map(vindex, nil, [][]sqltypes.Value{vindexValues}) if err != nil { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 1ad46f7e61f..a0c72010f21 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -873,7 +873,6 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo pos += l values[colNum] = value - valueIndex++ } filtered := make([]sqltypes.Value, len(plan.ColExprs)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index d15c4670716..90f9e851359 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -110,9 +110,9 @@ func TestSetAndEnum(t *testing.T) { output: [][]string{{ `begin`, fe.String(), - `type:ROW row_event: > > `, - `type:ROW row_event: > > `, - `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, `gtid`, `commit`, }}, @@ -133,8 +133,8 @@ func TestCellValuePadding(t *testing.T) { engine.se.Reload(context.Background()) queries := []string{ "begin", - "insert into t1 values (1, 'aaa')", - "insert into t1 values (2, 'bbb')", + "insert into t1 values (1, 'aaa\000')", + "insert into t1 values (2, 'bbb\000')", "update t1 set id = 11 where val = 'aaa\000'", "insert into t2 values (1, 'aaa')", "insert into t2 values (2, 'bbb')", @@ -147,9 +147,9 @@ func TestCellValuePadding(t *testing.T) { output: [][]string{{ `begin`, `type:FIELD field_event: fields: > `, - `type:ROW row_event: > > `, - `type:ROW row_event: > > `, - `type:ROW row_event: after: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: after: > > `, `type:FIELD field_event: fields: > `, `type:ROW row_event: > > `, `type:ROW row_event: > > `, @@ -1560,13 +1560,13 @@ func TestTypes(t *testing.T) { }, { // TODO(sougou): validate that binary and char data generate correct DMLs on the other end. input: []string{ - "insert into vitess_strings values('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'a', 'a,b')", + "insert into vitess_strings values('a', 'b', 'c', 'd\000\000\000', 'e', 'f', 'g', 'h', 'a', 'a,b')", }, output: [][]string{{ `begin`, `type:FIELD field_event: fields: fields: fields: fields: fields: fields: fields: fields: fields: > `, - `type:ROW row_event: > > `, + `type:ROW row_event: > > `, `gtid`, `commit`, }}, From 9f8baac4099d023a5873d74330b335067d25fb0d Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 4 May 2021 11:52:38 +0200 Subject: [PATCH 3/3] Fix tests Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/framework_test.go | 5 ++++- .../vreplication/vplayer_flaky_test.go | 14 +++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index f8a09bf3a41..a539c4deb03 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -121,12 +121,15 @@ func TestMain(m *testing.M) { playerEngine = NewTestEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, realDBClientFactory, vrepldb, externalConfig) playerEngine.Open(context.Background()) defer playerEngine.Close() - if err := env.Mysqld.ExecuteSuperQueryList(context.Background(), binlogplayer.CreateVReplicationTable()); err != nil { fmt.Fprintf(os.Stderr, "%v", err) return 1 } + for _, query := range binlogplayer.AlterVReplicationTable { + env.Mysqld.ExecuteSuperQuery(context.Background(), query) + } + if err := env.Mysqld.ExecuteSuperQuery(context.Background(), createCopyState); err != nil { fmt.Fprintf(os.Stderr, "%v", err) return 1 diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 89a3d29a94d..31498ad2a9c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1329,8 +1329,8 @@ func TestPlayerTypes(t *testing.T) { fmt.Sprintf("create table %s.vitess_null(id int, val varbinary(128), primary key(id))", vrepldb), "create table src1(id int, val varbinary(128), primary key(id))", fmt.Sprintf("create table %s.src1(id int, val varbinary(128), primary key(id))", vrepldb), - "create table binary_pk(b binary(7), val varbinary(4), primary key(b))", - fmt.Sprintf("create table %s.binary_pk(b binary(7), val varbinary(4), primary key(b))", vrepldb), + "create table binary_pk(b binary(4), val varbinary(4), primary key(b))", + fmt.Sprintf("create table %s.binary_pk(b binary(4), val varbinary(4), primary key(b))", vrepldb), }) defer execStatements(t, []string{ "drop table vitess_ints", @@ -1394,10 +1394,10 @@ func TestPlayerTypes(t *testing.T) { }, }, { input: "insert into vitess_strings values('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'a', 'a,b')", - output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d','e','f','g','h','1','3')", + output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d\\0\\0\\0\\0','e','f','g','h','1','3')", table: "vitess_strings", data: [][]string{ - {"a", "b", "c", "d\000\000\000", "e", "f", "g", "h", "a", "a,b"}, + {"a", "b", "c", "d\000\000\000\000", "e", "f", "g", "h", "a", "a,b"}, }, }, { input: "insert into vitess_misc values(1, '\x01', '2012-01-01', '2012-01-01 15:45:45', '15:45:45', point(1, 2))", @@ -1415,7 +1415,7 @@ func TestPlayerTypes(t *testing.T) { }, }, { input: "insert into binary_pk values('a', 'aaa')", - output: "insert into binary_pk(b,val) values ('a','aaa')", + output: "insert into binary_pk(b,val) values ('a\\0\\0\\0','aaa')", table: "binary_pk", data: [][]string{ {"a\000\000\000", "aaa"}, @@ -1423,10 +1423,10 @@ func TestPlayerTypes(t *testing.T) { }, { // Binary pk is a special case: https://github.com/vitessio/vitess/issues/3984 input: "update binary_pk set val='bbb' where b='a\\0\\0\\0'", - output: "update binary_pk set val='bbb' where b=cast('a' as binary(4))", + output: "update binary_pk set val='bbb' where b='a\\0\\0\\0'", table: "binary_pk", data: [][]string{ - {"a\x00\x00\x00", "bbb"}, + {"a\000\000\000", "bbb"}, }, }} if enableJSONColumnTesting {