Skip to content

Commit

Permalink
Fix binlog end log pos over lost data (#1367)
Browse files Browse the repository at this point in the history
* fix(fix binlog end-log-pos over 4g lost data): fix binlog end-log-pos over 4g lost data

* fix(fix-binlog-end-log-pos-over-lost-data): fix-binlog-end-log-pos-over-lost-data

Calculate whether the binlog log_pos overflows beyond 4G using end_log_pos and event_size.

* fix(x): x

x

* test(fix-binlog-end-log-pos-over-lost-data): add unit test

* fix(fix-binlog-end-log-pos-over-lost-data): x

x

* Update binlog.go

change IsLogPosOverflowBeyond4Bytes comment.

* Update binlog.go

x

* fix(fix-binlog-end-log-pos-over-lost-data): fix doc

x

---------

Co-authored-by: shaohoukun <[email protected]>
Co-authored-by: Tim Vaillancourt <[email protected]>
Co-authored-by: meiji163 <[email protected]>
  • Loading branch information
4 people authored Mar 12, 2024
1 parent 9ca2499 commit 48b34bc
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
5 changes: 5 additions & 0 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinate

// StreamEvents
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
if this.currentCoordinates.IsLogPosOverflowBeyond4Bytes(&this.LastAppliedRowsEventHint) {
return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", this.currentCoordinates)
}

if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
this.migrationContext.Log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
return nil
Expand Down Expand Up @@ -141,6 +145,7 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
this.currentCoordinates.EventSize = int64(ev.Header.EventSize)
}()

switch binlogEvent := ev.Event.(type) {
Expand Down
29 changes: 27 additions & 2 deletions go/mysql/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import (

// BinlogCoordinates described binary log coordinates in the form of log file & log position.
type BinlogCoordinates struct {
LogFile string
LogPos int64
LogFile string
LogPos int64
EventSize int64
}

// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306
Expand Down Expand Up @@ -74,3 +75,27 @@ func (this *BinlogCoordinates) SmallerThanOrEquals(other *BinlogCoordinates) boo
}
return this.LogFile == other.LogFile && this.LogPos == other.LogPos
}

// IsLogPosOverflowBeyond4Bytes returns true if the coordinate endpos is overflow beyond 4 bytes.
// The binlog event end_log_pos field type is defined as uint32, 4 bytes.
// https://github.com/go-mysql-org/go-mysql/blob/master/replication/event.go
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header
// Issue: https://github.com/github/gh-ost/issues/1366
func (this *BinlogCoordinates) IsLogPosOverflowBeyond4Bytes(preCoordinate *BinlogCoordinates) bool {
if preCoordinate == nil {
return false
}
if preCoordinate.IsEmpty() {
return false
}

if this.LogFile != preCoordinate.LogFile {
return false
}

if preCoordinate.LogPos+this.EventSize >= 1<<32 {
// Unexpected rows event, the previous binlog log_pos + current binlog event_size is overflow 4 bytes
return true
}
return false
}
44 changes: 44 additions & 0 deletions go/mysql/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package mysql

import (
"math"
"testing"

"github.com/openark/golib/log"
Expand Down Expand Up @@ -52,3 +53,46 @@ func TestBinlogCoordinatesAsKey(t *testing.T) {

test.S(t).ExpectEquals(len(m), 3)
}

func TestIsLogPosOverflowBeyond4Bytes(t *testing.T) {
{
var preCoordinates *BinlogCoordinates
curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 10321, EventSize: 1100}
test.S(t).ExpectFalse(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates))
}
{
preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 1100, EventSize: 1100}
curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1100)), EventSize: 1100}
test.S(t).ExpectFalse(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates))
}
{
preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00016", LogPos: 1100, EventSize: 1100}
curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1100)), EventSize: 1100}
test.S(t).ExpectFalse(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates))
}
{
preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 1001, EventSize: 1000}
curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000}
test.S(t).ExpectFalse(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates))
}
{
preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 1000, EventSize: 1000}
curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000}
test.S(t).ExpectFalse(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates))
}
{
preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 999, EventSize: 1000}
curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000}
test.S(t).ExpectTrue(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates))
}
{
preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(math.MaxUint32 - 500)), EventSize: 1000}
curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000}
test.S(t).ExpectTrue(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates))
}
{
preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32, EventSize: 1000}
curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000}
test.S(t).ExpectTrue(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates))
}
}

0 comments on commit 48b34bc

Please sign in to comment.