Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtutils "vitess.io/vitess/go/vt/utils"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/logic"
)
Expand Down Expand Up @@ -402,6 +403,76 @@ func TestRepairAfterTER(t *testing.T) {
utils.CheckReplication(t, clusterInfo, newPrimary, []*cluster.Vttablet{curPrimary}, 15*time.Second)
}

// TestStalePrimary tests that an old primary that remains writable and of tablet type PRIMARY in the topo
// is properly demoted to a read-only replica by VTOrc.
func TestStalePrimary(t *testing.T) {
ctx := t.Context()

defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance)
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 4, 0, []string{"--topo-information-refresh-duration", "1s"}, cluster.VTOrcConfiguration{
PreventCrossCellFailover: true,
}, cluster.DefaultVtorcsByCell, policy.DurabilitySemiSync)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
assert.NotNil(t, curPrimary, "should have elected a primary")
utils.CheckPrimaryTablet(t, clusterInfo, curPrimary, true)

var badPrimary, healthyReplica *cluster.Vttablet
for _, tablet := range shard0.Vttablets {
if tablet.Alias == curPrimary.Alias {
continue
}

if badPrimary == nil {
badPrimary = tablet
continue
}

healthyReplica = tablet
}

utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{badPrimary, healthyReplica}, 15*time.Second)

curPrimaryTopo, err := clusterInfo.Ts.GetTablet(ctx, curPrimary.GetAlias())
require.NoError(t, err, "expected to read current primary topo record")

curPrimaryTermStart := protoutil.TimeFromProto(curPrimaryTopo.PrimaryTermStartTime)
require.False(t, curPrimaryTermStart.IsZero(), "expected current primary term start time to be set")

err = utils.RunSQLs(t, []string{"SET GLOBAL read_only = OFF"}, badPrimary, "")
require.NoError(t, err)
require.True(t, utils.WaitForReadOnlyValue(t, badPrimary, 0))

// We set the tablet's type in the topology to PRIMARY. This mimics the situation where during a demotion
// in a hypothetical ERS, the old primary starts running as a replica, but fails before updating the topology
// accordingly.
_, err = clusterInfo.Ts.UpdateTabletFields(ctx, badPrimary.GetAlias(), func(tablet *topodatapb.Tablet) error {
tablet.Type = topodatapb.TabletType_PRIMARY
tablet.PrimaryTermStartTime = protoutil.TimeToProto(curPrimaryTermStart.Add(-1 * time.Minute))
return nil
})
require.NoError(t, err)

// Expect VTOrc to demote the stale primary to a read-only replica.
require.Eventuallyf(t, func() bool {
topoTablet, topoErr := clusterInfo.Ts.GetTablet(ctx, badPrimary.GetAlias())
if topoErr != nil {
t.Logf("stale primary probe: topo error=%v", topoErr)
return false
}

readOnly, readErr := badPrimary.VttabletProcess.GetDBVar("read_only", "")
if readErr != nil {
t.Logf("stale primary probe: alias=%s topo=%v read_only error=%v", badPrimary.Alias, topoTablet.Type, readErr)
return false
}

return topoTablet.Type == topodatapb.TabletType_REPLICA && readOnly == "ON"
}, 30*time.Second, time.Second, "expected demotion to REPLICA with read_only=ON")
}

