diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index 3e3dd6e88f9..eaf0eaee270 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -119,7 +119,7 @@ func TestMysql56ParsePosition(t *testing.T) { set = set.AddGTID(Mysql56GTID{Server: sid, Sequence: 2}) want := Position{GTIDSet: set} - got, err := ParsePosition(mysql56FlavorID, input) + got, err := ParsePosition(Mysql56FlavorID, input) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/go/mysql/filepos_gtid.go b/go/mysql/filepos_gtid.go index 07606a4fc69..26d3ebdcfa1 100644 --- a/go/mysql/filepos_gtid.go +++ b/go/mysql/filepos_gtid.go @@ -22,7 +22,8 @@ import ( "strings" ) -const filePosFlavorID = "FilePos" +// FilePosFlavorID is the string identifier for the filePos flavor. +const FilePosFlavorID = "FilePos" // parsefilePosGTID is registered as a GTID parser. func parseFilePosGTID(s string) (GTID, error) { @@ -65,7 +66,7 @@ func (gtid filePosGTID) String() string { // Flavor implements GTID.Flavor(). func (gtid filePosGTID) Flavor() string { - return filePosFlavorID + return FilePosFlavorID } // SequenceDomain implements GTID.SequenceDomain(). @@ -147,7 +148,7 @@ func (gtid filePosGTID) Union(other GTIDSet) GTIDSet { } func init() { - gtidParsers[filePosFlavorID] = parseFilePosGTID - gtidSetParsers[filePosFlavorID] = parseFilePosGTIDSet - flavors[filePosFlavorID] = newFilePosFlavor + gtidParsers[FilePosFlavorID] = parseFilePosGTID + gtidSetParsers[FilePosFlavorID] = parseFilePosGTIDSet + flavors[FilePosFlavorID] = newFilePosFlavor } diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index c18ec95a97d..fd2479ff5f8 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -163,6 +163,18 @@ func (c *Conn) MasterPosition() (Position, error) { }, nil } +// MasterFilePosition returns the current master's file based replication position. +func (c *Conn) MasterFilePosition() (Position, error) { + filePosFlavor := filePosFlavor{} + gtidSet, err := filePosFlavor.masterGTIDSet(c) + if err != nil { + return Position{}, err + } + return Position{ + GTIDSet: gtidSet, + }, nil +} + // StartReplicationCommand returns the command to start the replication. func (c *Conn) StartReplicationCommand() string { return c.flavor.startReplicationCommand() @@ -317,6 +329,15 @@ func (c *Conn) WaitUntilPositionCommand(ctx context.Context, pos Position) (stri return c.flavor.waitUntilPositionCommand(ctx, pos) } +// WaitUntilFilePositionCommand returns the SQL command to issue +// to wait until the given position, until the context +// expires for the file position flavor. The command returns -1 if it times out. It +// returns NULL if GTIDs are not enabled. +func (c *Conn) WaitUntilFilePositionCommand(ctx context.Context, pos Position) (string, error) { + filePosFlavor := filePosFlavor{} + return filePosFlavor.waitUntilPositionCommand(ctx, pos) +} + // EnableBinlogPlaybackCommand returns a command to run to enable // binlog playback. func (c *Conn) EnableBinlogPlaybackCommand() string { diff --git a/go/mysql/mariadb_gtid.go b/go/mysql/mariadb_gtid.go index 57cca52bc78..cf3856a0cdb 100644 --- a/go/mysql/mariadb_gtid.go +++ b/go/mysql/mariadb_gtid.go @@ -26,7 +26,8 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) -const mariadbFlavorID = "MariaDB" +// MariadbFlavorID is the string identifier for the MariaDB flavor. +const MariadbFlavorID = "MariaDB" // parseMariadbGTID is registered as a GTID parser. func parseMariadbGTID(s string) (GTID, error) { @@ -93,7 +94,7 @@ func (gtid MariadbGTID) String() string { // Flavor implements GTID.Flavor(). func (gtid MariadbGTID) Flavor() string { - return mariadbFlavorID + return MariadbFlavorID } // SequenceDomain implements GTID.SequenceDomain(). @@ -140,7 +141,7 @@ func (gtidSet MariadbGTIDSet) String() string { // Flavor implements GTIDSet.Flavor() func (gtidSet MariadbGTIDSet) Flavor() string { - return mariadbFlavorID + return MariadbFlavorID } // ContainsGTID implements GTIDSet.ContainsGTID(). @@ -251,6 +252,6 @@ func (gtidSet MariadbGTIDSet) addGTID(otherGTID MariadbGTID) { } func init() { - gtidParsers[mariadbFlavorID] = parseMariadbGTID - gtidSetParsers[mariadbFlavorID] = parseMariadbGTIDSet + gtidParsers[MariadbFlavorID] = parseMariadbGTID + gtidSetParsers[MariadbFlavorID] = parseMariadbGTIDSet } diff --git a/go/mysql/mysql56_gtid.go b/go/mysql/mysql56_gtid.go index efe51b88545..ea8c03cd8e6 100644 --- a/go/mysql/mysql56_gtid.go +++ b/go/mysql/mysql56_gtid.go @@ -26,7 +26,8 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) -const mysql56FlavorID = "MySQL56" +// Mysql56FlavorID is the string identifier for the Mysql56 flavor. +const Mysql56FlavorID = "MySQL56" // parseMysql56GTID is registered as a GTID parser. func parseMysql56GTID(s string) (GTID, error) { @@ -101,7 +102,7 @@ func (gtid Mysql56GTID) String() string { // Flavor implements GTID.Flavor(). func (gtid Mysql56GTID) Flavor() string { - return mysql56FlavorID + return Mysql56FlavorID } // SequenceDomain implements GTID.SequenceDomain(). @@ -125,5 +126,5 @@ func (gtid Mysql56GTID) GTIDSet() GTIDSet { } func init() { - gtidParsers[mysql56FlavorID] = parseMysql56GTID + gtidParsers[Mysql56FlavorID] = parseMysql56GTID } diff --git a/go/mysql/mysql56_gtid_set.go b/go/mysql/mysql56_gtid_set.go index 422974fa9fa..a43a2959b30 100644 --- a/go/mysql/mysql56_gtid_set.go +++ b/go/mysql/mysql56_gtid_set.go @@ -172,7 +172,7 @@ func (set Mysql56GTIDSet) String() string { } // Flavor implements GTIDSet. -func (Mysql56GTIDSet) Flavor() string { return mysql56FlavorID } +func (Mysql56GTIDSet) Flavor() string { return Mysql56FlavorID } // ContainsGTID implements GTIDSet. func (set Mysql56GTIDSet) ContainsGTID(gtid GTID) bool { @@ -435,6 +435,8 @@ func (set Mysql56GTIDSet) SIDBlock() []byte { return buf.Bytes() } +// Difference will supply the difference between the receiver and supplied Mysql56GTIDSets, and supply the result +// as a Mysql56GTIDSet. func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet { if other == nil || set == nil { return set @@ -625,5 +627,5 @@ func popInterval(dst *interval, s1, s2 *[]interval) bool { } func init() { - gtidSetParsers[mysql56FlavorID] = parseMysql56GTIDSet + gtidSetParsers[Mysql56FlavorID] = parseMysql56GTIDSet } diff --git a/go/mysql/replication_position.go b/go/mysql/replication_position.go index cf98865a67c..3ac83e88807 100644 --- a/go/mysql/replication_position.go +++ b/go/mysql/replication_position.go @@ -170,6 +170,23 @@ func (rp *Position) UnmarshalJSON(buf []byte) error { return nil } +// MatchesFlavor will take a flavor string, and return whether the positions GTIDSet matches the supplied flavor. +// The caller should use the constants Mysql56FlavorID, MariadbFlavorID, or FilePosFlavorID when supplying the flavor string. +func (rp *Position) MatchesFlavor(flavor string) bool { + switch flavor { + case Mysql56FlavorID: + _, matches := rp.GTIDSet.(Mysql56GTIDSet) + return matches + case MariadbFlavorID: + _, matches := rp.GTIDSet.(MariadbGTIDSet) + return matches + case FilePosFlavorID: + _, matches := rp.GTIDSet.(filePosGTID) + return matches + } + return false +} + // Comparable returns whether the receiver is comparable to the supplied position, based on whether one // of the two positions contains the other. func (rp *Position) Comparable(other Position) bool { diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index fc3e7e682e7..6f3893596ac 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -204,32 +204,58 @@ func (mysqld *Mysqld) WaitMasterPos(ctx context.Context, targetPos mysql.Positio } defer conn.Recycle() - // If we are the master, WaitUntilPositionCommand will fail. - // But position is most likely reached. So, check the position - // first. - mpos, err := conn.MasterPosition() - if err != nil { - return fmt.Errorf("WaitMasterPos: MasterPosition failed: %v", err) - } - if mpos.AtLeast(targetPos) { - return nil - } + // First check if filePos flavored Position was passed in. If so, we can't defer to the flavor in the connection, + // unless that flavor is also filePos. + waitCommandName := "WaitUntilPositionCommand" + var query string + if targetPos.MatchesFlavor(mysql.FilePosFlavorID) { + // If we are the master, WaitUntilFilePositionCommand will fail. + // But position is most likely reached. So, check the position + // first. + mpos, err := conn.MasterFilePosition() + if err != nil { + return fmt.Errorf("WaitMasterPos: MasterFilePosition failed: %v", err) + } + if mpos.AtLeast(targetPos) { + return nil + } - // Find the query to run, run it. - query, err := conn.WaitUntilPositionCommand(ctx, targetPos) - if err != nil { - return err + // Find the query to run, run it. + query, err = conn.WaitUntilFilePositionCommand(ctx, targetPos) + if err != nil { + return err + } + waitCommandName = "WaitUntilFilePositionCommand" + } else { + // If we are the master, WaitUntilPositionCommand will fail. + // But position is most likely reached. So, check the position + // first. + mpos, err := conn.MasterPosition() + if err != nil { + return fmt.Errorf("WaitMasterPos: MasterPosition failed: %v", err) + } + if mpos.AtLeast(targetPos) { + return nil + } + + // Find the query to run, run it. + query, err = conn.WaitUntilPositionCommand(ctx, targetPos) + if err != nil { + return err + } } + qr, err := mysqld.FetchSuperQuery(ctx, query) if err != nil { - return fmt.Errorf("WaitUntilPositionCommand(%v) failed: %v", query, err) + return fmt.Errorf("%v(%v) failed: %v", waitCommandName, query, err) } + if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return fmt.Errorf("unexpected result format from WaitUntilPositionCommand(%v): %#v", query, qr) + return fmt.Errorf("unexpected result format from %v(%v): %#v", waitCommandName, query, qr) } result := qr.Rows[0][0] if result.IsNull() { - return fmt.Errorf("WaitUntilPositionCommand(%v) failed: replication is probably stopped", query) + return fmt.Errorf("%v(%v) failed: replication is probably stopped", waitCommandName, query) } if result.ToString() == "-1" { return fmt.Errorf("timed out waiting for position %v", targetPos)