diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index 26adb83f145..72cd460d7da 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -76,6 +76,9 @@ type flavor interface { // stopIOThreadCommand returns the command to stop the replica's io thread only. stopIOThreadCommand() string + // startSQLThreadCommand returns the command to start the replica's sql thread only. + startSQLThreadCommand() string + // sendBinlogDumpCommand sends the packet required to start // dumping binlogs from the specified location. sendBinlogDumpCommand(c *Conn, serverID uint32, startPos Position) error @@ -224,6 +227,11 @@ func (c *Conn) StopIOThreadCommand() string { return c.flavor.stopIOThreadCommand() } +// StartSQLThreadCommand returns the command to start the replica's SQL thread. +func (c *Conn) StartSQLThreadCommand() string { + return c.flavor.startSQLThreadCommand() +} + // SendBinlogDumpCommand sends the flavor-specific version of // the COM_BINLOG_DUMP command to start dumping raw binlog // events over a server connection, starting at a given GTID. diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index fcae889c7f1..d5d7fd59c0e 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -78,6 +78,10 @@ func (flv *filePosFlavor) stopIOThreadCommand() string { return "unsupported" } +func (flv *filePosFlavor) startSQLThreadCommand() string { + return "unsupported" +} + // sendBinlogDumpCommand is part of the Flavor interface. func (flv *filePosFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, startPos Position) error { rpos, ok := startPos.GTIDSet.(filePosGTID) diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index cf53e6e6521..abca919f6d8 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -77,6 +77,10 @@ func (mariadbFlavor) stopIOThreadCommand() string { return "STOP SLAVE IO_THREAD" } +func (mariadbFlavor) startSQLThreadCommand() string { + return "START SLAVE SQL_THREAD" +} + // sendBinlogDumpCommand is part of the Flavor interface. func (mariadbFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, startPos Position) error { // Tell the server that we understand GTIDs by setting diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index 0e23f889cc8..fcc25b14380 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -80,6 +80,10 @@ func (mysqlFlavor) stopIOThreadCommand() string { return "STOP SLAVE IO_THREAD" } +func (mysqlFlavor) startSQLThreadCommand() string { + return "START SLAVE SQL_THREAD" +} + // sendBinlogDumpCommand is part of the Flavor interface. func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, startPos Position) error { gtidSet, ok := startPos.GTIDSet.(Mysql56GTIDSet) diff --git a/go/mysql/flavor_mysqlgr.go b/go/mysql/flavor_mysqlgr.go index 196c632fb7c..4e8ed7c7dd3 100644 --- a/go/mysql/flavor_mysqlgr.go +++ b/go/mysql/flavor_mysqlgr.go @@ -73,6 +73,11 @@ func (mysqlGRFlavor) stopIOThreadCommand() string { return "" } +// startSQLThreadCommand is disabled in mysqlGRFlavor +func (mysqlGRFlavor) startSQLThreadCommand() string { + return "" +} + // resetReplicationCommands is disabled in mysqlGRFlavor func (mysqlGRFlavor) resetReplicationCommands(c *Conn) []string { return []string{} diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 2dbebccfc5d..d14104a1ad0 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -26,6 +26,7 @@ import ( "os/exec" "os/signal" "path" + "regexp" "strconv" "sync" "syscall" @@ -72,6 +73,10 @@ type LocalProcessCluster struct { VtgateGrpcPort int VtctldHTTPPort int + // major version numbers + VtTabletMajorVersion int + VtctlMajorVersion int + // standalone executable VtctlclientProcess VtctlClientProcess VtctlProcess VtctlProcess @@ -212,6 +217,7 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) { log.Error(err) return } + cluster.VtctlProcess.LogDir = cluster.TmpDirectory } cluster.VtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(), @@ -627,10 +633,42 @@ func NewCluster(cell string, hostname string) *LocalProcessCluster { _ = os.Setenv("VTDATAROOT", cluster.CurrentVTDATAROOT) log.Infof("Created cluster on %s. ReusingVTDATAROOT=%v", cluster.CurrentVTDATAROOT, cluster.ReusingVTDATAROOT) + err := cluster.populateVersionInfo() + if err != nil { + log.Errorf("Error populating version information - %v", err) + } + rand.Seed(time.Now().UTC().UnixNano()) return cluster } +// populateVersionInfo is used to populate the version information for the binaries used to setup the cluster. +func (cluster *LocalProcessCluster) populateVersionInfo() error { + var err error + cluster.VtTabletMajorVersion, err = getMajorVersion("vttablet") + if err != nil { + return err + } + cluster.VtctlMajorVersion, err = getMajorVersion("vtctl") + return err +} + +func getMajorVersion(binaryName string) (int, error) { + version, err := exec.Command(binaryName, "--version").Output() + if err != nil { + return 0, err + } + versionRegex := regexp.MustCompile(`Version: ([0-9]+)\.([0-9]+)\.([0-9]+)`) + v := versionRegex.FindStringSubmatch(string(version)) + if len(v) != 4 { + return 0, fmt.Errorf("could not parse server version from: %s", version) + } + if err != nil { + return 0, fmt.Errorf("could not parse server version from: %s", version) + } + return strconv.Atoi(v[1]) +} + // RestartVtgate starts vtgate with updated configs func (cluster *LocalProcessCluster) RestartVtgate() (err error) { err = cluster.VtgateProcess.TearDown() diff --git a/go/test/endtoend/cluster/vtctl_process.go b/go/test/endtoend/cluster/vtctl_process.go index b27cdea9911..c47219eb98e 100644 --- a/go/test/endtoend/cluster/vtctl_process.go +++ b/go/test/endtoend/cluster/vtctl_process.go @@ -29,6 +29,7 @@ import ( type VtctlProcess struct { Name string Binary string + LogDir string TopoImplementation string TopoGlobalAddress string TopoGlobalRoot string @@ -68,6 +69,7 @@ func (vtctl *VtctlProcess) CreateKeyspace(keyspace string) (err error) { // ExecuteCommandWithOutput executes any vtctlclient command and returns output func (vtctl *VtctlProcess) ExecuteCommandWithOutput(args ...string) (result string, err error) { args = append([]string{ + "-log_dir", vtctl.LogDir, "-enable_queries", "-topo_implementation", vtctl.TopoImplementation, "-topo_global_server_address", vtctl.TopoGlobalAddress, diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index ee68bbb167f..393da0f6e70 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -218,9 +218,10 @@ func TestPullFromRdonly(t *testing.T) { require.NoError(t, err) } -// TestTwoReplicasNoReplicationStatus checks that ERS is able to fix -// two replicas which do not have any replication status -func TestTwoReplicasNoReplicationStatus(t *testing.T) { +// TestNoReplicationStatusAndReplicationStopped checks that ERS is able to fix +// replicas which do not have any replication status and also succeeds if the replication +// is stopped on the primary elect. +func TestNoReplicationStatusAndReplicationStopped(t *testing.T) { defer cluster.PanicHandler(t) clusterInstance := utils.SetupReparentCluster(t) defer utils.TeardownCluster(clusterInstance) @@ -229,11 +230,22 @@ func TestTwoReplicasNoReplicationStatus(t *testing.T) { err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE; RESET SLAVE ALL`) require.NoError(t, err) - err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[2].Alias, `STOP SLAVE; RESET SLAVE ALL`) + err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[2].Alias, `STOP SLAVE;`) require.NoError(t, err) - + err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[3].Alias, `STOP SLAVE SQL_THREAD;`) + require.NoError(t, err) + // Run an additional command in the current primary which will only be acked by tablets[3] and be in its relay log. + insertedVal := utils.ConfirmReplication(t, tablets[0], nil) + // Failover to tablets[3] out, err := utils.Ers(clusterInstance, tablets[3], "60s", "30s") require.NoError(t, err, out) + // Verify that the tablet has the inserted value + err = utils.CheckInsertedValues(context.Background(), t, tablets[3], insertedVal) + require.NoError(t, err) + // Confirm that replication is setup correctly from tablets[3] to tablets[0] + utils.ConfirmReplication(t, tablets[3], tablets[:1]) + // Confirm that tablets[2] which had replication stopped initially still has its replication stopped + utils.CheckReplicationStatus(context.Background(), t, tablets[2], false, false) } // TestERSForInitialization tests whether calling ERS in the beginning sets up the cluster properly or not diff --git a/go/test/endtoend/reparent/plannedreparent/reparent_test.go b/go/test/endtoend/reparent/plannedreparent/reparent_test.go index c6800d66507..fcbd9bc523b 100644 --- a/go/test/endtoend/reparent/plannedreparent/reparent_test.go +++ b/go/test/endtoend/reparent/plannedreparent/reparent_test.go @@ -287,6 +287,11 @@ func TestReparentWithDownReplica(t *testing.T) { _, err = utils.Prs(t, clusterInstance, tablets[1]) require.NoError(t, err) + // We have to StartReplication on tablets[2] since the MySQL instance is restarted and does not have replication running + // We earlier used to rely on replicationManager to fix this but we have disabled it in our testing environment for latest versions of vttablet and vtctl. + err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", tablets[2].Alias) + require.NoError(t, err) + // wait until it gets the data err = utils.CheckInsertedValues(ctx, t, tablets[2], insertVal) require.NoError(t, err) diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 326e17e7cc6..dd9202c980e 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -128,6 +128,15 @@ func setupCluster(ctx context.Context, t *testing.T, shardName string, cells []s // DemotePrimary rpc is stalled! "-queryserver_enable_online_ddl=false", } + if clusterInstance.VtTabletMajorVersion >= 13 && clusterInstance.VtctlMajorVersion >= 13 { + // disabling active reparents on the tablet since we don't want the replication manager + // to fix replication if it is stopped. Some tests deliberately do that. Also, we don't want + // the replication manager to silently fix the replication in case ERS or PRS mess up. All the + // tests in this test suite should work irrespective of this flag. Each run of ERS, PRS should be + // setting up the replication correctly. + // However, due to the bugs in old vitess components we can only do this for version >= 13. + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "-disable_active_reparents") + } // Initialize Cluster err = clusterInstance.SetupCluster(keyspace, []cluster.Shard{*shard}) @@ -607,3 +616,19 @@ func SetReplicationSourceFailed(tablet *cluster.Vttablet, prsOut string) bool { } return strings.Contains(prsOut, fmt.Sprintf("tablet %s failed to SetMaster", tablet.Alias)) } + +// CheckReplicationStatus checks that the replication for sql and io threads is setup as expected +func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, sqlThreadRunning bool, ioThreadRunning bool) { + res := RunSQL(ctx, t, "show slave status;", tablet) + if ioThreadRunning { + require.Equal(t, "Yes", res.Rows[0][10].ToString()) + } else { + require.Equal(t, "No", res.Rows[0][10].ToString()) + } + + if sqlThreadRunning { + require.Equal(t, "Yes", res.Rows[0][11].ToString()) + } else { + require.Equal(t, "No", res.Rows[0][11].ToString()) + } +} diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 61f24fdfc5b..844700909de 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -244,6 +244,14 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio return nil } + // Start the SQL Thread before waiting for position to be reached, since the replicas + // can only make forward progress if the SQL thread is started and we have already verified + // that the replica is not already as advanced as we want it to be + err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()}) + if err != nil { + return err + } + // Find the query to run, run it. query, err = conn.WaitUntilPositionCommand(ctx, targetPos) if err != nil {