Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle subevents in transaction payload event #827

Merged
merged 4 commits into from
Oct 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,17 @@ func (c *Canal) runSyncBinlog() error {
// we only focus row based event
err = c.handleRowsEvent(ev)
if err != nil {
e := errors.Cause(err)
// if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal
if e != ErrExcludedTable &&
e != schema.ErrTableNotExist &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the old code, the error may come from any return branch in handleRowsEvent, but in your change we only checked the last return. I made a quick check that the three errors of

		if e != ErrExcludedTable &&
			e != schema.ErrTableNotExist &&
			e != schema.ErrMissingTableMeta {

seems only come from c.GetTable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. In this case, I think the old code means to ignore three errors: ErrExcludedTable,schema.ErrTableNotExist, schema.ErrMissingTableMeta. So we can do this just after GetTable in handleRowsEvent

e != schema.ErrMissingTableMeta {
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
return errors.Trace(err)
}
continue
case *replication.TransactionPayloadEvent:
// handle subevent row by row
ev := ev.Event.(*replication.TransactionPayloadEvent)
for _, subEvent := range ev.Events {
err = c.handleRowsEvent(subEvent)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
c.cfg.Logger.Errorf("handle transaction payload rows event at (%s, %d) error %v", pos.Name, curPos, err)
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -232,11 +237,17 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
ev := e.Event.(*replication.RowsEvent)

// Caveat: table may be altered at runtime.
schema := string(ev.Table.Schema)
table := string(ev.Table.Table)
schemaName := string(ev.Table.Schema)
tableName := string(ev.Table.Table)

t, err := c.GetTable(schema, table)
t, err := c.GetTable(schemaName, tableName)
if err != nil {
e := errors.Cause(err)
// ignore errors below
if e == ErrExcludedTable || e == schema.ErrTableNotExist || e == schema.ErrMissingTableMeta {
err = nil
}

return err
}
var action string
Expand Down
Loading