From f25a3a6e238269a1d71563e5c3c364e5b0e0cca7 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Mon, 11 Nov 2019 17:57:24 -0800 Subject: [PATCH] vrepl: handle "other" events While working on filePos flavor, I encountered this issue where an "other" statement will cause the next GTID to not be immediately sent. This can cause delays if the target waits for that event. This is pretty rare for GTID mode. But will likely be more pronounced for filePos. So, I'm proactively making this fix to make sure we don't delay sending of GTIDs, even in the case of non-relevant events. Because of this change, this also means that we don't have to generate pseudo-gtids. Generating GTIDs outside of transactions, coupled with OTHER event will make the right thing happen. Signed-off-by: Sugu Sougoumarane --- .../tabletmanager/vreplication/vplayer.go | 9 ++++++++ .../vreplication/vplayer_test.go | 7 +++++++ .../tabletserver/vstreamer/vstreamer.go | 17 +++++++-------- .../tabletserver/vstreamer/vstreamer_test.go | 21 +++++++++++++------ 4 files changed, 39 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 3a4bfd1fff4..0ad83989004 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -352,6 +352,15 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil { return err } + case binlogdatapb.VEventType_OTHER: + // Just update the position. + posReached, err := vp.updatePos(event.Timestamp) + if err != nil { + return err + } + if posReached { + return io.EOF + } case binlogdatapb.VEventType_DDL: if vp.vr.dbClient.InTransaction { return fmt.Errorf("unexpected state: DDL encountered in the middle of a transaction: %v", event.Ddl) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go index 10f66487e05..8b46dae3671 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go @@ -864,6 +864,8 @@ func TestPlayerDDL(t *testing.T) { expectDBClientQueries(t, []string{ "alter table t1 add column val1 varchar(128)", "/update _vt.vreplication set pos=", + // The apply of the DDL on target generates an "other" event. + "/update _vt.vreplication set pos=", }) execStatements(t, []string{"alter table t1 add column val2 varchar(128)"}) expectDBClientQueries(t, []string{ @@ -884,6 +886,8 @@ func TestPlayerDDL(t *testing.T) { expectDBClientQueries(t, []string{ "alter table t1 add column val1 varchar(128)", "/update _vt.vreplication set pos=", + // The apply of the DDL on target generates an "other" event. + "/update _vt.vreplication set pos=", }) execStatements(t, []string{"alter table t1 add column val2 varchar(128)"}) expectDBClientQueries(t, []string{ @@ -1304,6 +1308,9 @@ func TestPlayerBatching(t *testing.T) { "/update _vt.vreplication set pos=", "alter table t1 drop column val2", "/update _vt.vreplication set pos=", + // The apply of the DDLs on target generates two "other" event. + "/update _vt.vreplication set pos=", + "/update _vt.vreplication set pos=", }) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 791ee38e757..633d0133c2e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -145,8 +145,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD: // We never have to send GTID, BEGIN or FIELD events on their own. bufferedEvents = append(bufferedEvents, vevent) - case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_HEARTBEAT: - // COMMIT, DDL and HEARTBEAT must be immediately sent. + case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, binlogdatapb.VEventType_HEARTBEAT: + // COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent. bufferedEvents = append(bufferedEvents, vevent) vevents := bufferedEvents bufferedEvents = nil @@ -310,13 +310,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e Ddl: q.SQL, }) } else { - vevents = append(vevents, - &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_BEGIN, - }, - &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_COMMIT, - }) + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_OTHER, + }) } // Proactively reload schema. // If the DDL adds a column, comparing with an older snapshot of the @@ -324,6 +320,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e vs.se.Reload(vs.ctx) case sqlparser.StmtOther: // These are DBA statements like REPAIR that can be ignored. + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_OTHER, + }) default: return nil, fmt.Errorf("unexpected statement type %s in row-based replication: %q", cat, q.SQL) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index efe84b69573..eb4621840f2 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -114,19 +114,29 @@ func TestStatements(t *testing.T) { }, { // repair, optimize and analyze show up in binlog stream, but ignored by vitess. input: "repair table stream2", + output: [][]string{{ + `gtid`, + `type:OTHER `, + }}, }, { input: "optimize table stream2", + output: [][]string{{ + `gtid`, + `type:OTHER `, + }}, }, { input: "analyze table stream2", + output: [][]string{{ + `gtid`, + `type:OTHER `, + }}, }, { - // select, set, show, analyze and describe don't get logged. + // select, set, show and describe don't get logged. input: "select * from stream1", }, { input: "set @val=1", }, { input: "show tables", - }, { - input: "analyze table stream1", }, { input: "describe stream1", }} @@ -435,9 +445,8 @@ func TestUnsentDDL(t *testing.T) { }, // An unsent DDL is sent as an empty transaction. output: [][]string{{ - `gtid|begin`, - `gtid|begin`, - `commit`, + `gtid`, + `type:OTHER `, }}, }}