Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions go/vt/vtorc/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,20 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er
return err
}

// Prioritise primary recovery.
// If we are performing some other action, first ensure that it is not because of primary issues.
// This step is only meant to improve the time taken to detect and fix cluster wide recoveries, it does not impact correctness.
// If a VTOrc detects an issue on a replica like ReplicationStopped, the underlying cause could be a dead primary instead.
// So, we try to reload that primary's information before proceeding with the replication stopped fix. We do this before acquiring the shard lock
// to allow another VTOrc instance to proceed with the dead primary recovery if it is indeed the case and it detects it before us. If however, the primary
// is not dead, then we will proceed with the fix for the replica. Essentially, we are trading off speed in replica recoveries (by doing an additional primary tablet reload)
// for speed in cluster-wide recoveries (by not holding the shard lock before reloading the primary tablet information).
if !isClusterWideRecovery(checkAndRecoverFunctionCode) {
if err = recheckPrimaryHealth(analysisEntry, DiscoverInstance); err != nil {
return err
}
}

// We lock the shard here and then refresh the tablets information
ctx, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard,
getLockAction(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis),
Expand Down Expand Up @@ -667,6 +681,36 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er
return err
}

// recheckPrimaryHealth check the health of the primary node.
// It then checks whether, given the re-discovered primary health, the original recovery is still valid.
// If not valid then it will abort the current analysis.
func recheckPrimaryHealth(analysisEntry *inst.ReplicationAnalysis, discoveryFunc func(string, bool)) error {
originalAnalysisEntry := analysisEntry.Analysis
primaryTabletAlias := analysisEntry.AnalyzedInstancePrimaryAlias

// re-check if there are any mitigation required for the leader node.
// if the current problem is because of dead primary, this call will update the analysis entry
discoveryFunc(primaryTabletAlias, true)

// checking if the original analysis is valid even after the primary refresh.
recoveryRequired, err := checkIfAlreadyFixed(analysisEntry)
if err != nil {
log.Infof("recheckPrimaryHealth: Checking if recovery is required returned err: %v", err)
return err
}

// The original analysis for the tablet has changed.
// This could mean that either the original analysis has changed or some other Vtorc instance has already performing the mitigation.
// In either case, the original analysis is stale which can be safely aborted.
if recoveryRequired {
log.Infof("recheckPrimaryHealth: Primary recovery is required, Tablet alias: %v", primaryTabletAlias)
// original analysis is stale, abort.
return fmt.Errorf("aborting %s, primary mitigation is required", originalAnalysisEntry)
}

return nil
}

// checkIfAlreadyFixed checks whether the problem that the analysis entry represents has already been fixed by another agent or not
func checkIfAlreadyFixed(analysisEntry *inst.ReplicationAnalysis) (bool, error) {
// Run a replication analysis again. We will check if the problem persisted
Expand Down
108 changes: 108 additions & 0 deletions go/vt/vtorc/logic/topology_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/external/golib/sqlutils"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/test"
_ "vitess.io/vitess/go/vt/vttablet/grpctmclient"
)

Expand Down Expand Up @@ -311,3 +314,108 @@ func TestGetCheckAndRecoverFunctionCode(t *testing.T) {
})
}
}

func TestRecheckPrimaryHealth(t *testing.T) {
tests := []struct {
name string
info []*test.InfoForRecoveryAnalysis
wantErr string
}{
{
name: "analysis change",
info: []*test.InfoForRecoveryAnalysis{{
TabletInfo: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100},
Hostname: "localhost",
Keyspace: "ks",
Shard: "0",
Type: topodatapb.TabletType_PRIMARY,
MysqlHostname: "localhost",
MysqlPort: 6709,
},
DurabilityPolicy: "none",
LastCheckValid: 0,
CountReplicas: 4,
CountValidReplicas: 4,
CountValidReplicatingReplicas: 0,
}},
wantErr: "aborting ReplicationStopped, primary mitigation is required",
},
{
name: "analysis did not change",
info: []*test.InfoForRecoveryAnalysis{{
TabletInfo: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 101},
Hostname: "localhost",
Keyspace: "ks",
Shard: "0",
Type: topodatapb.TabletType_PRIMARY,
MysqlHostname: "localhost",
MysqlPort: 6708,
},
DurabilityPolicy: policy.DurabilityNone,
LastCheckValid: 1,
CountReplicas: 4,
CountValidReplicas: 4,
CountValidReplicatingReplicas: 3,
CountValidOracleGTIDReplicas: 4,
CountLoggingReplicas: 2,
IsPrimary: 1,
CurrentTabletType: int(topodatapb.TabletType_PRIMARY),
}, {
TabletInfo: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100},
Hostname: "localhost",
Keyspace: "ks",
Shard: "0",
Type: topodatapb.TabletType_REPLICA,
MysqlHostname: "localhost",
MysqlPort: 6709,
},
DurabilityPolicy: policy.DurabilityNone,
PrimaryTabletInfo: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 101},
},
LastCheckValid: 1,
ReadOnly: 1,
ReplicationStopped: 1,
}},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// reset vtorc db after every test
oldDB := db.Db
defer func() {
db.Db = oldDB
}()

var rowMaps []sqlutils.RowMap
for _, analysis := range tt.info {
analysis.SetValuesFromTabletInfo()
rowMaps = append(rowMaps, analysis.ConvertToRowMap())
}

// set replication analysis in Vtorc DB.
db.Db = test.NewTestDB([][]sqlutils.RowMap{rowMaps})

err := recheckPrimaryHealth(&inst.ReplicationAnalysis{
AnalyzedInstanceAlias: "zon1-0000000100",
Analysis: inst.ReplicationStopped,
AnalyzedKeyspace: "ks",
AnalyzedShard: "0",
}, func(s string, b bool) {
// the implementation for DiscoverInstance is not required because we are mocking the db response.
})

if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr)
return
}

require.NoError(t, err)
})
}

}
Loading