Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,24 @@ type flavor interface {
// restartReplicationCommands returns the commands to stop, reset and start the replication.
restartReplicationCommands() []string

// startReplicationUntilAfter will restart replication, but only allow it
// startReplicationUntilAfter will start replication, but only allow it
// to run until `pos` is reached. After reaching pos, replication will be stopped again
startReplicationUntilAfter(pos Position) string

// startSQLThreadUntilAfter will start replication's sql thread(s), but only allow it
// to run until `pos` is reached. After reaching pos, it will be stopped again
startSQLThreadUntilAfter(pos Position) string

// stopReplicationCommand returns the command to stop the replication.
stopReplicationCommand() string

// stopIOThreadCommand returns the command to stop the replica's io thread only.
// stopIOThreadCommand returns the command to stop the replica's IO thread only.
stopIOThreadCommand() string

// startSQLThreadCommand returns the command to start the replica's sql thread only.
// stopSQLThreadCommand returns the command to stop the replica's SQL thread(s) only.
stopSQLThreadCommand() string

// startSQLThreadCommand returns the command to start the replica's SQL thread only.
startSQLThreadCommand() string

// sendBinlogDumpCommand sends the packet required to start
Expand Down Expand Up @@ -202,21 +209,28 @@ func (c *Conn) PrimaryFilePosition() (Position, error) {
}, nil
}

// StartReplicationCommand returns the command to start the replication.
// StartReplicationCommand returns the command to start replication.
func (c *Conn) StartReplicationCommand() string {
return c.flavor.startReplicationCommand()
}

// RestartReplicationCommands returns the commands to stop, reset and start the replication.
// RestartReplicationCommands returns the commands to stop, reset and start replication.
func (c *Conn) RestartReplicationCommands() []string {
return c.flavor.restartReplicationCommands()
}

// StartReplicationUntilAfterCommand returns the command to start the replication.
// StartReplicationUntilAfterCommand returns the command to start replication.
func (c *Conn) StartReplicationUntilAfterCommand(pos Position) string {
return c.flavor.startReplicationUntilAfter(pos)
}

// StartSQLThreadUntilAfterCommand returns the command to start the replica's SQL
// thread(s) and have it run until it has reached the given position, at which point
// it will stop.
func (c *Conn) StartSQLThreadUntilAfterCommand(pos Position) string {
return c.flavor.startSQLThreadUntilAfter(pos)
}

// StopReplicationCommand returns the command to stop the replication.
func (c *Conn) StopReplicationCommand() string {
return c.flavor.stopReplicationCommand()
Expand All @@ -227,6 +241,11 @@ func (c *Conn) StopIOThreadCommand() string {
return c.flavor.stopIOThreadCommand()
}

// StopSQLThreadCommand returns the command to stop the replica's SQL thread(s).
func (c *Conn) StopSQLThreadCommand() string {
return c.flavor.stopSQLThreadCommand()
}

// StartSQLThreadCommand returns the command to start the replica's SQL thread.
func (c *Conn) StartSQLThreadCommand() string {
return c.flavor.startSQLThreadCommand()
Expand Down
8 changes: 8 additions & 0 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (flv *filePosFlavor) stopIOThreadCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) stopSQLThreadCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) startSQLThreadCommand() string {
return "unsupported"
}
Expand Down Expand Up @@ -265,6 +269,10 @@ func (*filePosFlavor) startReplicationUntilAfter(pos Position) string {
return "unsupported"
}

func (*filePosFlavor) startSQLThreadUntilAfter(pos Position) string {
return "unsupported"
}

// enableBinlogPlaybackCommand is part of the Flavor interface.
func (*filePosFlavor) enableBinlogPlaybackCommand() string {
return ""
Expand Down
8 changes: 8 additions & 0 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (mariadbFlavor) startReplicationUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE UNTIL master_gtid_pos = \"%s\"", pos)
}

func (mariadbFlavor) startSQLThreadUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL master_gtid_pos = \"%s\"", pos)
}

func (mariadbFlavor) startReplicationCommand() string {
return "START SLAVE"
}
Expand All @@ -77,6 +81,10 @@ func (mariadbFlavor) stopIOThreadCommand() string {
return "STOP SLAVE IO_THREAD"
}

func (mariadbFlavor) stopSQLThreadCommand() string {
return "STOP SLAVE SQL_THREAD"
}

func (mariadbFlavor) startSQLThreadCommand() string {
return "START SLAVE SQL_THREAD"
}
Expand Down
8 changes: 8 additions & 0 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (mysqlFlavor) startReplicationUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos)
}

func (mysqlFlavor) startSQLThreadUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos)
}

