From 8b403afb416f1a80b02e575d9c138bc5448df57f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 15 Apr 2022 18:21:45 -0400 Subject: [PATCH 1/2] Only start SQL thread to WaitForPosition if it's not already started It will then be stopped again after we've reached the desired position. This was the table repair will function normally and start replication again if it's a replica table and the IO and SQL threads are stopped. Signed-off-by: Matt Lord --- go/mysql/flavor.go | 9 +++++++++ go/mysql/flavor_filepos.go | 4 ++++ go/mysql/flavor_mariadb.go | 4 ++++ go/mysql/flavor_mysql.go | 4 ++++ go/mysql/flavor_mysqlgr.go | 5 +++++ go/vt/mysqlctl/replication.go | 35 +++++++++++++++++++++++++++++------ 6 files changed, 55 insertions(+), 6 deletions(-) diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index d9bc7f31a63..9e6c9562e92 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -70,6 +70,10 @@ type flavor interface { // to run until `pos` is reached. After reaching pos, replication will be stopped again startReplicationUntilAfter(pos Position) string + // startReplicationSQLUntilAfter will restart replication's SQL_thread, but only allow it + // to run until `pos` is reached. After reaching pos, it will be stopped again + startReplicationSQLUntilAfter(pos Position) string + // stopReplicationCommand returns the command to stop the replication. stopReplicationCommand() string @@ -251,6 +255,11 @@ func (c *Conn) StartReplicationUntilAfterCommand(pos Position) string { return c.flavor.startReplicationUntilAfter(pos) } +// StartReplicationSQLUntilAfterCommand returns the command to start the replication's SQL thread. +func (c *Conn) StartReplicationSQLUntilAfterCommand(pos Position) string { + return c.flavor.startReplicationSQLUntilAfter(pos) +} + // StopReplicationCommand returns the command to stop the replication. func (c *Conn) StopReplicationCommand() string { return c.flavor.stopReplicationCommand() diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index 6636061d6cc..a76646f7d80 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -269,6 +269,10 @@ func (*filePosFlavor) startReplicationUntilAfter(pos Position) string { return "unsupported" } +func (*filePosFlavor) startReplicationSQLUntilAfter(pos Position) string { + return "unsupported" +} + // enableBinlogPlaybackCommand is part of the Flavor interface. func (*filePosFlavor) enableBinlogPlaybackCommand() string { return "" diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index 2d6fc6cc996..2d46a1f7e43 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -57,6 +57,10 @@ func (mariadbFlavor) startReplicationUntilAfter(pos Position) string { return fmt.Sprintf("START SLAVE UNTIL master_gtid_pos = \"%s\"", pos) } +func (mariadbFlavor) startReplicationSQLUntilAfter(pos Position) string { + return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL master_gtid_pos = \"%s\"", pos) +} + func (mariadbFlavor) startReplicationCommand() string { return "START SLAVE" } diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index 4726e009e31..7cfe26475b9 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -72,6 +72,10 @@ func (mysqlFlavor) startReplicationUntilAfter(pos Position) string { return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos) } +func (mysqlFlavor) startReplicationSQLUntilAfter(pos Position) string { + return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos) +} + func (mysqlFlavor) stopReplicationCommand() string { return "STOP SLAVE" } diff --git a/go/mysql/flavor_mysqlgr.go b/go/mysql/flavor_mysqlgr.go index db65ff2f7b3..675d05f0dea 100644 --- a/go/mysql/flavor_mysqlgr.go +++ b/go/mysql/flavor_mysqlgr.go @@ -61,6 +61,11 @@ func (mysqlGRFlavor) startReplicationUntilAfter(pos Position) string { return "" } +// startReplicationSQLUntilAfter is disabled in mysqlGRFlavor +func (mysqlGRFlavor) startReplicationSQLUntilAfter(pos Position) string { + return "" +} + // stopReplicationCommand returns the command to stop the replication. // we return empty here since `STOP GROUP_REPLICATION` should be called by // the external orchestrator diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index d2d8976500b..f5fc751ce63 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -27,6 +27,7 @@ import ( "strings" "time" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/evalengine" "context" @@ -97,6 +98,19 @@ func (mysqld *Mysqld) StartReplicationUntilAfter(ctx context.Context, targetPos return mysqld.executeSuperQueryListConn(ctx, conn, queries) } +// StartReplicationSQLUntilAfter starts replication until replication has come to `targetPos`, then it stops replication +func (mysqld *Mysqld) StartReplicationSQLUntilAfter(ctx context.Context, targetPos mysql.Position) error { + conn, err := getPoolReconnect(ctx, mysqld.dbaPool) + if err != nil { + return err + } + defer conn.Recycle() + + queries := []string{conn.StartReplicationSQLUntilAfterCommand(targetPos)} + + return mysqld.executeSuperQueryListConn(ctx, conn, queries) +} + // StopReplication stops replication. func (mysqld *Mysqld) StopReplication(hookExtraEnv map[string]string) error { h := hook.NewSimpleHook("preflight_stop_slave") @@ -210,6 +224,11 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio } defer conn.Recycle() + replicationStatus, err := conn.ShowReplicationStatus() + if err != nil { + return err + } + // First check if filePos flavored Position was passed in. If so, we can't defer to the flavor in the connection, // unless that flavor is also filePos. waitCommandName := "WaitUntilPositionCommand" @@ -244,12 +263,16 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio return nil } - // Start the SQL Thread before waiting for position to be reached, since the replicas - // can only make forward progress if the SQL thread is started and we have already verified - // that the replica is not already as advanced as we want it to be - err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()}) - if err != nil { - return err + // If the SQL_thread is not already running -- e.g. in the case of EmergencyReparentShard where the + // instance is transitioning to PRIMARY but we need it to catch up by executing all of the queued + // binary log events before it starts serving traffic for the shard to minimize potential data + // loss -- then we try to start the SQL Thread before waiting for position to be reached, since the + // replicas can only make forward progress if the SQL thread is started and we have already + // verified that the replica is not already as advanced as we want it to be + if !replicationStatus.SQLHealthy() { + if err = mysqld.StartReplicationSQLUntilAfter(ctx, targetPos); err != nil { + return vterrors.Wrap(err, "the SQL_thread was stopped and we could not force start") + } } // Find the query to run, run it. From 259c8d076abd6e69cc70c096469174ab00b9a469 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 15 Apr 2022 18:38:18 -0400 Subject: [PATCH 2/2] Check if the mysql istance is a replica Signed-off-by: Matt Lord --- go/vt/mysqlctl/replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index f5fc751ce63..71721c6f15d 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -225,7 +225,7 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio defer conn.Recycle() replicationStatus, err := conn.ShowReplicationStatus() - if err != nil { + if err != nil && !errors.Is(err, mysql.ErrNotReplica) { return err }