Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type flavor interface {
// to run until `pos` is reached. After reaching pos, replication will be stopped again
startReplicationUntilAfter(pos Position) string

// startReplicationSQLUntilAfter will restart replication's SQL_thread, but only allow it
// to run until `pos` is reached. After reaching pos, it will be stopped again
startReplicationSQLUntilAfter(pos Position) string

// stopReplicationCommand returns the command to stop the replication.
stopReplicationCommand() string

Expand Down Expand Up @@ -251,6 +255,11 @@ func (c *Conn) StartReplicationUntilAfterCommand(pos Position) string {
return c.flavor.startReplicationUntilAfter(pos)
}

// StartReplicationSQLUntilAfterCommand returns the command to start the replication's SQL thread.
func (c *Conn) StartReplicationSQLUntilAfterCommand(pos Position) string {
return c.flavor.startReplicationSQLUntilAfter(pos)
}

// StopReplicationCommand returns the command to stop the replication.
func (c *Conn) StopReplicationCommand() string {
return c.flavor.stopReplicationCommand()
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ func (*filePosFlavor) startReplicationUntilAfter(pos Position) string {
return "unsupported"
}

func (*filePosFlavor) startReplicationSQLUntilAfter(pos Position) string {
return "unsupported"
}

// enableBinlogPlaybackCommand is part of the Flavor interface.
func (*filePosFlavor) enableBinlogPlaybackCommand() string {
return ""
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (mariadbFlavor) startReplicationUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE UNTIL master_gtid_pos = \"%s\"", pos)
}

func (mariadbFlavor) startReplicationSQLUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL master_gtid_pos = \"%s\"", pos)
}

func (mariadbFlavor) startReplicationCommand() string {
return "START SLAVE"
}
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (mysqlFlavor) startReplicationUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos)
}

func (mysqlFlavor) startReplicationSQLUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos)
}

func (mysqlFlavor) stopReplicationCommand() string {
return "STOP SLAVE"
}
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/flavor_mysqlgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (mysqlGRFlavor) startReplicationUntilAfter(pos Position) string {
return ""
}

// startReplicationSQLUntilAfter is disabled in mysqlGRFlavor
func (mysqlGRFlavor) startReplicationSQLUntilAfter(pos Position) string {
return ""
}

// stopReplicationCommand returns the command to stop the replication.
// we return empty here since `STOP GROUP_REPLICATION` should be called by
// the external orchestrator
Expand Down
35 changes: 29 additions & 6 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"time"

"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"context"
Expand Down Expand Up @@ -97,6 +98,19 @@ func (mysqld *Mysqld) StartReplicationUntilAfter(ctx context.Context, targetPos
return mysqld.executeSuperQueryListConn(ctx, conn, queries)
}

// StartReplicationSQLUntilAfter starts replication until replication has come to `targetPos`, then it stops replication
func (mysqld *Mysqld) StartReplicationSQLUntilAfter(ctx context.Context, targetPos mysql.Position) error {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return err
}
defer conn.Recycle()

queries := []string{conn.StartReplicationSQLUntilAfterCommand(targetPos)}

return mysqld.executeSuperQueryListConn(ctx, conn, queries)
}

// StopReplication stops replication.
func (mysqld *Mysqld) StopReplication(hookExtraEnv map[string]string) error {
h := hook.NewSimpleHook("preflight_stop_slave")
Expand Down Expand Up @@ -210,6 +224,11 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio
}
defer conn.Recycle()

replicationStatus, err := conn.ShowReplicationStatus()
if err != nil && !errors.Is(err, mysql.ErrNotReplica) {
return err
}

// First check if filePos flavored Position was passed in. If so, we can't defer to the flavor in the connection,
// unless that flavor is also filePos.
waitCommandName := "WaitUntilPositionCommand"
Expand Down Expand Up @@ -244,12 +263,16 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio
return nil
}

// Start the SQL Thread before waiting for position to be reached, since the replicas
// can only make forward progress if the SQL thread is started and we have already verified
// that the replica is not already as advanced as we want it to be
err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()})
if err != nil {
return err
// If the SQL_thread is not already running -- e.g. in the case of EmergencyReparentShard where the
// instance is transitioning to PRIMARY but we need it to catch up by executing all of the queued
// binary log events before it starts serving traffic for the shard to minimize potential data
// loss -- then we try to start the SQL Thread before waiting for position to be reached, since the
// replicas can only make forward progress if the SQL thread is started and we have already
// verified that the replica is not already as advanced as we want it to be
if !replicationStatus.SQLHealthy() {
if err = mysqld.StartReplicationSQLUntilAfter(ctx, targetPos); err != nil {
return vterrors.Wrap(err, "the SQL_thread was stopped and we could not force start")
}
}

// Find the query to run, run it.
Expand Down