Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
MODIFY `t1` varchar(128) CHARACTER SET utf8mb4 NOT NULL, MODIFY `t2` varchar(128) CHARACTER SET latin2 NOT NULL, MODIFY `tutf8` varchar(128) CHARACTER SET latin1 NOT NULL
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id int auto_increment,
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
tutf8 varchar(128) charset utf8,
tutf8mb4 varchar(128) charset utf8mb4,
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1;

insert into onlineddl_test values (null, md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (null, 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
insert into onlineddl_test values (null, 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');
insert into onlineddl_test values (null, 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
insert into onlineddl_test values (null, 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
insert into onlineddl_test values (null, 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);

drop event if exists onlineddl_test;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(5.5|5.6|5.7)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id varchar(128) charset latin1 collate latin1_swedish_ci,
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
tutf8 varchar(128) charset utf8,
tutf8mb4 varchar(128) charset utf8mb4,
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1;

insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');

drop event if exists onlineddl_test;
delimiter ;;
create event onlineddl_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);
end ;;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(5.5|5.6|5.7)
15 changes: 9 additions & 6 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (v *VRepl) generateFilterQuery() error {
sb.WriteString(fmt.Sprintf("CONCAT(%s)", escapeName(name)))
case sourceCol.Type() == "json":
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
Comment on lines 290 to 291
Copy link
Member

@mattlord mattlord Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbussink do you think this is still needed? I don't think so anymore, now that we have native JSON type support.

Copy link
Member

@mattlord mattlord Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(against a v21 vtgate here) 

❯ mysql commerce -e "create table json_test (id int not null primary key, j1 json); insert into json_test values (1, '{\"name\":\"Matt\"}')"
❯ mysql commerce -e "insert into json_test select id+10, j1 from json_test"
❯ mysql commerce -e "select * from json_test" --column-type-info
Field   1:  `id`
Catalog:    `def`
Database:   `commerce`
Table:      `json_test`
Org_table:  `json_test`
Type:       LONG
Collation:  binary (63)
Length:     11
Max_length: 2
Decimals:   0
Flags:      NOT_NULL PRI_KEY NO_DEFAULT_VALUE NUM PART_KEY

Field   2:  `j1`
Catalog:    `def`
Database:   `commerce`
Table:      `json_test`
Org_table:  `json_test`
Type:       JSON
Collation:  binary (63)
Length:     4294967295
Max_length: 16
Decimals:   0
Flags:      BLOB BINARY


+----+------------------+
| id | j1               |
+----+------------------+
|  1 | {"name": "Matt"} |
| 11 | {"name": "Matt"} |
+----+------------------+

I expect this to be bytes we pass on to MySQL "on the other side" and they are interpreted there as either a JSON field or serialized as a utf8mb4 string if some other type on the target.

Copy link
Member

@mattlord mattlord Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way, I don't think it's a major deal on the source/vcopier side as the primary problems we've seen there are when these CONVERT calls then preclude us from using the desired index in the rowstreamer query and you can't add indexes directly on JSON columns anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave it like so for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON is a bit special anyway, since we can't use the direct textual representation, but we turn it into a sql expression using JSON_OBJECT so we lose as little type information as possible.

case targetCol.Type() == "json": // we already know the source col is not JSON, per the above `case` condition
// Convert any type to JSON: encode the type as utf8mb4 text
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
case sourceCol.IsTextual():
// Check source and target charset/encoding. If needed, create
// a binlogdatapb.CharsetConversion entry (later written to vreplication)
Expand All @@ -301,19 +304,19 @@ func (v *VRepl) generateFilterQuery() error {
if targetCol.IsTextual() && toCollation == collations.Unknown {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", targetCol.Charset(), targetCol.Name())
}

if trivialCharset(fromCollation) && trivialCharset(toCollation) && targetCol.Type() != "json" {
if trivialCharset(fromCollation) && trivialCharset(toCollation) {
sb.WriteString(escapeName(name))
} else if fromCollation == toCollation {
// No need for charset conversions as both have the same collation.
sb.WriteString(escapeName(name))
} else {
// Charset conversion required:
v.convertCharset[targetName] = &binlogdatapb.CharsetConversion{
FromCharset: sourceCol.Charset(),
ToCharset: targetCol.Charset(),
}
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
sb.WriteString(escapeName(name))
}
case targetCol.Type() == "json" && sourceCol.Type() != "json":
// Convert any type to JSON: encode the type as utf8mb4 text
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
default:
sb.WriteString(escapeName(name))
}
Expand Down
66 changes: 51 additions & 15 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/mysql/collations/charset"
"vitess.io/vitess/go/mysql/collations/colldata"
vjson "vitess.io/vitess/go/mysql/json"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -257,7 +258,7 @@ func (tp *TablePlan) applyBulkInsert(sqlbuffer *bytes2.Buffer, rows []*querypb.R
if i > 0 {
sqlbuffer.WriteString(", ")
}
if err := appendFromRow(tp.BulkInsertValues, sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil {
if err := tp.appendFromRow(sqlbuffer, row); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -312,6 +313,30 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
return false
}

// convertStringCharset does a charset conversion given raw data and an applicable conversion rule.
// In case of a conversion error, it returns an equivalent of MySQL error 1366, which is what you'd
// get in a failed `CONVERT()` function, e.g.:
//
// > create table tascii(v varchar(100) charset ascii);
// > insert into tascii values ('€');
// ERROR 1366 (HY000): Incorrect string value: '\xE2\x82\xAC' for column 'v' at row 1
func (tp *TablePlan) convertStringCharset(raw []byte, conversion *binlogdatapb.CharsetConversion, fieldName string) ([]byte, error) {
fromCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.FromCharset)
if fromCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.FromCharset, fieldName)
}
toCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.ToCharset)
if toCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.ToCharset, fieldName)
}

out, err := charset.Convert(nil, colldata.Lookup(toCollation).Charset(), raw, colldata.Lookup(fromCollation).Charset())
if err != nil {
return nil, sqlerror.NewSQLError(sqlerror.ERTruncatedWrongValueForField, sqlerror.SSUnknownSQLState, "Incorrect string value: %s", err.Error())
}
return out, nil
}

// bindFieldVal returns a bind variable based on given field and value.
// Most values will just bind directly. But some values may need manipulation:
// - text values with charset conversion
Expand All @@ -320,11 +345,7 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) {
if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() {
// Non-null string value, for which we have a charset conversion instruction
fromCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.FromCharset)
if fromCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", conversion.FromCharset, field.Name)
}
out, err := charset.Convert(nil, charset.Charset_utf8mb4{}, val.Raw(), colldata.Lookup(fromCollation).Charset())
out, err := tp.convertStringCharset(val.Raw(), conversion, field.Name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -590,28 +611,30 @@ func valsEqual(v1, v2 sqltypes.Value) bool {
// note: there can be more fields than bind locations since extra columns might be requested from the source if not all
// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for
// values from the database on the source: sum/count for aggregation queries, for example
func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*querypb.Field, row *querypb.Row, skipFields map[string]bool) error {
bindLocations := pq.BindLocations()
if len(fields) < len(bindLocations) {
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
bindLocations := tp.BulkInsertValues.BindLocations()
if len(tp.Fields) < len(bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
len(fields), len(bindLocations))
len(tp.Fields), len(bindLocations))
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
field *querypb.Field
}
rowInfo := make([]*colInfo, 0)

offset := int64(0)
for i, field := range fields { // collect info required for fields to be bound
for i, field := range tp.Fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !skipFields[strings.ToLower(field.Name)] {
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
rowInfo = append(rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
field: field,
})
}
if length > 0 {
Expand All @@ -623,7 +646,7 @@ func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*quer
var offsetQuery int
for i, loc := range bindLocations {
col := rowInfo[i]
buf.WriteString(pq.Query[offsetQuery:loc.Offset])
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset])
typ := col.typ

switch typ {
Expand All @@ -645,12 +668,25 @@ func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*quer
// -1 means a null variable; serialize it directly
buf.WriteString(sqltypes.NullStr)
} else {
vv := sqltypes.MakeTrusted(typ, row.Values[col.offset:col.offset+col.length])
raw := row.Values[col.offset : col.offset+col.length]
var vv sqltypes.Value

if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 {
// Non-null string value, for which we have a charset conversion instruction
out, err := tp.convertStringCharset(raw, conversion, col.field.Name)
if err != nil {
return err
}
vv = sqltypes.MakeTrusted(typ, out)
} else {
vv = sqltypes.MakeTrusted(typ, raw)
}

vv.EncodeSQLBytes2(buf)
}
}
offsetQuery = loc.Offset + loc.Length
}
buf.WriteString(pq.Query[offsetQuery:])
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:])
return nil
}