// TestSemiSync tests that semi-sync is setup correctly by vtorc if it is incorrectly set
func TestSemiSync(t *testing.T) {
// stop any vtorc instance running due to a previous test.
Expand Down
5 changes: 5 additions & 0 deletions go/vt/external/golib/sqlutils/sqlutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (rm *RowMap) GetTime(key string) time.Time {
if t, err := time.Parse(DateTimeFormat, rm.GetString(key)); err == nil {
return t
}

if t, err := time.Parse(time.RFC3339Nano, rm.GetString(key)); err == nil {
return t
}
Comment on lines +139 to +141
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was needed because we seem to be storing the timestamp in RFC3339Nano format, but then reading it as DateTimeFormat (ISO8601). I find this hard to believe because somehow that means we haven't needed to read timestamps back out at all? But it seems like that's the case 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use that format for histograms buckets in the stats package. I'm not aware of any other usage. Where was this needed here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When vtorc is reading the primary term start time, I was getting parsing errors leading to it being parsed as the zero value.


return time.Time{}
}

Expand Down
17 changes: 13 additions & 4 deletions go/vt/vtorc/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ const (
PrimarySemiSyncBlocked AnalysisCode = "PrimarySemiSyncBlocked"
ErrantGTIDDetected AnalysisCode = "ErrantGTIDDetected"
PrimaryDiskStalled AnalysisCode = "PrimaryDiskStalled"

// StaleTopoPrimary describes when a tablet still has the type PRIMARY in the topology when a newer primary
// has been elected. VTOrc should demote this primary to a replica.
StaleTopoPrimary AnalysisCode = "StaleTopoPrimary"
)

type StructureAnalysisCode string
Expand Down Expand Up @@ -87,10 +91,15 @@ type DetectionAnalysisHints struct {

// DetectionAnalysis represents an analysis of a detected problem.
type DetectionAnalysis struct {
AnalyzedInstanceAlias string
AnalyzedInstancePrimaryAlias string
TabletType topodatapb.TabletType
CurrentTabletType topodatapb.TabletType
AnalyzedInstanceAlias string
AnalyzedInstancePrimaryAlias string

// TabletType is the tablet's type as seen in the topology.
TabletType topodatapb.TabletType

// CurrentTabletType is the type this tablet is currently running as.
CurrentTabletType topodatapb.TabletType

PrimaryTimeStamp time.Time
AnalyzedKeyspace string
AnalyzedShard string
Expand Down
21 changes: 20 additions & 1 deletion go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ type clusterAnalysis struct {
hasShardWideAction bool
totalTablets int
primaryAlias string
durability policy.Durabler

// primaryTimestamp is the most recent primary term start time observed for the shard.
primaryTimestamp time.Time

// durability is the shard's current durability policy.
durability policy.Durabler
}

// GetDetectionAnalysis will check for detected problems (dead primary; unreachable primary; etc)
Expand Down Expand Up @@ -381,6 +386,7 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
if a.TabletType == topodatapb.TabletType_PRIMARY {
a.IsClusterPrimary = true
clusters[keyspaceShard].primaryAlias = a.AnalyzedInstanceAlias
clusters[keyspaceShard].primaryTimestamp = a.PrimaryTimeStamp
}
durabilityPolicy := m.GetString("durability_policy")
if durabilityPolicy == "" {
Expand Down Expand Up @@ -458,6 +464,9 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
case a.IsClusterPrimary && a.CurrentTabletType != topodatapb.TabletType_UNKNOWN && a.CurrentTabletType != topodatapb.TabletType_PRIMARY:
a.Analysis = PrimaryCurrentTypeMismatch
a.Description = "Primary tablet's current type is not PRIMARY"
case isStaleTopoPrimary(a, ca):
a.Analysis = StaleTopoPrimary
a.Description = "Primary tablet is stale, older than current primary"
case topo.IsReplicaType(a.TabletType) && a.ErrantGTID != "":
a.Analysis = ErrantGTIDDetected
a.Description = "Tablet has errant GTIDs"
Expand Down Expand Up @@ -611,6 +620,16 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
return result, err
}

// isStaleTopoPrimary returns true when a tablet has type PRIMARY in the topology and has an older primary term
// start time than the shard's current primary.
func isStaleTopoPrimary(tablet *DetectionAnalysis, cluster *clusterAnalysis) bool {
if tablet.TabletType != topodatapb.TabletType_PRIMARY {
return false
}

return tablet.PrimaryTimeStamp.Before(cluster.primaryTimestamp)
}

// postProcessAnalyses is used to update different analyses based on the information gleaned from looking at all the analyses together instead of individual data.
func postProcessAnalyses(result []*DetectionAnalysis, clusters map[string]*clusterAnalysis) []*DetectionAnalysis {
for {
Expand Down
100 changes: 100 additions & 0 deletions go/vt/vtorc/inst/analysis_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,106 @@ func TestGetDetectionAnalysisDecision(t *testing.T) {
}
}

// TestStalePrimary tests that an old primary that remains writable and is of tablet type PRIMARY
// in the topo is demoted to a read-only replica by VTOrc.
func TestStalePrimary(t *testing.T) {
oldDB := db.Db
defer func() {
db.Db = oldDB
}()

currentPrimaryTimestamp := time.Now().UTC().Truncate(time.Microsecond)
stalePrimaryTimestamp := currentPrimaryTimestamp.Add(-1 * time.Minute)
shardPrimaryTermTimestamp := currentPrimaryTimestamp.Format(sqlutils.DateTimeFormat)

// We set up a real primary and replica, and then a stale primary running as REPLICA but with
// tablet type PRIMARY in the topology.
info := []*test.InfoForRecoveryAnalysis{
{
TabletInfo: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 101},
Hostname: "localhost",
Keyspace: "ks",
Shard: "0",
Type: topodatapb.TabletType_PRIMARY,
MysqlHostname: "localhost",
MysqlPort: 6708,
},
DurabilityPolicy: policy.DurabilitySemiSync,
LastCheckValid: 1,
CountReplicas: 1,
CountValidReplicas: 1,
CountValidReplicatingReplicas: 1,
IsPrimary: 1,
SemiSyncPrimaryEnabled: 1,
SemiSyncPrimaryStatus: 1,
SemiSyncPrimaryWaitForReplicaCount: 1,
SemiSyncPrimaryClients: 1,
CurrentTabletType: int(topodatapb.TabletType_PRIMARY),
PrimaryTimestamp: &currentPrimaryTimestamp,
ShardPrimaryTermTimestamp: shardPrimaryTermTimestamp,
},
{
TabletInfo: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100},
Hostname: "localhost",
Keyspace: "ks",
Shard: "0",
Type: topodatapb.TabletType_REPLICA,
MysqlHostname: "localhost",
MysqlPort: 6709,
},
DurabilityPolicy: policy.DurabilitySemiSync,
PrimaryTabletInfo: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 101},
},
LastCheckValid: 1,
ReadOnly: 1,
SemiSyncReplicaEnabled: 1,
ShardPrimaryTermTimestamp: shardPrimaryTermTimestamp,
},
{
TabletInfo: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 102},
Hostname: "localhost",
Keyspace: "ks",
Shard: "0",
Type: topodatapb.TabletType_PRIMARY,
MysqlHostname: "localhost",
MysqlPort: 6710,
},
DurabilityPolicy: policy.DurabilitySemiSync,
LastCheckValid: 1,
IsPrimary: 1,
ReadOnly: 0,
SemiSyncPrimaryEnabled: 1,
SemiSyncPrimaryStatus: 1,
SemiSyncPrimaryWaitForReplicaCount: 2,
SemiSyncPrimaryClients: 1,
CurrentTabletType: int(topodatapb.TabletType_REPLICA),
PrimaryTimestamp: &stalePrimaryTimestamp,
},
}

