Skip to content
8 changes: 8 additions & 0 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type flavor interface {
// stopIOThreadCommand returns the command to stop the replica's io thread only.
stopIOThreadCommand() string

// startSQLThreadCommand returns the command to start the replica's sql thread only.
startSQLThreadCommand() string

// sendBinlogDumpCommand sends the packet required to start
// dumping binlogs from the specified location.
sendBinlogDumpCommand(c *Conn, serverID uint32, startPos Position) error
Expand Down Expand Up @@ -224,6 +227,11 @@ func (c *Conn) StopIOThreadCommand() string {
return c.flavor.stopIOThreadCommand()
}

// StartSQLThreadCommand returns the command to start the replica's SQL thread.
func (c *Conn) StartSQLThreadCommand() string {
return c.flavor.startSQLThreadCommand()
}

// SendBinlogDumpCommand sends the flavor-specific version of
// the COM_BINLOG_DUMP command to start dumping raw binlog
// events over a server connection, starting at a given GTID.
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (flv *filePosFlavor) stopIOThreadCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) startSQLThreadCommand() string {
return "unsupported"
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (flv *filePosFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, startPos Position) error {
rpos, ok := startPos.GTIDSet.(filePosGTID)
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (mariadbFlavor) stopIOThreadCommand() string {
return "STOP SLAVE IO_THREAD"
}

func (mariadbFlavor) startSQLThreadCommand() string {
return "START SLAVE SQL_THREAD"
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (mariadbFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, startPos Position) error {
// Tell the server that we understand GTIDs by setting
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (mysqlFlavor) stopIOThreadCommand() string {
return "STOP SLAVE IO_THREAD"
}

func (mysqlFlavor) startSQLThreadCommand() string {
return "START SLAVE SQL_THREAD"
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, startPos Position) error {
gtidSet, ok := startPos.GTIDSet.(Mysql56GTIDSet)
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/flavor_mysqlgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (mysqlGRFlavor) stopIOThreadCommand() string {
return ""
}

// startSQLThreadCommand is disabled in mysqlGRFlavor
func (mysqlGRFlavor) startSQLThreadCommand() string {
return ""
}

// resetReplicationCommands is disabled in mysqlGRFlavor
func (mysqlGRFlavor) resetReplicationCommands(c *Conn) []string {
return []string{}
Expand Down
38 changes: 38 additions & 0 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os/exec"
"os/signal"
"path"
"regexp"
"strconv"
"sync"
"syscall"
Expand Down Expand Up @@ -72,6 +73,10 @@ type LocalProcessCluster struct {
VtgateGrpcPort int
VtctldHTTPPort int

// major version numbers
VtTabletMajorVersion int
VtctlMajorVersion int

// standalone executable
VtctlclientProcess VtctlClientProcess
VtctlProcess VtctlProcess
Expand Down Expand Up @@ -212,6 +217,7 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) {
log.Error(err)
return
}
cluster.VtctlProcess.LogDir = cluster.TmpDirectory
}

cluster.VtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(),
Expand Down Expand Up @@ -627,10 +633,42 @@ func NewCluster(cell string, hostname string) *LocalProcessCluster {
_ = os.Setenv("VTDATAROOT", cluster.CurrentVTDATAROOT)
log.Infof("Created cluster on %s. ReusingVTDATAROOT=%v", cluster.CurrentVTDATAROOT, cluster.ReusingVTDATAROOT)

err := cluster.populateVersionInfo()
if err != nil {
log.Errorf("Error populating version information - %v", err)
}

rand.Seed(time.Now().UTC().UnixNano())
return cluster
}

// populateVersionInfo is used to populate the version information for the binaries used to setup the cluster.
func (cluster *LocalProcessCluster) populateVersionInfo() error {
var err error
cluster.VtTabletMajorVersion, err = getMajorVersion("vttablet")
if err != nil {
return err
}
cluster.VtctlMajorVersion, err = getMajorVersion("vtctl")
return err
}

func getMajorVersion(binaryName string) (int, error) {
version, err := exec.Command(binaryName, "--version").Output()
if err != nil {
return 0, err
}
versionRegex := regexp.MustCompile(`Version: ([0-9]+)\.([0-9]+)\.([0-9]+)`)
v := versionRegex.FindStringSubmatch(string(version))
if len(v) != 4 {
return 0, fmt.Errorf("could not parse server version from: %s", version)
}
if err != nil {
return 0, fmt.Errorf("could not parse server version from: %s", version)
}
return strconv.Atoi(v[1])
}

// RestartVtgate starts vtgate with updated configs
func (cluster *LocalProcessCluster) RestartVtgate() (err error) {
err = cluster.VtgateProcess.TearDown()
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/cluster/vtctl_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
type VtctlProcess struct {
Name string
Binary string
LogDir string
TopoImplementation string
TopoGlobalAddress string
TopoGlobalRoot string
Expand Down Expand Up @@ -68,6 +69,7 @@ func (vtctl *VtctlProcess) CreateKeyspace(keyspace string) (err error) {
// ExecuteCommandWithOutput executes any vtctlclient command and returns output
func (vtctl *VtctlProcess) ExecuteCommandWithOutput(args ...string) (result string, err error) {
args = append([]string{
"-log_dir", vtctl.LogDir,
"-enable_queries",
"-topo_implementation", vtctl.TopoImplementation,
"-topo_global_server_address", vtctl.TopoGlobalAddress,
Expand Down
22 changes: 17 additions & 5 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,10 @@ func TestPullFromRdonly(t *testing.T) {
require.NoError(t, err)
}

// TestTwoReplicasNoReplicationStatus checks that ERS is able to fix
// two replicas which do not have any replication status
func TestTwoReplicasNoReplicationStatus(t *testing.T) {
// TestNoReplicationStatusAndReplicationStopped checks that ERS is able to fix
// replicas which do not have any replication status and also succeeds if the replication
// is stopped on the primary elect.
func TestNoReplicationStatusAndReplicationStopped(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
defer utils.TeardownCluster(clusterInstance)
Expand All @@ -229,11 +230,22 @@ func TestTwoReplicasNoReplicationStatus(t *testing.T) {

err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE; RESET SLAVE ALL`)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[2].Alias, `STOP SLAVE; RESET SLAVE ALL`)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[2].Alias, `STOP SLAVE;`)
require.NoError(t, err)

err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[3].Alias, `STOP SLAVE SQL_THREAD;`)
require.NoError(t, err)
// Run an additional command in the current primary which will only be acked by tablets[3] and be in its relay log.
insertedVal := utils.ConfirmReplication(t, tablets[0], nil)
// Failover to tablets[3]
out, err := utils.Ers(clusterInstance, tablets[3], "60s", "30s")
require.NoError(t, err, out)
// Verify that the tablet has the inserted value
err = utils.CheckInsertedValues(context.Background(), t, tablets[3], insertedVal)
require.NoError(t, err)
// Confirm that replication is setup correctly from tablets[3] to tablets[0]
utils.ConfirmReplication(t, tablets[3], tablets[:1])
// Confirm that tablets[2] which had replication stopped initially still has its replication stopped
utils.CheckReplicationStatus(context.Background(), t, tablets[2], false, false)
}

// TestERSForInitialization tests whether calling ERS in the beginning sets up the cluster properly or not
Expand Down
5 changes: 5 additions & 0 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ func TestReparentWithDownReplica(t *testing.T) {
_, err = utils.Prs(t, clusterInstance, tablets[1])
require.NoError(t, err)

// We have to StartReplication on tablets[2] since the MySQL instance is restarted and does not have replication running
// We earlier used to rely on replicationManager to fix this but we have disabled it in our testing environment for latest versions of vttablet and vtctl.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", tablets[2].Alias)
require.NoError(t, err)

// wait until it gets the data
err = utils.CheckInsertedValues(ctx, t, tablets[2], insertVal)
require.NoError(t, err)
Expand Down
25 changes: 25 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ func setupCluster(ctx context.Context, t *testing.T, shardName string, cells []s
// DemotePrimary rpc is stalled!
"-queryserver_enable_online_ddl=false",
}
if clusterInstance.VtTabletMajorVersion >= 13 && clusterInstance.VtctlMajorVersion >= 13 {
// disabling active reparents on the tablet since we don't want the replication manager
// to fix replication if it is stopped. Some tests deliberately do that. Also, we don't want
// the replication manager to silently fix the replication in case ERS or PRS mess up. All the
// tests in this test suite should work irrespective of this flag. Each run of ERS, PRS should be
// setting up the replication correctly.
// However, due to the bugs in old vitess components we can only do this for version >= 13.
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "-disable_active_reparents")
}

// Initialize Cluster
err = clusterInstance.SetupCluster(keyspace, []cluster.Shard{*shard})
Expand Down Expand Up @@ -607,3 +616,19 @@ func SetReplicationSourceFailed(tablet *cluster.Vttablet, prsOut string) bool {
}
return strings.Contains(prsOut, fmt.Sprintf("tablet %s failed to SetMaster", tablet.Alias))
}

// CheckReplicationStatus checks that the replication for sql and io threads is setup as expected
func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, sqlThreadRunning bool, ioThreadRunning bool) {
res := RunSQL(ctx, t, "show slave status;", tablet)
if ioThreadRunning {
require.Equal(t, "Yes", res.Rows[0][10].ToString())
} else {
require.Equal(t, "No", res.Rows[0][10].ToString())
}

if sqlThreadRunning {
require.Equal(t, "Yes", res.Rows[0][11].ToString())
} else {
require.Equal(t, "No", res.Rows[0][11].ToString())
}
}
8 changes: 8 additions & 0 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,14 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio
return nil
}

// Start the SQL Thread before waiting for position to be reached, since the replicas
// can only make forward progress if the SQL thread is started and we have already verified
// that the replica is not already as advanced as we want it to be
err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()})
if err != nil {
return err
}

// Find the query to run, run it.
query, err = conn.WaitUntilPositionCommand(ctx, targetPos)
if err != nil {
Expand Down