diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 391e98f2714..f6cec182f70 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -89,7 +89,7 @@ type LocalProcessCluster struct { VtgateProcess VtgateProcess VtworkerProcess VtworkerProcess VtbackupProcess VtbackupProcess - VtorcProcess *VtorcProcess + VtorcProcesses []*VtorcProcess nextPortForProcess int @@ -726,8 +726,8 @@ func (cluster *LocalProcessCluster) Teardown() { log.Errorf("Error in vtgate teardown: %v", err) } - if cluster.VtorcProcess != nil { - if err := cluster.VtorcProcess.TearDown(); err != nil { + for _, vtorcProcess := range cluster.VtorcProcesses { + if err := vtorcProcess.TearDown(); err != nil { log.Errorf("Error in vtorc teardown: %v", err) } } @@ -952,13 +952,14 @@ func (cluster *LocalProcessCluster) NewVttabletInstance(tabletType string, UID i } // NewOrcProcess creates a new VtorcProcess object -func (cluster *LocalProcessCluster) NewOrcProcess(configFile string) *VtorcProcess { +func (cluster *LocalProcessCluster) NewOrcProcess(config VtorcConfiguration) *VtorcProcess { base := VtctlProcessInstance(cluster.TopoProcess.Port, cluster.Hostname) base.Binary = "vtorc" return &VtorcProcess{ VtctlProcess: *base, LogDir: cluster.TmpDirectory, - Config: configFile, + Config: config, + WebPort: cluster.GetAndReservePort(), } } diff --git a/go/test/endtoend/cluster/vtorc_process.go b/go/test/endtoend/cluster/vtorc_process.go index fc5e6264f83..21f743f9756 100644 --- a/go/test/endtoend/cluster/vtorc_process.go +++ b/go/test/endtoend/cluster/vtorc_process.go @@ -18,6 +18,7 @@ limitations under the License. package cluster import ( + "encoding/json" "fmt" "os" "os/exec" @@ -33,16 +34,68 @@ import ( // vtorc as a separate process for testing type VtorcProcess struct { VtctlProcess - LogDir string - ExtraArgs []string - Config string - proc *exec.Cmd - exit chan error + LogDir string + ExtraArgs []string + ConfigPath string + Config VtorcConfiguration + WebPort int + proc *exec.Cmd + exit chan error +} + +type VtorcConfiguration struct { + Debug bool + ListenAddress string + MySQLTopologyUser string + MySQLTopologyPassword string + MySQLReplicaUser string + MySQLReplicaPassword string + RecoveryPeriodBlockSeconds int + InstancePollSeconds int + PreventCrossDataCenterPrimaryFailover bool `json:",omitempty"` + LockShardTimeoutSeconds int `json:",omitempty"` + Durability string `json:",omitempty"` + ReplicationLagQuery string `json:",omitempty"` + FailPrimaryPromotionOnLagMinutes int `json:",omitempty"` +} + +// ToJSONString will marshal this configuration as JSON +func (config *VtorcConfiguration) ToJSONString() string { + b, _ := json.MarshalIndent(config, "", "\t") + return string(b) +} + +func (config *VtorcConfiguration) AddDefaults(webPort int) { + config.Debug = true + config.MySQLTopologyUser = "orc_client_user" + config.MySQLTopologyPassword = "orc_client_user_password" + config.MySQLReplicaUser = "vt_repl" + config.MySQLReplicaPassword = "" + config.RecoveryPeriodBlockSeconds = 1 + config.InstancePollSeconds = 1 + config.ListenAddress = fmt.Sprintf(":%d", webPort) } // Setup starts orc process with required arguements func (orc *VtorcProcess) Setup() (err error) { + // create the configuration file + timeNow := time.Now().UnixNano() + configFile, _ := os.Create(path.Join(orc.LogDir, fmt.Sprintf("orc-config-%d.json", timeNow))) + orc.ConfigPath = configFile.Name() + + // Add the default configurations and print them out + orc.Config.AddDefaults(orc.WebPort) + log.Errorf("configuration - %v", orc.Config.ToJSONString()) + _, err = configFile.WriteString(orc.Config.ToJSONString()) + if err != nil { + return err + } + err = configFile.Close() + if err != nil { + return err + } + /* minimal command line arguments: $ vtorc -topo_implementation etcd2 -topo_global_server_address localhost:2379 -topo_global_root /vitess/global -config config/orchestrator/default.json -alsologtostderr http @@ -52,7 +105,7 @@ func (orc *VtorcProcess) Setup() (err error) { "--topo_implementation", orc.TopoImplementation, "--topo_global_server_address", orc.TopoGlobalAddress, "--topo_global_root", orc.TopoGlobalRoot, - "--config", orc.Config, + "--config", orc.ConfigPath, "--orc_web_dir", path.Join(os.Getenv("VTROOT"), "web", "orchestrator"), ) if *isCoverage { @@ -62,7 +115,7 @@ func (orc *VtorcProcess) Setup() (err error) { orc.proc.Args = append(orc.proc.Args, orc.ExtraArgs...) orc.proc.Args = append(orc.proc.Args, "--alsologtostderr", "http") - errFile, _ := os.Create(path.Join(orc.LogDir, fmt.Sprintf("orc-stderr-%d.txt", time.Now().UnixNano()))) + errFile, _ := os.Create(path.Join(orc.LogDir, fmt.Sprintf("orc-stderr-%d.txt", timeNow))) orc.proc.Stderr = errFile orc.proc.Env = append(orc.proc.Env, os.Environ()...) diff --git a/go/test/endtoend/vtorc/general/main_test.go b/go/test/endtoend/vtorc/general/main_test.go index 604d0e77ec7..018e6da21fa 100644 --- a/go/test/endtoend/vtorc/general/main_test.go +++ b/go/test/endtoend/vtorc/general/main_test.go @@ -52,8 +52,8 @@ func TestMain(m *testing.M) { if clusterInfo != nil { // stop vtorc first otherwise its logs get polluted // with instances being unreachable triggering unnecessary operations - if clusterInfo.ClusterInstance.VtorcProcess != nil { - _ = clusterInfo.ClusterInstance.VtorcProcess.TearDown() + for _, vtorcProcess := range clusterInfo.ClusterInstance.VtorcProcesses { + _ = vtorcProcess.TearDown() } for _, cellInfo := range clusterInfo.CellInfos { diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index 1d1f258bc7a..5a413aaa338 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -32,17 +32,31 @@ import ( ) // Cases to test: -// 1. create cluster with 1 replica and 1 rdonly, let orc choose primary +// 1. create cluster with 2 replicas and 1 rdonly, let orc choose primary // verify rdonly is not elected, only replica // verify replication is setup +// verify that with multiple vtorc instances, we still only have 1 PlannedReparentShard call func TestPrimaryElection(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 2) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] - utils.CheckPrimaryTablet(t, clusterInfo, shard0.Vttablets[0], true) - utils.CheckReplication(t, clusterInfo, shard0.Vttablets[0], shard0.Vttablets[1:], 10*time.Second) + primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) + assert.NotNil(t, primary, "should have elected a primary") + utils.CheckReplication(t, clusterInfo, primary, shard0.Vttablets, 10*time.Second) + + for _, vttablet := range shard0.Vttablets { + if vttablet.Type == "rdonly" && primary.Alias == vttablet.Alias { + t.Errorf("Rdonly tablet promoted as primary - %v", primary.Alias) + } + } + + res, err := utils.RunSQL(t, "select * from reparent_journal", primary, "_vt") + require.NoError(t, err) + require.Len(t, res.Rows, 1, "There should only be 1 primary tablet which was elected") } // Cases to test: @@ -51,7 +65,9 @@ func TestPrimaryElection(t *testing.T) { // verify replication is setup func TestSingleKeyspace(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, []string{"--clusters_to_watch", "ks"}, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, []string{"--clusters_to_watch", "ks"}, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -65,7 +81,9 @@ func TestSingleKeyspace(t *testing.T) { // verify replication is setup func TestKeyspaceShard(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, []string{"--clusters_to_watch", "ks/0"}, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, []string{"--clusters_to_watch", "ks/0"}, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -95,7 +113,9 @@ func waitForReadOnlyValue(t *testing.T, curPrimary *cluster.Vttablet, expectValu // 3. make primary readonly, let orc repair func TestPrimaryReadOnly(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -115,7 +135,9 @@ func TestPrimaryReadOnly(t *testing.T) { // 4. make replica ReadWrite, let orc repair func TestReplicaReadWrite(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -143,7 +165,9 @@ func TestReplicaReadWrite(t *testing.T) { // 5. stop replication, let orc repair func TestStopReplication(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -175,7 +199,9 @@ func TestStopReplication(t *testing.T) { // 6. setup replication from non-primary, let orc repair func TestReplicationFromOtherReplica(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 3, 0, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 3, 0, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -221,7 +247,9 @@ func TestRepairAfterTER(t *testing.T) { // test fails intermittently on CI, skip until it can be fixed. t.SkipNow() defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -252,7 +280,9 @@ func TestRepairAfterTER(t *testing.T) { // 7. make instance A replicates from B and B from A, wait for repair func TestCircularReplication(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -292,11 +322,14 @@ func TestCircularReplication(t *testing.T) { // TestSemiSync tests that semi-sync is setup correctly by vtorc if it is incorrectly set func TestSemiSync(t *testing.T) { // stop any vtorc instance running due to a previous test. - utils.StopVtorc(t, clusterInfo) + utils.StopVtorcs(t, clusterInfo) newCluster := utils.SetupNewClusterSemiSync(t) - utils.StartVtorc(t, newCluster, nil, "test_config_semi_sync.json") + utils.StartVtorcs(t, newCluster, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + Durability: "semi_sync", + }, 1) defer func() { - utils.StopVtorc(t, newCluster) + utils.StopVtorcs(t, newCluster) newCluster.ClusterInstance.Teardown() }() keyspace := &newCluster.ClusterInstance.Keyspaces[0] @@ -353,3 +386,43 @@ func TestSemiSync(t *testing.T) { } } } + +// TestVtorcWithPrs tests that VTOrc works fine even when PRS is called from vtctld +func TestVtorcWithPrs(t *testing.T) { + defer cluster.PanicHandler(t) + utils.SetupVttabletsAndVtorc(t, clusterInfo, 4, 0, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) + keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + + // find primary from topo + curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) + assert.NotNil(t, curPrimary, "should have elected a primary") + + // find any replica tablet other than the current primary + var replica *cluster.Vttablet + for _, tablet := range shard0.Vttablets { + if tablet.Alias != curPrimary.Alias { + replica = tablet + break + } + } + assert.NotNil(t, replica, "could not find any replica tablet") + + // check that the replication is setup correctly before we failover + utils.CheckReplication(t, clusterInfo, curPrimary, shard0.Vttablets, 10*time.Second) + + output, err := clusterInfo.ClusterInstance.VtctlclientProcess.ExecuteCommandWithOutput( + "PlannedReparentShard", + "-keyspace_shard", fmt.Sprintf("%s/%s", keyspace.Name, shard0.Name), + "-wait_replicas_timeout", "31s", + "-new_primary", replica.Alias) + require.NoError(t, err, "error in PlannedReparentShard output - %s", output) + + time.Sleep(40 * time.Second) + + // check that the replica gets promoted + utils.CheckPrimaryTablet(t, clusterInfo, replica, true) + utils.VerifyWritesSucceed(t, clusterInfo, replica, shard0.Vttablets, 10*time.Second) +} diff --git a/go/test/endtoend/vtorc/gracefultakeover/graceful_takeover_test.go b/go/test/endtoend/vtorc/gracefultakeover/graceful_takeover_test.go index 41b3a21bdf2..ddb6649d594 100644 --- a/go/test/endtoend/vtorc/gracefultakeover/graceful_takeover_test.go +++ b/go/test/endtoend/vtorc/gracefultakeover/graceful_takeover_test.go @@ -31,7 +31,9 @@ import ( // covers the test case graceful-master-takeover from orchestrator func TestGracefulPrimaryTakeover(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -52,7 +54,7 @@ func TestGracefulPrimaryTakeover(t *testing.T) { // check that the replication is setup correctly before we failover utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica}, 10*time.Second) - status, _ := utils.MakeAPICallUntilRegistered(t, fmt.Sprintf("http://localhost:3000/api/graceful-primary-takeover/localhost/%d/localhost/%d", curPrimary.MySQLPort, replica.MySQLPort)) + status, _ := utils.MakeAPICallUntilRegistered(t, fmt.Sprintf("http://localhost:%d/api/graceful-primary-takeover/localhost/%d/localhost/%d", clusterInfo.ClusterInstance.VtorcProcesses[0].WebPort, curPrimary.MySQLPort, replica.MySQLPort)) assert.Equal(t, 200, status) // check that the replica gets promoted @@ -65,7 +67,9 @@ func TestGracefulPrimaryTakeover(t *testing.T) { // orchestrator used to fail in this case, but for VtOrc, specifying no target makes it choose one on its own func TestGracefulPrimaryTakeoverNoTarget(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -86,7 +90,7 @@ func TestGracefulPrimaryTakeoverNoTarget(t *testing.T) { // check that the replication is setup correctly before we failover utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica}, 10*time.Second) - status, _ := utils.MakeAPICallUntilRegistered(t, fmt.Sprintf("http://localhost:3000/api/graceful-primary-takeover/localhost/%d/", curPrimary.MySQLPort)) + status, _ := utils.MakeAPICallUntilRegistered(t, fmt.Sprintf("http://localhost:%d/api/graceful-primary-takeover/localhost/%d/", clusterInfo.ClusterInstance.VtorcProcesses[0].WebPort, curPrimary.MySQLPort)) assert.Equal(t, 200, status) // check that the replica gets promoted @@ -98,7 +102,9 @@ func TestGracefulPrimaryTakeoverNoTarget(t *testing.T) { // covers the test case graceful-master-takeover-auto from orchestrator func TestGracefulPrimaryTakeoverAuto(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -123,14 +129,14 @@ func TestGracefulPrimaryTakeoverAuto(t *testing.T) { // check that the replication is setup correctly before we failover utils.CheckReplication(t, clusterInfo, primary, []*cluster.Vttablet{replica, rdonly}, 10*time.Second) - status, _ := utils.MakeAPICallUntilRegistered(t, fmt.Sprintf("http://localhost:3000/api/graceful-primary-takeover-auto/localhost/%d/localhost/%d", primary.MySQLPort, replica.MySQLPort)) + status, _ := utils.MakeAPICallUntilRegistered(t, fmt.Sprintf("http://localhost:%d/api/graceful-primary-takeover-auto/localhost/%d/localhost/%d", clusterInfo.ClusterInstance.VtorcProcesses[0].WebPort, primary.MySQLPort, replica.MySQLPort)) assert.Equal(t, 200, status) // check that the replica gets promoted utils.CheckPrimaryTablet(t, clusterInfo, replica, true) utils.VerifyWritesSucceed(t, clusterInfo, replica, []*cluster.Vttablet{primary, rdonly}, 10*time.Second) - status, _ = utils.MakeAPICallUntilRegistered(t, fmt.Sprintf("http://localhost:3000/api/graceful-primary-takeover-auto/localhost/%d/", replica.MySQLPort)) + status, _ = utils.MakeAPICallUntilRegistered(t, fmt.Sprintf("http://localhost:%d/api/graceful-primary-takeover-auto/localhost/%d/", clusterInfo.ClusterInstance.VtorcProcesses[0].WebPort, replica.MySQLPort)) assert.Equal(t, 200, status) // check that the primary gets promoted back @@ -142,7 +148,9 @@ func TestGracefulPrimaryTakeoverAuto(t *testing.T) { // covers the test case graceful-master-takeover-fail-cross-region from orchestrator func TestGracefulPrimaryTakeoverFailCrossCell(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] @@ -163,7 +171,7 @@ func TestGracefulPrimaryTakeoverFailCrossCell(t *testing.T) { // newly started tablet does not replicate from anyone yet, we will allow orchestrator to fix this too utils.CheckReplication(t, clusterInfo, primary, []*cluster.Vttablet{crossCellReplica1, rdonly}, 25*time.Second) - status, response := utils.MakeAPICallUntilRegistered(t, fmt.Sprintf("http://localhost:3000/api/graceful-primary-takeover/localhost/%d/localhost/%d", primary.MySQLPort, crossCellReplica1.MySQLPort)) + status, response := utils.MakeAPICallUntilRegistered(t, fmt.Sprintf("http://localhost:%d/api/graceful-primary-takeover/localhost/%d/localhost/%d", clusterInfo.ClusterInstance.VtorcProcesses[0].WebPort, primary.MySQLPort, crossCellReplica1.MySQLPort)) assert.Equal(t, 500, status) assert.Contains(t, response, "GracefulPrimaryTakeover: constraint failure") diff --git a/go/test/endtoend/vtorc/gracefultakeover/main_test.go b/go/test/endtoend/vtorc/gracefultakeover/main_test.go index 8af51b0fc31..d429e36a35b 100644 --- a/go/test/endtoend/vtorc/gracefultakeover/main_test.go +++ b/go/test/endtoend/vtorc/gracefultakeover/main_test.go @@ -58,8 +58,8 @@ func TestMain(m *testing.M) { if clusterInfo != nil { // stop vtorc first otherwise its logs get polluted // with instances being unreachable triggering unnecessary operations - if clusterInfo.ClusterInstance.VtorcProcess != nil { - _ = clusterInfo.ClusterInstance.VtorcProcess.TearDown() + for _, vtorcProcess := range clusterInfo.ClusterInstance.VtorcProcesses { + _ = vtorcProcess.TearDown() } for _, cellInfo := range clusterInfo.CellInfos { diff --git a/go/test/endtoend/vtorc/primaryfailure/main_test.go b/go/test/endtoend/vtorc/primaryfailure/main_test.go index 6761d4b188d..73a159ceda7 100644 --- a/go/test/endtoend/vtorc/primaryfailure/main_test.go +++ b/go/test/endtoend/vtorc/primaryfailure/main_test.go @@ -59,8 +59,8 @@ func TestMain(m *testing.M) { if clusterInfo != nil { // stop vtorc first otherwise its logs get polluted // with instances being unreachable triggering unnecessary operations - if clusterInfo.ClusterInstance.VtorcProcess != nil { - _ = clusterInfo.ClusterInstance.VtorcProcess.TearDown() + for _, vtorcProcess := range clusterInfo.ClusterInstance.VtorcProcesses { + _ = vtorcProcess.TearDown() } for _, cellInfo := range clusterInfo.CellInfos { diff --git a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go index 85ea91eed24..a56a27d69fe 100644 --- a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go +++ b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go @@ -32,7 +32,9 @@ import ( // covers the test case master-failover from orchestrator func TestDownPrimary(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] // find primary from topo @@ -74,7 +76,9 @@ func TestDownPrimary(t *testing.T) { // covers part of the test case master-failover-lost-replicas from orchestrator func TestCrossDataCenterFailure(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] // find primary from topo @@ -117,7 +121,9 @@ func TestCrossDataCenterFailure(t *testing.T) { // In case of no viable candidates, we should error out func TestCrossDataCenterFailureError(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] // find primary from topo @@ -161,7 +167,9 @@ func TestLostRdonlyOnPrimaryFailure(t *testing.T) { // were detected by vtorc and could be configured to have their sources detached t.Skip() defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 2, nil, "test_config.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 2, nil, cluster.VtorcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] // find primary from topo @@ -240,7 +248,10 @@ func TestLostRdonlyOnPrimaryFailure(t *testing.T) { // covers the test case master-failover-fail-promotion-lag-minutes-success from orchestrator func TestPromotionLagSuccess(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, "test_config_promotion_success.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{ + ReplicationLagQuery: "select 59", + FailPrimaryPromotionOnLagMinutes: 1, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] // find primary from topo @@ -286,7 +297,10 @@ func TestPromotionLagFailure(t *testing.T) { // was smaller than the configured value, otherwise it would fail the promotion t.Skip() defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 3, 1, nil, "test_config_promotion_failure.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 3, 1, nil, cluster.VtorcConfiguration{ + ReplicationLagQuery: "select 61", + FailPrimaryPromotionOnLagMinutes: 1, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] // find primary from topo @@ -335,7 +349,10 @@ func TestPromotionLagFailure(t *testing.T) { // That is the replica which should be promoted in case of primary failure func TestDownPrimaryPromotionRule(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, "test_config_crosscenter_prefer.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{ + LockShardTimeoutSeconds: 5, + Durability: "test", + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] // find primary from topo @@ -380,7 +397,10 @@ func TestDownPrimaryPromotionRule(t *testing.T) { // It should also be caught up when it is promoted func TestDownPrimaryPromotionRuleWithLag(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, "test_config_crosscenter_prefer.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{ + LockShardTimeoutSeconds: 5, + Durability: "test", + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] // find primary from topo @@ -457,7 +477,11 @@ func TestDownPrimaryPromotionRuleWithLag(t *testing.T) { // It should also be caught up when it is promoted func TestDownPrimaryPromotionRuleWithLagCrossCenter(t *testing.T) { defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, "test_config_crosscenter_prefer_prevent.json") + utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{ + LockShardTimeoutSeconds: 5, + Durability: "test", + PreventCrossDataCenterPrimaryFailover: true, + }, 1) keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] // find primary from topo diff --git a/go/test/endtoend/vtorc/utils/test_config.json b/go/test/endtoend/vtorc/utils/test_config.json deleted file mode 100644 index 8ba28850090..00000000000 --- a/go/test/endtoend/vtorc/utils/test_config.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "Debug": true, - "MySQLTopologyUser": "orc_client_user", - "MySQLTopologyPassword": "orc_client_user_password", - "MySQLReplicaUser": "vt_repl", - "MySQLReplicaPassword": "", - "RecoveryPeriodBlockSeconds": 1, - "InstancePollSeconds": 1, - "PreventCrossDataCenterPrimaryFailover": true -} diff --git a/go/test/endtoend/vtorc/utils/test_config_crosscenter_prefer.json b/go/test/endtoend/vtorc/utils/test_config_crosscenter_prefer.json deleted file mode 100644 index c9d09747364..00000000000 --- a/go/test/endtoend/vtorc/utils/test_config_crosscenter_prefer.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "Debug": true, - "MySQLTopologyUser": "orc_client_user", - "MySQLTopologyPassword": "orc_client_user_password", - "MySQLReplicaUser": "vt_repl", - "MySQLReplicaPassword": "", - "RecoveryPeriodBlockSeconds": 1, - "InstancePollSeconds": 1, - "LockShardTimeoutSeconds": 5, - "Durability": "test" -} diff --git a/go/test/endtoend/vtorc/utils/test_config_crosscenter_prefer_prevent.json b/go/test/endtoend/vtorc/utils/test_config_crosscenter_prefer_prevent.json deleted file mode 100644 index cb4ef5000f7..00000000000 --- a/go/test/endtoend/vtorc/utils/test_config_crosscenter_prefer_prevent.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "Debug": true, - "MySQLTopologyUser": "orc_client_user", - "MySQLTopologyPassword": "orc_client_user_password", - "MySQLReplicaUser": "vt_repl", - "MySQLReplicaPassword": "", - "RecoveryPeriodBlockSeconds": 1, - "InstancePollSeconds": 1, - "LockShardTimeoutSeconds": 5, - "Durability": "test", - "PreventCrossDataCenterPrimaryFailover": true -} diff --git a/go/test/endtoend/vtorc/utils/test_config_promotion_failure.json b/go/test/endtoend/vtorc/utils/test_config_promotion_failure.json deleted file mode 100644 index 94daa6a729f..00000000000 --- a/go/test/endtoend/vtorc/utils/test_config_promotion_failure.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "Debug": true, - "MySQLTopologyUser": "orc_client_user", - "MySQLTopologyPassword": "orc_client_user_password", - "MySQLReplicaUser": "vt_repl", - "MySQLReplicaPassword": "", - "RecoveryPeriodBlockSeconds": 1, - "InstancePollSeconds": 1, - "ReplicationLagQuery": "select 61", - "FailPrimaryPromotionOnLagMinutes": 1 -} diff --git a/go/test/endtoend/vtorc/utils/test_config_promotion_success.json b/go/test/endtoend/vtorc/utils/test_config_promotion_success.json deleted file mode 100644 index 5084e4598ec..00000000000 --- a/go/test/endtoend/vtorc/utils/test_config_promotion_success.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "Debug": true, - "MySQLTopologyUser": "orc_client_user", - "MySQLTopologyPassword": "orc_client_user_password", - "MySQLReplicaUser": "vt_repl", - "MySQLReplicaPassword": "", - "RecoveryPeriodBlockSeconds": 1, - "InstancePollSeconds": 1, - "ReplicationLagQuery": "select 59", - "FailPrimaryPromotionOnLagMinutes": 1 -} diff --git a/go/test/endtoend/vtorc/utils/test_config_semi_sync.json b/go/test/endtoend/vtorc/utils/test_config_semi_sync.json deleted file mode 100644 index 8b531fdc6ac..00000000000 --- a/go/test/endtoend/vtorc/utils/test_config_semi_sync.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "Debug": true, - "MySQLTopologyUser": "orc_client_user", - "MySQLTopologyPassword": "orc_client_user_password", - "MySQLReplicaUser": "vt_repl", - "MySQLReplicaPassword": "", - "Durability": "semi_sync", - "RecoveryPeriodBlockSeconds": 1, - "InstancePollSeconds": 1, - "PreventCrossDataCenterPrimaryFailover": true -} diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 1e2d308a81b..4641a898bed 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -240,38 +240,35 @@ func demotePrimaryTablet(ts *topo.Server) (err error) { return } -// StartVtorc is used to start the orchestrator with the given extra arguments -func StartVtorc(t *testing.T, clusterInfo *VtOrcClusterInfo, orcExtraArgs []string, configFileName string) { +// StartVtorcs is used to start the orchestrator with the given extra arguments +func StartVtorcs(t *testing.T, clusterInfo *VtOrcClusterInfo, orcExtraArgs []string, config cluster.VtorcConfiguration, count int) { t.Helper() - workingDir := os.Getenv("PWD") - idx := strings.Index(workingDir, "vtorc") - if idx == -1 { - require.Fail(t, "SetupVttabletsAndVtorc should only be used from a package inside the vtorc directory") - } - - pathToConfig := path.Join(workingDir[:idx], "vtorc", "utils", configFileName) // Start vtorc - clusterInfo.ClusterInstance.VtorcProcess = clusterInfo.ClusterInstance.NewOrcProcess(pathToConfig) - clusterInfo.ClusterInstance.VtorcProcess.ExtraArgs = orcExtraArgs - err := clusterInfo.ClusterInstance.VtorcProcess.Setup() - require.NoError(t, err) + for i := 0; i < count; i++ { + vtorcProcess := clusterInfo.ClusterInstance.NewOrcProcess(config) + vtorcProcess.ExtraArgs = orcExtraArgs + err := vtorcProcess.Setup() + require.NoError(t, err) + clusterInfo.ClusterInstance.VtorcProcesses = append(clusterInfo.ClusterInstance.VtorcProcesses, vtorcProcess) + } } -// StopVtorc is used to stop the orchestrator -func StopVtorc(t *testing.T, clusterInfo *VtOrcClusterInfo) { +// StopVtorcs is used to stop the orchestrator +func StopVtorcs(t *testing.T, clusterInfo *VtOrcClusterInfo) { t.Helper() // Stop vtorc - if clusterInfo.ClusterInstance.VtorcProcess != nil { - err := clusterInfo.ClusterInstance.VtorcProcess.TearDown() - require.NoError(t, err) + for _, vtorcProcess := range clusterInfo.ClusterInstance.VtorcProcesses { + if err := vtorcProcess.TearDown(); err != nil { + log.Errorf("Error in vtorc teardown: %v", err) + } } - clusterInfo.ClusterInstance.VtorcProcess = nil + clusterInfo.ClusterInstance.VtorcProcesses = nil } // SetupVttabletsAndVtorc is used to setup the vttablets and start the orchestrator -func SetupVttabletsAndVtorc(t *testing.T, clusterInfo *VtOrcClusterInfo, numReplicasReqCell1, numRdonlyReqCell1 int, orcExtraArgs []string, configFileName string) { +func SetupVttabletsAndVtorc(t *testing.T, clusterInfo *VtOrcClusterInfo, numReplicasReqCell1, numRdonlyReqCell1 int, orcExtraArgs []string, config cluster.VtorcConfiguration, vtorcCount int) { // stop vtorc if it is running - StopVtorc(t, clusterInfo) + StopVtorcs(t, clusterInfo) // remove all the vttablets so that each test can add the amount that they require err := shutdownVttablets(clusterInfo) @@ -312,7 +309,7 @@ func SetupVttabletsAndVtorc(t *testing.T, clusterInfo *VtOrcClusterInfo, numRepl } // start vtorc - StartVtorc(t, clusterInfo, orcExtraArgs, configFileName) + StartVtorcs(t, clusterInfo, orcExtraArgs, config, vtorcCount) } // cleanAndStartVttablet cleans the MySQL instance underneath for running a new test. It also starts the vttablet. diff --git a/go/vt/orchestrator/logic/orchestrator.go b/go/vt/orchestrator/logic/orchestrator.go index d181e223bf1..1d9b4e6c740 100644 --- a/go/vt/orchestrator/logic/orchestrator.go +++ b/go/vt/orchestrator/logic/orchestrator.go @@ -51,8 +51,6 @@ var discoveryQueue *discovery.Queue var snapshotDiscoveryKeys chan inst.InstanceKey var snapshotDiscoveryKeysMutex sync.Mutex var hasReceivedSIGTERM int32 -var ersInProgressMutex sync.Mutex -var ersInProgress bool var discoveriesCounter = metrics.NewCounter() var failedDiscoveriesCounter = metrics.NewCounter() diff --git a/go/vt/orchestrator/logic/topology_recovery.go b/go/vt/orchestrator/logic/topology_recovery.go index 223852458e6..d5b77fbda18 100644 --- a/go/vt/orchestrator/logic/topology_recovery.go +++ b/go/vt/orchestrator/logic/topology_recovery.go @@ -19,7 +19,6 @@ package logic import ( "context" "encoding/json" - "errors" "fmt" "math/rand" goos "os" @@ -619,10 +618,44 @@ func checkAndRecoverDeadPrimary(analysisEntry inst.ReplicationAnalysis, candidat if !(forceInstanceRecovery || analysisEntry.ClusterDetails.HasAutomatedPrimaryRecovery) { return false, nil, nil } - tablet, err := TabletRefresh(analysisEntry.AnalyzedInstanceKey) + + // We lock the shard here and then refresh the tablets information + ctx, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedInstanceKey) if err != nil { return false, nil, err } + defer unlock(&err) + + // TODO (@GuptaManan100): Refresh only the shard tablet information instead of all the tablets + RefreshTablets(true /* forceRefresh */) + + // Run a replication analysis again. We need this because vtorc works on ephemeral data to find the failure scenarios. + // That data might be old, because of a cluster operation that was run through vtctld or some other vtorc. So before we do any + // changes, we should be checking that this failure is indeed needed to be fixed. We do this after locking the shard to be sure + // that the data that we use now is up-to-date. + analysisEntries, err := inst.GetReplicationAnalysis(analysisEntry.ClusterDetails.ClusterName, &inst.ReplicationAnalysisHints{}) + if err != nil { + return false, nil, err + } + + // The recovery is only required if the same instance key requires a DeadPrimary or DeadPrimaryAndSomeReplicas recovery. + recoveryRequired := false + for _, entry := range analysisEntries { + if entry.AnalyzedInstanceKey.Equals(&analysisEntry.AnalyzedInstanceKey) { + if entry.Analysis == inst.DeadPrimary || entry.Analysis == inst.DeadPrimaryAndSomeReplicas { + recoveryRequired = true + } + } + } + + // No recovery is required. Some other agent already fixed the issue. + if !recoveryRequired { + log.Infof("Analysis: %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis) + return false, nil, nil + } + + // Read the tablet information from the database to find the shard and keyspace of the tablet + tablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceKey) var candidateTabletAlias *topodatapb.TabletAlias if candidateInstanceKey != nil { @@ -639,32 +672,6 @@ func checkAndRecoverDeadPrimary(analysisEntry inst.ReplicationAnalysis, candidat } log.Infof("Analysis: %v, deadprimary %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey) - // this check is needed because sometimes DeadPrimary code path is forcefully spawned off from other recoveries like PrimaryHasPrimary. - // So we need to check that we only run an ERS if the instance that we analyzed was actually a primary! Otherwise, we would end up running an ERS - // even when the cluster is fine or the problem can be fixed via some other recovery - if tablet.Type != topodatapb.TabletType_PRIMARY { - RefreshTablets(true /* forceRefresh */) - AuditTopologyRecovery(topologyRecovery, "another agent seems to have fixed the problem") - return false, topologyRecovery, nil - } - - // check if we have received SIGTERM, if we have, we should not continue with the recovery - val := atomic.LoadInt32(&hasReceivedSIGTERM) - if val > 0 { - return false, topologyRecovery, errors.New("Can't lock shard: SIGTERM received") - } - - // check if we have received an ERS in progress, if we do, we should not continue with the recovery - if checkAndSetIfERSInProgress() { - AuditTopologyRecovery(topologyRecovery, "an ERS is already in progress, not issuing another") - return false, topologyRecovery, nil - } - defer setERSCompleted() - - // add to the shard lock counter since ERS will lock the shard - atomic.AddInt32(&shardsLockCounter, 1) - defer atomic.AddInt32(&shardsLockCounter, -1) - ev, err := reparentutil.NewEmergencyReparenter(ts, tmclient.NewTabletManagerClient(), logutil.NewCallbackLogger(func(event *logutilpb.Event) { level := event.GetLevel() value := event.GetValue() @@ -676,7 +683,7 @@ func checkAndRecoverDeadPrimary(analysisEntry inst.ReplicationAnalysis, candidat log.Errorf("ERS - %s", value) } AuditTopologyRecovery(topologyRecovery, value) - })).ReparentShard(context.Background(), + })).ReparentShard(ctx, tablet.Keyspace, tablet.Shard, reparentutil.EmergencyReparentOptions{ @@ -687,9 +694,7 @@ func checkAndRecoverDeadPrimary(analysisEntry inst.ReplicationAnalysis, candidat }, ) - // here we need to forcefully refresh all the tablets otherwise old information is used and failover scenarios are spawned off which are not required - // For example, if we do not refresh the tablets forcefully and the new primary is found in the cache then its source key is not updated and this spawns off - // PrimaryHasPrimary analysis which runs another ERS + // We should refresh the tablet information again to update our information. RefreshTablets(true /* forceRefresh */) var promotedReplica *inst.Instance if ev.NewPrimary != nil { @@ -702,24 +707,6 @@ func checkAndRecoverDeadPrimary(analysisEntry inst.ReplicationAnalysis, candidat return true, topologyRecovery, err } -// checkAndSetIfERSInProgress checks if an ERS is already in progress. If it is not in progress, then we set it to be in progress. -func checkAndSetIfERSInProgress() bool { - ersInProgressMutex.Lock() - defer ersInProgressMutex.Unlock() - if ersInProgress { - return true - } - ersInProgress = true - return false -} - -// setERSCompleted sets the variable tracking if an ers is in progress to false. -func setERSCompleted() { - ersInProgressMutex.Lock() - defer ersInProgressMutex.Unlock() - ersInProgress = false -} - func postErsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.ReplicationAnalysis, skipProcesses bool, promotedReplica *inst.Instance) { if promotedReplica != nil { message := fmt.Sprintf("promoted replica: %+v", promotedReplica.Key) @@ -1770,6 +1757,41 @@ func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.Re // electNewPrimary elects a new primary while none were present before. func electNewPrimary(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + // We lock the shard here and then refresh the tablets information + ctx, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedInstanceKey) + if err != nil { + return false, nil, err + } + defer unlock(&err) + + // TODO (@GuptaManan100): Refresh only the shard tablet information instead of all the tablets + RefreshTablets(true /* forceRefresh */) + + // Run a replication analysis again. We need this because vtorc works on ephemeral data to find the failure scenarios. + // That data might be old, because of a cluster operation that was run through vtctld or some other vtorc. So before we do any + // changes, we should be checking that this failure is indeed needed to be fixed. We do this after locking the shard to be sure + // that the data that we use now is up-to-date. + analysisEntries, err := inst.GetReplicationAnalysis(analysisEntry.ClusterDetails.ClusterName, &inst.ReplicationAnalysisHints{}) + if err != nil { + return false, nil, err + } + + // The recovery is only required if the same instance key requires a ClusterHasNoPrimary recovery. + recoveryRequired := false + for _, entry := range analysisEntries { + if entry.AnalyzedInstanceKey.Equals(&analysisEntry.AnalyzedInstanceKey) { + if entry.Analysis == inst.ClusterHasNoPrimary { + recoveryRequired = true + } + } + } + + // No recovery is required. Some other agent already fixed the issue. + if !recoveryRequired { + log.Infof("Analysis: %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis) + return false, nil, nil + } + topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false /*failIfFailedInstanceInActiveRecovery*/, true /*failIfClusterInActiveRecovery*/) if topologyRecovery == nil || err != nil { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another electNewPrimary.", analysisEntry.AnalyzedInstanceKey)) @@ -1794,7 +1816,7 @@ func electNewPrimary(analysisEntry inst.ReplicationAnalysis, candidateInstanceKe log.Errorf("PRS - %s", value) } AuditTopologyRecovery(topologyRecovery, value) - })).ReparentShard(context.Background(), + })).ReparentShard(ctx, analyzedTablet.Keyspace, analyzedTablet.Shard, reparentutil.PlannedReparentOptions{ @@ -1819,6 +1841,46 @@ func electNewPrimary(analysisEntry inst.ReplicationAnalysis, candidateInstanceKe // fixClusterAndPrimary performs a traditional vitess PlannedReparentShard. func fixClusterAndPrimary(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + // We lock the shard here and then refresh the tablets information + _, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedInstanceKey) + if err != nil { + return false, nil, err + } + unlockFunctionCalled := false + defer func() { + if !unlockFunctionCalled { + unlock(&err) + } + }() + + // TODO (@GuptaManan100): Refresh only the shard tablet information instead of all the tablets + RefreshTablets(true /* forceRefresh */) + + // Run a replication analysis again. We need this because vtorc works on ephemeral data to find the failure scenarios. + // That data might be old, because of a cluster operation that was run through vtctld or some other vtorc. So before we do any + // changes, we should be checking that this failure is indeed needed to be fixed. We do this after locking the shard to be sure + // that the data that we use now is up-to-date. + analysisEntries, err := inst.GetReplicationAnalysis(analysisEntry.ClusterDetails.ClusterName, &inst.ReplicationAnalysisHints{}) + if err != nil { + return false, nil, err + } + + // The recovery is only required if the same instance key requires a DeadPrimary or DeadPrimaryAndSomeReplicas recovery. + recoveryRequired := false + for _, entry := range analysisEntries { + if entry.AnalyzedInstanceKey.Equals(&analysisEntry.AnalyzedInstanceKey) { + if entry.Analysis == inst.PrimaryHasPrimary { + recoveryRequired = true + } + } + } + + // No recovery is required. Some other agent already fixed the issue. + if !recoveryRequired { + log.Infof("Analysis: %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis) + return false, nil, nil + } + topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true) if topologyRecovery == nil { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixClusterAndPrimary.", analysisEntry.AnalyzedInstanceKey)) @@ -1833,6 +1895,8 @@ func fixClusterAndPrimary(analysisEntry inst.ReplicationAnalysis, candidateInsta return false, topologyRecovery, err } + unlockFunctionCalled = true + unlock(&err) altAnalysis, err := forceAnalysisEntry(analysisEntry.ClusterDetails.ClusterName, inst.DeadPrimary, "", &analysisEntry.AnalyzedInstancePrimaryKey) if err != nil { return false, topologyRecovery, err diff --git a/go/vt/vtctl/reparentutil/emergency_reparenter.go b/go/vt/vtctl/reparentutil/emergency_reparenter.go index 46bea34edb1..1f0746cabf3 100644 --- a/go/vt/vtctl/reparentutil/emergency_reparenter.go +++ b/go/vt/vtctl/reparentutil/emergency_reparenter.go @@ -96,13 +96,17 @@ func NewEmergencyReparenter(ts *topo.Server, tmc tmclient.TabletManagerClient, l // ReparentShard performs the EmergencyReparentShard operation on the given // keyspace and shard. func (erp *EmergencyReparenter) ReparentShard(ctx context.Context, keyspace string, shard string, opts EmergencyReparentOptions) (*events.Reparent, error) { - // First step is to lock the shard for the given operation - opts.lockAction = erp.getLockAction(opts.NewPrimaryAlias) - ctx, unlock, err := erp.ts.LockShard(ctx, keyspace, shard, opts.lockAction) - if err != nil { - return nil, err + var err error + // First step is to lock the shard for the given operation, if not already locked + if err = topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + var unlock func(*error) + opts.lockAction = erp.getLockAction(opts.NewPrimaryAlias) + ctx, unlock, err = erp.ts.LockShard(ctx, keyspace, shard, opts.lockAction) + if err != nil { + return nil, err + } + defer unlock(&err) } - defer unlock(&err) // dispatch success or failure of ERS ev := &events.Reparent{} diff --git a/go/vt/vtctl/reparentutil/planned_reparenter.go b/go/vt/vtctl/reparentutil/planned_reparenter.go index 3b1dabeb05c..46e351a8db2 100644 --- a/go/vt/vtctl/reparentutil/planned_reparenter.go +++ b/go/vt/vtctl/reparentutil/planned_reparenter.go @@ -86,15 +86,17 @@ func NewPlannedReparenter(ts *topo.Server, tmc tmclient.TabletManagerClient, log // and shard. It will make the provided tablet the primary for the shard, when // both the current and desired primary are reachable and in a good state. func (pr *PlannedReparenter) ReparentShard(ctx context.Context, keyspace string, shard string, opts PlannedReparentOptions) (*events.Reparent, error) { - opts.lockAction = pr.getLockAction(opts) - - ctx, unlock, err := pr.ts.LockShard(ctx, keyspace, shard, opts.lockAction) - if err != nil { - return nil, err + var err error + if err = topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + var unlock func(*error) + opts.lockAction = pr.getLockAction(opts) + ctx, unlock, err = pr.ts.LockShard(ctx, keyspace, shard, opts.lockAction) + if err != nil { + return nil, err + } + defer unlock(&err) } - defer unlock(&err) - if opts.NewPrimaryAlias == nil && opts.AvoidPrimaryAlias == nil { shardInfo, err := pr.ts.GetShard(ctx, keyspace, shard) if err != nil { diff --git a/go/vt/vtctl/reparentutil/planned_reparenter_flaky_test.go b/go/vt/vtctl/reparentutil/planned_reparenter_flaky_test.go index 2c4091ffe4c..51ee27bb309 100644 --- a/go/vt/vtctl/reparentutil/planned_reparenter_flaky_test.go +++ b/go/vt/vtctl/reparentutil/planned_reparenter_flaky_test.go @@ -278,15 +278,44 @@ func TestPlannedReparenter_ReparentShard(t *testing.T) { }, }, { - name: "cannot lock shard", + name: "already locked shard", ts: memorytopo.NewServer("zone1"), - tmc: nil, + tmc: &testutil.TabletManagerClient{ + PrimaryPositionResults: map[string]struct { + Position string + Error error + }{ + "zone1-0000000100": { + Position: "position1", + Error: nil, + }, + }, + PopulateReparentJournalResults: map[string]error{ + "zone1-0000000100": nil, + }, + SetReplicationSourceResults: map[string]error{ + "zone1-0000000200": nil, + }, + SetReadWriteResults: map[string]error{ + "zone1-0000000100": nil, + }, + }, tablets: []*topodatapb.Tablet{ { Alias: &topodatapb.TabletAlias{ Cell: "zone1", Uid: 100, }, + Type: topodatapb.TabletType_PRIMARY, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 200, + }, + Type: topodatapb.TabletType_REPLICA, Keyspace: "testkeyspace", Shard: "-", }, @@ -295,10 +324,33 @@ func TestPlannedReparenter_ReparentShard(t *testing.T) { keyspace: "testkeyspace", shard: "-", - opts: PlannedReparentOptions{}, + opts: PlannedReparentOptions{ + NewPrimaryAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, - expectedEvent: nil, - shouldErr: true, + shouldErr: false, + expectedEvent: &events.Reparent{ + ShardInfo: *topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{ + PrimaryAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + KeyRange: &topodatapb.KeyRange{}, + IsPrimaryServing: true, + }, nil), + NewPrimary: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Type: topodatapb.TabletType_PRIMARY, + Keyspace: "testkeyspace", + Shard: "-", + }, + }, }, { // The simplest setup required to make an overall ReparentShard call