Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,18 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
l := int(data[pos])
mdata := data[pos+1 : pos+1+l]
if sqltypes.IsBinary(styp) {
// For binary(n) column types, mysql pads the data on the right with nulls. However the binlog event contains
// the data without this padding. This causes several issues:
// * if a binary(n) column is part of the sharding key, the keyspace_id() returned during the copy phase
// (where the value is the result of a mysql query) is different from the one during replication
// (where the value is the one from the binlogs)
// * mysql where clause comparisons do not do the right thing without padding
// So for fixed length binary() columns we right-pad it with nulls if necessary
if l < max {
paddedData := make([]byte, max)
copy(paddedData[:l], mdata)
mdata = paddedData
}
return sqltypes.MakeTrusted(querypb.Type_BINARY, mdata), l + 1, nil
}
return sqltypes.MakeTrusted(querypb.Type_VARCHAR, mdata), l + 1, nil
Expand Down
22 changes: 17 additions & 5 deletions go/test/endtoend/vreplication/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ create table orders(oid int, cid int, pid int, mname varchar(128), price int, pr
create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table customer2(cid int, name varbinary(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid));
create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table tenant(tenant_id binary(16), name varbinary(16), primary key (tenant_id));
`

initialProductVSchema = `
Expand All @@ -28,7 +29,8 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)
},
"order_seq": {
"type": "sequence"
}
},
"tenant": {}
}
}
`
Expand All @@ -39,9 +41,12 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)
"vindexes": {
"reverse_bits": {
"type": "reverse_bits"
}
},
"binary_md5": {
"type": "binary_md5"
}
},
"tables": {
"tables": {
"customer": {
"column_vindexes": [
{
Expand All @@ -65,9 +70,16 @@ create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)
"column": "cid",
"sequence": "customer_seq2"
}
}
},
"tenant": {
"column_vindexes": [
{
"column": "tenant_id",
"name": "binary_md5"
}
]
}
}

}
`
merchantVSchema = `
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vreplication/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines string) {
queries := strings.Split(lines, "\n")
for _, query := range queries {
if strings.HasPrefix(query, "--") {
continue
}
execVtgateQuery(t, conn, database, string(query))
}
}
Expand Down
4 changes: 4 additions & 0 deletions go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ insert into orders(oid, cid, mname, pid, price) values(3, 2, 'monoprice', 2, 20)
insert into customer2(cid, name, typ, sport) values(1, 'john',1,'football,baseball');
insert into customer2(cid, name, typ, sport) values(2, 'paul','soho','cricket');
insert into customer2(cid, name, typ, sport) values(3, 'ringo','enterprise','');
-- for testing edge case where inserted binary value is 15 bytes, field is 16, mysql adds a null while storing but binlog returns 15 bytes
insert into tenant(tenant_id, name) values (x'02BD00987932461E8820C908E84BAE', 'abc');


7 changes: 5 additions & 2 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func TestBasicVreplicationWorkflow(t *testing.T) {
materializeRollup(t)

shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName)

// the tenant table was to test a specific case with binary sharding keys. Drop it now so that we don't
// have to update the rest of the tests
execVtgateQuery(t, vtgateConn, "customer", "drop table tenant")
validateRollupReplicates(t)
shardOrders(t)
shardMerchant(t)
Expand Down Expand Up @@ -249,7 +251,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
t.Fatal(err)
}

tables := "customer"
tables := "customer,tenant"
moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables)

// Assume we are operating on first cell
Expand All @@ -267,6 +269,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')"
matchInsertQuery1 := "insert into customer(cid, `name`) values (:vtg1, :vtg2)"
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1))
execVtgateQuery(t, vtgateConn, "product", "update tenant set name='xyz'")
vdiff(t, ksWorkflow, "")
switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard)
switchReads(t, allCellNames, ksWorkflow)
Expand Down
16 changes: 9 additions & 7 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package vreplication
var dryRunResultsSwitchWritesCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Stop writes on keyspace product, tables [customer]:",
"Stop writes on keyspace product, tables [customer,tenant]:",
"/ Keyspace product, Shard 0 at Position",
"Wait for VReplication on stopped streams to catchup for upto 30s",
"Create reverse replication workflow p2c_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace customer tables [customer]",
"Enable writes on keyspace customer tables [customer,tenant]",
"Switch routing from keyspace product to keyspace customer",
"Routing rules for tables [customer] will be updated",
"Routing rules for tables [customer,tenant] will be updated",
"SwitchWrites completed, freeze and delete vreplication streams on:",
" tablet 200 ",
" tablet 300 ",
Expand All @@ -41,8 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA,RDONLY]",
"Routing rules for tables [customer] will be updated",
"Switch reads for tables [customer,tenant] to keyspace customer for tablet types [REPLICA,RDONLY]",
"Routing rules for tables [customer,tenant] will be updated",
"Unlock keyspace product",
}

Expand Down Expand Up @@ -91,7 +91,8 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{
"Lock keyspace customer",
"Dropping these tables from the database and removing them from the vschema for keyspace product:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer",
"Blacklisted tables [customer] will be removed from:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table tenant",
"Blacklisted tables [customer,tenant] will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete reverse vreplication streams on source:",
" Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100",
Expand All @@ -108,7 +109,8 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{
"Lock keyspace customer",
"Renaming these tables from the database and removing them from the vschema for keyspace product:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer",
"Blacklisted tables [customer] will be removed from:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table tenant",
"Blacklisted tables [customer,tenant] will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete reverse vreplication streams on source:",
" Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,15 @@ func TestMain(m *testing.M) {
playerEngine = NewTestEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, vrepldb, externalConfig)
playerEngine.Open(context.Background())
defer playerEngine.Close()

if err := env.Mysqld.ExecuteSuperQueryList(context.Background(), binlogplayer.CreateVReplicationTable()); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
}

for _, query := range binlogplayer.AlterVReplicationTable {
env.Mysqld.ExecuteSuperQuery(context.Background(), query)
}

if err := env.Mysqld.ExecuteSuperQuery(context.Background(), createCopyState); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
Expand Down
14 changes: 2 additions & 12 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,28 +678,18 @@ func (tpb *tablePlanBuilder) generateDeleteStatement() *sqlparser.ParsedQuery {
return buf.ParsedQuery()
}

// For binary(n) column types, the value in the where clause needs to be padded with nulls upto the length of the column
// for MySQL comparison to work properly. This is achieved by casting it to the column type
func castIfNecessary(buf *sqlparser.TrackedBuffer, cexpr *colExpr) {
if cexpr.dataType == "binary" {
buf.Myprintf("cast(%v as %s)", cexpr.expr, cexpr.columnType)
return
}
buf.Myprintf("%v", cexpr.expr)
}

func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) {
buf.WriteString(" where ")
bvf.mode = bvBefore
separator := ""
for _, cexpr := range tpb.pkCols {
if _, ok := cexpr.expr.(*sqlparser.ColName); ok {
buf.Myprintf("%s%v=", separator, cexpr.colName)
castIfNecessary(buf, cexpr)
buf.Myprintf("%v", cexpr.expr)
} else {
// Parenthesize non-trivial expressions.
buf.Myprintf("%s%v=(", separator, cexpr.colName)
castIfNecessary(buf, cexpr)
buf.Myprintf("%v", cexpr.expr)
buf.Myprintf(")")
}
separator = " and "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestPlayerCopyCharPK(t *testing.T) {
"/update _vt.vreplication set state='Copying'",
"insert into dst(idc,val) values ('a\\0',1)",
`/update _vt.copy_state set lastpk='fields:<name:\\"idc\\" type:BINARY > rows:<lengths:2 values:\\"a\\\\000\\" > ' where vrepl_id=.*`,
`update dst set val=3 where idc=cast('a' as binary(2)) and ('a') <= ('a\0')`,
`update dst set val=3 where idc='a\0' and ('a\0') <= ('a\0')`,
"insert into dst(idc,val) values ('c\\0',2)",
`/update _vt.copy_state set lastpk='fields:<name:\\"idc\\" type:BINARY > rows:<lengths:2 values:\\"c\\\\000\\" > ' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ func TestCharPK(t *testing.T) {
data [][]string
}{{ //binary(2)
input: "insert into t1 values(1, 'a')",
output: "insert into t1(id,val) values (1,'a')",
output: "insert into t1(id,val) values (1,'a\\0')",
table: "t1",
data: [][]string{
{"1", "a\000"},
},
}, {
input: "update t1 set id = 2 where val = 'a\000'",
output: "update t1 set id=2 where val=cast('a' as binary(2))",
output: "update t1 set id=2 where val='a\\0'",
table: "t1",
data: [][]string{
{"2", "a\000"},
Expand Down Expand Up @@ -1321,8 +1321,8 @@ func TestPlayerTypes(t *testing.T) {
fmt.Sprintf("create table %s.vitess_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, y year, primary key(tiny))", vrepldb),
"create table vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))",
fmt.Sprintf("create table %s.vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))", vrepldb),
"create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))",
fmt.Sprintf("create table %s.vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", vrepldb),
"create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(5), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))",
fmt.Sprintf("create table %s.vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(5), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))", vrepldb),
"create table vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))",
fmt.Sprintf("create table %s.vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))", vrepldb),
"create table vitess_null(id int, val varbinary(128), primary key(id))",
Expand Down Expand Up @@ -1394,10 +1394,10 @@ func TestPlayerTypes(t *testing.T) {
},
}, {
input: "insert into vitess_strings values('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'a', 'a,b')",
output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d','e','f','g','h','1','3')",
output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d\\0\\0\\0\\0','e','f','g','h','1','3')",
table: "vitess_strings",
data: [][]string{
{"a", "b", "c", "d\000\000\000", "e", "f", "g", "h", "a", "a,b"},
{"a", "b", "c", "d\000\000\000\000", "e", "f", "g", "h", "a", "a,b"},
},
}, {
input: "insert into vitess_misc values(1, '\x01', '2012-01-01', '2012-01-01 15:45:45', '15:45:45', point(1, 2))",
Expand All @@ -1415,18 +1415,18 @@ func TestPlayerTypes(t *testing.T) {
},
}, {
input: "insert into binary_pk values('a', 'aaa')",
output: "insert into binary_pk(b,val) values ('a','aaa')",
output: "insert into binary_pk(b,val) values ('a\\0\\0\\0','aaa')",
table: "binary_pk",
data: [][]string{
{"a\000\000\000", "aaa"},
},
}, {
// Binary pk is a special case: https://github.com/vitessio/vitess/issues/3984
input: "update binary_pk set val='bbb' where b='a\\0\\0\\0'",
output: "update binary_pk set val='bbb' where b=cast('a' as binary(4))",
output: "update binary_pk set val='bbb' where b='a\\0\\0\\0'",
table: "binary_pk",
data: [][]string{
{"a\x00\x00\x00", "bbb"},
{"a\000\000\000", "bbb"},
},
}}
if enableJSONColumnTesting {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error
return false, nil, nil
}
case VindexMatch:
ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns)
ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns, plan.Table.Fields)
if err != nil {
return false, nil, err
}
Expand All @@ -144,7 +144,7 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error
if colExpr.Vindex == nil {
result[i] = values[colExpr.ColNum]
} else {
ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns)
ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns, plan.Table.Fields)
if err != nil {
return false, nil, err
}
Expand All @@ -154,7 +154,7 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error
return true, result, nil
}

func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int) (key.DestinationKeyspaceID, error) {
func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int, fields []*querypb.Field) (key.DestinationKeyspaceID, error) {
vindexValues := make([]sqltypes.Value, 0, len(vindexColumns))
for _, col := range vindexColumns {
vindexValues = append(vindexValues, values[col])
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
return false, nil, err
}
pos += l

values[colNum] = value
valueIndex++
}
Expand Down
Loading