Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,15 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil {
return err
}
case binlogdatapb.VEventType_OTHER:
// Just update the position.
posReached, err := vp.updatePos(event.Timestamp)
if err != nil {
return err
}
if posReached {
return io.EOF
}
case binlogdatapb.VEventType_DDL:
if vp.vr.dbClient.InTransaction {
return fmt.Errorf("unexpected state: DDL encountered in the middle of a transaction: %v", event.Ddl)
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,8 @@ func TestPlayerDDL(t *testing.T) {
expectDBClientQueries(t, []string{
"alter table t1 add column val1 varchar(128)",
"/update _vt.vreplication set pos=",
// The apply of the DDL on target generates an "other" event.
"/update _vt.vreplication set pos=",
})
execStatements(t, []string{"alter table t1 add column val2 varchar(128)"})
expectDBClientQueries(t, []string{
Expand All @@ -884,6 +886,8 @@ func TestPlayerDDL(t *testing.T) {
expectDBClientQueries(t, []string{
"alter table t1 add column val1 varchar(128)",
"/update _vt.vreplication set pos=",
// The apply of the DDL on target generates an "other" event.
"/update _vt.vreplication set pos=",
})
execStatements(t, []string{"alter table t1 add column val2 varchar(128)"})
expectDBClientQueries(t, []string{
Expand Down Expand Up @@ -1304,6 +1308,9 @@ func TestPlayerBatching(t *testing.T) {
"/update _vt.vreplication set pos=",
"alter table t1 drop column val2",
"/update _vt.vreplication set pos=",
// The apply of the DDLs on target generates two "other" event.
"/update _vt.vreplication set pos=",
"/update _vt.vreplication set pos=",
})
}

Expand Down
17 changes: 8 additions & 9 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD:
// We never have to send GTID, BEGIN or FIELD events on their own.
bufferedEvents = append(bufferedEvents, vevent)
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_HEARTBEAT:
// COMMIT, DDL and HEARTBEAT must be immediately sent.
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, binlogdatapb.VEventType_HEARTBEAT:
// COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent.
bufferedEvents = append(bufferedEvents, vevent)
vevents := bufferedEvents
bufferedEvents = nil
Expand Down Expand Up @@ -310,20 +310,19 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
Ddl: q.SQL,
})
} else {
vevents = append(vevents,
&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_BEGIN,
},
&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_COMMIT,
})
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_OTHER,
})
}
// Proactively reload schema.
// If the DDL adds a column, comparing with an older snapshot of the
// schema will make us think that a column was dropped and error out.
vs.se.Reload(vs.ctx)
case sqlparser.StmtOther:
// These are DBA statements like REPAIR that can be ignored.
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_OTHER,
})
default:
return nil, fmt.Errorf("unexpected statement type %s in row-based replication: %q", cat, q.SQL)
}
Expand Down
21 changes: 15 additions & 6 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,29 @@ func TestStatements(t *testing.T) {
}, {
// repair, optimize and analyze show up in binlog stream, but ignored by vitess.
input: "repair table stream2",
output: [][]string{{
`gtid`,
`type:OTHER `,
}},
}, {
input: "optimize table stream2",
output: [][]string{{
`gtid`,
`type:OTHER `,
}},
}, {
input: "analyze table stream2",
output: [][]string{{
`gtid`,
`type:OTHER `,
}},
}, {
// select, set, show, analyze and describe don't get logged.
// select, set, show and describe don't get logged.
input: "select * from stream1",
}, {
input: "set @val=1",
}, {
input: "show tables",
}, {
input: "analyze table stream1",
}, {
input: "describe stream1",
}}
Expand Down Expand Up @@ -435,9 +445,8 @@ func TestUnsentDDL(t *testing.T) {
},
// An unsent DDL is sent as an empty transaction.
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`commit`,
`gtid`,
`type:OTHER `,
}},
}}

Expand Down