From e35272c08e4213db8b6a1d28f96ff2c7e0eeac77 Mon Sep 17 00:00:00 2001 From: Lulu Sheng <31415382+froot@users.noreply.github.com> Date: Thu, 9 May 2024 19:15:03 -0700 Subject: [PATCH] Fix bug in handling sub events of replication.TransactionPayloadEvent (#875) --- canal/sync.go | 189 ++++++++++++++++++++++++++------------------------ 1 file changed, 98 insertions(+), 91 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index 14892eae0..d6ce44c6b 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -38,9 +38,6 @@ func (c *Canal) runSyncBinlog() error { return err } - savePos := false - force := false - for { ev, err := s.GetEvent(c.ctx) if err != nil { @@ -69,110 +66,120 @@ func (c *Canal) runSyncBinlog() error { } } - savePos = false - force = false - pos := c.master.Position() + err = c.handleEvent(ev) + if err != nil { + return err + } + } +} - curPos := pos.Pos +func (c *Canal) handleEvent(ev *replication.BinlogEvent) error { + savePos := false + force := false + pos := c.master.Position() + var err error - // next binlog pos - pos.Pos = ev.Header.LogPos + curPos := pos.Pos - // We only save position with RotateEvent and XIDEvent. - // For RowsEvent, we can't save the position until meeting XIDEvent - // which tells the whole transaction is over. - // TODO: If we meet any DDL query, we must save too. - switch e := ev.Event.(type) { - case *replication.RotateEvent: - pos.Name = string(e.NextLogName) - pos.Pos = uint32(e.Position) - c.cfg.Logger.Infof("rotate binlog to %s", pos) - savePos = true - force = true - if err = c.eventHandler.OnRotate(ev.Header, e); err != nil { - return errors.Trace(err) - } - case *replication.RowsEvent: - // we only focus row based event - err = c.handleRowsEvent(ev) + // next binlog pos + pos.Pos = ev.Header.LogPos + + // We only save position with RotateEvent and XIDEvent. + // For RowsEvent, we can't save the position until meeting XIDEvent + // which tells the whole transaction is over. + // TODO: If we meet any DDL query, we must save too. + switch e := ev.Event.(type) { + case *replication.RotateEvent: + pos.Name = string(e.NextLogName) + pos.Pos = uint32(e.Position) + c.cfg.Logger.Infof("rotate binlog to %s", pos) + savePos = true + force = true + if err = c.eventHandler.OnRotate(ev.Header, e); err != nil { + return errors.Trace(err) + } + case *replication.RowsEvent: + // we only focus row based event + err = c.handleRowsEvent(ev) + if err != nil { + c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) + return errors.Trace(err) + } + return nil + case *replication.TransactionPayloadEvent: + // handle subevent row by row + ev := ev.Event.(*replication.TransactionPayloadEvent) + for _, subEvent := range ev.Events { + err = c.handleEvent(subEvent) if err != nil { - c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) + c.cfg.Logger.Errorf("handle transaction payload subevent at (%s, %d) error %v", pos.Name, curPos, err) return errors.Trace(err) } - continue - case *replication.TransactionPayloadEvent: - // handle subevent row by row - ev := ev.Event.(*replication.TransactionPayloadEvent) - for _, subEvent := range ev.Events { - err = c.handleRowsEvent(subEvent) - if err != nil { - c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err) + } + return nil + case *replication.XIDEvent: + savePos = true + // try to save the position later + if err := c.eventHandler.OnXID(ev.Header, pos); err != nil { + return errors.Trace(err) + } + if e.GSet != nil { + c.master.UpdateGTIDSet(e.GSet) + } + case *replication.MariadbGTIDEvent: + if err := c.eventHandler.OnGTID(ev.Header, e); err != nil { + return errors.Trace(err) + } + case *replication.GTIDEvent: + if err := c.eventHandler.OnGTID(ev.Header, e); err != nil { + return errors.Trace(err) + } + case *replication.RowsQueryEvent: + if err := c.eventHandler.OnRowsQueryEvent(e); err != nil { + return errors.Trace(err) + } + case *replication.QueryEvent: + stmts, _, err := c.parser.Parse(string(e.Query), "", "") + if err != nil { + c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err) + return nil + } + for _, stmt := range stmts { + nodes := parseStmt(stmt) + for _, node := range nodes { + if node.db == "" { + node.db = string(e.Schema) + } + if err = c.updateTable(ev.Header, node.db, node.table); err != nil { return errors.Trace(err) } } - continue - case *replication.XIDEvent: - savePos = true - // try to save the position later - if err := c.eventHandler.OnXID(ev.Header, pos); err != nil { - return errors.Trace(err) - } - if e.GSet != nil { - c.master.UpdateGTIDSet(e.GSet) - } - case *replication.MariadbGTIDEvent: - if err := c.eventHandler.OnGTID(ev.Header, e); err != nil { - return errors.Trace(err) - } - case *replication.GTIDEvent: - if err := c.eventHandler.OnGTID(ev.Header, e); err != nil { - return errors.Trace(err) - } - case *replication.RowsQueryEvent: - if err := c.eventHandler.OnRowsQueryEvent(e); err != nil { - return errors.Trace(err) - } - case *replication.QueryEvent: - stmts, _, err := c.parser.Parse(string(e.Query), "", "") - if err != nil { - c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err) - continue - } - for _, stmt := range stmts { - nodes := parseStmt(stmt) - for _, node := range nodes { - if node.db == "" { - node.db = string(e.Schema) - } - if err = c.updateTable(ev.Header, node.db, node.table); err != nil { - return errors.Trace(err) - } - } - if len(nodes) > 0 { - savePos = true - force = true - // Now we only handle Table Changed DDL, maybe we will support more later. - if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil { - return errors.Trace(err) - } + if len(nodes) > 0 { + savePos = true + force = true + // Now we only handle Table Changed DDL, maybe we will support more later. + if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil { + return errors.Trace(err) } } - if savePos && e.GSet != nil { - c.master.UpdateGTIDSet(e.GSet) - } - default: - continue } + if savePos && e.GSet != nil { + c.master.UpdateGTIDSet(e.GSet) + } + default: + return nil + } - if savePos { - c.master.Update(pos) - c.master.UpdateTimestamp(ev.Header.Timestamp) + if savePos { + c.master.Update(pos) + c.master.UpdateTimestamp(ev.Header.Timestamp) - if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil { - return errors.Trace(err) - } + if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil { + return errors.Trace(err) } } + + return nil } type node struct {