Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,36 @@ type TableMap struct {
// - If the metadata is one byte, only the lower 8 bits are used.
// - If the metadata is two bytes, all 16 bits are used.
Metadata []uint16

// OptionalColumnNames contains optional metadata on column names for
// the table this TableMap describes. This metadata should only be set
// when @@binlog_row_metadata is set to FULL.
OptionalColumnNames []string

// OptionalColumnCollations contains optional metadata on column collations
// for the table this TableMap describes. This metadata should only be set
// when @@binlog_row_metadata is set to FULL.
OptionalColumnCollations []uint64

// OptionalEnumValues contains one []string for each enum field in a table's
// schema, where that []string contains the string values of the enum. This
// metadata should only be set when @@binlog_row_metadata is set to FULL.
OptionalEnumValues [][]string

// OptionalSetValues contains one []string for each set field in a table's
// schema, where that []string contains the string values of the set. This
// metadata should only be set when @@binlog_row_metadata is set to FULL.
OptionalSetValues [][]string

// OptionalEnumAndSetCollations contains one entry for each set and enum
// field in the table's schema, indicating the set or enum's collation.
OptionalEnumAndSetCollations []uint64
}

// String implements the Stringer interface
func (t *TableMap) String() string {
return fmt.Sprintf("{Flags: %v, Database: %q, Name: %q, Types: %v, CanBeNull: %v, Metadata: %v}",
t.Flags, t.Database, t.Name, t.Types, t.CanBeNull, t.Metadata)
return fmt.Sprintf("{Flags: %v, Database: %q, Name: %q, Types: %v, CanBeNull: %v, Metadata: %v, OptionalColumnNames: %v, OptionalColumnCollations: %v, OptionalEnumValues: %v, OptionalSetValues: %v, OptionalEnumAndSetCollations: %v}",
t.Flags, t.Database, t.Name, t.Types, t.CanBeNull, t.Metadata, t.OptionalColumnNames, t.OptionalColumnCollations, t.OptionalEnumValues, t.OptionalSetValues, t.OptionalEnumAndSetCollations)
}

// Rows contains data from a {WRITE,UPDATE,DELETE}_ROWS_EVENT.
Expand Down
152 changes: 147 additions & 5 deletions go/mysql/binlog_event_make.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mysql

import (
"encoding/binary"
"fmt"
"hash/crc32"
)

Expand Down Expand Up @@ -364,9 +365,143 @@ func NewMySQLGTIDEvent(f BinlogFormat, m BinlogEventMetadata, gtid Mysql56GTID,
return NewMysql56BinlogEvent(ev)
}

// NewTableMapEvent returns a TableMap event.
// TableMap optional metadata field types. These identifiers are used in the wire protocol to indicate
// different sections of data in the optional table map metadata.
const (
TableMapOptMetaColumnCharset = 0x03
TableMapOptMetaColumnName = 0x04
TableMapOptMetaSetValues = 0x05
TableMapOptMetaEnumValues = 0x06
TableMapOptMetaEnumSetCharset = 0x0b
)

// buildOptionalTableMapMetadata takes the optional metadata (e.g. column names, column collation IDs, enum and
// set names, and enum and set collation IDs) from |tableMap| and encodes it into the binary format needed to
// transmit this metadata to a remote MySQL replica.
func buildOptionalTableMapMetadata(tableMap *TableMap) ([]byte, error) {
columnNames := tableMap.OptionalColumnNames
columnCollationIds := tableMap.OptionalColumnCollations

if len(columnNames) == 0 || len(columnCollationIds) == 0 {
return nil, nil
}

if len(columnNames) != len(columnCollationIds) {
return nil, fmt.Errorf("len mismatch: %d columnNames vs %d collationIDs", len(columnNames), len(columnCollationIds))
}

// 1) Build COLUMN_CHARSET payload
var charsetPayload []byte
for _, coll := range columnCollationIds {
charsetPayload = append(charsetPayload, encodeLenEncInt(coll)...)
}

// 2) Build COLUMN_NAME payload
var namePayload []byte
for _, n := range columnNames {
if len(n) > 255 {
return nil, fmt.Errorf("column name too long (>255): %q", n)
}
namePayload = append(namePayload, byte(len(n)))
namePayload = append(namePayload, []byte(n)...)
}

// 3) Build ENUM VALUES payload
var enumValuesPayload []byte
for _, enumValues := range tableMap.OptionalEnumValues {
enumValuesPayload = append(enumValuesPayload, encodeLenEncInt(uint64(len(enumValues)))...)
for _, enumValue := range enumValues {
enumValuesPayload = append(enumValuesPayload, encodeLenEncInt(uint64(len(enumValue)))...)
enumValuesPayload = append(enumValuesPayload, []byte(enumValue)...)
}
}

// 4) Build SET VALUES payload
var setValuesPayload []byte
for _, setValues := range tableMap.OptionalSetValues {
setValuesPayload = append(setValuesPayload, encodeLenEncInt(uint64(len(setValues)))...)
for _, setValue := range setValues {
setValuesPayload = append(setValuesPayload, encodeLenEncInt(uint64(len(setValue)))...)
setValuesPayload = append(setValuesPayload, []byte(setValue)...)
}
}

// 5) Build ENUM/SET CHARSET payload
var enumSetCharsetPayload []byte
for _, coll := range tableMap.OptionalEnumAndSetCollations {
enumSetCharsetPayload = append(enumSetCharsetPayload, encodeLenEncInt(coll)...)
}

// 6) Wrap each payload as an optional metadata block: [type][lenenc-int][payload]
var out []byte
out = append(out, buildOptMetaBlock(TableMapOptMetaColumnCharset, charsetPayload)...)
out = append(out, buildOptMetaBlock(TableMapOptMetaColumnName, namePayload)...)
out = append(out, buildOptMetaBlock(TableMapOptMetaEnumValues, enumValuesPayload)...)
out = append(out, buildOptMetaBlock(TableMapOptMetaSetValues, setValuesPayload)...)
out = append(out, buildOptMetaBlock(TableMapOptMetaEnumSetCharset, enumSetCharsetPayload)...)

return out, nil
}