var rowMaps []sqlutils.RowMap
for _, analysis := range info {
analysis.SetValuesFromTabletInfo()
rowMaps = append(rowMaps, analysis.ConvertToRowMap())
}
db.Db = test.NewTestDB([][]sqlutils.RowMap{rowMaps, rowMaps})

// Each sampling should yield the placeholder analysis that represents the future recovery behavior once
// the demotion logic is implemented, which makes this test fail until the actual fix is in place.
for range 2 {
got, err := GetDetectionAnalysis("", "", &DetectionAnalysisHints{})
require.NoError(t, err, "expected detection analysis to run without error")
require.Len(t, got, 1, "expected exactly one analysis entry for the shard")
require.Equal(t, AnalysisCode("StaleTopoPrimary"), got[0].Analysis, "expected stale primary analysis")
require.Equal(t, "ks", got[0].AnalyzedKeyspace, "expected analysis to target keyspace ks")
require.Equal(t, "0", got[0].AnalyzedShard, "expected analysis to target shard 0")
}
}

// TestGetDetectionAnalysis tests the entire GetDetectionAnalysis. It inserts data into the database and runs the function.
// The database is not faked. This is intended to give more test coverage. This test is more comprehensive but more expensive than TestGetDetectionAnalysisDecision.
// This test is somewhere between a unit test, and an end-to-end test. It is specifically useful for testing situations which are hard to come by in end-to-end test, but require
Expand Down
Loading
Loading