diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 3a4bfd1fff4..0ad83989004 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -352,6 +352,15 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil { return err } + case binlogdatapb.VEventType_OTHER: + // Just update the position. + posReached, err := vp.updatePos(event.Timestamp) + if err != nil { + return err + } + if posReached { + return io.EOF + } case binlogdatapb.VEventType_DDL: if vp.vr.dbClient.InTransaction { return fmt.Errorf("unexpected state: DDL encountered in the middle of a transaction: %v", event.Ddl) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go index 10f66487e05..8b46dae3671 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go @@ -864,6 +864,8 @@ func TestPlayerDDL(t *testing.T) { expectDBClientQueries(t, []string{ "alter table t1 add column val1 varchar(128)", "/update _vt.vreplication set pos=", + // The apply of the DDL on target generates an "other" event. + "/update _vt.vreplication set pos=", }) execStatements(t, []string{"alter table t1 add column val2 varchar(128)"}) expectDBClientQueries(t, []string{ @@ -884,6 +886,8 @@ func TestPlayerDDL(t *testing.T) { expectDBClientQueries(t, []string{ "alter table t1 add column val1 varchar(128)", "/update _vt.vreplication set pos=", + // The apply of the DDL on target generates an "other" event. + "/update _vt.vreplication set pos=", }) execStatements(t, []string{"alter table t1 add column val2 varchar(128)"}) expectDBClientQueries(t, []string{ @@ -1304,6 +1308,9 @@ func TestPlayerBatching(t *testing.T) { "/update _vt.vreplication set pos=", "alter table t1 drop column val2", "/update _vt.vreplication set pos=", + // The apply of the DDLs on target generates two "other" event. + "/update _vt.vreplication set pos=", + "/update _vt.vreplication set pos=", }) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 791ee38e757..633d0133c2e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -145,8 +145,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD: // We never have to send GTID, BEGIN or FIELD events on their own. bufferedEvents = append(bufferedEvents, vevent) - case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_HEARTBEAT: - // COMMIT, DDL and HEARTBEAT must be immediately sent. + case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, binlogdatapb.VEventType_HEARTBEAT: + // COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent. bufferedEvents = append(bufferedEvents, vevent) vevents := bufferedEvents bufferedEvents = nil @@ -310,13 +310,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e Ddl: q.SQL, }) } else { - vevents = append(vevents, - &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_BEGIN, - }, - &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_COMMIT, - }) + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_OTHER, + }) } // Proactively reload schema. // If the DDL adds a column, comparing with an older snapshot of the @@ -324,6 +320,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e vs.se.Reload(vs.ctx) case sqlparser.StmtOther: // These are DBA statements like REPAIR that can be ignored. + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_OTHER, + }) default: return nil, fmt.Errorf("unexpected statement type %s in row-based replication: %q", cat, q.SQL) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index efe84b69573..eb4621840f2 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -114,19 +114,29 @@ func TestStatements(t *testing.T) { }, { // repair, optimize and analyze show up in binlog stream, but ignored by vitess. input: "repair table stream2", + output: [][]string{{ + `gtid`, + `type:OTHER `, + }}, }, { input: "optimize table stream2", + output: [][]string{{ + `gtid`, + `type:OTHER `, + }}, }, { input: "analyze table stream2", + output: [][]string{{ + `gtid`, + `type:OTHER `, + }}, }, { - // select, set, show, analyze and describe don't get logged. + // select, set, show and describe don't get logged. input: "select * from stream1", }, { input: "set @val=1", }, { input: "show tables", - }, { - input: "analyze table stream1", }, { input: "describe stream1", }} @@ -435,9 +445,8 @@ func TestUnsentDDL(t *testing.T) { }, // An unsent DDL is sent as an empty transaction. output: [][]string{{ - `gtid|begin`, - `gtid|begin`, - `commit`, + `gtid`, + `type:OTHER `, }}, }}