Skip to content

Commit

Permalink
Add function to parse 'extradata’ in rows event (#817)
Browse files Browse the repository at this point in the history
* feature - parse extra data

* fixed - parse extradata

* fix typos and add comments

* fix according to coding style

* Fix typos and change to formatting

* Fix Incorrect Byte Parsing in NDB Info from Extra Data

* Add Test Code to Validate 'extradata' Parsing in 'rows event'

* fixed conding style

* Add event type to rows event data parsing for partition ID

* Add event type to check partition Id in rows event
  • Loading branch information
Chungeun Choi authored Sep 8, 2023
1 parent 09a16f2 commit ad9a6e6
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 3 deletions.
5 changes: 5 additions & 0 deletions replication/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,8 @@ const (
LAST_INSERT_ID
INSERT_ID
)

const (
ENUM_EXTRA_ROW_INFO_TYPECODE_NDB byte = iota
ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION
)
39 changes: 36 additions & 3 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,12 @@ type RowsEvent struct {
Flags uint16

// if version == 2
ExtraData []byte
// Use when DataLen value is greater than 2
NdbFormat byte
NdbData []byte

PartitionId uint16
SourcePartitionId uint16

// lenenc_int
ColumnCount uint64
Expand Down Expand Up @@ -955,8 +960,12 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) {
if e.Version == 2 {
dataLen := binary.LittleEndian.Uint16(data[pos:])
pos += 2

e.ExtraData = data[pos : pos+int(dataLen-2)]
if dataLen > 2 {
err := e.decodeExtraData(data[pos:])
if err != nil {
return 0, err
}
}
pos += int(dataLen - 2)
}

Expand Down Expand Up @@ -985,6 +994,29 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) {
return pos, nil
}

func (e *RowsEvent) decodeExtraData(data []byte) (err2 error) {
pos := 0
extraDataType := data[pos]
pos += 1
switch extraDataType {
case ENUM_EXTRA_ROW_INFO_TYPECODE_NDB:
var ndbLength int = int(data[pos])
pos += 1
e.NdbFormat = data[pos]
pos += 1
e.NdbData = data[pos : pos+ndbLength-2]
case ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION:
if e.eventType == UPDATE_ROWS_EVENTv1 || e.eventType == UPDATE_ROWS_EVENTv2 || e.eventType == PARTIAL_UPDATE_ROWS_EVENT {
e.PartitionId = binary.LittleEndian.Uint16(data[pos:])
pos += 2
e.SourcePartitionId = binary.LittleEndian.Uint16(data[pos:])
} else {
e.PartitionId = binary.LittleEndian.Uint16(data[pos:])
}
}
return nil
}

