diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index b90231e20cc..60244c2cd21 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -1060,6 +1060,7 @@ func (cluster *LocalProcessCluster) NewVTOrcProcess(config VTOrcConfiguration) * LogDir: cluster.TmpDirectory, Config: config, WebPort: cluster.GetAndReservePort(), + Port: cluster.GetAndReservePort(), } } diff --git a/go/test/endtoend/cluster/vtorc_process.go b/go/test/endtoend/cluster/vtorc_process.go index d8563243a66..bd1e6fc7a59 100644 --- a/go/test/endtoend/cluster/vtorc_process.go +++ b/go/test/endtoend/cluster/vtorc_process.go @@ -20,6 +20,8 @@ package cluster import ( "encoding/json" "fmt" + "io" + "net/http" "os" "os/exec" "path" @@ -34,6 +36,7 @@ import ( // vtorc as a separate process for testing type VTOrcProcess struct { VtctlProcess + Port int LogDir string ExtraArgs []string ConfigPath string @@ -107,6 +110,7 @@ func (orc *VTOrcProcess) Setup() (err error) { "--topo_global_server_address", orc.TopoGlobalAddress, "--topo_global_root", orc.TopoGlobalRoot, "--config", orc.ConfigPath, + "--port", fmt.Sprintf("%d", orc.Port), "--orc_web_dir", path.Join(os.Getenv("VTROOT"), "web", "vtorc"), ) if *isCoverage { @@ -157,3 +161,24 @@ func (orc *VTOrcProcess) TearDown() error { return <-orc.exit } } + +// GetVars gets the variables exported on the /debug/vars page of VTOrc +func (orc *VTOrcProcess) GetVars() map[string]any { + varsURL := fmt.Sprintf("http://localhost:%d/debug/vars", orc.Port) + resp, err := http.Get(varsURL) + if err != nil { + return nil + } + defer resp.Body.Close() + + if resp.StatusCode == 200 { + resultMap := make(map[string]any) + respByte, _ := io.ReadAll(resp.Body) + err := json.Unmarshal(respByte, &resultMap) + if err != nil { + return nil + } + return resultMap + } + return nil +} diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index 57f5a86a0ef..4254606dd94 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/vtorc/utils" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vtorc/logic" ) // Cases to test: @@ -72,6 +73,7 @@ func TestSingleKeyspace(t *testing.T) { utils.CheckPrimaryTablet(t, clusterInfo, shard0.Vttablets[0], true) utils.CheckReplication(t, clusterInfo, shard0.Vttablets[0], shard0.Vttablets[1:], 10*time.Second) + utils.WaitForSuccessfulRecoveryCount(t, clusterInfo.ClusterInstance.VTOrcProcesses[0], logic.ElectNewPrimaryRecoveryName, 1) } // Cases to test: @@ -88,6 +90,7 @@ func TestKeyspaceShard(t *testing.T) { utils.CheckPrimaryTablet(t, clusterInfo, shard0.Vttablets[0], true) utils.CheckReplication(t, clusterInfo, shard0.Vttablets[0], shard0.Vttablets[1:], 10*time.Second) + utils.WaitForSuccessfulRecoveryCount(t, clusterInfo.ClusterInstance.VTOrcProcesses[0], logic.ElectNewPrimaryRecoveryName, 1) } // Cases to test: @@ -107,6 +110,8 @@ func TestVTOrcRepairs(t *testing.T) { // find primary from topo curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) assert.NotNil(t, curPrimary, "should have elected a primary") + vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0] + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1) var replica, otherReplica *cluster.Vttablet for _, tablet := range shard0.Vttablets { @@ -133,6 +138,7 @@ func TestVTOrcRepairs(t *testing.T) { // wait for repair match := utils.WaitForReadOnlyValue(t, curPrimary, 0) require.True(t, match) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixPrimaryRecoveryName, 1) }) t.Run("ReplicaReadWrite", func(t *testing.T) { @@ -143,6 +149,7 @@ func TestVTOrcRepairs(t *testing.T) { // wait for repair match := utils.WaitForReadOnlyValue(t, replica, 1) require.True(t, match) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 1) }) t.Run("StopReplication", func(t *testing.T) { @@ -152,6 +159,7 @@ func TestVTOrcRepairs(t *testing.T) { // check replication is setup correctly utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 15*time.Second) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 2) // Stop just the IO thread on the replica _, err = utils.RunSQL(t, "STOP SLAVE IO_THREAD", replica, "") @@ -159,6 +167,7 @@ func TestVTOrcRepairs(t *testing.T) { // check replication is setup correctly utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 15*time.Second) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 3) // Stop just the SQL thread on the replica _, err = utils.RunSQL(t, "STOP SLAVE SQL_THREAD", replica, "") @@ -166,6 +175,7 @@ func TestVTOrcRepairs(t *testing.T) { // check replication is setup correctly utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 15*time.Second) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 4) }) t.Run("ReplicationFromOtherReplica", func(t *testing.T) { @@ -177,6 +187,7 @@ func TestVTOrcRepairs(t *testing.T) { // wait until the source port is set back correctly by vtorc utils.CheckSourcePort(t, replica, curPrimary, 15*time.Second) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 5) // check that writes succeed utils.VerifyWritesSucceed(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 15*time.Second) @@ -196,6 +207,7 @@ func TestVTOrcRepairs(t *testing.T) { // wait for repair err = utils.WaitForReplicationToStop(t, curPrimary) require.NoError(t, err) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverPrimaryHasPrimaryRecoveryName, 1) // check that the writes still succeed utils.VerifyWritesSucceed(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 10*time.Second) }) @@ -314,6 +326,8 @@ func TestVTOrcWithPrs(t *testing.T) { // find primary from topo curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) assert.NotNil(t, curPrimary, "should have elected a primary") + vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0] + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1) // find any replica tablet other than the current primary var replica *cluster.Vttablet @@ -339,6 +353,12 @@ func TestVTOrcWithPrs(t *testing.T) { // check that the replica gets promoted utils.CheckPrimaryTablet(t, clusterInfo, replica, true) + // Verify that VTOrc didn't run any other recovery + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverDeadPrimaryRecoveryName, 0) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixPrimaryRecoveryName, 0) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 0) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverPrimaryHasPrimaryRecoveryName, 0) utils.VerifyWritesSucceed(t, clusterInfo, replica, shard0.Vttablets, 10*time.Second) } diff --git a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go index 0ec6a958ad3..01bf01782e7 100644 --- a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go +++ b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go @@ -21,6 +21,7 @@ import ( "time" "vitess.io/vitess/go/test/endtoend/vtorc/utils" + "vitess.io/vitess/go/vt/vtorc/logic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -40,6 +41,8 @@ func TestDownPrimary(t *testing.T) { // find primary from topo curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) assert.NotNil(t, curPrimary, "should have elected a primary") + vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0] + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1) // find the replica and rdonly tablets var replica, rdonly *cluster.Vttablet @@ -70,6 +73,7 @@ func TestDownPrimary(t *testing.T) { utils.CheckPrimaryTablet(t, clusterInfo, replica, true) // also check that the replication is working correctly after failover utils.VerifyWritesSucceed(t, clusterInfo, replica, []*cluster.Vttablet{rdonly}, 10*time.Second) + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverDeadPrimaryRecoveryName, 1) } // Failover should not be cross data centers, according to the configuration file diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index e558a1188ea..c2a6beaed90 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -194,8 +194,8 @@ func createVttablets(clusterInstance *cluster.LocalProcessCluster, cellInfos []* // shutdownVttablets shuts down all the vttablets and removes them from the topology func shutdownVttablets(clusterInfo *VTOrcClusterInfo) error { - // demote the primary tablet if there is - err := demotePrimaryTablet(clusterInfo.Ts) + // reset the shard primary + err := resetShardPrimary(clusterInfo.Ts) if err != nil { return err } @@ -203,11 +203,6 @@ func shutdownVttablets(clusterInfo *VTOrcClusterInfo) error { for _, vttablet := range clusterInfo.ClusterInstance.Keyspaces[0].Shards[0].Vttablets { // we need to stop a vttablet only if it is not shutdown if !vttablet.VttabletProcess.IsShutdown() { - // wait for primary tablet to demote. For all others, it will not wait - err = vttablet.VttabletProcess.WaitForTabletTypes([]string{vttablet.Type}) - if err != nil { - return err - } // Stop the vttablets err := vttablet.VttabletProcess.TearDown() if err != nil { @@ -224,10 +219,10 @@ func shutdownVttablets(clusterInfo *VTOrcClusterInfo) error { return nil } -// demotePrimaryTablet demotes the primary tablet for our shard -func demotePrimaryTablet(ts *topo.Server) (err error) { +// resetShardPrimary resets the shard's primary +func resetShardPrimary(ts *topo.Server) (err error) { // lock the shard - ctx, unlock, lockErr := ts.LockShard(context.Background(), keyspaceName, shardName, "demotePrimaryTablet-vtorc-endtoend-test") + ctx, unlock, lockErr := ts.LockShard(context.Background(), keyspaceName, shardName, "resetShardPrimary-vtorc-endtoend-test") if lockErr != nil { return lockErr } @@ -236,7 +231,6 @@ func demotePrimaryTablet(ts *topo.Server) (err error) { // update the shard record's primary if _, err = ts.UpdateShardFields(ctx, keyspaceName, shardName, func(si *topo.ShardInfo) error { si.PrimaryAlias = nil - si.SetPrimaryTermStartTime(time.Now()) return nil }); err != nil { return err @@ -339,6 +333,9 @@ func cleanAndStartVttablet(t *testing.T, clusterInfo *VTOrcClusterInfo, vttablet // reset the binlog _, err = RunSQL(t, "RESET MASTER", vttablet, "") require.NoError(t, err) + // set read-only to true + _, err = RunSQL(t, "SET GLOBAL read_only = ON", vttablet, "") + require.NoError(t, err) // start the vttablet err = vttablet.VttabletProcess.Setup() @@ -913,3 +910,23 @@ func WaitForReadOnlyValue(t *testing.T, curPrimary *cluster.Vttablet, expectValu } return false } + +// WaitForSuccessfulRecoveryCount waits until the given recovery name's count of successful runs matches the count expected +func WaitForSuccessfulRecoveryCount(t *testing.T, vtorcInstance *cluster.VTOrcProcess, recoveryName string, countExpected int) { + t.Helper() + timeout := 15 * time.Second + startTime := time.Now() + for time.Since(startTime) < timeout { + vars := vtorcInstance.GetVars() + successfulRecoveriesMap := vars["SuccessfulRecoveries"].(map[string]interface{}) + successCount := successfulRecoveriesMap[recoveryName] + if successCount == countExpected { + return + } + time.Sleep(time.Second) + } + vars := vtorcInstance.GetVars() + successfulRecoveriesMap := vars["SuccessfulRecoveries"].(map[string]interface{}) + successCount := successfulRecoveriesMap[recoveryName] + assert.EqualValues(t, countExpected, successCount) +} diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index c710f85fbbd..bdc558f28ad 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -23,40 +23,63 @@ import ( "math/rand" goos "os" "strings" - "sync/atomic" "time" - "vitess.io/vitess/go/vt/log" - - "vitess.io/vitess/go/vt/topo/topoproto" - "github.com/patrickmn/go-cache" - "github.com/rcrowley/go-metrics" logutilpb "vitess.io/vitess/go/vt/proto/logutil" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtctl/reparentutil" "vitess.io/vitess/go/vt/vtctl/reparentutil/promotionrule" "vitess.io/vitess/go/vt/vtorc/attributes" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/inst" - ometrics "vitess.io/vitess/go/vt/vtorc/metrics" "vitess.io/vitess/go/vt/vtorc/os" "vitess.io/vitess/go/vt/vtorc/process" "vitess.io/vitess/go/vt/vtorc/util" "vitess.io/vitess/go/vt/vttablet/tmclient" ) -var countPendingRecoveries int64 - type RecoveryType string const ( PrimaryRecovery RecoveryType = "PrimaryRecovery" CoPrimaryRecovery RecoveryType = "CoPrimaryRecovery" IntermediatePrimaryRecovery RecoveryType = "IntermediatePrimaryRecovery" + + CheckAndRecoverGenericProblemRecoveryName string = "CheckAndRecoverGenericProblem" + RecoverDeadPrimaryRecoveryName string = "RecoverDeadPrimary" + RecoverPrimaryHasPrimaryRecoveryName string = "RecoverPrimaryHasPrimary" + CheckAndRecoverLockedSemiSyncPrimaryRecoveryName string = "CheckAndRecoverLockedSemiSyncPrimary" + ElectNewPrimaryRecoveryName string = "ElectNewPrimary" + FixPrimaryRecoveryName string = "FixPrimary" + FixReplicaRecoveryName string = "FixReplica" +) + +var ( + actionableRecoveriesNames = []string{ + RecoverDeadPrimaryRecoveryName, + RecoverPrimaryHasPrimaryRecoveryName, + ElectNewPrimaryRecoveryName, + FixPrimaryRecoveryName, + FixReplicaRecoveryName, + } + + countPendingRecoveries = stats.NewGauge("PendingRecoveries", "Count of the number of pending recoveries") + + // recoveriesCounter counts the number of recoveries that VTOrc has performed + recoveriesCounter = stats.NewCountersWithSingleLabel("RecoveriesCount", "Count of the different recoveries performed", "RecoveryType", actionableRecoveriesNames...) + + // recoveriesSuccessfulCounter counts the number of successful recoveries that VTOrc has performed + recoveriesSuccessfulCounter = stats.NewCountersWithSingleLabel("SuccessfulRecoveries", "Count of the different successful recoveries performed", "RecoveryType", actionableRecoveriesNames...) + + // recoveriesFailureCounter counts the number of failed recoveries that VTOrc has performed + recoveriesFailureCounter = stats.NewCountersWithSingleLabel("FailedRecoveries", "Count of the different failed recoveries performed", "RecoveryType", actionableRecoveriesNames...) ) // recoveryFunction is the code of the recovery function to be used @@ -201,32 +224,8 @@ func (instancesByCountReplicas InstancesByCountReplicas) Less(i, j int) bool { return len(instancesByCountReplicas[i].Replicas) < len(instancesByCountReplicas[j].Replicas) } -var recoverDeadIntermediatePrimaryCounter = metrics.NewCounter() -var recoverDeadIntermediatePrimarySuccessCounter = metrics.NewCounter() -var recoverDeadIntermediatePrimaryFailureCounter = metrics.NewCounter() -var recoverDeadCoPrimaryCounter = metrics.NewCounter() -var recoverDeadCoPrimarySuccessCounter = metrics.NewCounter() -var recoverDeadCoPrimaryFailureCounter = metrics.NewCounter() -var countPendingRecoveriesGauge = metrics.NewGauge() - func init() { - _ = metrics.Register("recover.dead_intermediate_primary.start", recoverDeadIntermediatePrimaryCounter) - _ = metrics.Register("recover.dead_intermediate_primary.success", recoverDeadIntermediatePrimarySuccessCounter) - _ = metrics.Register("recover.dead_intermediate_primary.fail", recoverDeadIntermediatePrimaryFailureCounter) - _ = metrics.Register("recover.dead_co_primary.start", recoverDeadCoPrimaryCounter) - _ = metrics.Register("recover.dead_co_primary.success", recoverDeadCoPrimarySuccessCounter) - _ = metrics.Register("recover.dead_co_primary.fail", recoverDeadCoPrimaryFailureCounter) - _ = metrics.Register("recover.pending", countPendingRecoveriesGauge) - go initializeTopologyRecoveryPostConfiguration() - - ometrics.OnMetricsTick(func() { - countPendingRecoveriesGauge.Update(getCountPendingRecoveries()) - }) -} - -func getCountPendingRecoveries() int64 { - return atomic.LoadInt64(&countPendingRecoveries) } func initializeTopologyRecoveryPostConfiguration() { @@ -907,6 +906,31 @@ func getCheckAndRecoverFunction(recoveryFunctionCode recoveryFunction) ( } } +// getRecoverFunctionName gets the recovery function name for the given code. +// This name is used for metrics +func getRecoverFunctionName(recoveryFunctionCode recoveryFunction) string { + switch recoveryFunctionCode { + case noRecoveryFunc: + return "" + case recoverGenericProblemFunc: + return CheckAndRecoverGenericProblemRecoveryName + case recoverDeadPrimaryFunc: + return RecoverDeadPrimaryRecoveryName + case recoverPrimaryHasPrimaryFunc: + return RecoverPrimaryHasPrimaryRecoveryName + case recoverLockedSemiSyncPrimaryFunc: + return CheckAndRecoverLockedSemiSyncPrimaryRecoveryName + case electNewPrimaryFunc: + return ElectNewPrimaryRecoveryName + case fixPrimaryFunc: + return FixPrimaryRecoveryName + case fixReplicaFunc: + return FixReplicaRecoveryName + default: + return "" + } +} + // isClusterWideRecovery returns whether the given recovery is a cluster-wide recovery or not func isClusterWideRecovery(recoveryFunctionCode recoveryFunction) bool { switch recoveryFunctionCode { @@ -946,8 +970,8 @@ func runEmergentOperations(analysisEntry *inst.ReplicationAnalysis) { // executeCheckAndRecoverFunction will choose the correct check & recovery function based on analysis. // It executes the function synchronuously func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - atomic.AddInt64(&countPendingRecoveries, 1) - defer atomic.AddInt64(&countPendingRecoveries, -1) + countPendingRecoveries.Add(1) + defer countPendingRecoveries.Add(-1) checkAndRecoverFunctionCode := getCheckAndRecoverFunctionCode(analysisEntry.Analysis, &analysisEntry.AnalyzedInstanceKey) isActionableRecovery := hasActionableRecovery(checkAndRecoverFunctionCode) @@ -1069,6 +1093,13 @@ func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, cand if !recoveryAttempted { return recoveryAttempted, topologyRecovery, err } + recoveryName := getRecoverFunctionName(checkAndRecoverFunctionCode) + recoveriesCounter.Add(recoveryName, 1) + if err != nil { + recoveriesFailureCounter.Add(recoveryName, 1) + } else { + recoveriesSuccessfulCounter.Add(recoveryName, 1) + } if topologyRecovery == nil { return recoveryAttempted, topologyRecovery, err }