// buildOptMetaBlock constructs a single optional metadata block in the MySQL
// binlog row event format.
//
// The block is encoded as:
//
// [type][length][payload]
//
// where:
// - type is a one-byte identifier indicating the metadata subtype,
// - length is a length-encoded integer representing the size of payload in bytes,
// - payload is the raw metadata content for that subtype.
//
// The returned byte slice contains the fully encoded metadata block and is suitable
// for concatenation with other optional metadata blocks when building the
// optional_metadata section of a binlog event.
func buildOptMetaBlock(typ byte, payload []byte) []byte {
var b []byte
b = append(b, typ)
b = append(b, encodeLenEncInt(uint64(len(payload)))...)
b = append(b, payload...)
return b
}

// encodeLenEncInt encodes MySQL "length-encoded integer" (a.k.a. lenenc-int).
//
// Encoding:
// - < 251: 1 byte
// - < 2^16: 0xFC + 2 bytes little-endian
// - < 2^24: 0xFD + 3 bytes little-endian
// - else: 0xFE + 8 bytes little-endian
func encodeLenEncInt(x uint64) []byte {
switch {
case x < 251:
return []byte{byte(x)}
case x < 1<<16:
b := make([]byte, 3)
b[0] = 0xFC
binary.LittleEndian.PutUint16(b[1:], uint16(x))
return b
case x < 1<<24:
// 0xFD + 3 bytes little endian
return []byte{0xFD, byte(x), byte(x >> 8), byte(x >> 16)}
default:
b := make([]byte, 9)
b[0] = 0xFE
binary.LittleEndian.PutUint64(b[1:], x)
return b
}
}

// NewTableMapEvent returns a TableMap event. If any errors are encountered while building the
// event bytes, an error is returned.
// Only works with post_header_length=8.
func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm *TableMap) BinlogEvent {
func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm *TableMap) (BinlogEvent, error) {
optionalMetadata, err := buildOptionalTableMapMetadata(tm)
if err != nil {
return nil, err
}

if f.HeaderSize(eTableMapEvent) != 8 {
panic("Not implemented, post_header_length!=8")
}
Expand All @@ -385,7 +520,9 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm
len(tm.Types) +
lenEncIntSize(uint64(metadataLength)) + // lenenc-str column-meta-def
metadataLength +
len(tm.CanBeNull.data)
len(tm.CanBeNull.data) +
len(optionalMetadata)

data := make([]byte, length)

data[0] = byte(tableID)
Expand All @@ -397,9 +534,11 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm
data[6] = byte(tm.Flags)
data[7] = byte(tm.Flags >> 8)
data[8] = byte(len(tm.Database))

pos := 6 + 2 + 1 + copy(data[9:], tm.Database)
data[pos] = 0
pos++

data[pos] = byte(len(tm.Name))
pos += 1 + copy(data[pos+1:], tm.Name)
data[pos] = 0
Expand All @@ -414,12 +553,15 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm
}

pos += copy(data[pos:], tm.CanBeNull.data)
pos += copy(data[pos:], optionalMetadata)

