Skip to content

Commit

Permalink
fix(canal): Preserve binlog filename received in the fake rotate event (
Browse files Browse the repository at this point in the history
#428)

* fix(canal): Preserve binlog filename received in the fake rotate event #406
  • Loading branch information
mefcorvi authored and siddontang committed Sep 23, 2019
1 parent 87265c0 commit cb33eb8
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func (c *Canal) runSyncBinlog() error {

savePos := false
force := false

// The name of the binlog file received in the fake rotate event.
// It must be preserved until the new position is saved.
fakeRotateLogName := ""

for {
ev, err := s.GetEvent(c.ctx)
if err != nil {
Expand All @@ -51,11 +56,21 @@ func (c *Canal) runSyncBinlog() error {

// Update the delay between the Canal and the Master before the handler hooks are called
c.updateReplicationDelay(ev)
// if log pos equal zero ,it is a fake rotate event,ignore it.
// see https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899

// If log pos equals zero then the received event is a fake rotate event and
// contains only a name of the next binlog file
// See https://github.com/mysql/mysql-server/blob/8e797a5d6eb3a87f16498edcb7261a75897babae/sql/rpl_binlog_sender.h#L235
// and https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899
if ev.Header.LogPos == 0 {
switch e := ev.Event.(type) {
case *replication.RotateEvent:
fakeRotateLogName = string(e.NextLogName)
log.Infof("received fake rotate event, next log name is %s", e.NextLogName)
}

continue
}

savePos = false
force = false
pos := c.master.Position()
Expand All @@ -64,6 +79,11 @@ func (c *Canal) runSyncBinlog() error {
// next binlog pos
pos.Pos = ev.Header.LogPos

// new file name received in the fake rotate event
if fakeRotateLogName != "" {
pos.Name = fakeRotateLogName
}

// We only save position with RotateEvent and XIDEvent.
// For RowsEvent, we can't save the position until meeting XIDEvent
// which tells the whole transaction is over.
Expand Down Expand Up @@ -154,6 +174,8 @@ func (c *Canal) runSyncBinlog() error {
if savePos {
c.master.Update(pos)
c.master.UpdateTimestamp(ev.Header.Timestamp)
fakeRotateLogName = ""

if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force); err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit cb33eb8

Please sign in to comment.