Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestMysql56ParsePosition(t *testing.T) {
set = set.AddGTID(Mysql56GTID{Server: sid, Sequence: 2})
want := Position{GTIDSet: set}

got, err := ParsePosition(mysql56FlavorID, input)
got, err := ParsePosition(Mysql56FlavorID, input)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
11 changes: 6 additions & 5 deletions go/mysql/filepos_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"strings"
)

const filePosFlavorID = "FilePos"
// FilePosFlavorID is the string identifier for the filePos flavor.
const FilePosFlavorID = "FilePos"

// parsefilePosGTID is registered as a GTID parser.
func parseFilePosGTID(s string) (GTID, error) {
Expand Down Expand Up @@ -65,7 +66,7 @@ func (gtid filePosGTID) String() string {

// Flavor implements GTID.Flavor().
func (gtid filePosGTID) Flavor() string {
return filePosFlavorID
return FilePosFlavorID
}

// SequenceDomain implements GTID.SequenceDomain().
Expand Down Expand Up @@ -147,7 +148,7 @@ func (gtid filePosGTID) Union(other GTIDSet) GTIDSet {
}

func init() {
gtidParsers[filePosFlavorID] = parseFilePosGTID
gtidSetParsers[filePosFlavorID] = parseFilePosGTIDSet
flavors[filePosFlavorID] = newFilePosFlavor
gtidParsers[FilePosFlavorID] = parseFilePosGTID
gtidSetParsers[FilePosFlavorID] = parseFilePosGTIDSet
flavors[FilePosFlavorID] = newFilePosFlavor
}
21 changes: 21 additions & 0 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ func (c *Conn) MasterPosition() (Position, error) {
}, nil
}

// MasterFilePosition returns the current master's file based replication position.
func (c *Conn) MasterFilePosition() (Position, error) {
filePosFlavor := filePosFlavor{}
gtidSet, err := filePosFlavor.masterGTIDSet(c)
if err != nil {
return Position{}, err
}
return Position{
GTIDSet: gtidSet,
}, nil
Comment on lines +170 to +175
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merely a coding style comment, feel free to ignore. What I usually do is:

Suggested change
if err != nil {
return Position{}, err
}
return Position{
GTIDSet: gtidSet,
}, nil
return Position{
GTIDSet: gtidSet,
}, err

it's the caller's responsibility to check if err != nil and if so, to ignore the GTIDSet value.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the pattern established in the rest of the codebase but you bring up a good point. I'm somewhat split on this. I agree with you that it's the caller's responsibility to check if err != nil but I'm also used to writing in languages where if a result is errorful, it's not even possible to access the successful result.

In this case though if err is not nil, then gtidSet would be nil anyway, so I think your version would actually return the same result with less code - and I like that. I suppose it's a question of how others feel regarding established patterns that already exist in the Vitess codebase, and if we should follow those patterns or break them when it's sensible? @enisoc @deepthi @systay @sougou?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the less compact version because it is easier to read.

}

// StartReplicationCommand returns the command to start the replication.
func (c *Conn) StartReplicationCommand() string {
return c.flavor.startReplicationCommand()
Expand Down Expand Up @@ -317,6 +329,15 @@ func (c *Conn) WaitUntilPositionCommand(ctx context.Context, pos Position) (stri
return c.flavor.waitUntilPositionCommand(ctx, pos)
}

// WaitUntilFilePositionCommand returns the SQL command to issue
// to wait until the given position, until the context
// expires for the file position flavor. The command returns -1 if it times out. It
// returns NULL if GTIDs are not enabled.
func (c *Conn) WaitUntilFilePositionCommand(ctx context.Context, pos Position) (string, error) {
filePosFlavor := filePosFlavor{}
return filePosFlavor.waitUntilPositionCommand(ctx, pos)
}

// EnableBinlogPlaybackCommand returns a command to run to enable
// binlog playback.
func (c *Conn) EnableBinlogPlaybackCommand() string {
Expand Down
11 changes: 6 additions & 5 deletions go/mysql/mariadb_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)

const mariadbFlavorID = "MariaDB"
// MariadbFlavorID is the string identifier for the MariaDB flavor.
const MariadbFlavorID = "MariaDB"

// parseMariadbGTID is registered as a GTID parser.
func parseMariadbGTID(s string) (GTID, error) {
Expand Down Expand Up @@ -93,7 +94,7 @@ func (gtid MariadbGTID) String() string {

// Flavor implements GTID.Flavor().
func (gtid MariadbGTID) Flavor() string {
return mariadbFlavorID
return MariadbFlavorID
}

// SequenceDomain implements GTID.SequenceDomain().
Expand Down Expand Up @@ -140,7 +141,7 @@ func (gtidSet MariadbGTIDSet) String() string {

// Flavor implements GTIDSet.Flavor()
func (gtidSet MariadbGTIDSet) Flavor() string {
return mariadbFlavorID
return MariadbFlavorID
}

// ContainsGTID implements GTIDSet.ContainsGTID().
Expand Down Expand Up @@ -251,6 +252,6 @@ func (gtidSet MariadbGTIDSet) addGTID(otherGTID MariadbGTID) {
}

func init() {
gtidParsers[mariadbFlavorID] = parseMariadbGTID
gtidSetParsers[mariadbFlavorID] = parseMariadbGTIDSet
gtidParsers[MariadbFlavorID] = parseMariadbGTID
gtidSetParsers[MariadbFlavorID] = parseMariadbGTIDSet
}
7 changes: 4 additions & 3 deletions go/mysql/mysql56_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)

const mysql56FlavorID = "MySQL56"
// Mysql56FlavorID is the string identifier for the Mysql56 flavor.
const Mysql56FlavorID = "MySQL56"

// parseMysql56GTID is registered as a GTID parser.
func parseMysql56GTID(s string) (GTID, error) {
Expand Down Expand Up @@ -101,7 +102,7 @@ func (gtid Mysql56GTID) String() string {

// Flavor implements GTID.Flavor().
func (gtid Mysql56GTID) Flavor() string {
return mysql56FlavorID
return Mysql56FlavorID
}

// SequenceDomain implements GTID.SequenceDomain().
Expand All @@ -125,5 +126,5 @@ func (gtid Mysql56GTID) GTIDSet() GTIDSet {
}

func init() {
gtidParsers[mysql56FlavorID] = parseMysql56GTID
gtidParsers[Mysql56FlavorID] = parseMysql56GTID
}
6 changes: 4 additions & 2 deletions go/mysql/mysql56_gtid_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (set Mysql56GTIDSet) String() string {
}

// Flavor implements GTIDSet.
func (Mysql56GTIDSet) Flavor() string { return mysql56FlavorID }
func (Mysql56GTIDSet) Flavor() string { return Mysql56FlavorID }

// ContainsGTID implements GTIDSet.
func (set Mysql56GTIDSet) ContainsGTID(gtid GTID) bool {
Expand Down Expand Up @@ -435,6 +435,8 @@ func (set Mysql56GTIDSet) SIDBlock() []byte {
return buf.Bytes()
}

// Difference will supply the difference between the receiver and supplied Mysql56GTIDSets, and supply the result
// as a Mysql56GTIDSet.
func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet {
if other == nil || set == nil {
return set
Expand Down Expand Up @@ -625,5 +627,5 @@ func popInterval(dst *interval, s1, s2 *[]interval) bool {
}

func init() {
gtidSetParsers[mysql56FlavorID] = parseMysql56GTIDSet
gtidSetParsers[Mysql56FlavorID] = parseMysql56GTIDSet
}
17 changes: 17 additions & 0 deletions go/mysql/replication_position.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,23 @@ func (rp *Position) UnmarshalJSON(buf []byte) error {
return nil
}

// MatchesFlavor will take a flavor string, and return whether the positions GTIDSet matches the supplied flavor.
// The caller should use the constants Mysql56FlavorID, MariadbFlavorID, or FilePosFlavorID when supplying the flavor string.
func (rp *Position) MatchesFlavor(flavor string) bool {
switch flavor {
case Mysql56FlavorID:
_, matches := rp.GTIDSet.(Mysql56GTIDSet)
return matches
case MariadbFlavorID:
_, matches := rp.GTIDSet.(MariadbGTIDSet)
return matches
case FilePosFlavorID:
_, matches := rp.GTIDSet.(filePosGTID)
return matches
}
return false
}

// Comparable returns whether the receiver is comparable to the supplied position, based on whether one
// of the two positions contains the other.
func (rp *Position) Comparable(other Position) bool {
Expand Down
60 changes: 43 additions & 17 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,32 +204,58 @@ func (mysqld *Mysqld) WaitMasterPos(ctx context.Context, targetPos mysql.Positio
}
defer conn.Recycle()

// If we are the master, WaitUntilPositionCommand will fail.
// But position is most likely reached. So, check the position
// first.
mpos, err := conn.MasterPosition()
if err != nil {
return fmt.Errorf("WaitMasterPos: MasterPosition failed: %v", err)
}
if mpos.AtLeast(targetPos) {
return nil
}
// First check if filePos flavored Position was passed in. If so, we can't defer to the flavor in the connection,
// unless that flavor is also filePos.
waitCommandName := "WaitUntilPositionCommand"
var query string
if targetPos.MatchesFlavor(mysql.FilePosFlavorID) {
// If we are the master, WaitUntilFilePositionCommand will fail.
// But position is most likely reached. So, check the position
// first.
mpos, err := conn.MasterFilePosition()
if err != nil {
return fmt.Errorf("WaitMasterPos: MasterFilePosition failed: %v", err)
}
if mpos.AtLeast(targetPos) {
return nil
}

// Find the query to run, run it.
query, err := conn.WaitUntilPositionCommand(ctx, targetPos)
if err != nil {
return err
// Find the query to run, run it.
query, err = conn.WaitUntilFilePositionCommand(ctx, targetPos)
if err != nil {
return err
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need to conn.Recycle()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For mysql.Connect there is no Recycle(). Recycle is a method available to a connection coming from a connection pool. In this case, we are making a direct connection to override the flavor.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like this code is going away anyway, but for future reference, we would need to defer conn.Close() in this case.

waitCommandName = "WaitUntilFilePositionCommand"
} else {
// If we are the master, WaitUntilPositionCommand will fail.
// But position is most likely reached. So, check the position
// first.
mpos, err := conn.MasterPosition()
if err != nil {
return fmt.Errorf("WaitMasterPos: MasterPosition failed: %v", err)
}
if mpos.AtLeast(targetPos) {
return nil
}

// Find the query to run, run it.
query, err = conn.WaitUntilPositionCommand(ctx, targetPos)
if err != nil {
return err
}
}

qr, err := mysqld.FetchSuperQuery(ctx, query)
if err != nil {
return fmt.Errorf("WaitUntilPositionCommand(%v) failed: %v", query, err)
return fmt.Errorf("%v(%v) failed: %v", waitCommandName, query, err)
}

if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return fmt.Errorf("unexpected result format from WaitUntilPositionCommand(%v): %#v", query, qr)
return fmt.Errorf("unexpected result format from %v(%v): %#v", waitCommandName, query, qr)
}
result := qr.Rows[0][0]
if result.IsNull() {
return fmt.Errorf("WaitUntilPositionCommand(%v) failed: replication is probably stopped", query)
return fmt.Errorf("%v(%v) failed: replication is probably stopped", waitCommandName, query)
}
if result.ToString() == "-1" {
return fmt.Errorf("timed out waiting for position %v", targetPos)
Expand Down