diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index 72cd460d7da..3f36689f162 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -66,17 +66,24 @@ type flavor interface { // restartReplicationCommands returns the commands to stop, reset and start the replication. restartReplicationCommands() []string - // startReplicationUntilAfter will restart replication, but only allow it + // startReplicationUntilAfter will start replication, but only allow it // to run until `pos` is reached. After reaching pos, replication will be stopped again startReplicationUntilAfter(pos Position) string + // startSQLThreadUntilAfter will start replication's sql thread(s), but only allow it + // to run until `pos` is reached. After reaching pos, it will be stopped again + startSQLThreadUntilAfter(pos Position) string + // stopReplicationCommand returns the command to stop the replication. stopReplicationCommand() string - // stopIOThreadCommand returns the command to stop the replica's io thread only. + // stopIOThreadCommand returns the command to stop the replica's IO thread only. stopIOThreadCommand() string - // startSQLThreadCommand returns the command to start the replica's sql thread only. + // stopSQLThreadCommand returns the command to stop the replica's SQL thread(s) only. + stopSQLThreadCommand() string + + // startSQLThreadCommand returns the command to start the replica's SQL thread only. startSQLThreadCommand() string // sendBinlogDumpCommand sends the packet required to start @@ -202,21 +209,28 @@ func (c *Conn) PrimaryFilePosition() (Position, error) { }, nil } -// StartReplicationCommand returns the command to start the replication. +// StartReplicationCommand returns the command to start replication. func (c *Conn) StartReplicationCommand() string { return c.flavor.startReplicationCommand() } -// RestartReplicationCommands returns the commands to stop, reset and start the replication. +// RestartReplicationCommands returns the commands to stop, reset and start replication. func (c *Conn) RestartReplicationCommands() []string { return c.flavor.restartReplicationCommands() } -// StartReplicationUntilAfterCommand returns the command to start the replication. +// StartReplicationUntilAfterCommand returns the command to start replication. func (c *Conn) StartReplicationUntilAfterCommand(pos Position) string { return c.flavor.startReplicationUntilAfter(pos) } +// StartSQLThreadUntilAfterCommand returns the command to start the replica's SQL +// thread(s) and have it run until it has reached the given position, at which point +// it will stop. +func (c *Conn) StartSQLThreadUntilAfterCommand(pos Position) string { + return c.flavor.startSQLThreadUntilAfter(pos) +} + // StopReplicationCommand returns the command to stop the replication. func (c *Conn) StopReplicationCommand() string { return c.flavor.stopReplicationCommand() @@ -227,6 +241,11 @@ func (c *Conn) StopIOThreadCommand() string { return c.flavor.stopIOThreadCommand() } +// StopSQLThreadCommand returns the command to stop the replica's SQL thread(s). +func (c *Conn) StopSQLThreadCommand() string { + return c.flavor.stopSQLThreadCommand() +} + // StartSQLThreadCommand returns the command to start the replica's SQL thread. func (c *Conn) StartSQLThreadCommand() string { return c.flavor.startSQLThreadCommand() diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index d5d7fd59c0e..d1613ace1cf 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -78,6 +78,10 @@ func (flv *filePosFlavor) stopIOThreadCommand() string { return "unsupported" } +func (flv *filePosFlavor) stopSQLThreadCommand() string { + return "unsupported" +} + func (flv *filePosFlavor) startSQLThreadCommand() string { return "unsupported" } @@ -265,6 +269,10 @@ func (*filePosFlavor) startReplicationUntilAfter(pos Position) string { return "unsupported" } +func (*filePosFlavor) startSQLThreadUntilAfter(pos Position) string { + return "unsupported" +} + // enableBinlogPlaybackCommand is part of the Flavor interface. func (*filePosFlavor) enableBinlogPlaybackCommand() string { return "" diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index abca919f6d8..5c5453d66e6 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -57,6 +57,10 @@ func (mariadbFlavor) startReplicationUntilAfter(pos Position) string { return fmt.Sprintf("START SLAVE UNTIL master_gtid_pos = \"%s\"", pos) } +func (mariadbFlavor) startSQLThreadUntilAfter(pos Position) string { + return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL master_gtid_pos = \"%s\"", pos) +} + func (mariadbFlavor) startReplicationCommand() string { return "START SLAVE" } @@ -77,6 +81,10 @@ func (mariadbFlavor) stopIOThreadCommand() string { return "STOP SLAVE IO_THREAD" } +func (mariadbFlavor) stopSQLThreadCommand() string { + return "STOP SLAVE SQL_THREAD" +} + func (mariadbFlavor) startSQLThreadCommand() string { return "START SLAVE SQL_THREAD" } diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index fcc25b14380..4cd8e9540d0 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -72,6 +72,10 @@ func (mysqlFlavor) startReplicationUntilAfter(pos Position) string { return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos) } +func (mysqlFlavor) startSQLThreadUntilAfter(pos Position) string { + return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos) +} + func (mysqlFlavor) stopReplicationCommand() string { return "STOP SLAVE" } @@ -80,6 +84,10 @@ func (mysqlFlavor) stopIOThreadCommand() string { return "STOP SLAVE IO_THREAD" } +func (mysqlFlavor) stopSQLThreadCommand() string { + return "STOP SLAVE SQL_THREAD" +} + func (mysqlFlavor) startSQLThreadCommand() string { return "START SLAVE SQL_THREAD" } diff --git a/go/mysql/flavor_mysqlgr.go b/go/mysql/flavor_mysqlgr.go index 4e8ed7c7dd3..b479e9d1189 100644 --- a/go/mysql/flavor_mysqlgr.go +++ b/go/mysql/flavor_mysqlgr.go @@ -61,6 +61,11 @@ func (mysqlGRFlavor) startReplicationUntilAfter(pos Position) string { return "" } +// startSQLThreadUntilAfter is disabled in mysqlGRFlavor +func (mysqlGRFlavor) startSQLThreadUntilAfter(pos Position) string { + return "" +} + // stopReplicationCommand returns the command to stop the replication. // we return empty here since `STOP GROUP_REPLICATION` should be called by // the external orchestrator @@ -73,6 +78,11 @@ func (mysqlGRFlavor) stopIOThreadCommand() string { return "" } +// stopSQLThreadCommand is disabled in mysqlGRFlavor +func (mysqlGRFlavor) stopSQLThreadCommand() string { + return "" +} + // startSQLThreadCommand is disabled in mysqlGRFlavor func (mysqlGRFlavor) startSQLThreadCommand() string { return "" diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 844700909de..8a27e3db811 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -27,6 +27,7 @@ import ( "strings" "time" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/evalengine" "context" @@ -97,6 +98,19 @@ func (mysqld *Mysqld) StartReplicationUntilAfter(ctx context.Context, targetPos return mysqld.executeSuperQueryListConn(ctx, conn, queries) } +// StartSQLThreadUntilAfter starts replication's SQL thread(s) until replication has come to `targetPos`, then it stops it +func (mysqld *Mysqld) StartSQLThreadUntilAfter(ctx context.Context, targetPos mysql.Position) error { + conn, err := getPoolReconnect(ctx, mysqld.dbaPool) + if err != nil { + return err + } + defer conn.Recycle() + + queries := []string{conn.StartSQLThreadUntilAfterCommand(targetPos)} + + return mysqld.executeSuperQueryListConn(ctx, conn, queries) +} + // StopReplication stops replication. func (mysqld *Mysqld) StopReplication(hookExtraEnv map[string]string) error { h := hook.NewSimpleHook("preflight_stop_slave") @@ -125,6 +139,17 @@ func (mysqld *Mysqld) StopIOThread(ctx context.Context) error { return mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StopIOThreadCommand()}) } +// StopSQLThread stops a replica's SQL thread(s) only. +func (mysqld *Mysqld) StopSQLThread(ctx context.Context) error { + conn, err := getPoolReconnect(ctx, mysqld.dbaPool) + if err != nil { + return err + } + defer conn.Recycle() + + return mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StopSQLThreadCommand()}) +} + // RestartReplication stops, resets and starts replication. func (mysqld *Mysqld) RestartReplication(hookExtraEnv map[string]string) error { h := hook.NewSimpleHook("preflight_stop_slave") @@ -244,14 +269,32 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio return nil } - // Start the SQL Thread before waiting for position to be reached, since the replicas - // can only make forward progress if the SQL thread is started and we have already verified - // that the replica is not already as advanced as we want it to be - err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()}) - if err != nil { + replicationStatus, err := conn.ShowReplicationStatus() + if err != nil && !errors.Is(err, mysql.ErrNotReplica) { return err } + // If the SQL thread(s) is not already running -- e.g. in the case of EmergencyReparentShard where the + // instance is transitioning to PRIMARY (elect) and we can no longer talk to the old PRIMARY, we need + // it to catch up as much as possible by executing all of the locally queued binary log events (in + // the existing relay logs) before it starts serving traffic for the shard to minimize potential data + // loss -- then we try to start the SQL Thread(s) before waiting for the position to be reached at + // which point the SQL thread(s) will be stopped again, since the replicas can only make forward + // progress if the SQL thread is started and we have already verified that the replica is not already + // as advanced as we want it to be + if !replicationStatus.SQLThreadRunning { + // Let's ensure the replication state is put back to what it was when we started. + // Doing this in a deferred function ensures that we do so even if we timeout while waiting. + defer func() { + mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StopSQLThreadCommand()}) + }() + if err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()}); err != nil { + return vterrors.Wrap(err, + fmt.Sprintf("the replication SQL thread(s) was stopped and we could not temporarily start it in order to wait for the target position of %v", + targetPos)) + } + } + // Find the query to run, run it. query, err = conn.WaitUntilPositionCommand(ctx, targetPos) if err != nil {