From cb33eb8bb30f5728cdfe53b031e882e60fe85cec Mon Sep 17 00:00:00 2001 From: Bulat Aykaev Date: Mon, 23 Sep 2019 03:56:47 +0300 Subject: [PATCH] fix(canal): Preserve binlog filename received in the fake rotate event (#428) * fix(canal): Preserve binlog filename received in the fake rotate event #406 --- canal/sync.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index b0bb6dae6..89bc0159d 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -43,6 +43,11 @@ func (c *Canal) runSyncBinlog() error { savePos := false force := false + + // The name of the binlog file received in the fake rotate event. + // It must be preserved until the new position is saved. + fakeRotateLogName := "" + for { ev, err := s.GetEvent(c.ctx) if err != nil { @@ -51,11 +56,21 @@ func (c *Canal) runSyncBinlog() error { // Update the delay between the Canal and the Master before the handler hooks are called c.updateReplicationDelay(ev) - // if log pos equal zero ,it is a fake rotate event,ignore it. - // see https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899 + + // If log pos equals zero then the received event is a fake rotate event and + // contains only a name of the next binlog file + // See https://github.com/mysql/mysql-server/blob/8e797a5d6eb3a87f16498edcb7261a75897babae/sql/rpl_binlog_sender.h#L235 + // and https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899 if ev.Header.LogPos == 0 { + switch e := ev.Event.(type) { + case *replication.RotateEvent: + fakeRotateLogName = string(e.NextLogName) + log.Infof("received fake rotate event, next log name is %s", e.NextLogName) + } + continue } + savePos = false force = false pos := c.master.Position() @@ -64,6 +79,11 @@ func (c *Canal) runSyncBinlog() error { // next binlog pos pos.Pos = ev.Header.LogPos + // new file name received in the fake rotate event + if fakeRotateLogName != "" { + pos.Name = fakeRotateLogName + } + // We only save position with RotateEvent and XIDEvent. // For RowsEvent, we can't save the position until meeting XIDEvent // which tells the whole transaction is over. @@ -154,6 +174,8 @@ func (c *Canal) runSyncBinlog() error { if savePos { c.master.Update(pos) c.master.UpdateTimestamp(ev.Header.Timestamp) + fakeRotateLogName = "" + if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force); err != nil { return errors.Trace(err) }