func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {
// Rows_log_event::print_verbose()

Expand Down Expand Up @@ -1742,6 +1774,7 @@ func (e *RowsEvent) Dump(w io.Writer) {
fmt.Fprintf(w, "TableID: %d\n", e.TableID)
fmt.Fprintf(w, "Flags: %d\n", e.Flags)
fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount)
fmt.Fprintf(w, "NDB data: %s\n", e.NdbData)

fmt.Fprintf(w, "Values:\n")
for _, rows := range e.Rows {
Expand Down
120 changes: 120 additions & 0 deletions replication/row_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,126 @@ func TestTableMapOptMetaVisibility(t *testing.T) {
}
}

func TestRowsDataExtraData(t *testing.T) {
// Only after mysql 8.0.16 version can be parsed from extradata to 'partition info' and 'ndb info'
testcases := []struct {
data []byte
tableData []byte
eventType EventType
expectPartitionId uint16
expectSourcePartitionId uint16
expectNdbFormat byte
expectNdbData []byte
}{
/*
mysql-cluster 8.0.32
+-------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+-----+---------+-------+
| p | int | NO | PRI | NULL | |
| c | int | YES | UNI | NULL | |
+-------+------+------+-----+---------+-------+
CREATE TABLE t (
p INT PRIMARY KEY,
c INT,
UNIQUE KEY u (c)
) ENGINE NDB;
INSERT INTO t VALUES (1,1), (2,2), (3,3), (4,4), (5,5);
*/
{
data: []byte("s\x00\x00\x00\x00\x00\x01\x00\x0f\x00\x00\f\x00\x01\x00\x00\x04\x80\x00\x04\x00\x00\x00\x02\xff\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x02\x00\x00\x00\x02\x00\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00\x03\x00\x00\x00\x03\x00\x00\x00\x00\x05\x00\x00\x00\x05\x00\x00\x00"),
tableData: []byte("s\x00\x00\x00\x00\x00\x01\x00\abdteste\x00\x01t\x00\x02\x03\x03\x00\x02\x01\x01\x00"),
eventType: WRITE_ROWS_EVENTv2,
expectPartitionId: 0x0,
expectSourcePartitionId: 0x0,
expectNdbFormat: 0x0,
expectNdbData: []byte("\x01\x00\x00\x04\x80\x00\x04\x00\x00\x00"),
},
/*
mysql 8.0.16
+-------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+-----+---------+-------+
| id | int | YES | | NULL | |
+-------+------+------+-----+---------+-------+
CREATE TABLE test (id INTEGER)
PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (1),
PARTITION p1 VALUES LESS THAN (2),
PARTITION p2 VALUES LESS THAN (3),
PARTITION p3 VALUES LESS THAN (4),
PARTITION p4 VALUES LESS THAN (5)
);
INSERT INTO test (id) VALUES(3);
UPDATE test set id = 1 WHERE id = 3;
*/
{
data: []byte("p\x03\x00\x00\x00\x00\x01\x00\x05\x00\x01\x03\x00\x01\xff\x00\x03\x00\x00\x00"),
tableData: []byte("p\x03\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01\x01\x01\x00"),
eventType: WRITE_ROWS_EVENTv2,
expectPartitionId: 0x3,
expectSourcePartitionId: 0x0,
expectNdbFormat: 0x0,
expectNdbData: []byte(nil),
},
{
data: []byte("p\x03\x00\x00\x00\x00\x01\x00\a\x00\x01\x01\x00\x03\x00\x01\xff\xff\x00\x03\x00\x00\x00\x00\x01\x00\x00\x00"),
tableData: []byte("p\x03\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01\x01\x01\x00"),
eventType: UPDATE_ROWS_EVENTv2,
expectPartitionId: 0x1,
expectSourcePartitionId: 0x3,
expectNdbFormat: 0x0,
expectNdbData: []byte(nil),
},
// mysql 5.7 and mariadb 14(15) does not surpot extra data
{
data: []byte("m\x00\x00\x00\x00\x00\x01\x00\x02\x00\x01\xff\xfe\x03\x00\x00\x00"),
tableData: []byte("m\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01"),
eventType: WRITE_ROWS_EVENTv2,
expectPartitionId: 0x0,
expectSourcePartitionId: 0x0,
expectNdbFormat: 0x0,
expectNdbData: []byte(nil),
},
{
data: []byte("m\x00\x00\x00\x00\x00\x01\x00\x02\x00\x01\xff\xff\xfe\x03\x00\x00\x00\xfe\x01\x00\x00\x00"),
tableData: []byte("m\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01"),
eventType: UPDATE_ROWS_EVENTv2,
expectPartitionId: 0x0,
expectSourcePartitionId: 0x0,
expectNdbFormat: 0x0,
expectNdbData: []byte(nil),
},
}

for _, tc := range testcases {
tableMapEvent := new(TableMapEvent)
tableMapEvent.tableIDSize = 6
err := tableMapEvent.Decode(tc.tableData)
require.NoError(t, err)

rowsEvent := new(RowsEvent)
rowsEvent.tableIDSize = 6
rowsEvent.tables = make(map[uint64]*TableMapEvent)
rowsEvent.tables[tableMapEvent.TableID] = tableMapEvent
rowsEvent.Version = 2
rowsEvent.eventType = tc.eventType

err = rowsEvent.Decode(tc.data)
require.NoError(t, err)
require.Equal(t, tc.expectPartitionId, rowsEvent.PartitionId)
require.Equal(t, tc.expectSourcePartitionId, rowsEvent.SourcePartitionId)
require.Equal(t, tc.expectNdbFormat, rowsEvent.NdbFormat)
require.Equal(t, tc.expectNdbData, rowsEvent.NdbData)
}
}

func TestTableMapHelperMaps(t *testing.T) {
/*
CREATE TABLE `_types` (
Expand Down

0 comments on commit ad9a6e6

Please sign in to comment.