diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go index 3f8aca6dbce..ea99b78d3f3 100644 --- a/go/mysql/binlog_event.go +++ b/go/mysql/binlog_event.go @@ -205,12 +205,36 @@ type TableMap struct { // - If the metadata is one byte, only the lower 8 bits are used. // - If the metadata is two bytes, all 16 bits are used. Metadata []uint16 + + // OptionalColumnNames contains optional metadata on column names for + // the table this TableMap describes. This metadata should only be set + // when @@binlog_row_metadata is set to FULL. + OptionalColumnNames []string + + // OptionalColumnCollations contains optional metadata on column collations + // for the table this TableMap describes. This metadata should only be set + // when @@binlog_row_metadata is set to FULL. + OptionalColumnCollations []uint64 + + // OptionalEnumValues contains one []string for each enum field in a table's + // schema, where that []string contains the string values of the enum. This + // metadata should only be set when @@binlog_row_metadata is set to FULL. + OptionalEnumValues [][]string + + // OptionalSetValues contains one []string for each set field in a table's + // schema, where that []string contains the string values of the set. This + // metadata should only be set when @@binlog_row_metadata is set to FULL. + OptionalSetValues [][]string + + // OptionalEnumAndSetCollations contains one entry for each set and enum + // field in the table's schema, indicating the set or enum's collation. + OptionalEnumAndSetCollations []uint64 } // String implements the Stringer interface func (t *TableMap) String() string { - return fmt.Sprintf("{Flags: %v, Database: %q, Name: %q, Types: %v, CanBeNull: %v, Metadata: %v}", - t.Flags, t.Database, t.Name, t.Types, t.CanBeNull, t.Metadata) + return fmt.Sprintf("{Flags: %v, Database: %q, Name: %q, Types: %v, CanBeNull: %v, Metadata: %v, OptionalColumnNames: %v, OptionalColumnCollations: %v, OptionalEnumValues: %v, OptionalSetValues: %v, OptionalEnumAndSetCollations: %v}", + t.Flags, t.Database, t.Name, t.Types, t.CanBeNull, t.Metadata, t.OptionalColumnNames, t.OptionalColumnCollations, t.OptionalEnumValues, t.OptionalSetValues, t.OptionalEnumAndSetCollations) } // Rows contains data from a {WRITE,UPDATE,DELETE}_ROWS_EVENT. diff --git a/go/mysql/binlog_event_make.go b/go/mysql/binlog_event_make.go index 49ec14d8a75..df6367f5c78 100644 --- a/go/mysql/binlog_event_make.go +++ b/go/mysql/binlog_event_make.go @@ -18,6 +18,7 @@ package mysql import ( "encoding/binary" + "fmt" "hash/crc32" ) @@ -364,9 +365,143 @@ func NewMySQLGTIDEvent(f BinlogFormat, m BinlogEventMetadata, gtid Mysql56GTID, return NewMysql56BinlogEvent(ev) } -// NewTableMapEvent returns a TableMap event. +// TableMap optional metadata field types. These identifiers are used in the wire protocol to indicate +// different sections of data in the optional table map metadata. +const ( + TableMapOptMetaColumnCharset = 0x03 + TableMapOptMetaColumnName = 0x04 + TableMapOptMetaSetValues = 0x05 + TableMapOptMetaEnumValues = 0x06 + TableMapOptMetaEnumSetCharset = 0x0b +) + +// buildOptionalTableMapMetadata takes the optional metadata (e.g. column names, column collation IDs, enum and +// set names, and enum and set collation IDs) from |tableMap| and encodes it into the binary format needed to +// transmit this metadata to a remote MySQL replica. +func buildOptionalTableMapMetadata(tableMap *TableMap) ([]byte, error) { + columnNames := tableMap.OptionalColumnNames + columnCollationIds := tableMap.OptionalColumnCollations + + if len(columnNames) == 0 || len(columnCollationIds) == 0 { + return nil, nil + } + + if len(columnNames) != len(columnCollationIds) { + return nil, fmt.Errorf("len mismatch: %d columnNames vs %d collationIDs", len(columnNames), len(columnCollationIds)) + } + + // 1) Build COLUMN_CHARSET payload + var charsetPayload []byte + for _, coll := range columnCollationIds { + charsetPayload = append(charsetPayload, encodeLenEncInt(coll)...) + } + + // 2) Build COLUMN_NAME payload + var namePayload []byte + for _, n := range columnNames { + if len(n) > 255 { + return nil, fmt.Errorf("column name too long (>255): %q", n) + } + namePayload = append(namePayload, byte(len(n))) + namePayload = append(namePayload, []byte(n)...) + } + + // 3) Build ENUM VALUES payload + var enumValuesPayload []byte + for _, enumValues := range tableMap.OptionalEnumValues { + enumValuesPayload = append(enumValuesPayload, encodeLenEncInt(uint64(len(enumValues)))...) + for _, enumValue := range enumValues { + enumValuesPayload = append(enumValuesPayload, encodeLenEncInt(uint64(len(enumValue)))...) + enumValuesPayload = append(enumValuesPayload, []byte(enumValue)...) + } + } + + // 4) Build SET VALUES payload + var setValuesPayload []byte + for _, setValues := range tableMap.OptionalSetValues { + setValuesPayload = append(setValuesPayload, encodeLenEncInt(uint64(len(setValues)))...) + for _, setValue := range setValues { + setValuesPayload = append(setValuesPayload, encodeLenEncInt(uint64(len(setValue)))...) + setValuesPayload = append(setValuesPayload, []byte(setValue)...) + } + } + + // 5) Build ENUM/SET CHARSET payload + var enumSetCharsetPayload []byte + for _, coll := range tableMap.OptionalEnumAndSetCollations { + enumSetCharsetPayload = append(enumSetCharsetPayload, encodeLenEncInt(coll)...) + } + + // 6) Wrap each payload as an optional metadata block: [type][lenenc-int][payload] + var out []byte + out = append(out, buildOptMetaBlock(TableMapOptMetaColumnCharset, charsetPayload)...) + out = append(out, buildOptMetaBlock(TableMapOptMetaColumnName, namePayload)...) + out = append(out, buildOptMetaBlock(TableMapOptMetaEnumValues, enumValuesPayload)...) + out = append(out, buildOptMetaBlock(TableMapOptMetaSetValues, setValuesPayload)...) + out = append(out, buildOptMetaBlock(TableMapOptMetaEnumSetCharset, enumSetCharsetPayload)...) + + return out, nil +} + +// buildOptMetaBlock constructs a single optional metadata block in the MySQL +// binlog row event format. +// +// The block is encoded as: +// +// [type][length][payload] +// +// where: +// - type is a one-byte identifier indicating the metadata subtype, +// - length is a length-encoded integer representing the size of payload in bytes, +// - payload is the raw metadata content for that subtype. +// +// The returned byte slice contains the fully encoded metadata block and is suitable +// for concatenation with other optional metadata blocks when building the +// optional_metadata section of a binlog event. +func buildOptMetaBlock(typ byte, payload []byte) []byte { + var b []byte + b = append(b, typ) + b = append(b, encodeLenEncInt(uint64(len(payload)))...) + b = append(b, payload...) + return b +} + +// encodeLenEncInt encodes MySQL "length-encoded integer" (a.k.a. lenenc-int). +// +// Encoding: +// - < 251: 1 byte +// - < 2^16: 0xFC + 2 bytes little-endian +// - < 2^24: 0xFD + 3 bytes little-endian +// - else: 0xFE + 8 bytes little-endian +func encodeLenEncInt(x uint64) []byte { + switch { + case x < 251: + return []byte{byte(x)} + case x < 1<<16: + b := make([]byte, 3) + b[0] = 0xFC + binary.LittleEndian.PutUint16(b[1:], uint16(x)) + return b + case x < 1<<24: + // 0xFD + 3 bytes little endian + return []byte{0xFD, byte(x), byte(x >> 8), byte(x >> 16)} + default: + b := make([]byte, 9) + b[0] = 0xFE + binary.LittleEndian.PutUint64(b[1:], x) + return b + } +} + +// NewTableMapEvent returns a TableMap event. If any errors are encountered while building the +// event bytes, an error is returned. // Only works with post_header_length=8. -func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm *TableMap) BinlogEvent { +func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm *TableMap) (BinlogEvent, error) { + optionalMetadata, err := buildOptionalTableMapMetadata(tm) + if err != nil { + return nil, err + } + if f.HeaderSize(eTableMapEvent) != 8 { panic("Not implemented, post_header_length!=8") } @@ -385,7 +520,9 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm len(tm.Types) + lenEncIntSize(uint64(metadataLength)) + // lenenc-str column-meta-def metadataLength + - len(tm.CanBeNull.data) + len(tm.CanBeNull.data) + + len(optionalMetadata) + data := make([]byte, length) data[0] = byte(tableID) @@ -397,9 +534,11 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm data[6] = byte(tm.Flags) data[7] = byte(tm.Flags >> 8) data[8] = byte(len(tm.Database)) + pos := 6 + 2 + 1 + copy(data[9:], tm.Database) data[pos] = 0 pos++ + data[pos] = byte(len(tm.Name)) pos += 1 + copy(data[pos+1:], tm.Name) data[pos] = 0 @@ -414,12 +553,15 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm } pos += copy(data[pos:], tm.CanBeNull.data) + pos += copy(data[pos:], optionalMetadata) + if pos != len(data) { - panic("bad encoding") + return nil, fmt.Errorf("bad table map encoding; calculated position (%v) "+ + "does not match length of data (%v)", pos, len(data)) } ev := packetize(f, eTableMapEvent, 0, data, m) - return NewMariadbBinlogEvent(ev) + return NewMariadbBinlogEvent(ev), nil } // NewWriteRowsEvent returns a WriteRows event. Uses v2. diff --git a/go/mysql/binlog_event_make_test.go b/go/mysql/binlog_event_make_test.go index d9b1f056baa..a5fc7ac47d9 100644 --- a/go/mysql/binlog_event_make_test.go +++ b/go/mysql/binlog_event_make_test.go @@ -326,7 +326,8 @@ func TestTableMapEvent(t *testing.T) { tm.CanBeNull.Set(5, true) tm.CanBeNull.Set(9, true) - event := NewTableMapEvent(f, m, 0x102030405060, tm) + event, err := NewTableMapEvent(f, m, 0x102030405060, tm) + require.NoError(t, err) if !event.IsValid() { t.Fatalf("NewTableMapEvent().IsValid() is false") } @@ -334,7 +335,7 @@ func TestTableMapEvent(t *testing.T) { t.Fatalf("NewTableMapEvent().IsTableMap() if false") } - event, _, err := event.StripChecksum(f) + event, _, err = event.StripChecksum(f) if err != nil { t.Fatalf("StripChecksum failed: %v", err) } @@ -352,6 +353,71 @@ func TestTableMapEvent(t *testing.T) { } } +// Test serialization of TableMap events that contain optional metadata (e.g. column names, enum values). +func TestTableMapEventWithOptionalMetadata(t *testing.T) { + f := NewMySQL56BinlogFormat() + m := NewTestBinlogMetadata() + + tm := &TableMap{ + Flags: 0x8090, + Database: "my_database", + Name: "my_table", + Types: []byte{ + TypeLongLong, + TypeLongLong, + TypeLongLong, + }, + CanBeNull: NewServerBitmap(10), + Metadata: []uint16{ + 0, + 0, + 0, + }, + OptionalEnumValues: [][]string{{"apple", "orange"}, {"red", "green"}}, + OptionalSetValues: [][]string{{"one", "two", "three"}}, + OptionalColumnNames: []string{"foo", "bar", "baz"}, + OptionalColumnCollations: []uint64{0, 0, 0}, + OptionalEnumAndSetCollations: []uint64{0, 0, 0}, + } + tm.CanBeNull.Set(1, true) + tm.CanBeNull.Set(2, true) + tm.CanBeNull.Set(5, true) + tm.CanBeNull.Set(9, true) + + event, err := NewTableMapEvent(f, m, 0x102030405060, tm) + require.NoError(t, err) + if !event.IsValid() { + t.Fatalf("NewTableMapEvent().IsValid() is false") + } + if !event.IsTableMap() { + t.Fatalf("NewTableMapEvent().IsTableMap() if false") + } + + event, _, err = event.StripChecksum(f) + if err != nil { + t.Fatalf("StripChecksum failed: %v", err) + } + + tableID := event.TableID(f) + if tableID != 0x102030405060 { + t.Fatalf("NewTableMapEvent().TableID returned %x", tableID) + } + + // NOTE: Vitess doesn't currently include support for deserializing optional table map data (only serializing it) + // so instead of doing round-trip testing of the values, we use static expected bytes that we know have been + // serialized and work correctly with replication clients. + var expectedBytes = []byte{ + 0x98, 0x68, 0xe9, 0x53, 0x13, 0x1, 0x0, 0x0, 0x0, 0x81, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x60, + 0x50, 0x40, 0x30, 0x20, 0x10, 0x90, 0x80, 0xb, 0x6d, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, + 0x65, 0x0, 0x8, 0x6d, 0x79, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x0, 0x3, 0x8, 0x8, 0x8, 0x0, 0x26, 0x2, + 0x3, 0x3, 0x0, 0x0, 0x0, 0x4, 0xc, 0x3, 0x66, 0x6f, 0x6f, 0x3, 0x62, 0x61, 0x72, 0x3, 0x62, 0x61, 0x7a, + 0x6, 0x19, 0x2, 0x5, 0x61, 0x70, 0x70, 0x6c, 0x65, 0x6, 0x6f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x2, 0x3, + 0x72, 0x65, 0x64, 0x5, 0x67, 0x72, 0x65, 0x65, 0x6e, 0x5, 0xf, 0x3, 0x3, 0x6f, 0x6e, 0x65, 0x3, 0x74, + 0x77, 0x6f, 0x5, 0x74, 0x68, 0x72, 0x65, 0x65, 0xb, 0x3, 0x0, 0x0, 0x0, + } + require.Equal(t, expectedBytes, event.Bytes()) +} + func TestRowsEvent(t *testing.T) { f := NewMySQL56BinlogFormat() m := NewTestBinlogMetadata()