diff --git a/go/mysql/binlog_event_rbr.go b/go/mysql/binlog_event_rbr.go index 7c2341a322a..906ccba5c39 100644 --- a/go/mysql/binlog_event_rbr.go +++ b/go/mysql/binlog_event_rbr.go @@ -895,6 +895,18 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ l := int(data[pos]) mdata := data[pos+1 : pos+1+l] if sqltypes.IsBinary(styp) { + // For binary(n) column types, mysql pads the data on the right with nulls. However the binlog event contains + // the data without this padding. This causes several issues: + // * if a binary(n) column is part of the sharding key, the keyspace_id() returned during the copy phase + // (where the value is the result of a mysql query) is different from the one during replication + // (where the value is the one from the binlogs) + // * mysql where clause comparisons do not do the right thing without padding + // So for fixed length binary() columns we right-pad it with nulls if necessary + if l < max { + paddedData := make([]byte, max) + copy(paddedData[:l], mdata) + mdata = paddedData + } return sqltypes.MakeTrusted(querypb.Type_BINARY, mdata), l + 1, nil } return sqltypes.MakeTrusted(querypb.Type_VARCHAR, mdata), l + 1, nil diff --git a/go/test/endtoend/vreplication/config.go b/go/test/endtoend/vreplication/config.go index 6d73e4db255..214876a2050 100644 --- a/go/test/endtoend/vreplication/config.go +++ b/go/test/endtoend/vreplication/config.go @@ -10,6 +10,7 @@ create table orders(oid int, cid int, pid int, mname varchar(128), price int, pr create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; create table customer2(cid int, name varbinary(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid)); create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; +create table tenant(tenant_id binary(16), name varbinary(16), primary key (tenant_id)); ` initialProductVSchema = ` @@ -28,7 +29,8 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id) }, "order_seq": { "type": "sequence" - } + }, + "tenant": {} } } ` @@ -39,9 +41,12 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id) "vindexes": { "reverse_bits": { "type": "reverse_bits" - } + }, + "binary_md5": { + "type": "binary_md5" + } }, - "tables": { + "tables": { "customer": { "column_vindexes": [ { @@ -65,9 +70,16 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id) "column": "cid", "sequence": "customer_seq2" } - } + }, + "tenant": { + "column_vindexes": [ + { + "column": "tenant_id", + "name": "binary_md5" + } + ] + } } - } ` merchantVSchema = ` diff --git a/go/test/endtoend/vreplication/helper.go b/go/test/endtoend/vreplication/helper.go index 0fea60f0d83..15726e2dd36 100644 --- a/go/test/endtoend/vreplication/helper.go +++ b/go/test/endtoend/vreplication/helper.go @@ -24,6 +24,9 @@ import ( func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines string) { queries := strings.Split(lines, "\n") for _, query := range queries { + if strings.HasPrefix(query, "--") { + continue + } execVtgateQuery(t, conn, database, string(query)) } } diff --git a/go/test/endtoend/vreplication/unsharded_init_data.sql b/go/test/endtoend/vreplication/unsharded_init_data.sql index 15e2405eb0a..1b58404cfb7 100644 --- a/go/test/endtoend/vreplication/unsharded_init_data.sql +++ b/go/test/endtoend/vreplication/unsharded_init_data.sql @@ -11,3 +11,7 @@ insert into orders(oid, cid, mname, pid, price) values(3, 2, 'monoprice', 2, 20) insert into customer2(cid, name, typ, sport) values(1, 'john',1,'football,baseball'); insert into customer2(cid, name, typ, sport) values(2, 'paul','soho','cricket'); insert into customer2(cid, name, typ, sport) values(3, 'ringo','enterprise',''); +-- for testing edge case where inserted binary value is 15 bytes, field is 16, mysql adds a null while storing but binlog returns 15 bytes +insert into tenant(tenant_id, name) values (x'02BD00987932461E8820C908E84BAE', 'abc'); + + diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 8fb71792d38..f4998cd3d79 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -108,7 +108,9 @@ func TestBasicVreplicationWorkflow(t *testing.T) { materializeRollup(t) shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName) - + // the tenant table was to test a specific case with binary sharding keys. Drop it now so that we don't + // have to update the rest of the tests + execVtgateQuery(t, vtgateConn, "customer", "drop table tenant") validateRollupReplicates(t) shardOrders(t) shardMerchant(t) @@ -249,7 +251,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl t.Fatal(err) } - tables := "customer" + tables := "customer,tenant" moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables) // Assume we are operating on first cell @@ -267,6 +269,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')" matchInsertQuery1 := "insert into customer(cid, `name`) values (:vtg1, :vtg2)" require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1)) + execVtgateQuery(t, vtgateConn, "product", "update tenant set name='xyz'") vdiff(t, ksWorkflow, "") switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard) switchReads(t, allCellNames, ksWorkflow) diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index e618432a7ef..4c53f5104b8 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -19,14 +19,14 @@ package vreplication var dryRunResultsSwitchWritesCustomerShard = []string{ "Lock keyspace product", "Lock keyspace customer", - "Stop writes on keyspace product, tables [customer]:", + "Stop writes on keyspace product, tables [customer,tenant]:", "/ Keyspace product, Shard 0 at Position", "Wait for VReplication on stopped streams to catchup for upto 30s", "Create reverse replication workflow p2c_reverse", "Create journal entries on source databases", - "Enable writes on keyspace customer tables [customer]", + "Enable writes on keyspace customer tables [customer,tenant]", "Switch routing from keyspace product to keyspace customer", - "Routing rules for tables [customer] will be updated", + "Routing rules for tables [customer,tenant] will be updated", "SwitchWrites completed, freeze and delete vreplication streams on:", " tablet 200 ", " tablet 300 ", @@ -41,8 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ var dryRunResultsReadCustomerShard = []string{ "Lock keyspace product", - "Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA,RDONLY]", - "Routing rules for tables [customer] will be updated", + "Switch reads for tables [customer,tenant] to keyspace customer for tablet types [REPLICA,RDONLY]", + "Routing rules for tables [customer,tenant] will be updated", "Unlock keyspace product", } @@ -91,7 +91,8 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{ "Lock keyspace customer", "Dropping these tables from the database and removing them from the vschema for keyspace product:", " Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer", - "Blacklisted tables [customer] will be removed from:", + " Keyspace product Shard 0 DbName vt_product Tablet 100 Table tenant", + "Blacklisted tables [customer,tenant] will be removed from:", " Keyspace product Shard 0 Tablet 100", "Delete reverse vreplication streams on source:", " Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100", @@ -108,7 +109,8 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{ "Lock keyspace customer", "Renaming these tables from the database and removing them from the vschema for keyspace product:", " Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer", - "Blacklisted tables [customer] will be removed from:", + " Keyspace product Shard 0 DbName vt_product Tablet 100 Table tenant", + "Blacklisted tables [customer,tenant] will be removed from:", " Keyspace product Shard 0 Tablet 100", "Delete reverse vreplication streams on source:", " Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100", diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index f8a09bf3a41..a539c4deb03 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -121,12 +121,15 @@ func TestMain(m *testing.M) { playerEngine = NewTestEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, realDBClientFactory, vrepldb, externalConfig) playerEngine.Open(context.Background()) defer playerEngine.Close() - if err := env.Mysqld.ExecuteSuperQueryList(context.Background(), binlogplayer.CreateVReplicationTable()); err != nil { fmt.Fprintf(os.Stderr, "%v", err) return 1 } + for _, query := range binlogplayer.AlterVReplicationTable { + env.Mysqld.ExecuteSuperQuery(context.Background(), query) + } + if err := env.Mysqld.ExecuteSuperQuery(context.Background(), createCopyState); err != nil { fmt.Fprintf(os.Stderr, "%v", err) return 1 diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 9a9b9871106..66c57ca09d0 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -685,16 +685,6 @@ func (tpb *tablePlanBuilder) generateDeleteStatement() *sqlparser.ParsedQuery { return buf.ParsedQuery() } -// For binary(n) column types, the value in the where clause needs to be padded with nulls upto the length of the column -// for MySQL comparison to work properly. This is achieved by casting it to the column type -func castIfNecessary(buf *sqlparser.TrackedBuffer, cexpr *colExpr) { - if cexpr.dataType == "binary" { - buf.Myprintf("cast(%v as %s)", cexpr.expr, cexpr.columnType) - return - } - buf.Myprintf("%v", cexpr.expr) -} - func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) { buf.WriteString(" where ") bvf.mode = bvBefore @@ -702,11 +692,11 @@ func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bi for _, cexpr := range tpb.pkCols { if _, ok := cexpr.expr.(*sqlparser.ColName); ok { buf.Myprintf("%s%v=", separator, cexpr.colName) - castIfNecessary(buf, cexpr) + buf.Myprintf("%v", cexpr.expr) } else { // Parenthesize non-trivial expressions. buf.Myprintf("%s%v=(", separator, cexpr.colName) - castIfNecessary(buf, cexpr) + buf.Myprintf("%v", cexpr.expr) buf.Myprintf(")") } separator = " and " diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 95a15a4de08..bd68ed60046 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -119,7 +119,7 @@ func TestPlayerCopyCharPK(t *testing.T) { "/update _vt.vreplication set state='Copying'", "insert into dst(idc,val) values ('a\\0',1)", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - `update dst set val=3 where idc=cast('a' as binary(2)) and ('a') <= ('a\0')`, + `update dst set val=3 where idc='a\0' and ('a\0') <= ('a\0')`, "insert into dst(idc,val) values ('c\\0',2)", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, "/delete from _vt.copy_state.*dst", diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index a30a609d8ee..31498ad2a9c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -182,14 +182,14 @@ func TestCharPK(t *testing.T) { data [][]string }{{ //binary(2) input: "insert into t1 values(1, 'a')", - output: "insert into t1(id,val) values (1,'a')", + output: "insert into t1(id,val) values (1,'a\\0')", table: "t1", data: [][]string{ {"1", "a\000"}, }, }, { input: "update t1 set id = 2 where val = 'a\000'", - output: "update t1 set id=2 where val=cast('a' as binary(2))", + output: "update t1 set id=2 where val='a\\0'", table: "t1", data: [][]string{ {"2", "a\000"}, @@ -1321,8 +1321,8 @@ func TestPlayerTypes(t *testing.T) { fmt.Sprintf("create table %s.vitess_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, y year, primary key(tiny))", vrepldb), "create table vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))", fmt.Sprintf("create table %s.vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))", vrepldb), - "create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", - fmt.Sprintf("create table %s.vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", vrepldb), + "create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(5), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", + fmt.Sprintf("create table %s.vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(5), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", vrepldb), "create table vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))", fmt.Sprintf("create table %s.vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))", vrepldb), "create table vitess_null(id int, val varbinary(128), primary key(id))", @@ -1394,10 +1394,10 @@ func TestPlayerTypes(t *testing.T) { }, }, { input: "insert into vitess_strings values('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'a', 'a,b')", - output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d','e','f','g','h','1','3')", + output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d\\0\\0\\0\\0','e','f','g','h','1','3')", table: "vitess_strings", data: [][]string{ - {"a", "b", "c", "d\000\000\000", "e", "f", "g", "h", "a", "a,b"}, + {"a", "b", "c", "d\000\000\000\000", "e", "f", "g", "h", "a", "a,b"}, }, }, { input: "insert into vitess_misc values(1, '\x01', '2012-01-01', '2012-01-01 15:45:45', '15:45:45', point(1, 2))", @@ -1415,7 +1415,7 @@ func TestPlayerTypes(t *testing.T) { }, }, { input: "insert into binary_pk values('a', 'aaa')", - output: "insert into binary_pk(b,val) values ('a','aaa')", + output: "insert into binary_pk(b,val) values ('a\\0\\0\\0','aaa')", table: "binary_pk", data: [][]string{ {"a\000\000\000", "aaa"}, @@ -1423,10 +1423,10 @@ func TestPlayerTypes(t *testing.T) { }, { // Binary pk is a special case: https://github.com/vitessio/vitess/issues/3984 input: "update binary_pk set val='bbb' where b='a\\0\\0\\0'", - output: "update binary_pk set val='bbb' where b=cast('a' as binary(4))", + output: "update binary_pk set val='bbb' where b='a\\0\\0\\0'", table: "binary_pk", data: [][]string{ - {"a\x00\x00\x00", "bbb"}, + {"a\000\000\000", "bbb"}, }, }} if enableJSONColumnTesting { diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 10afd1e83e4..eed93df1106 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -130,7 +130,7 @@ func (plan *Plan) filter(values, result []sqltypes.Value) (bool, error) { return false, nil } case VindexMatch: - ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns) + ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns, plan.Table.Fields) if err != nil { return false, err } @@ -150,7 +150,7 @@ func (plan *Plan) filter(values, result []sqltypes.Value) (bool, error) { if colExpr.Vindex == nil { result[i] = values[colExpr.ColNum] } else { - ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns) + ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns, plan.Table.Fields) if err != nil { return false, err } @@ -160,7 +160,7 @@ func (plan *Plan) filter(values, result []sqltypes.Value) (bool, error) { return true, nil } -func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int) (key.DestinationKeyspaceID, error) { +func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int, fields []*querypb.Field) (key.DestinationKeyspaceID, error) { vindexValues := make([]sqltypes.Value, 0, len(vindexColumns)) for _, col := range vindexColumns { vindexValues = append(vindexValues, values[col]) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index c1bb26c7987..a0c72010f21 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -871,6 +871,7 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo return false, nil, err } pos += l + values[colNum] = value valueIndex++ } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index d15c4670716..90f9e851359 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -110,9 +110,9 @@ func TestSetAndEnum(t *testing.T) { output: [][]string{{ `begin`, fe.String(), - `type:ROW row_event: > > `, - `type:ROW row_event: > > `, - `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, `gtid`, `commit`, }}, @@ -133,8 +133,8 @@ func TestCellValuePadding(t *testing.T) { engine.se.Reload(context.Background()) queries := []string{ "begin", - "insert into t1 values (1, 'aaa')", - "insert into t1 values (2, 'bbb')", + "insert into t1 values (1, 'aaa\000')", + "insert into t1 values (2, 'bbb\000')", "update t1 set id = 11 where val = 'aaa\000'", "insert into t2 values (1, 'aaa')", "insert into t2 values (2, 'bbb')", @@ -147,9 +147,9 @@ func TestCellValuePadding(t *testing.T) { output: [][]string{{ `begin`, `type:FIELD field_event: fields: > `, - `type:ROW row_event: > > `, - `type:ROW row_event: > > `, - `type:ROW row_event: after: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: after: > > `, `type:FIELD field_event: fields: > `, `type:ROW row_event: > > `, `type:ROW row_event: > > `, @@ -1560,13 +1560,13 @@ func TestTypes(t *testing.T) { }, { // TODO(sougou): validate that binary and char data generate correct DMLs on the other end. input: []string{ - "insert into vitess_strings values('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'a', 'a,b')", + "insert into vitess_strings values('a', 'b', 'c', 'd\000\000\000', 'e', 'f', 'g', 'h', 'a', 'a,b')", }, output: [][]string{{ `begin`, `type:FIELD field_event: fields: fields: fields: fields: fields: fields: fields: fields: fields: > `, - `type:ROW row_event: > > `, + `type:ROW row_event: > > `, `gtid`, `commit`, }},