if pos != len(data) {
panic("bad encoding")
return nil, fmt.Errorf("bad table map encoding; calculated position (%v) "+
"does not match length of data (%v)", pos, len(data))
}

ev := packetize(f, eTableMapEvent, 0, data, m)
return NewMariadbBinlogEvent(ev)
return NewMariadbBinlogEvent(ev), nil
}

// NewWriteRowsEvent returns a WriteRows event. Uses v2.
Expand Down
70 changes: 68 additions & 2 deletions go/mysql/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,16 @@ func TestTableMapEvent(t *testing.T) {
tm.CanBeNull.Set(5, true)
tm.CanBeNull.Set(9, true)

event := NewTableMapEvent(f, m, 0x102030405060, tm)
event, err := NewTableMapEvent(f, m, 0x102030405060, tm)
require.NoError(t, err)
if !event.IsValid() {
t.Fatalf("NewTableMapEvent().IsValid() is false")
}
if !event.IsTableMap() {
t.Fatalf("NewTableMapEvent().IsTableMap() if false")
}

event, _, err := event.StripChecksum(f)
event, _, err = event.StripChecksum(f)
if err != nil {
t.Fatalf("StripChecksum failed: %v", err)
}
Expand All @@ -352,6 +353,71 @@ func TestTableMapEvent(t *testing.T) {
}
}

// Test serialization of TableMap events that contain optional metadata (e.g. column names, enum values).
func TestTableMapEventWithOptionalMetadata(t *testing.T) {
f := NewMySQL56BinlogFormat()
m := NewTestBinlogMetadata()

tm := &TableMap{
Flags: 0x8090,
Database: "my_database",
Name: "my_table",
Types: []byte{
TypeLongLong,
TypeLongLong,
TypeLongLong,
},
CanBeNull: NewServerBitmap(10),
Metadata: []uint16{
0,
0,
0,
},
OptionalEnumValues: [][]string{{"apple", "orange"}, {"red", "green"}},
OptionalSetValues: [][]string{{"one", "two", "three"}},
OptionalColumnNames: []string{"foo", "bar", "baz"},
OptionalColumnCollations: []uint64{0, 0, 0},
OptionalEnumAndSetCollations: []uint64{0, 0, 0},
}
tm.CanBeNull.Set(1, true)
tm.CanBeNull.Set(2, true)
tm.CanBeNull.Set(5, true)
tm.CanBeNull.Set(9, true)

event, err := NewTableMapEvent(f, m, 0x102030405060, tm)
require.NoError(t, err)
if !event.IsValid() {
t.Fatalf("NewTableMapEvent().IsValid() is false")
}
if !event.IsTableMap() {
t.Fatalf("NewTableMapEvent().IsTableMap() if false")
}

event, _, err = event.StripChecksum(f)
if err != nil {
t.Fatalf("StripChecksum failed: %v", err)
}

tableID := event.TableID(f)
if tableID != 0x102030405060 {
t.Fatalf("NewTableMapEvent().TableID returned %x", tableID)
}

// NOTE: Vitess doesn't currently include support for deserializing optional table map data (only serializing it)
// so instead of doing round-trip testing of the values, we use static expected bytes that we know have been
// serialized and work correctly with replication clients.
var expectedBytes = []byte{
0x98, 0x68, 0xe9, 0x53, 0x13, 0x1, 0x0, 0x0, 0x0, 0x81, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x60,
0x50, 0x40, 0x30, 0x20, 0x10, 0x90, 0x80, 0xb, 0x6d, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73,
0x65, 0x0, 0x8, 0x6d, 0x79, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x0, 0x3, 0x8, 0x8, 0x8, 0x0, 0x26, 0x2,
0x3, 0x3, 0x0, 0x0, 0x0, 0x4, 0xc, 0x3, 0x66, 0x6f, 0x6f, 0x3, 0x62, 0x61, 0x72, 0x3, 0x62, 0x61, 0x7a,
0x6, 0x19, 0x2, 0x5, 0x61, 0x70, 0x70, 0x6c, 0x65, 0x6, 0x6f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x2, 0x3,
0x72, 0x65, 0x64, 0x5, 0x67, 0x72, 0x65, 0x65, 0x6e, 0x5, 0xf, 0x3, 0x3, 0x6f, 0x6e, 0x65, 0x3, 0x74,
0x77, 0x6f, 0x5, 0x74, 0x68, 0x72, 0x65, 0x65, 0xb, 0x3, 0x0, 0x0, 0x0,
}
require.Equal(t, expectedBytes, event.Bytes())
}

func TestRowsEvent(t *testing.T) {
f := NewMySQL56BinlogFormat()
m := NewTestBinlogMetadata()
Expand Down