diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index cccae8209cf..fe768051a2d 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -32,6 +32,7 @@ import ( "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtutils "vitess.io/vitess/go/vt/utils" + "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" "vitess.io/vitess/go/vt/vtorc/inst" "vitess.io/vitess/go/vt/vtorc/logic" ) @@ -402,6 +403,76 @@ func TestRepairAfterTER(t *testing.T) { utils.CheckReplication(t, clusterInfo, newPrimary, []*cluster.Vttablet{curPrimary}, 15*time.Second) } +// TestStalePrimary tests that an old primary that remains writable and of tablet type PRIMARY in the topo +// is properly demoted to a read-only replica by VTOrc. +func TestStalePrimary(t *testing.T) { + ctx := t.Context() + + defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance) + utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 4, 0, []string{"--topo-information-refresh-duration", "1s"}, cluster.VTOrcConfiguration{ + PreventCrossCellFailover: true, + }, cluster.DefaultVtorcsByCell, policy.DurabilitySemiSync) + keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + + curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) + assert.NotNil(t, curPrimary, "should have elected a primary") + utils.CheckPrimaryTablet(t, clusterInfo, curPrimary, true) + + var badPrimary, healthyReplica *cluster.Vttablet + for _, tablet := range shard0.Vttablets { + if tablet.Alias == curPrimary.Alias { + continue + } + + if badPrimary == nil { + badPrimary = tablet + continue + } + + healthyReplica = tablet + } + + utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{badPrimary, healthyReplica}, 15*time.Second) + + curPrimaryTopo, err := clusterInfo.Ts.GetTablet(ctx, curPrimary.GetAlias()) + require.NoError(t, err, "expected to read current primary topo record") + + curPrimaryTermStart := protoutil.TimeFromProto(curPrimaryTopo.PrimaryTermStartTime) + require.False(t, curPrimaryTermStart.IsZero(), "expected current primary term start time to be set") + + err = utils.RunSQLs(t, []string{"SET GLOBAL read_only = OFF"}, badPrimary, "") + require.NoError(t, err) + require.True(t, utils.WaitForReadOnlyValue(t, badPrimary, 0)) + + // We set the tablet's type in the topology to PRIMARY. This mimics the situation where during a demotion + // in a hypothetical ERS, the old primary starts running as a replica, but fails before updating the topology + // accordingly. + _, err = clusterInfo.Ts.UpdateTabletFields(ctx, badPrimary.GetAlias(), func(tablet *topodatapb.Tablet) error { + tablet.Type = topodatapb.TabletType_PRIMARY + tablet.PrimaryTermStartTime = protoutil.TimeToProto(curPrimaryTermStart.Add(-1 * time.Minute)) + return nil + }) + require.NoError(t, err) + + // Expect VTOrc to demote the stale primary to a read-only replica. + require.Eventuallyf(t, func() bool { + topoTablet, topoErr := clusterInfo.Ts.GetTablet(ctx, badPrimary.GetAlias()) + if topoErr != nil { + t.Logf("stale primary probe: topo error=%v", topoErr) + return false + } + + readOnly, readErr := badPrimary.VttabletProcess.GetDBVar("read_only", "") + if readErr != nil { + t.Logf("stale primary probe: alias=%s topo=%v read_only error=%v", badPrimary.Alias, topoTablet.Type, readErr) + return false + } + + return topoTablet.Type == topodatapb.TabletType_REPLICA && readOnly == "ON" + }, 30*time.Second, time.Second, "expected demotion to REPLICA with read_only=ON") +} + // TestSemiSync tests that semi-sync is setup correctly by vtorc if it is incorrectly set func TestSemiSync(t *testing.T) { // stop any vtorc instance running due to a previous test. diff --git a/go/vt/external/golib/sqlutils/sqlutils.go b/go/vt/external/golib/sqlutils/sqlutils.go index 4e8303cd47a..bdd34d7a116 100644 --- a/go/vt/external/golib/sqlutils/sqlutils.go +++ b/go/vt/external/golib/sqlutils/sqlutils.go @@ -135,6 +135,11 @@ func (rm *RowMap) GetTime(key string) time.Time { if t, err := time.Parse(DateTimeFormat, rm.GetString(key)); err == nil { return t } + + if t, err := time.Parse(time.RFC3339Nano, rm.GetString(key)); err == nil { + return t + } + return time.Time{} } diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go index c2f5e8ae85a..6bc6101581b 100644 --- a/go/vt/vtorc/inst/analysis.go +++ b/go/vt/vtorc/inst/analysis.go @@ -60,6 +60,10 @@ const ( PrimarySemiSyncBlocked AnalysisCode = "PrimarySemiSyncBlocked" ErrantGTIDDetected AnalysisCode = "ErrantGTIDDetected" PrimaryDiskStalled AnalysisCode = "PrimaryDiskStalled" + + // StaleTopoPrimary describes when a tablet still has the type PRIMARY in the topology when a newer primary + // has been elected. VTOrc should demote this primary to a replica. + StaleTopoPrimary AnalysisCode = "StaleTopoPrimary" ) type StructureAnalysisCode string @@ -87,10 +91,15 @@ type DetectionAnalysisHints struct { // DetectionAnalysis represents an analysis of a detected problem. type DetectionAnalysis struct { - AnalyzedInstanceAlias string - AnalyzedInstancePrimaryAlias string - TabletType topodatapb.TabletType - CurrentTabletType topodatapb.TabletType + AnalyzedInstanceAlias string + AnalyzedInstancePrimaryAlias string + + // TabletType is the tablet's type as seen in the topology. + TabletType topodatapb.TabletType + + // CurrentTabletType is the type this tablet is currently running as. + CurrentTabletType topodatapb.TabletType + PrimaryTimeStamp time.Time AnalyzedKeyspace string AnalyzedShard string diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 19560a69b07..9b457bee8c2 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -54,7 +54,12 @@ type clusterAnalysis struct { hasShardWideAction bool totalTablets int primaryAlias string - durability policy.Durabler + + // primaryTimestamp is the most recent primary term start time observed for the shard. + primaryTimestamp time.Time + + // durability is the shard's current durability policy. + durability policy.Durabler } // GetDetectionAnalysis will check for detected problems (dead primary; unreachable primary; etc) @@ -381,6 +386,7 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi if a.TabletType == topodatapb.TabletType_PRIMARY { a.IsClusterPrimary = true clusters[keyspaceShard].primaryAlias = a.AnalyzedInstanceAlias + clusters[keyspaceShard].primaryTimestamp = a.PrimaryTimeStamp } durabilityPolicy := m.GetString("durability_policy") if durabilityPolicy == "" { @@ -458,6 +464,9 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi case a.IsClusterPrimary && a.CurrentTabletType != topodatapb.TabletType_UNKNOWN && a.CurrentTabletType != topodatapb.TabletType_PRIMARY: a.Analysis = PrimaryCurrentTypeMismatch a.Description = "Primary tablet's current type is not PRIMARY" + case isStaleTopoPrimary(a, ca): + a.Analysis = StaleTopoPrimary + a.Description = "Primary tablet is stale, older than current primary" case topo.IsReplicaType(a.TabletType) && a.ErrantGTID != "": a.Analysis = ErrantGTIDDetected a.Description = "Tablet has errant GTIDs" @@ -611,6 +620,16 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi return result, err } +// isStaleTopoPrimary returns true when a tablet has type PRIMARY in the topology and has an older primary term +// start time than the shard's current primary. +func isStaleTopoPrimary(tablet *DetectionAnalysis, cluster *clusterAnalysis) bool { + if tablet.TabletType != topodatapb.TabletType_PRIMARY { + return false + } + + return tablet.PrimaryTimeStamp.Before(cluster.primaryTimestamp) +} + // postProcessAnalyses is used to update different analyses based on the information gleaned from looking at all the analyses together instead of individual data. func postProcessAnalyses(result []*DetectionAnalysis, clusters map[string]*clusterAnalysis) []*DetectionAnalysis { for { diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index bd0fba77dfb..27a7efab8c0 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -1019,6 +1019,106 @@ func TestGetDetectionAnalysisDecision(t *testing.T) { } } +// TestStalePrimary tests that an old primary that remains writable and is of tablet type PRIMARY +// in the topo is demoted to a read-only replica by VTOrc. +func TestStalePrimary(t *testing.T) { + oldDB := db.Db + defer func() { + db.Db = oldDB + }() + + currentPrimaryTimestamp := time.Now().UTC().Truncate(time.Microsecond) + stalePrimaryTimestamp := currentPrimaryTimestamp.Add(-1 * time.Minute) + shardPrimaryTermTimestamp := currentPrimaryTimestamp.Format(sqlutils.DateTimeFormat) + + // We set up a real primary and replica, and then a stale primary running as REPLICA but with + // tablet type PRIMARY in the topology. + info := []*test.InfoForRecoveryAnalysis{ + { + TabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 101}, + Hostname: "localhost", + Keyspace: "ks", + Shard: "0", + Type: topodatapb.TabletType_PRIMARY, + MysqlHostname: "localhost", + MysqlPort: 6708, + }, + DurabilityPolicy: policy.DurabilitySemiSync, + LastCheckValid: 1, + CountReplicas: 1, + CountValidReplicas: 1, + CountValidReplicatingReplicas: 1, + IsPrimary: 1, + SemiSyncPrimaryEnabled: 1, + SemiSyncPrimaryStatus: 1, + SemiSyncPrimaryWaitForReplicaCount: 1, + SemiSyncPrimaryClients: 1, + CurrentTabletType: int(topodatapb.TabletType_PRIMARY), + PrimaryTimestamp: ¤tPrimaryTimestamp, + ShardPrimaryTermTimestamp: shardPrimaryTermTimestamp, + }, + { + TabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100}, + Hostname: "localhost", + Keyspace: "ks", + Shard: "0", + Type: topodatapb.TabletType_REPLICA, + MysqlHostname: "localhost", + MysqlPort: 6709, + }, + DurabilityPolicy: policy.DurabilitySemiSync, + PrimaryTabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 101}, + }, + LastCheckValid: 1, + ReadOnly: 1, + SemiSyncReplicaEnabled: 1, + ShardPrimaryTermTimestamp: shardPrimaryTermTimestamp, + }, + { + TabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 102}, + Hostname: "localhost", + Keyspace: "ks", + Shard: "0", + Type: topodatapb.TabletType_PRIMARY, + MysqlHostname: "localhost", + MysqlPort: 6710, + }, + DurabilityPolicy: policy.DurabilitySemiSync, + LastCheckValid: 1, + IsPrimary: 1, + ReadOnly: 0, + SemiSyncPrimaryEnabled: 1, + SemiSyncPrimaryStatus: 1, + SemiSyncPrimaryWaitForReplicaCount: 2, + SemiSyncPrimaryClients: 1, + CurrentTabletType: int(topodatapb.TabletType_REPLICA), + PrimaryTimestamp: &stalePrimaryTimestamp, + }, + } + + var rowMaps []sqlutils.RowMap + for _, analysis := range info { + analysis.SetValuesFromTabletInfo() + rowMaps = append(rowMaps, analysis.ConvertToRowMap()) + } + db.Db = test.NewTestDB([][]sqlutils.RowMap{rowMaps, rowMaps}) + + // Each sampling should yield the placeholder analysis that represents the future recovery behavior once + // the demotion logic is implemented, which makes this test fail until the actual fix is in place. + for range 2 { + got, err := GetDetectionAnalysis("", "", &DetectionAnalysisHints{}) + require.NoError(t, err, "expected detection analysis to run without error") + require.Len(t, got, 1, "expected exactly one analysis entry for the shard") + require.Equal(t, AnalysisCode("StaleTopoPrimary"), got[0].Analysis, "expected stale primary analysis") + require.Equal(t, "ks", got[0].AnalyzedKeyspace, "expected analysis to target keyspace ks") + require.Equal(t, "0", got[0].AnalyzedShard, "expected analysis to target shard 0") + } +} + // TestGetDetectionAnalysis tests the entire GetDetectionAnalysis. It inserts data into the database and runs the function. // The database is not faked. This is intended to give more test coverage. This test is more comprehensive but more expensive than TestGetDetectionAnalysisDecision. // This test is somewhere between a unit test, and an end-to-end test. It is specifically useful for testing situations which are hard to come by in end-to-end test, but require diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index e05462e0a4f..75ba11b5931 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -53,6 +53,10 @@ const ( FixPrimaryRecoveryName string = "FixPrimary" FixReplicaRecoveryName string = "FixReplica" RecoverErrantGTIDDetectedName string = "RecoverErrantGTIDDetected" + + // DemoteStaleTopoPrimaryRecoveryName is a recovery for tablets that have a stale type of PRIMARY + // in the topology but a newer primary has been elected. + DemoteStaleTopoPrimaryRecoveryName string = "DemoteStaleTopoPrimary" ) // RecoverySkipCode represents the reason for a skipped recovery. @@ -145,6 +149,10 @@ const ( fixPrimaryFunc fixReplicaFunc recoverErrantGTIDDetectedFunc + + // demoteStaleTopoPrimaryFunc is the recovery function for when a tablet has a stale type of + // PRIMARY in the topology and should be demoted. + demoteStaleTopoPrimaryFunc ) // TopologyRecovery represents an entry in the topology_recovery table @@ -562,6 +570,8 @@ func getCheckAndRecoverFunctionCode(analysisEntry *inst.DetectionAnalysis) (reco recoveryFunc = electNewPrimaryFunc case inst.PrimaryIsReadOnly, inst.PrimarySemiSyncMustBeSet, inst.PrimarySemiSyncMustNotBeSet, inst.PrimaryCurrentTypeMismatch: recoveryFunc = fixPrimaryFunc + case inst.StaleTopoPrimary: + recoveryFunc = demoteStaleTopoPrimaryFunc // replica case inst.NotConnectedToPrimary, inst.ConnectedToWrongPrimary, inst.ReplicationStopped, inst.ReplicaIsWritable, inst.ReplicaSemiSyncMustBeSet, inst.ReplicaSemiSyncMustNotBeSet, inst.ReplicaMisconfigured: @@ -617,6 +627,8 @@ func hasActionableRecovery(recoveryFunctionCode recoveryFunction) bool { return true case recoverErrantGTIDDetectedFunc: return true + case demoteStaleTopoPrimaryFunc: + return true default: return false } @@ -651,6 +663,8 @@ func getCheckAndRecoverFunction(recoveryFunctionCode recoveryFunction) ( return fixReplica case recoverErrantGTIDDetectedFunc: return recoverErrantGTIDDetected + case demoteStaleTopoPrimaryFunc: + return demoteStaleTopoPrimary default: return nil } @@ -684,6 +698,8 @@ func getRecoverFunctionName(recoveryFunctionCode recoveryFunction) string { return FixReplicaRecoveryName case recoverErrantGTIDDetectedFunc: return RecoverErrantGTIDDetectedName + case demoteStaleTopoPrimaryFunc: + return DemoteStaleTopoPrimaryRecoveryName default: return "" } @@ -1130,6 +1146,67 @@ func fixReplica(ctx context.Context, analysisEntry *inst.DetectionAnalysis, logg return true, topologyRecovery, err } +// demoteStaleTopoPrimary demotes a tablet that has a stale type of PRIMARY in the topology when a newer primary has +// been elected. It demotes the tablet, updates its type to REPLICA in the topology, and sets its replication source +// to the current primary. +func demoteStaleTopoPrimary(ctx context.Context, analysisEntry *inst.DetectionAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + // Register the recovery before touching topology so multiple VTOrc instances do not race the demotion. + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) + if topologyRecovery == nil { + message := fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another demoteStaleTopoPrimary.", analysisEntry.AnalyzedInstanceAlias) + logger.Warning(message) + _ = AuditTopologyRecovery(topologyRecovery, message) + return false, nil, err + } + + logger.Infof("Analysis: %v, will demote stale topo primary %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + // This has to be done in the end; whether successful or not, we should mark that the recovery is done. + // So that after the active period passes, we are able to run other recoveries. + defer func() { + _ = resolveRecovery(topologyRecovery, nil) + }() + + analyzedTablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceAlias) + if err != nil { + logger.Errorf("Failed to read instance %q, aborting recovery", analysisEntry.AnalyzedInstanceAlias) + return false, topologyRecovery, fmt.Errorf("failed to read instance: %w", err) + } + + primaryTablet, err := shardPrimary(analyzedTablet.Keyspace, analyzedTablet.Shard) + if err != nil { + logger.Infof("Could not compute primary for %s/%s", analyzedTablet.Keyspace, analyzedTablet.Shard) + return false, topologyRecovery, fmt.Errorf("failed to find primary for shard: %w", err) + } + + durabilityPolicy, err := inst.GetDurabilityPolicy(analyzedTablet.Keyspace) + if err != nil { + logger.Infof("Could not read the durability policy for %s/%s", analyzedTablet.Keyspace, analyzedTablet.Shard) + return false, topologyRecovery, fmt.Errorf("failed to read the durability policy for the keyspace: %w", err) + } + + // Demote the tablet, forcing it to drop any pending transactions that are waiting for an ack. + _, err = tmc.DemotePrimary(ctx, analyzedTablet, true) + if err != nil { + return true, topologyRecovery, fmt.Errorf("failed to demote stale primary: %w", err) + } + logger.Info("Successfully demoted the stale primary " + analysisEntry.AnalyzedInstanceAlias) + + // Set tablet to REPLICA in topology. + semiSync := policy.IsReplicaSemiSync(durabilityPolicy, primaryTablet, analyzedTablet) + err = changeTabletType(ctx, analyzedTablet, topodatapb.TabletType_REPLICA, semiSync) + if err != nil { + return true, topologyRecovery, fmt.Errorf("failed to set tablet type to REPLICA in topology: %w", err) + } + + // Set the instance's replication source to the current primary. + err = setReplicationSource(ctx, analyzedTablet, primaryTablet, semiSync, float64(analysisEntry.ReplicaNetTimeout)/2) + if err != nil { + return true, topologyRecovery, fmt.Errorf("failed to repoint replication to primary: %w", err) + } + + return true, topologyRecovery, err +} + // recoverErrantGTIDDetected changes the tablet type of a replica tablet that has errant GTIDs. func recoverErrantGTIDDetected(ctx context.Context, analysisEntry *inst.DetectionAnalysis, logger *log.PrefixedLogger) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry)