diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index d9bc7f31a63..9e6c9562e92 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -70,6 +70,10 @@ type flavor interface { // to run until `pos` is reached. After reaching pos, replication will be stopped again startReplicationUntilAfter(pos Position) string + // startReplicationSQLUntilAfter will restart replication's SQL_thread, but only allow it + // to run until `pos` is reached. After reaching pos, it will be stopped again + startReplicationSQLUntilAfter(pos Position) string + // stopReplicationCommand returns the command to stop the replication. stopReplicationCommand() string @@ -251,6 +255,11 @@ func (c *Conn) StartReplicationUntilAfterCommand(pos Position) string { return c.flavor.startReplicationUntilAfter(pos) } +// StartReplicationSQLUntilAfterCommand returns the command to start the replication's SQL thread. +func (c *Conn) StartReplicationSQLUntilAfterCommand(pos Position) string { + return c.flavor.startReplicationSQLUntilAfter(pos) +} + // StopReplicationCommand returns the command to stop the replication. func (c *Conn) StopReplicationCommand() string { return c.flavor.stopReplicationCommand() diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index 6636061d6cc..a76646f7d80 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -269,6 +269,10 @@ func (*filePosFlavor) startReplicationUntilAfter(pos Position) string { return "unsupported" } +func (*filePosFlavor) startReplicationSQLUntilAfter(pos Position) string { + return "unsupported" +} + // enableBinlogPlaybackCommand is part of the Flavor interface. func (*filePosFlavor) enableBinlogPlaybackCommand() string { return "" diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index 2d6fc6cc996..2d46a1f7e43 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -57,6 +57,10 @@ func (mariadbFlavor) startReplicationUntilAfter(pos Position) string { return fmt.Sprintf("START SLAVE UNTIL master_gtid_pos = \"%s\"", pos) } +func (mariadbFlavor) startReplicationSQLUntilAfter(pos Position) string { + return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL master_gtid_pos = \"%s\"", pos) +} + func (mariadbFlavor) startReplicationCommand() string { return "START SLAVE" } diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index 4726e009e31..7cfe26475b9 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -72,6 +72,10 @@ func (mysqlFlavor) startReplicationUntilAfter(pos Position) string { return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos) } +func (mysqlFlavor) startReplicationSQLUntilAfter(pos Position) string { + return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos) +} + func (mysqlFlavor) stopReplicationCommand() string { return "STOP SLAVE" } diff --git a/go/mysql/flavor_mysqlgr.go b/go/mysql/flavor_mysqlgr.go index db65ff2f7b3..675d05f0dea 100644 --- a/go/mysql/flavor_mysqlgr.go +++ b/go/mysql/flavor_mysqlgr.go @@ -61,6 +61,11 @@ func (mysqlGRFlavor) startReplicationUntilAfter(pos Position) string { return "" } +// startReplicationSQLUntilAfter is disabled in mysqlGRFlavor +func (mysqlGRFlavor) startReplicationSQLUntilAfter(pos Position) string { + return "" +} + // stopReplicationCommand returns the command to stop the replication. // we return empty here since `STOP GROUP_REPLICATION` should be called by // the external orchestrator diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index d2d8976500b..71721c6f15d 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -27,6 +27,7 @@ import ( "strings" "time" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/evalengine" "context" @@ -97,6 +98,19 @@ func (mysqld *Mysqld) StartReplicationUntilAfter(ctx context.Context, targetPos return mysqld.executeSuperQueryListConn(ctx, conn, queries) } +// StartReplicationSQLUntilAfter starts replication until replication has come to `targetPos`, then it stops replication +func (mysqld *Mysqld) StartReplicationSQLUntilAfter(ctx context.Context, targetPos mysql.Position) error { + conn, err := getPoolReconnect(ctx, mysqld.dbaPool) + if err != nil { + return err + } + defer conn.Recycle() + + queries := []string{conn.StartReplicationSQLUntilAfterCommand(targetPos)} + + return mysqld.executeSuperQueryListConn(ctx, conn, queries) +} + // StopReplication stops replication. func (mysqld *Mysqld) StopReplication(hookExtraEnv map[string]string) error { h := hook.NewSimpleHook("preflight_stop_slave") @@ -210,6 +224,11 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio } defer conn.Recycle() + replicationStatus, err := conn.ShowReplicationStatus() + if err != nil && !errors.Is(err, mysql.ErrNotReplica) { + return err + } + // First check if filePos flavored Position was passed in. If so, we can't defer to the flavor in the connection, // unless that flavor is also filePos. waitCommandName := "WaitUntilPositionCommand" @@ -244,12 +263,16 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio return nil } - // Start the SQL Thread before waiting for position to be reached, since the replicas - // can only make forward progress if the SQL thread is started and we have already verified - // that the replica is not already as advanced as we want it to be - err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()}) - if err != nil { - return err + // If the SQL_thread is not already running -- e.g. in the case of EmergencyReparentShard where the + // instance is transitioning to PRIMARY but we need it to catch up by executing all of the queued + // binary log events before it starts serving traffic for the shard to minimize potential data + // loss -- then we try to start the SQL Thread before waiting for position to be reached, since the + // replicas can only make forward progress if the SQL thread is started and we have already + // verified that the replica is not already as advanced as we want it to be + if !replicationStatus.SQLHealthy() { + if err = mysqld.StartReplicationSQLUntilAfter(ctx, targetPos); err != nil { + return vterrors.Wrap(err, "the SQL_thread was stopped and we could not force start") + } } // Find the query to run, run it.