diff --git a/go/mysql/binlog/rbr.go b/go/mysql/binlog/rbr.go index 7512413f606..8a985c32a65 100644 --- a/go/mysql/binlog/rbr.go +++ b/go/mysql/binlog/rbr.go @@ -164,11 +164,9 @@ func printTimestamp(v uint32) *bytes.Buffer { } t := time.Unix(int64(v), 0).UTC() - year, month, day := t.Date() - hour, minute, second := t.Clock() result := &bytes.Buffer{} - fmt.Fprintf(result, "%04d-%02d-%02d %02d:%02d:%02d", year, int(month), day, hour, minute, second) + result.Write(t.AppendFormat(nil, sqltypes.TimestampFormat)) return result } diff --git a/go/mysql/binlog/rbr_test.go b/go/mysql/binlog/rbr_test.go index ce49e587e6c..8b7bd06c9e9 100644 --- a/go/mysql/binlog/rbr_test.go +++ b/go/mysql/binlog/rbr_test.go @@ -23,6 +23,8 @@ import ( "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" + + "github.com/stretchr/testify/assert" ) func TestCellLengthAndData(t *testing.T) { @@ -557,3 +559,10 @@ func TestCellLengthAndData(t *testing.T) { } } } + +func TestPrintTimestamp(t *testing.T) { + var timestamp uint32 = 1741794544 + + result := printTimestamp(timestamp).String() + assert.Equal(t, "2025-03-12 15:49:04", result) +} diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go index 90c2d7b9668..979f7537ada 100644 --- a/go/mysql/binlog_event.go +++ b/go/mysql/binlog_event.go @@ -111,7 +111,7 @@ type BinlogEvent interface { // GTID returns the GTID from the event, and if this event // also serves as a BEGIN statement. // This is only valid if IsGTID() returns true. - GTID(BinlogFormat) (replication.GTID, bool, error) + GTID(BinlogFormat) (gtid replication.GTID, hasBegin bool, lastCommitted int64, sequenceNumber int64, err error) // Query returns a Query struct representing data from a QUERY_EVENT. // This is only valid if IsQuery() returns true. Query(BinlogFormat) (Query, error) diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index 8c60956faf1..d32a8c6d2c5 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -40,8 +40,8 @@ func newFilePosBinlogEvent(buf []byte) *filePosBinlogEvent { return &filePosBinlogEvent{binlogEvent: binlogEvent(buf)} } -func (*filePosBinlogEvent) GTID(BinlogFormat) (replication.GTID, bool, error) { - return nil, false, nil +func (*filePosBinlogEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) { + return nil, false, 0, 0, nil } // IsSemiSyncAckRequested implements BinlogEvent.IsSemiSyncAckRequested(). @@ -224,8 +224,8 @@ func (ev filePosFakeEvent) Format() (BinlogFormat, error) { return BinlogFormat{}, nil } -func (ev filePosFakeEvent) GTID(BinlogFormat) (replication.GTID, bool, error) { - return nil, false, nil +func (ev filePosFakeEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) { + return nil, false, 0, 0, nil } func (ev filePosFakeEvent) Query(BinlogFormat) (Query, error) { @@ -304,6 +304,6 @@ func (ev filePosGTIDEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte, e return ev, nil, nil } -func (ev filePosGTIDEvent) GTID(BinlogFormat) (replication.GTID, bool, error) { - return ev.gtid, false, nil +func (ev filePosGTIDEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) { + return ev.gtid, false, 0, 0, nil } diff --git a/go/mysql/binlog_event_make_test.go b/go/mysql/binlog_event_make_test.go index 84535213cd9..9eef1a7108e 100644 --- a/go/mysql/binlog_event_make_test.go +++ b/go/mysql/binlog_event_make_test.go @@ -159,7 +159,7 @@ func TestMariadDBGTIDEVent(t *testing.T) { event, _, err := event.StripChecksum(f) require.NoError(t, err, "StripChecksum failed: %v", err) - gtid, hasBegin, err := event.GTID(f) + gtid, hasBegin, _, _, err := event.GTID(f) require.NoError(t, err, "NewMariaDBGTIDEvent().GTID() returned error: %v", err) require.True(t, hasBegin, "NewMariaDBGTIDEvent() didn't store hasBegin properly.") @@ -178,7 +178,7 @@ func TestMariadDBGTIDEVent(t *testing.T) { event, _, err = event.StripChecksum(f) require.NoError(t, err, "StripChecksum failed: %v", err) - gtid, hasBegin, err = event.GTID(f) + gtid, hasBegin, _, _, err = event.GTID(f) require.NoError(t, err, "NewMariaDBGTIDEvent().GTID() returned error: %v", err) require.False(t, hasBegin, "NewMariaDBGTIDEvent() didn't store hasBegin properly.") diff --git a/go/mysql/binlog_event_mariadb.go b/go/mysql/binlog_event_mariadb.go index f2c0ec8f369..f35edc0d9c7 100644 --- a/go/mysql/binlog_event_mariadb.go +++ b/go/mysql/binlog_event_mariadb.go @@ -60,17 +60,18 @@ func (ev mariadbBinlogEvent) IsGTID() bool { // 8 sequence number // 4 domain ID // 1 flags2 -func (ev mariadbBinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, error) { +func (ev mariadbBinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, int64, int64, error) { const FLStandalone = 1 data := ev.Bytes()[f.HeaderLength:] flags2 := data[8+4] - return replication.MariadbGTID{ + gtid := replication.MariadbGTID{ Sequence: binary.LittleEndian.Uint64(data[:8]), Domain: binary.LittleEndian.Uint32(data[8 : 8+4]), Server: ev.ServerID(), - }, flags2&FLStandalone == 0, nil + } + return gtid, flags2&FLStandalone == 0, 0, 0, nil } // PreviousGTIDs implements BinlogEvent.PreviousGTIDs(). diff --git a/go/mysql/binlog_event_mariadb_test.go b/go/mysql/binlog_event_mariadb_test.go index 83015e4389d..fc9cdf6f7ce 100644 --- a/go/mysql/binlog_event_mariadb_test.go +++ b/go/mysql/binlog_event_mariadb_test.go @@ -60,7 +60,7 @@ func TestMariadbNotBeginGTID(t *testing.T) { require.NoError(t, err) input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbStandaloneGTIDEvent)} - _, got, err := input.GTID(f) + _, got, _, _, err := input.GTID(f) require.NoError(t, err) assert.False(t, got, "%#v", input) } @@ -70,7 +70,7 @@ func TestMariadbIsBeginGTID(t *testing.T) { require.NoError(t, err) input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbBeginGTIDEvent)} - _, got, err := input.GTID(f) + _, got, _, _, err := input.GTID(f) require.NoError(t, err) assert.True(t, got, "%#v", input) } @@ -81,7 +81,7 @@ func TestMariadbStandaloneBinlogEventGTID(t *testing.T) { input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbStandaloneGTIDEvent)} want := replication.MariadbGTID{Domain: 0, Server: 62344, Sequence: 9} - got, hasBegin, err := input.GTID(f) + got, hasBegin, _, _, err := input.GTID(f) assert.NoError(t, err) assert.False(t, hasBegin) assert.Equal(t, want, got) @@ -93,7 +93,7 @@ func TestMariadbBinlogEventGTID(t *testing.T) { input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbBeginGTIDEvent)} want := replication.MariadbGTID{Domain: 0, Server: 62344, Sequence: 10} - got, hasBegin, err := input.GTID(f) + got, hasBegin, _, _, err := input.GTID(f) assert.NoError(t, err) assert.True(t, hasBegin) assert.Equal(t, want, got) diff --git a/go/mysql/binlog_event_mysql56.go b/go/mysql/binlog_event_mysql56.go index 3f931310ba9..7529df664ac 100644 --- a/go/mysql/binlog_event_mysql56.go +++ b/go/mysql/binlog_event_mysql56.go @@ -60,12 +60,27 @@ func (ev mysql56BinlogEvent) IsGTID() bool { // 1 flags // 16 SID (server UUID) // 8 GNO (sequence number, signed int) -func (ev mysql56BinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, error) { +// 1 lt_type +// 8 last_committed +// 8 sequence_number +func (ev mysql56BinlogEvent) GTID(f BinlogFormat) (gtid replication.GTID, hasBegin bool, lastCommitted int64, sequenceNumber int64, err error) { data := ev.Bytes()[f.HeaderLength:] var sid replication.SID - copy(sid[:], data[1:1+16]) - gno := int64(binary.LittleEndian.Uint64(data[1+16 : 1+16+8])) - return replication.Mysql56GTID{Server: sid, Sequence: gno}, false /* hasBegin */, nil + pos := 1 + copy(sid[:], data[pos:pos+16]) + pos += 16 // end of SID + gno := int64(binary.LittleEndian.Uint64(data[pos : pos+8])) + pos += 8 // end of GNO + pos += 1 // end of lt_type + if len(data) >= pos+8 { + lastCommitted = int64(binary.LittleEndian.Uint64(data[pos : pos+8])) + } + pos += 8 // end of last_committed + if len(data) >= pos+8 { + sequenceNumber = int64(binary.LittleEndian.Uint64(data[pos : pos+8])) + } + // pos += 8 // end of sequence_number + return replication.Mysql56GTID{Server: sid, Sequence: gno}, false /* hasBegin */, lastCommitted, sequenceNumber, nil } // PreviousGTIDs implements BinlogEvent.PreviousGTIDs(). diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index ede2abece99..8a6a6edf1aa 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -34,7 +34,7 @@ import ( // Sample event data for MySQL 5.6. var ( mysql56FormatEvent = NewMysql56BinlogEvent([]byte{0x78, 0x4e, 0x49, 0x55, 0xf, 0x64, 0x0, 0x0, 0x0, 0x74, 0x0, 0x0, 0x0, 0x78, 0x0, 0x0, 0x0, 0x1, 0x0, 0x4, 0x0, 0x35, 0x2e, 0x36, 0x2e, 0x32, 0x34, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x78, 0x4e, 0x49, 0x55, 0x13, 0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5c, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x19, 0x19, 0x0, 0x1, 0x18, 0x4a, 0xf, 0xca}) - mysql56GTIDEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x21, 0x64, 0x0, 0x0, 0x0, 0x30, 0x0, 0x0, 0x0, 0xf5, 0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x48, 0x45, 0x82, 0x27}) + mysql56GTIDEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x21, 0x64, 0x0, 0x0, 0x0, 0x30, 0x0, 0x0, 0x0, 0xf5, 0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 /* lt_type: */, 0x0 /* last_committed: */, 0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 /* sequence_number: */, 0x9, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x48, 0x45, 0x82, 0x27}) mysql56QueryEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3}) mysql56SemiSyncNoAckQueryEvent = NewMysql56BinlogEvent([]byte{0xef, 0x00, 0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3}) mysql56SemiSyncAckQueryEvent = NewMysql56BinlogEvent([]byte{0xef, 0x01, 0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3}) @@ -90,10 +90,12 @@ func TestMysql56GTID(t *testing.T) { Server: replication.SID{0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a}, Sequence: 4, } - got, hasBegin, err := input.GTID(format) + got, hasBegin, lastCommitted, sequenceNumber, err := input.GTID(format) require.NoError(t, err, "GTID() error: %v", err) assert.False(t, hasBegin, "GTID() returned hasBegin") assert.Equal(t, want, got, "GTID() = %#v, want %#v", got, want) + assert.Equal(t, int64(7), lastCommitted) + assert.Equal(t, int64(9), sequenceNumber) } func TestMysql56DecodeTransactionPayload(t *testing.T) { diff --git a/go/mysql/endtoend/replication_test.go b/go/mysql/endtoend/replication_test.go index a04f75c6b43..4c0e3f5864a 100644 --- a/go/mysql/endtoend/replication_test.go +++ b/go/mysql/endtoend/replication_test.go @@ -230,7 +230,7 @@ func TestRowReplicationWithRealDatabase(t *testing.T) { switch { case be.IsGTID(): // We expect one of these at least. - gtid, hasBegin, err := be.GTID(f) + gtid, hasBegin, _, _, err := be.GTID(f) if err != nil { t.Fatalf("GTID event is broken: %v", err) } diff --git a/go/mysql/replication/filepos_gtid.go b/go/mysql/replication/filepos_gtid.go index 95c7efcd3b1..1584c6db33a 100644 --- a/go/mysql/replication/filepos_gtid.go +++ b/go/mysql/replication/filepos_gtid.go @@ -69,6 +69,10 @@ func (gtid FilePosGTID) Flavor() string { return FilePosFlavorID } +func (gtid FilePosGTID) Empty() bool { + return gtid.File == "" +} + // SequenceDomain implements GTID.SequenceDomain(). func (gtid FilePosGTID) SequenceDomain() any { return nil @@ -137,6 +141,11 @@ func (gtid FilePosGTID) AddGTID(other GTID) GTIDSet { return filePosOther } +// AddGTIDInPlace implements GTIDSet.AddGTIDInPlace(). +func (gtid FilePosGTID) AddGTIDInPlace(other GTID) GTIDSet { + return gtid.AddGTID(other) +} + // Union implements GTIDSet.Union(). func (gtid FilePosGTID) Union(other GTIDSet) GTIDSet { filePosOther, ok := other.(FilePosGTID) @@ -147,6 +156,13 @@ func (gtid FilePosGTID) Union(other GTIDSet) GTIDSet { return filePosOther } +// Union implements GTIDSet.Union(). +func (gtid FilePosGTID) UnionInPlace(other GTIDSet) GTIDSet { + gtid = gtid.Union(other).(FilePosGTID) + + return gtid +} + // Last returns last filePosition // For filePos based GTID we have only one position // here we will just return the current filePos diff --git a/go/mysql/replication/gtid_set.go b/go/mysql/replication/gtid_set.go index 1e4ca29b42e..e3f11a6c55c 100644 --- a/go/mysql/replication/gtid_set.go +++ b/go/mysql/replication/gtid_set.go @@ -34,6 +34,9 @@ type GTIDSet interface { // registered in the transactionSetParsers map. Flavor() string + // Empty returns true when the GTID has no entries + Empty() bool + // ContainsGTID returns true if the set contains the specified transaction. ContainsGTID(GTID) bool @@ -47,9 +50,15 @@ type GTIDSet interface { // AddGTID returns a new GTIDSet that is expanded to contain the given GTID. AddGTID(GTID) GTIDSet + // AddGTID returns a new GTIDSet that is expanded to contain the given GTID, modifying the receiver GTID set + AddGTIDInPlace(GTID) GTIDSet + // Union returns a union of the receiver GTIDSet and the supplied GTIDSet. Union(GTIDSet) GTIDSet + // UnionInPlace returns a union of the receiver GTIDSet and the supplied GTIDSet, modifying the receiver GTID set + UnionInPlace(GTIDSet) GTIDSet + // Union returns a union of the receiver GTIDSet and the supplied GTIDSet. Last() string } diff --git a/go/mysql/replication/gtid_test.go b/go/mysql/replication/gtid_test.go index 8713f94b115..c4d39edd081 100644 --- a/go/mysql/replication/gtid_test.go +++ b/go/mysql/replication/gtid_test.go @@ -177,14 +177,16 @@ type fakeGTID struct { func (f fakeGTID) String() string { return f.value } func (f fakeGTID) Last() string { panic("not implemented") } func (f fakeGTID) Flavor() string { return f.flavor } +func (f fakeGTID) Empty() bool { return false } func (fakeGTID) SourceServer() any { return int(1) } func (fakeGTID) SequenceNumber() any { return int(1) } func (fakeGTID) SequenceDomain() any { return int(1) } func (f fakeGTID) GTIDSet() GTIDSet { return nil } -func (fakeGTID) ContainsGTID(GTID) bool { return false } -func (fakeGTID) Contains(GTIDSet) bool { return false } -func (f fakeGTID) Union(GTIDSet) GTIDSet { return f } +func (fakeGTID) ContainsGTID(GTID) bool { return false } +func (fakeGTID) Contains(GTIDSet) bool { return false } +func (f fakeGTID) Union(GTIDSet) GTIDSet { return f } +func (f fakeGTID) UnionInPlace(GTIDSet) GTIDSet { return f } func (f fakeGTID) Equal(other GTIDSet) bool { otherFake, ok := other.(fakeGTID) if !ok { @@ -192,4 +194,5 @@ func (f fakeGTID) Equal(other GTIDSet) bool { } return f == otherFake } -func (fakeGTID) AddGTID(GTID) GTIDSet { return nil } +func (fakeGTID) AddGTID(GTID) GTIDSet { return nil } +func (fakeGTID) AddGTIDInPlace(GTID) GTIDSet { return nil } diff --git a/go/mysql/replication/mariadb_gtid.go b/go/mysql/replication/mariadb_gtid.go index ff63964bbf1..7f01ec0ca60 100644 --- a/go/mysql/replication/mariadb_gtid.go +++ b/go/mysql/replication/mariadb_gtid.go @@ -148,6 +148,10 @@ func (gtidSet MariadbGTIDSet) Flavor() string { return MariadbFlavorID } +func (gtidSet MariadbGTIDSet) Empty() bool { + return len(gtidSet) == 0 +} + // ContainsGTID implements GTIDSet.ContainsGTID(). func (gtidSet MariadbGTIDSet) ContainsGTID(other GTID) bool { if other == nil { @@ -216,6 +220,19 @@ func (gtidSet MariadbGTIDSet) AddGTID(other GTID) GTIDSet { return newSet } +// AddGTID implements GTIDSet.AddGTID(). +func (gtidSet MariadbGTIDSet) AddGTIDInPlace(other GTID) GTIDSet { + if other == nil { + return gtidSet + } + mdbOther, ok := other.(MariadbGTID) + if !ok { + return gtidSet + } + gtidSet.addGTID(mdbOther) + return gtidSet +} + // Union implements GTIDSet.Union(). This is a pure method, and does not mutate the receiver. func (gtidSet MariadbGTIDSet) Union(other GTIDSet) GTIDSet { if gtidSet == nil && other != nil { @@ -237,6 +254,13 @@ func (gtidSet MariadbGTIDSet) Union(other GTIDSet) GTIDSet { return newSet } +// Union implements GTIDSet.Union(). +func (gtid MariadbGTIDSet) UnionInPlace(other GTIDSet) GTIDSet { + gtid = gtid.Union(other).(MariadbGTIDSet) + + return gtid +} + // Last returns the last gtid func (gtidSet MariadbGTIDSet) Last() string { // Sort domains so the string format is deterministic. diff --git a/go/mysql/replication/mysql56_gtid.go b/go/mysql/replication/mysql56_gtid.go index dd55caf1949..4e42b70708e 100644 --- a/go/mysql/replication/mysql56_gtid.go +++ b/go/mysql/replication/mysql56_gtid.go @@ -33,6 +33,10 @@ var ( ErrExpectMysql56Flavor = vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "expected MySQL GTID position but found a different or invalid format.") ) +func ParseMysql56GTID(s string) (GTID, error) { + return parseMysql56GTID(s) +} + // parseMysql56GTID is registered as a GTID parser. func parseMysql56GTID(s string) (GTID, error) { // Split into parts. @@ -94,8 +98,7 @@ func ParseSID(s string) (sid SID, err error) { type Mysql56GTID struct { // Server is the SID of the server that originally committed the transaction. Server SID - // Sequence is the sequence number of the transaction within a given Server's - // scope. + // Sequence is the sequence number of the transaction within a given Server's scope. Sequence int64 } @@ -126,7 +129,9 @@ func (gtid Mysql56GTID) SequenceNumber() any { // GTIDSet implements GTID.GTIDSet(). func (gtid Mysql56GTID) GTIDSet() GTIDSet { - return Mysql56GTIDSet{}.AddGTID(gtid) + return Mysql56GTIDSet{ + gtid.Server: []interval{{start: gtid.Sequence, end: gtid.Sequence}}, + } } func init() { diff --git a/go/mysql/replication/mysql56_gtid_set.go b/go/mysql/replication/mysql56_gtid_set.go index 77142272c6a..37087fa5cdf 100644 --- a/go/mysql/replication/mysql56_gtid_set.go +++ b/go/mysql/replication/mysql56_gtid_set.go @@ -206,6 +206,10 @@ func (set Mysql56GTIDSet) Last() string { // Flavor implements GTIDSet. func (Mysql56GTIDSet) Flavor() string { return Mysql56FlavorID } +func (set Mysql56GTIDSet) Empty() bool { + return len(set) == 0 +} + // ContainsGTID implements GTIDSet. func (set Mysql56GTIDSet) ContainsGTID(gtid GTID) bool { gtid56, ok := gtid.(Mysql56GTID) @@ -241,7 +245,11 @@ func (set Mysql56GTIDSet) Contains(other GTIDSet) bool { // Check each SID in the other set. for sid, otherIntervals := range other56 { i := 0 - intervals := set[sid] + intervals, ok := set[sid] + if !ok { + // other56 has a SID that `set` doesn't have. + return false + } count := len(intervals) // Check each interval for this SID in the other set. @@ -370,6 +378,65 @@ func (set Mysql56GTIDSet) AddGTID(gtid GTID) GTIDSet { return newSet } +// AddGTID implements GTIDSet. +func (set Mysql56GTIDSet) AddGTIDInPlace(gtid GTID) GTIDSet { + gtid56, ok := gtid.(Mysql56GTID) + if !ok { + return set + } + + added := false + intervals, ok := set[gtid56.Server] + if !ok { + set[gtid56.Server] = []interval{{start: gtid56.Sequence, end: gtid56.Sequence}} + return set + } + + var newIntervals []interval + // Look for the right place to add this GTID. + for _, iv := range intervals { + if gtid56.Sequence >= iv.start && gtid56.Sequence <= iv.end { + // GTID already exists in the set. + return set + } + if !added { + switch { + case gtid56.Sequence == iv.start-1: + // Expand the interval at the beginning. + iv.start = gtid56.Sequence + added = true + case gtid56.Sequence == iv.end+1: + // Expand the interval at the end. + iv.end = gtid56.Sequence + added = true + case gtid56.Sequence < iv.start-1: + // The next interval is beyond the new GTID, but it can't + // be expanded, so we have to insert a new interval. + newIntervals = append(newIntervals, interval{start: gtid56.Sequence, end: gtid56.Sequence}) + added = true + } + } + // Check if this interval can be merged with the previous one. + count := len(newIntervals) + if count != 0 && iv.start == newIntervals[count-1].end+1 { + // Merge instead of appending. + newIntervals[count-1].end = iv.end + } else { + // Can't be merged. + newIntervals = append(newIntervals, iv) + } + set[gtid56.Server] = newIntervals + } + + if !added { + // There wasn't any place to insert the new GTID, so just append it + // as a new interval. + set[gtid56.Server] = append(set[gtid56.Server], interval{start: gtid56.Sequence, end: gtid56.Sequence}) + } + + return set +} + // Union implements GTIDSet.Union(). func (set Mysql56GTIDSet) Union(other GTIDSet) GTIDSet { if set == nil && other != nil { @@ -439,6 +506,63 @@ func (set Mysql56GTIDSet) Union(other GTIDSet) GTIDSet { return newSet } +// Union implements GTIDSet.Union(). +func (set Mysql56GTIDSet) UnionInPlace(other GTIDSet) GTIDSet { + if other == nil { + return set + } + mydbOther, ok := other.(Mysql56GTIDSet) + if !ok { + return set + } + if set == nil { + set = mydbOther + return set + } + + var nextInterval interval + for otherSID, otherIntervals := range mydbOther { + intervals, ok := set[otherSID] + if !ok { + // No matching server id, so we must add it from other set. + set[otherSID] = otherIntervals + continue + } + + // Found server id match between sets, so now we need to add each interval. + s1 := intervals + s2 := otherIntervals + + var newIntervals = make([]interval, 0, len(s1)+len(s2)) // pre-allocation saves computation time later on + for popInterval(&nextInterval, &s1, &s2) { + if len(newIntervals) == 0 { + newIntervals = append(newIntervals, nextInterval) + continue + } + + activeInterval := &newIntervals[len(newIntervals)-1] + + if nextInterval.end <= activeInterval.end { + // We hit an interval whose start was after or equal to the previous interval's start, but whose + // end is prior to the active intervals end. Skip to next interval. + continue + } + + if nextInterval.start > activeInterval.end+1 { + // We found a gap, so we need to start a new interval. + newIntervals = append(newIntervals, nextInterval) + continue + } + + // Extend our active interval. + activeInterval.end = nextInterval.end + } + set[otherSID] = newIntervals + } + + return set +} + // SIDBlock returns the binary encoding of a MySQL 5.6 GTID set as expected // by internal commands that refer to an "SID block". // diff --git a/go/mysql/replication/mysql56_gtid_set_test.go b/go/mysql/replication/mysql56_gtid_set_test.go index bf87a992d5d..6d315c4efe2 100644 --- a/go/mysql/replication/mysql56_gtid_set_test.go +++ b/go/mysql/replication/mysql56_gtid_set_test.go @@ -18,6 +18,7 @@ package replication import ( "fmt" + "maps" "reflect" "strings" "testing" @@ -371,70 +372,117 @@ func TestMysql56GTIDSetAddGTID(t *testing.T) { sid3 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 17} // The set to test against. - set := Mysql56GTIDSet{ - sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, - sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, - } - - table := map[GTID]Mysql56GTIDSet{ - // Adding wrong flavor is a no-op. - fakeGTID{}: set, - - // Adding GTIDs that are already in the set - Mysql56GTID{Server: sid1, Sequence: 20}: { - sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, - sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + tcases := []struct { + name string + add GTID + expect Mysql56GTIDSet + }{ + { + "Adding wrong flavor is a no-op.", + fakeGTID{}, + Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + }, }, - Mysql56GTID{Server: sid1, Sequence: 30}: { - sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, - sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + { + "Adding GTIDs that are already in the set", + Mysql56GTID{Server: sid1, Sequence: 20}, + Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + }, }, - Mysql56GTID{Server: sid1, Sequence: 25}: { - sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, - sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + { + "Adding GTIDs that are already in the set 2", + Mysql56GTID{Server: sid1, Sequence: 30}, + Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + }, }, - // New interval beginning - Mysql56GTID{Server: sid1, Sequence: 1}: { - sid1: []interval{{1, 1}, {20, 30}, {35, 40}, {42, 45}}, - sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + { + "Adding GTIDs that are already in the set 3", + Mysql56GTID{Server: sid1, Sequence: 25}, + Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + }, }, - // New interval middle - Mysql56GTID{Server: sid1, Sequence: 32}: { - sid1: []interval{{20, 30}, {32, 32}, {35, 40}, {42, 45}}, - sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + { + "New interval beginning", + Mysql56GTID{Server: sid1, Sequence: 1}, + Mysql56GTIDSet{ + sid1: []interval{{1, 1}, {20, 30}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + }, }, - // New interval end - Mysql56GTID{Server: sid1, Sequence: 50}: { - sid1: []interval{{20, 30}, {35, 40}, {42, 45}, {50, 50}}, - sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + { + "New interval middle", + Mysql56GTID{Server: sid1, Sequence: 32}, + Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {32, 32}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + }, }, - // Extend interval start - Mysql56GTID{Server: sid2, Sequence: 49}: { - sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, - sid2: []interval{{1, 5}, {49, 50}, {60, 70}}, + { + "New interval end", + Mysql56GTID{Server: sid1, Sequence: 50}, + Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 40}, {42, 45}, {50, 50}}, + sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + }, }, - // Extend interval end - Mysql56GTID{Server: sid2, Sequence: 51}: { - sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, - sid2: []interval{{1, 5}, {50, 51}, {60, 70}}, + { + "Extend interval start", + Mysql56GTID{Server: sid2, Sequence: 49}, + Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {49, 50}, {60, 70}}, + }, }, - // Merge intervals - Mysql56GTID{Server: sid1, Sequence: 41}: { - sid1: []interval{{20, 30}, {35, 45}}, - sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + { + "Extend interval end", + Mysql56GTID{Server: sid2, Sequence: 51}, + Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {50, 51}, {60, 70}}, + }, }, - // Different SID - Mysql56GTID{Server: sid3, Sequence: 1}: { - sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, - sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, - sid3: []interval{{1, 1}}, + { + "Merge intervals", + Mysql56GTID{Server: sid1, Sequence: 41}, + Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 45}}, + sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + }, + }, + { + "Different SID", + Mysql56GTID{Server: sid3, Sequence: 1}, + Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + sid3: []interval{{1, 1}}, + }, }, } + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + set := Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {50, 50}, {60, 70}}, + } - for input, want := range table { - if got := set.AddGTID(input); !got.Equal(want) { - t.Errorf("AddGTID(%#v) = %#v, want %#v", input, got, want) - } + originalSet := maps.Clone(set) + got := set.AddGTID(tcase.add) + assert.Equal(t, originalSet, set) // ensure immutable + assert.Equal(t, tcase.expect, got) + + gotInplace := set.AddGTIDInPlace(tcase.add) + assert.Equal(t, gotInplace, set) // ensure mutable + assert.Equal(t, tcase.expect, gotInplace) + }) } } @@ -465,6 +513,120 @@ func TestMysql56GTIDSetUnion(t *testing.T) { } +func TestMysql56GTIDSetInPlaceUnion(t *testing.T) { + sid1 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + sid2 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16} + sid3 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 17} + + set1 := Mysql56GTIDSet{ + sid1: []interval{{20, 30}, {35, 40}, {42, 45}}, + sid2: []interval{{1, 5}, {20, 50}, {60, 70}}, + } + + set2 := Mysql56GTIDSet{ + sid1: []interval{{20, 31}, {35, 37}, {41, 46}}, + sid2: []interval{{3, 6}, {22, 49}, {67, 72}}, + sid3: []interval{{1, 45}}, + } + + got := set1.UnionInPlace(set2) + + want := Mysql56GTIDSet{ + sid1: []interval{{20, 31}, {35, 46}}, + sid2: []interval{{1, 6}, {20, 50}, {60, 72}}, + sid3: []interval{{1, 45}}, + } + assert.Equal(t, set1, got) // Because this is in-place + assert.Equal(t, want, got) + assert.True(t, got.Equal(want), "set1: %#v, set1.Union(%#v) = %#v, want %#v", set1, set2, got, want) + +} + +func BenchmarkMysql56GTIDSetAdd(b *testing.B) { + base := "00010203-0405-0607-0809-0a0b0c0d0e0f:1-5" + gtidSet, err := ParseMysql56GTIDSet(base) + require.NoError(b, err) + pos := Position{GTIDSet: gtidSet} + + var Inputs = []string{ + "00010203-0405-0607-0809-0a0b0c0d0e0f:6", + "00010203-0405-0607-0809-0a0b0c0d0e0f:12", + "00010203-0405-0607-0809-0a0b0c0d0e0f:13", + "00010203-0405-0607-0809-0a0b0c0d0e0f:14", + "00010203-0405-0607-0809-0a0b0c0d0e0f:18", + "00010203-0405-0607-0809-0a0b0c0d0e0f:21", + "00010203-0405-0607-0809-0a0b0c0d0e0f:22", + } + gtids := make([]GTID, len(Inputs)) + for i, input := range Inputs { + gtid, err := parseMysql56GTID(input) + require.NoError(b, err) + gtids[i] = gtid + } + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + for _, gtid := range gtids { + pos.GTIDSet = pos.GTIDSet.AddGTID(gtid) + } + } +} + +func BenchmarkMysql56GTIDSetUnion(b *testing.B) { + var Inputs = []string{ + "00010203-0405-0607-0809-0a0b0c0d0e0f:1-5", + "00010203-0405-0607-0809-0a0b0c0d0e0f:12", + "00010203-0405-0607-0809-0a0b0c0d0e0f:1-5:10-20", + "00010203-0405-0607-0809-0a0b0c0d0e0f:10-20:1-5", + "00010203-0405-0607-0809-0a0b0c0d0e0f:8-7", + "00010203-0405-0607-0809-0a0b0c0d0e0f:1-5:8-7:10-20", + "00010203-0405-0607-0809-0a0b0c0d0e0f:1-5:10-20,00010203-0405-0607-0809-0a0b0c0d0eff:1-5:50", + "8aabbf4f-5074-11ed-b225-aa23ce7e3ba2:1-20443,a6f1bf40-5073-11ed-9c0f-12a3889dc912:1-343402", + } + positions := make([]Position, len(Inputs)) + for i, input := range Inputs { + gtid, err := ParseMysql56GTIDSet(input) + require.NoError(b, err) + positions[i] = Position{GTIDSet: gtid} + } + var pos Position + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + for _, p := range positions { + pos.GTIDSet = p.GTIDSet.UnionInPlace(pos.GTIDSet) + } + } +} + +func BenchmarkMysql56GTIDSetUnionHappyPath(b *testing.B) { + var Inputs = []string{ + "00010203-0405-0607-0809-0a0b0c0d0e0f:1-5", + "00010203-0405-0607-0809-0a0b0c0d0e0f:12", + "00010203-0405-0607-0809-0a0b0c0d0e0f:12-15:17-20", + "00010203-0405-0607-0809-0a0b0c0d0e0f:21-30:41-45", + "00010203-0405-0607-0809-0a0b0c0d0e0f:48", + "00010203-0405-0607-0809-0a0b0c0d0e0f:49-50:52-53", + } + positions := make([]Position, len(Inputs)) + for i, input := range Inputs { + gtid, err := ParseMysql56GTIDSet(input) + require.NoError(b, err) + positions[i] = Position{GTIDSet: gtid} + } + var pos = Position{GTIDSet: Mysql56GTIDSet{}} + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + for _, p := range positions { + pos.GTIDSet = pos.GTIDSet.UnionInPlace(p.GTIDSet) + } + } +} + func TestMysql56GTIDSetDifference(t *testing.T) { sid1 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} sid2 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16} diff --git a/go/mysql/replication/mysql56_gtid_test.go b/go/mysql/replication/mysql56_gtid_test.go index a8bffed72b9..392ce54c60f 100644 --- a/go/mysql/replication/mysql56_gtid_test.go +++ b/go/mysql/replication/mysql56_gtid_test.go @@ -57,9 +57,7 @@ func TestSIDString(t *testing.T) { input := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} want := "00010203-0405-0607-0809-0a0b0c0d0e0f" - if got := strings.ToLower(input.String()); got != want { - t.Errorf("%#v.String() = %#v, want %#v", input, got, want) - } + assert.Equal(t, want, input.String()) } func TestParseSID(t *testing.T) { @@ -137,9 +135,7 @@ func TestMysql56GTIDGTIDSet(t *testing.T) { sid1 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} input := Mysql56GTID{Server: sid1, Sequence: 5432} want := Mysql56GTIDSet{sid1: []interval{{5432, 5432}}} - if got := input.GTIDSet(); !got.Equal(want) { - t.Errorf("%#v.GTIDSet() = %#v, want %#v", input, got, want) - } + assert.Equal(t, want, input.GTIDSet()) } func TestMysql56ParseGTID(t *testing.T) { diff --git a/go/mysql/replication/replication_position.go b/go/mysql/replication/replication_position.go index a1a9fc2c9c1..c4388735cbf 100644 --- a/go/mysql/replication/replication_position.go +++ b/go/mysql/replication/replication_position.go @@ -94,7 +94,7 @@ func (rp Position) String() string { // IsZero returns true if this is the zero value, Position{}. func (rp Position) IsZero() bool { - return rp.GTIDSet == nil + return rp.GTIDSet == nil || rp.GTIDSet.Empty() } // AppendGTID returns a new Position that represents the position @@ -109,6 +109,44 @@ func AppendGTID(rp Position, gtid GTID) Position { return Position{GTIDSet: rp.GTIDSet.AddGTID(gtid)} } +// AppendGTID returns a new Position that represents the position +// after the given GTID is replicated. +func AppendGTIDInPlace(rp Position, gtid GTID) Position { + // If gtid is nil, treat it as a no-op and return the input Position. + if gtid == nil { + return rp + } + if rp.GTIDSet == nil { + rp.GTIDSet = gtid.GTIDSet() + } else { + rp.GTIDSet.AddGTIDInPlace(gtid) + } + return rp +} + +// AppendGTIDSet returns a new Position that represents the position +// after the given GTIDSet is replicated. +func AppendGTIDSet(rp Position, gtidSet GTIDSet) Position { + if gtidSet == nil { + return rp + } + return Position{GTIDSet: gtidSet.Union(rp.GTIDSet)} +} + +// AppendGTIDSet returns a new Position that represents the position +// after the given GTIDSet is replicated. +func AppendGTIDSetInPlace(rp Position, gtidSet GTIDSet) Position { + if gtidSet == nil { + return rp + } + + if rp.GTIDSet == nil { + return AppendGTIDSet(rp, gtidSet) + } + rp.GTIDSet = rp.GTIDSet.UnionInPlace(gtidSet) + return rp +} + // MustParsePosition calls ParsePosition and panics // on error. func MustParsePosition(flavor, value string) Position { diff --git a/go/mysql/replication/replication_position_test.go b/go/mysql/replication/replication_position_test.go index 125f5929bbe..deb9f9f9af3 100644 --- a/go/mysql/replication/replication_position_test.go +++ b/go/mysql/replication/replication_position_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestPositionEqual(t *testing.T) { @@ -181,33 +182,159 @@ func TestPositionIsNotZero(t *testing.T) { } func TestPositionAppend(t *testing.T) { - input1 := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234}}} - input2 := MariadbGTID{Domain: 3, Server: 5555, Sequence: 1235} - want := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1235}}} + t.Run("MariaDB", func(t *testing.T) { + input1 := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234}}} + input2 := MariadbGTID{Domain: 3, Server: 5555, Sequence: 1235} + want := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1235}}} - if got := AppendGTID(input1, input2); !got.Equal(want) { - t.Errorf("AppendGTID(%#v, %#v) = %#v, want %#v", input1, input2, got, want) - } + if got := AppendGTID(input1, input2); !got.Equal(want) { + t.Errorf("AppendGTID(%#v, %#v) = %#v, want %#v", input1, input2, got, want) + } + }) + t.Run("MySQL56", func(t *testing.T) { + gtidset, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615") + require.NoError(t, err) + gtid, err := parseMysql56GTID("16b1039f-22b6-11ed-b765-0a43f95f28a3:616") + require.NoError(t, err) + want, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-616") + require.NoError(t, err) + + pos := Position{GTIDSet: gtidset} + wantPos := Position{GTIDSet: want} + + gotPos := AppendGTID(pos, gtid) + assert.Equal(t, wantPos, gotPos, "got=%v", gotPos) + assert.NotEqual(t, pos, gotPos) + + gotPos = AppendGTIDInPlace(pos, gtid) + assert.Equal(t, wantPos, gotPos) + assert.Equal(t, wantPos, pos) + }) +} + +func TestPositionAppendBefore(t *testing.T) { + t.Run("MySQL56", func(t *testing.T) { + gtidset, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:10-615") + require.NoError(t, err) + gtid, err := parseMysql56GTID("16b1039f-22b6-11ed-b765-0a43f95f28a3:9") + require.NoError(t, err) + want, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:9-615") + require.NoError(t, err) + + pos := Position{GTIDSet: gtidset} + wantPos := Position{GTIDSet: want} + + gotPos := AppendGTID(pos, gtid) + assert.Equal(t, wantPos, gotPos, "got=%v", gotPos) + assert.NotEqual(t, pos, gotPos) + + gotPos = AppendGTIDInPlace(pos, gtid) + assert.Equal(t, wantPos, gotPos) + assert.Equal(t, wantPos, pos) + }) +} + +func TestPositionAppendNewInterval(t *testing.T) { + t.Run("MySQL56", func(t *testing.T) { + gtidset, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615") + require.NoError(t, err) + gtid, err := parseMysql56GTID("16b1039f-22b6-11ed-b765-0a43f95f28a3:620") + require.NoError(t, err) + want, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615:620") + require.NoError(t, err) + + pos := Position{GTIDSet: gtidset} + wantPos := Position{GTIDSet: want} + + gotPos := AppendGTID(pos, gtid) + assert.Equal(t, wantPos, gotPos, "got=%v", gotPos) + assert.NotEqual(t, pos, gotPos) + + gotPos = AppendGTIDInPlace(pos, gtid) + assert.Equal(t, wantPos, gotPos) + assert.Equal(t, wantPos, pos) + }) +} + +func TestPositionAppendContains(t *testing.T) { + t.Run("MySQL56", func(t *testing.T) { + gtidset, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615") + require.NoError(t, err) + gtid, err := parseMysql56GTID("16b1039f-22b6-11ed-b765-0a43f95f28a3:600") + require.NoError(t, err) + want, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615") + require.NoError(t, err) + + pos := Position{GTIDSet: gtidset} + wantPos := Position{GTIDSet: want} + + gotPos := AppendGTID(pos, gtid) + assert.Equal(t, wantPos, gotPos, "got=%v", gotPos) + assert.Equal(t, pos, gotPos) + + gotPos = AppendGTIDInPlace(pos, gtid) + assert.Equal(t, wantPos, gotPos) + assert.Equal(t, wantPos, pos) + }) } func TestPositionAppendNil(t *testing.T) { - input1 := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234}}} - input2 := GTID(nil) - want := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234}}} + t.Run("MariaDB", func(t *testing.T) { + input1 := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234}}} + input2 := GTID(nil) + want := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234}}} - if got := AppendGTID(input1, input2); !got.Equal(want) { - t.Errorf("AppendGTID(%#v, %#v) = %#v, want %#v", input1, input2, got, want) - } + if got := AppendGTID(input1, input2); !got.Equal(want) { + t.Errorf("AppendGTID(%#v, %#v) = %#v, want %#v", input1, input2, got, want) + } + }) + t.Run("MySQL56", func(t *testing.T) { + gtidset, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615") + require.NoError(t, err) + gtid := GTID(nil) + require.NoError(t, err) + want, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615") + require.NoError(t, err) + + pos := Position{GTIDSet: gtidset} + wantPos := Position{GTIDSet: want} + + gotPos := AppendGTID(pos, gtid) + assert.Equal(t, wantPos, gotPos, "got=%v", gotPos) + assert.Equal(t, pos, gotPos) + + gotPos = AppendGTIDInPlace(pos, gtid) + assert.Equal(t, wantPos, gotPos) + assert.Equal(t, wantPos, pos) + }) } func TestPositionAppendToZero(t *testing.T) { - input1 := Position{} - input2 := MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234} - want := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234}}} + t.Run("MariaDB", func(t *testing.T) { + input1 := Position{} + input2 := MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234} + want := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234}}} - if got := AppendGTID(input1, input2); !got.Equal(want) { - t.Errorf("AppendGTID(%#v, %#v) = %#v, want %#v", input1, input2, got, want) - } + if got := AppendGTID(input1, input2); !got.Equal(want) { + t.Errorf("AppendGTID(%#v, %#v) = %#v, want %#v", input1, input2, got, want) + } + }) + t.Run("MySQL56", func(t *testing.T) { + gtid, err := parseMysql56GTID("16b1039f-22b6-11ed-b765-0a43f95f28a3:616") + require.NoError(t, err) + want, err := ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:616") + require.NoError(t, err) + + pos := Position{} + wantPos := Position{GTIDSet: want} + + gotPos := AppendGTID(pos, gtid) + assert.Equal(t, wantPos, gotPos, "got=%v", gotPos) + assert.NotEqual(t, pos, gotPos) + + gotPos = AppendGTIDInPlace(pos, gtid) + assert.Equal(t, wantPos, gotPos) + }) } func TestMustParsePosition(t *testing.T) { diff --git a/go/vt/binlog/binlog_streamer.go b/go/vt/binlog/binlog_streamer.go index 08e06ec803c..1fdd0473f3e 100644 --- a/go/vt/binlog/binlog_streamer.go +++ b/go/vt/binlog/binlog_streamer.go @@ -344,7 +344,7 @@ func (bls *Streamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog switch { case ev.IsPseudo(): - gtid, _, err = ev.GTID(format) + gtid, _, _, _, err = ev.GTID(format) if err != nil { return pos, fmt.Errorf("can't get GTID from binlog event: %v, event data: %#v", err, ev) } @@ -360,7 +360,7 @@ func (bls *Streamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } case ev.IsGTID(): // GTID_EVENT: update current GTID, maybe BEGIN. var hasBegin bool - gtid, hasBegin, err = ev.GTID(format) + gtid, hasBegin, _, _, err = ev.GTID(format) if err != nil { return pos, fmt.Errorf("can't get GTID from binlog event: %v, event data: %#v", err, ev) } diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index fb848125a3d..161a5f84b75 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -304,6 +304,8 @@ const ( // If a client experiences some disruptions before receiving the event, // the client should restart the copy operation. VEventType_COPY_COMPLETED VEventType = 20 + // Indicates rotation into a new binary log + VEventType_PREVIOUS_GTIDS VEventType = 21 ) // Enum value maps for VEventType. @@ -330,6 +332,7 @@ var ( 18: "LASTPK", 19: "SAVEPOINT", 20: "COPY_COMPLETED", + 21: "PREVIOUS_GTIDS", } VEventType_value = map[string]int32{ "UNKNOWN": 0, @@ -353,6 +356,7 @@ var ( "LASTPK": 18, "SAVEPOINT": 19, "COPY_COMPLETED": 20, + "PREVIOUS_GTIDS": 21, } ) @@ -1866,7 +1870,7 @@ type VEvent struct { // Timestamp is the binlog timestamp in seconds. // The value should be ignored if 0. Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - // Gtid is set if the event type is GTID. + // Gtid is set if the event type is GTID. This is a combined GTIDs set of this event and previous events. Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"` // Statement is set if the event type is DDL, DML or SAVEPOINT. Statement string `protobuf:"bytes,4,opt,name=statement,proto3" json:"statement,omitempty"` @@ -1894,8 +1898,14 @@ type VEvent struct { Throttled bool `protobuf:"varint,24,opt,name=throttled,proto3" json:"throttled,omitempty"` // ThrottledReason is a human readable string that explains why the stream is throttled ThrottledReason string `protobuf:"bytes,25,opt,name=throttled_reason,json=throttledReason,proto3" json:"throttled_reason,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // For GTID events, the sequence number of the most recent transaction this event depends on. + CommitParent int64 `protobuf:"varint,26,opt,name=commit_parent,json=commitParent,proto3" json:"commit_parent,omitempty"` + // For GTID events, the sequence number (logical clock) value of this transaction. + SequenceNumber int64 `protobuf:"varint,27,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` + // EventGTID is decorated by VPlayer. It is the specific GTID (not the GTID set) for this event. + EventGtid string `protobuf:"bytes,28,opt,name=event_gtid,json=eventGtid,proto3" json:"event_gtid,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *VEvent) Reset() { @@ -2033,6 +2043,27 @@ func (x *VEvent) GetThrottledReason() string { return "" } +func (x *VEvent) GetCommitParent() int64 { + if x != nil { + return x.CommitParent + } + return 0 +} + +func (x *VEvent) GetSequenceNumber() int64 { + if x != nil { + return x.SequenceNumber + } + return 0 +} + +func (x *VEvent) GetEventGtid() string { + if x != nil { + return x.EventGtid + } + return "" +} + type MinimalTable struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` @@ -3182,7 +3213,7 @@ const file_binlogdata_proto_rawDesc = "" + "\vshard_gtids\x18\x05 \x03(\v2\x15.binlogdata.ShardGtidR\n" + "shardGtids\x12=\n" + "\fparticipants\x18\x06 \x03(\v2\x19.binlogdata.KeyspaceShardR\fparticipants\x12)\n" + - "\x10source_workflows\x18\a \x03(\tR\x0fsourceWorkflows\"\xb6\x04\n" + + "\x10source_workflows\x18\a \x03(\tR\x0fsourceWorkflows\"\xa3\x05\n" + "\x06VEvent\x12*\n" + "\x04type\x18\x01 \x01(\x0e2\x16.binlogdata.VEventTypeR\x04type\x12\x1c\n" + "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12\x12\n" + @@ -3199,7 +3230,11 @@ const file_binlogdata_proto_rawDesc = "" + "\bkeyspace\x18\x16 \x01(\tR\bkeyspace\x12\x14\n" + "\x05shard\x18\x17 \x01(\tR\x05shard\x12\x1c\n" + "\tthrottled\x18\x18 \x01(\bR\tthrottled\x12)\n" + - "\x10throttled_reason\x18\x19 \x01(\tR\x0fthrottledReason\"\x8d\x01\n" + + "\x10throttled_reason\x18\x19 \x01(\tR\x0fthrottledReason\x12#\n" + + "\rcommit_parent\x18\x1a \x01(\x03R\fcommitParent\x12'\n" + + "\x0fsequence_number\x18\x1b \x01(\x03R\x0esequenceNumber\x12\x1d\n" + + "\n" + + "event_gtid\x18\x1c \x01(\tR\teventGtid\"\x8d\x01\n" + "\fMinimalTable\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12$\n" + "\x06fields\x18\x02 \x03(\v2\f.query.FieldR\x06fields\x12\x1e\n" + @@ -3300,7 +3335,7 @@ const file_binlogdata_proto_rawDesc = "" + "\aCopying\x10\x03\x12\v\n" + "\aRunning\x10\x04\x12\t\n" + "\x05Error\x10\x05\x12\v\n" + - "\aLagging\x10\x06*\x8d\x02\n" + + "\aLagging\x10\x06*\xa1\x02\n" + "\n" + "VEventType\x12\v\n" + "\aUNKNOWN\x10\x00\x12\b\n" + @@ -3329,7 +3364,8 @@ const file_binlogdata_proto_rawDesc = "" + "\n" + "\x06LASTPK\x10\x12\x12\r\n" + "\tSAVEPOINT\x10\x13\x12\x12\n" + - "\x0eCOPY_COMPLETED\x10\x14*'\n" + + "\x0eCOPY_COMPLETED\x10\x14\x12\x12\n" + + "\x0ePREVIOUS_GTIDS\x10\x15*'\n" + "\rMigrationType\x12\n" + "\n" + "\x06TABLES\x10\x00\x12\n" + diff --git a/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go b/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go index 74433a6d6b9..38c92276fa0 100644 --- a/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go +++ b/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go @@ -512,6 +512,9 @@ func (m *VEvent) CloneVT() *VEvent { r.Shard = m.Shard r.Throttled = m.Throttled r.ThrottledReason = m.ThrottledReason + r.CommitParent = m.CommitParent + r.SequenceNumber = m.SequenceNumber + r.EventGtid = m.EventGtid if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -2175,6 +2178,29 @@ func (m *VEvent) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.EventGtid) > 0 { + i -= len(m.EventGtid) + copy(dAtA[i:], m.EventGtid) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.EventGtid))) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xe2 + } + if m.SequenceNumber != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.SequenceNumber)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xd8 + } + if m.CommitParent != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.CommitParent)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xd0 + } if len(m.ThrottledReason) > 0 { i -= len(m.ThrottledReason) copy(dAtA[i:], m.ThrottledReason) @@ -3905,6 +3931,16 @@ func (m *VEvent) SizeVT() (n int) { if l > 0 { n += 2 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.CommitParent != 0 { + n += 2 + protohelpers.SizeOfVarint(uint64(m.CommitParent)) + } + if m.SequenceNumber != 0 { + n += 2 + protohelpers.SizeOfVarint(uint64(m.SequenceNumber)) + } + l = len(m.EventGtid) + if l > 0 { + n += 2 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -8265,6 +8301,76 @@ func (m *VEvent) UnmarshalVT(dAtA []byte) error { } m.ThrottledReason = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 26: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field CommitParent", wireType) + } + m.CommitParent = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.CommitParent |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 27: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SequenceNumber", wireType) + } + m.SequenceNumber = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SequenceNumber |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 28: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field EventGtid", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.EventGtid = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 92980ff9e44..51c14b7515f 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -452,6 +452,9 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan [] evs[i].Timestamp = 0 evs[i].Keyspace = "" evs[i].Shard = "" + evs[i].SequenceNumber = 0 + evs[i].CommitParent = 0 + evs[i].EventGtid = "" if evs[i].Type == binlogdatapb.VEventType_FIELD { for j := range evs[i].FieldEvent.Fields { evs[i].FieldEvent.Fields[j].Flags = 0 diff --git a/go/vt/vttablet/tabletserver/vstreamer/main_test.go b/go/vt/vttablet/tabletserver/vstreamer/main_test.go index ce79b4ef5d0..4657c5501c8 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/main_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/main_test.go @@ -269,6 +269,10 @@ func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlog if !testRowEventFlags && evs[i].Type == binlogdatapb.VEventType_ROW { evs[i].RowEvent.Flags = 0 } + // cleanup indeterministic fields: + evs[i].SequenceNumber = 0 + evs[i].CommitParent = 0 + evs[i].EventGtid = "" want = env.RemoveAnyDeprecatedDisplayWidths(want) if got := fmt.Sprintf("%v", evs[i]); got != want { log.Errorf("%v (%d): event:\n%q, want\n%q", input, i, got, want) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 46a2c4e38a8..9a9b2a169ad 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -80,9 +80,12 @@ type vstreamer struct { versionTableID uint64 // format and pos are updated by parseEvent. - format mysql.BinlogFormat - pos replication.Position - stopPos string + format mysql.BinlogFormat + pos replication.Position + stopPos string + commitParent int64 + sequenceNumber int64 + eventGTID replication.GTID phase string vse *Engine @@ -238,6 +241,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog vevent.Shard = vs.vse.shard switch vevent.Type { + case binlogdatapb.VEventType_PREVIOUS_GTIDS: + // At this time do nothing. Ideally we would issue a `bufferedEvents = append(bufferedEvents, vevent)`, + // ie merged into the `case` clause below. + // But at this time the tests will fail as this event is unexpected. This is a TODO for the earliest + // opportunity to work on this. case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD, binlogdatapb.VEventType_JOURNAL: // We never have to send GTID, BEGIN, FIELD events on their own. @@ -439,6 +447,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev if err != nil { return nil, fmt.Errorf("can't parse FORMAT_DESCRIPTION_EVENT: %v, event data: %#v", err, ev) } + vs.eventGTID = nil return nil, nil } @@ -460,19 +469,32 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev return nil, fmt.Errorf("can't strip checksum from binlog event: %v, event data: %#v", err, ev) } + timeNowUnixNano := time.Now().UnixNano() var vevents []*binlogdatapb.VEvent switch { + case ev.IsRotate(), ev.IsStop(): + vs.eventGTID = nil + case ev.IsPreviousGTIDs(): + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_PREVIOUS_GTIDS, + }) + vs.eventGTID = nil case ev.IsGTID(): - gtid, hasBegin, err := ev.GTID(vs.format) + gtid, hasBegin, commitParent, sequenceNumber, err := ev.GTID(vs.format) if err != nil { return nil, vterrors.Wrapf(err, "failed to get GTID from binlog event: %#v", ev) } if hasBegin { vevents = append(vevents, &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_BEGIN, + Type: binlogdatapb.VEventType_BEGIN, + CommitParent: commitParent, + SequenceNumber: sequenceNumber, }) } - vs.pos = replication.AppendGTID(vs.pos, gtid) + vs.pos = replication.AppendGTIDInPlace(vs.pos, gtid) + vs.commitParent = commitParent + vs.sequenceNumber = sequenceNumber + vs.eventGTID = gtid case ev.IsXID(): vevents = append(vevents, &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_GTID, @@ -695,7 +717,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev } for _, tpvevent := range tpvevents { tpvevent.Timestamp = int64(ev.Timestamp()) - tpvevent.CurrentTime = time.Now().UnixNano() + tpvevent.CurrentTime = timeNowUnixNano if err := bufferAndTransmit(tpvevent); err != nil { if err == io.EOF { return nil, nil @@ -710,9 +732,18 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev } vs.vse.vstreamerCompressedTransactionsDecoded.Add(1) } + vsEventGTIDString := "" + if vs.eventGTID != nil { + vsEventGTIDString = vs.eventGTID.String() + } for _, vevent := range vevents { vevent.Timestamp = int64(ev.Timestamp()) - vevent.CurrentTime = time.Now().UnixNano() + vevent.CurrentTime = timeNowUnixNano + vevent.SequenceNumber = vs.sequenceNumber + vevent.CommitParent = vs.commitParent + if vs.eventGTID != nil { + vevent.EventGtid = vsEventGTIDString + } } return vevents, nil } diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index 8fa9a6f4921..d4b0abecd94 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -319,6 +319,8 @@ enum VEventType { // If a client experiences some disruptions before receiving the event, // the client should restart the copy operation. COPY_COMPLETED = 20; + // Indicates rotation into a new binary log + PREVIOUS_GTIDS = 21; } @@ -450,7 +452,7 @@ message VEvent { // Timestamp is the binlog timestamp in seconds. // The value should be ignored if 0. int64 timestamp = 2; - // Gtid is set if the event type is GTID. + // Gtid is set if the event type is GTID. This is a combined GTIDs set of this event and previous events. string gtid = 3; // Statement is set if the event type is DDL, DML or SAVEPOINT. string statement = 4; @@ -478,6 +480,12 @@ message VEvent { bool throttled = 24; // ThrottledReason is a human readable string that explains why the stream is throttled string throttled_reason = 25; + // For GTID events, the sequence number of the most recent transaction this event depends on. + int64 commit_parent = 26; + // For GTID events, the sequence number (logical clock) value of this transaction. + int64 sequence_number = 27; + // EventGTID is decorated by VPlayer. It is the specific GTID (not the GTID set) for this event. + string event_gtid = 28; } message MinimalTable { diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 67e8dd462da..6da0ae9df46 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -38304,7 +38304,8 @@ export namespace binlogdata { VERSION = 17, LASTPK = 18, SAVEPOINT = 19, - COPY_COMPLETED = 20 + COPY_COMPLETED = 20, + PREVIOUS_GTIDS = 21 } /** Properties of a RowChange. */ @@ -39283,6 +39284,15 @@ export namespace binlogdata { /** VEvent throttled_reason */ throttled_reason?: (string|null); + + /** VEvent commit_parent */ + commit_parent?: (number|Long|null); + + /** VEvent sequence_number */ + sequence_number?: (number|Long|null); + + /** VEvent event_gtid */ + event_gtid?: (string|null); } /** Represents a VEvent. */ @@ -39339,6 +39349,15 @@ export namespace binlogdata { /** VEvent throttled_reason. */ public throttled_reason: string; + /** VEvent commit_parent. */ + public commit_parent: (number|Long); + + /** VEvent sequence_number. */ + public sequence_number: (number|Long); + + /** VEvent event_gtid. */ + public event_gtid: string; + /** * Creates a new VEvent instance using the specified properties. * @param [properties] Properties to set diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index 86d78c1c718..4339c153aa0 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -90017,6 +90017,7 @@ export const binlogdata = $root.binlogdata = (() => { * @property {number} LASTPK=18 LASTPK value * @property {number} SAVEPOINT=19 SAVEPOINT value * @property {number} COPY_COMPLETED=20 COPY_COMPLETED value + * @property {number} PREVIOUS_GTIDS=21 PREVIOUS_GTIDS value */ binlogdata.VEventType = (function() { const valuesById = {}, values = Object.create(valuesById); @@ -90041,6 +90042,7 @@ export const binlogdata = $root.binlogdata = (() => { values[valuesById[18] = "LASTPK"] = 18; values[valuesById[19] = "SAVEPOINT"] = 19; values[valuesById[20] = "COPY_COMPLETED"] = 20; + values[valuesById[21] = "PREVIOUS_GTIDS"] = 21; return values; })(); @@ -92501,6 +92503,9 @@ export const binlogdata = $root.binlogdata = (() => { * @property {string|null} [shard] VEvent shard * @property {boolean|null} [throttled] VEvent throttled * @property {string|null} [throttled_reason] VEvent throttled_reason + * @property {number|Long|null} [commit_parent] VEvent commit_parent + * @property {number|Long|null} [sequence_number] VEvent sequence_number + * @property {string|null} [event_gtid] VEvent event_gtid */ /** @@ -92638,6 +92643,30 @@ export const binlogdata = $root.binlogdata = (() => { */ VEvent.prototype.throttled_reason = ""; + /** + * VEvent commit_parent. + * @member {number|Long} commit_parent + * @memberof binlogdata.VEvent + * @instance + */ + VEvent.prototype.commit_parent = $util.Long ? $util.Long.fromBits(0,0,false) : 0; + + /** + * VEvent sequence_number. + * @member {number|Long} sequence_number + * @memberof binlogdata.VEvent + * @instance + */ + VEvent.prototype.sequence_number = $util.Long ? $util.Long.fromBits(0,0,false) : 0; + + /** + * VEvent event_gtid. + * @member {string} event_gtid + * @memberof binlogdata.VEvent + * @instance + */ + VEvent.prototype.event_gtid = ""; + /** * Creates a new VEvent instance using the specified properties. * @function create @@ -92692,6 +92721,12 @@ export const binlogdata = $root.binlogdata = (() => { writer.uint32(/* id 24, wireType 0 =*/192).bool(message.throttled); if (message.throttled_reason != null && Object.hasOwnProperty.call(message, "throttled_reason")) writer.uint32(/* id 25, wireType 2 =*/202).string(message.throttled_reason); + if (message.commit_parent != null && Object.hasOwnProperty.call(message, "commit_parent")) + writer.uint32(/* id 26, wireType 0 =*/208).int64(message.commit_parent); + if (message.sequence_number != null && Object.hasOwnProperty.call(message, "sequence_number")) + writer.uint32(/* id 27, wireType 0 =*/216).int64(message.sequence_number); + if (message.event_gtid != null && Object.hasOwnProperty.call(message, "event_gtid")) + writer.uint32(/* id 28, wireType 2 =*/226).string(message.event_gtid); return writer; }; @@ -92786,6 +92821,18 @@ export const binlogdata = $root.binlogdata = (() => { message.throttled_reason = reader.string(); break; } + case 26: { + message.commit_parent = reader.int64(); + break; + } + case 27: { + message.sequence_number = reader.int64(); + break; + } + case 28: { + message.event_gtid = reader.string(); + break; + } default: reader.skipType(tag & 7); break; @@ -92846,6 +92893,7 @@ export const binlogdata = $root.binlogdata = (() => { case 18: case 19: case 20: + case 21: break; } if (message.timestamp != null && message.hasOwnProperty("timestamp")) @@ -92900,6 +92948,15 @@ export const binlogdata = $root.binlogdata = (() => { if (message.throttled_reason != null && message.hasOwnProperty("throttled_reason")) if (!$util.isString(message.throttled_reason)) return "throttled_reason: string expected"; + if (message.commit_parent != null && message.hasOwnProperty("commit_parent")) + if (!$util.isInteger(message.commit_parent) && !(message.commit_parent && $util.isInteger(message.commit_parent.low) && $util.isInteger(message.commit_parent.high))) + return "commit_parent: integer|Long expected"; + if (message.sequence_number != null && message.hasOwnProperty("sequence_number")) + if (!$util.isInteger(message.sequence_number) && !(message.sequence_number && $util.isInteger(message.sequence_number.low) && $util.isInteger(message.sequence_number.high))) + return "sequence_number: integer|Long expected"; + if (message.event_gtid != null && message.hasOwnProperty("event_gtid")) + if (!$util.isString(message.event_gtid)) + return "event_gtid: string expected"; return null; }; @@ -93006,6 +93063,10 @@ export const binlogdata = $root.binlogdata = (() => { case 20: message.type = 20; break; + case "PREVIOUS_GTIDS": + case 21: + message.type = 21; + break; } if (object.timestamp != null) if ($util.Long) @@ -93064,6 +93125,26 @@ export const binlogdata = $root.binlogdata = (() => { message.throttled = Boolean(object.throttled); if (object.throttled_reason != null) message.throttled_reason = String(object.throttled_reason); + if (object.commit_parent != null) + if ($util.Long) + (message.commit_parent = $util.Long.fromValue(object.commit_parent)).unsigned = false; + else if (typeof object.commit_parent === "string") + message.commit_parent = parseInt(object.commit_parent, 10); + else if (typeof object.commit_parent === "number") + message.commit_parent = object.commit_parent; + else if (typeof object.commit_parent === "object") + message.commit_parent = new $util.LongBits(object.commit_parent.low >>> 0, object.commit_parent.high >>> 0).toNumber(); + if (object.sequence_number != null) + if ($util.Long) + (message.sequence_number = $util.Long.fromValue(object.sequence_number)).unsigned = false; + else if (typeof object.sequence_number === "string") + message.sequence_number = parseInt(object.sequence_number, 10); + else if (typeof object.sequence_number === "number") + message.sequence_number = object.sequence_number; + else if (typeof object.sequence_number === "object") + message.sequence_number = new $util.LongBits(object.sequence_number.low >>> 0, object.sequence_number.high >>> 0).toNumber(); + if (object.event_gtid != null) + message.event_gtid = String(object.event_gtid); return message; }; @@ -93104,6 +93185,17 @@ export const binlogdata = $root.binlogdata = (() => { object.shard = ""; object.throttled = false; object.throttled_reason = ""; + if ($util.Long) { + let long = new $util.Long(0, 0, false); + object.commit_parent = options.longs === String ? long.toString() : options.longs === Number ? long.toNumber() : long; + } else + object.commit_parent = options.longs === String ? "0" : 0; + if ($util.Long) { + let long = new $util.Long(0, 0, false); + object.sequence_number = options.longs === String ? long.toString() : options.longs === Number ? long.toNumber() : long; + } else + object.sequence_number = options.longs === String ? "0" : 0; + object.event_gtid = ""; } if (message.type != null && message.hasOwnProperty("type")) object.type = options.enums === String ? $root.binlogdata.VEventType[message.type] === undefined ? message.type : $root.binlogdata.VEventType[message.type] : message.type; @@ -93141,6 +93233,18 @@ export const binlogdata = $root.binlogdata = (() => { object.throttled = message.throttled; if (message.throttled_reason != null && message.hasOwnProperty("throttled_reason")) object.throttled_reason = message.throttled_reason; + if (message.commit_parent != null && message.hasOwnProperty("commit_parent")) + if (typeof message.commit_parent === "number") + object.commit_parent = options.longs === String ? String(message.commit_parent) : message.commit_parent; + else + object.commit_parent = options.longs === String ? $util.Long.prototype.toString.call(message.commit_parent) : options.longs === Number ? new $util.LongBits(message.commit_parent.low >>> 0, message.commit_parent.high >>> 0).toNumber() : message.commit_parent; + if (message.sequence_number != null && message.hasOwnProperty("sequence_number")) + if (typeof message.sequence_number === "number") + object.sequence_number = options.longs === String ? String(message.sequence_number) : message.sequence_number; + else + object.sequence_number = options.longs === String ? $util.Long.prototype.toString.call(message.sequence_number) : options.longs === Number ? new $util.LongBits(message.sequence_number.low >>> 0, message.sequence_number.high >>> 0).toNumber() : message.sequence_number; + if (message.event_gtid != null && message.hasOwnProperty("event_gtid")) + object.event_gtid = message.event_gtid; return object; };