diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go index e965c7faf1b..84ab17d71ce 100644 --- a/go/mysql/binlog_event.go +++ b/go/mysql/binlog_event.go @@ -122,6 +122,9 @@ type BinlogEvent interface { // IsPseudo is for custom implementations of GTID. IsPseudo() bool + + // IsCompressed returns true if a compressed event is found (binlog_transaction_compression=ON) + IsCompressed() bool } // BinlogFormat contains relevant data from the FORMAT_DESCRIPTION_EVENT. diff --git a/go/mysql/binlog_event_common.go b/go/mysql/binlog_event_common.go index e9f022e0b28..8abfd9ac953 100644 --- a/go/mysql/binlog_event_common.go +++ b/go/mysql/binlog_event_common.go @@ -167,6 +167,11 @@ func (ev binlogEvent) IsPseudo() bool { return false } +// IsCompressed returns true if a compressed event is found (binlog_transaction_compression=ON) +func (ev binlogEvent) IsCompressed() bool { + return ev.Type() == eCompressedEvent +} + // Format implements BinlogEvent.Format(). // // Expected format (L = total length of event data): diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index dfec653081e..2f6bbb5bbfa 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -212,6 +212,10 @@ func (ev filePosFakeEvent) IsPseudo() bool { return false } +func (ev filePosFakeEvent) IsCompressed() bool { + return false +} + //---------------------------------------------------------------------------- // filePosGTIDEvent is a fake GTID event for filePos. diff --git a/go/mysql/replication_constants.go b/go/mysql/replication_constants.go index 53d51c944eb..096bd902a04 100644 --- a/go/mysql/replication_constants.go +++ b/go/mysql/replication_constants.go @@ -207,6 +207,9 @@ const ( //eViewChangeEvent = 37 //eXAPrepareLogEvent = 38 + // Transaction_payload_event when binlog compression is turned on + eCompressedEvent = 40 + // MariaDB specific values. They start at 160. //eMariaAnnotateRowsEvent = 160 // Unused diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 1a54c75fd3d..32e142828a6 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -567,6 +567,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e if err != nil { return nil, err } + case ev.IsCompressed(): + log.Errorf("VReplication does not handle binlog compression") + return nil, fmt.Errorf("VReplication does not handle binlog compression") } for _, vevent := range vevents { vevent.Timestamp = int64(ev.Timestamp())