func (mysqlFlavor) stopReplicationCommand() string {
return "STOP SLAVE"
}
Expand All @@ -80,6 +84,10 @@ func (mysqlFlavor) stopIOThreadCommand() string {
return "STOP SLAVE IO_THREAD"
}

func (mysqlFlavor) stopSQLThreadCommand() string {
return "STOP SLAVE SQL_THREAD"
}

func (mysqlFlavor) startSQLThreadCommand() string {
return "START SLAVE SQL_THREAD"
}
Expand Down
10 changes: 10 additions & 0 deletions go/mysql/flavor_mysqlgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (mysqlGRFlavor) startReplicationUntilAfter(pos Position) string {
return ""
}

// startSQLThreadUntilAfter is disabled in mysqlGRFlavor
func (mysqlGRFlavor) startSQLThreadUntilAfter(pos Position) string {
return ""
}

// stopReplicationCommand returns the command to stop the replication.
// we return empty here since `STOP GROUP_REPLICATION` should be called by
// the external orchestrator
Expand All @@ -73,6 +78,11 @@ func (mysqlGRFlavor) stopIOThreadCommand() string {
return ""
}

// stopSQLThreadCommand is disabled in mysqlGRFlavor
func (mysqlGRFlavor) stopSQLThreadCommand() string {
return ""
}

// startSQLThreadCommand is disabled in mysqlGRFlavor
func (mysqlGRFlavor) startSQLThreadCommand() string {
return ""
Expand Down
53 changes: 48 additions & 5 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"time"

"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"context"
Expand Down Expand Up @@ -97,6 +98,19 @@ func (mysqld *Mysqld) StartReplicationUntilAfter(ctx context.Context, targetPos
return mysqld.executeSuperQueryListConn(ctx, conn, queries)
}

// StartSQLThreadUntilAfter starts replication's SQL thread(s) until replication has come to `targetPos`, then it stops it
func (mysqld *Mysqld) StartSQLThreadUntilAfter(ctx context.Context, targetPos mysql.Position) error {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return err
}
defer conn.Recycle()

queries := []string{conn.StartSQLThreadUntilAfterCommand(targetPos)}

return mysqld.executeSuperQueryListConn(ctx, conn, queries)
}

// StopReplication stops replication.
func (mysqld *Mysqld) StopReplication(hookExtraEnv map[string]string) error {
h := hook.NewSimpleHook("preflight_stop_slave")
Expand Down Expand Up @@ -125,6 +139,17 @@ func (mysqld *Mysqld) StopIOThread(ctx context.Context) error {
return mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StopIOThreadCommand()})
}

// StopSQLThread stops a replica's SQL thread(s) only.
func (mysqld *Mysqld) StopSQLThread(ctx context.Context) error {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return err
}
defer conn.Recycle()

return mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StopSQLThreadCommand()})
}

// RestartReplication stops, resets and starts replication.
func (mysqld *Mysqld) RestartReplication(hookExtraEnv map[string]string) error {
h := hook.NewSimpleHook("preflight_stop_slave")
Expand Down Expand Up @@ -244,14 +269,32 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio
return nil
}

// Start the SQL Thread before waiting for position to be reached, since the replicas
// can only make forward progress if the SQL thread is started and we have already verified
// that the replica is not already as advanced as we want it to be
err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()})
if err != nil {
replicationStatus, err := conn.ShowReplicationStatus()
if err != nil && !errors.Is(err, mysql.ErrNotReplica) {
return err
}

// If the SQL thread(s) is not already running -- e.g. in the case of EmergencyReparentShard where the
// instance is transitioning to PRIMARY (elect) and we can no longer talk to the old PRIMARY, we need
// it to catch up as much as possible by executing all of the locally queued binary log events (in
// the existing relay logs) before it starts serving traffic for the shard to minimize potential data
// loss -- then we try to start the SQL Thread(s) before waiting for the position to be reached at
// which point the SQL thread(s) will be stopped again, since the replicas can only make forward
// progress if the SQL thread is started and we have already verified that the replica is not already
// as advanced as we want it to be
if !replicationStatus.SQLThreadRunning {
// Let's ensure the replication state is put back to what it was when we started.
// Doing this in a deferred function ensures that we do so even if we timeout while waiting.
defer func() {
mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StopSQLThreadCommand()})
}()
if err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()}); err != nil {
return vterrors.Wrap(err,
fmt.Sprintf("the replication SQL thread(s) was stopped and we could not temporarily start it in order to wait for the target position of %v",
targetPos))
}
}

// Find the query to run, run it.
query, err = conn.WaitUntilPositionCommand(ctx, targetPos)
if err != nil {
Expand Down