Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go/vt/vtorc/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ type ReplicationAnalysis struct {
TabletType topodatapb.TabletType
CurrentTabletType topodatapb.TabletType
PrimaryTimeStamp time.Time
ClusterDetails ClusterInfo
AnalyzedKeyspace string
AnalyzedShard string
// ShardPrimaryTermTimestamp is the primary term start time stored in the shard record.
Expand Down
14 changes: 6 additions & 8 deletions go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,6 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
Type: BinaryLog,
}
isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates")
a.ClusterDetails.Keyspace = m.GetString("keyspace")
a.ClusterDetails.Shard = m.GetString("shard")
a.GTIDMode = m.GetString("gtid_mode")
a.LastCheckValid = m.GetBool("is_last_check_valid")
a.LastCheckPartialSuccess = m.GetBool("last_check_partial_success")
Expand Down Expand Up @@ -367,13 +365,13 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna

if !a.LastCheckValid {
analysisMessage := fmt.Sprintf("analysis: Alias: %+v, Keyspace: %+v, Shard: %+v, IsPrimary: %+v, LastCheckValid: %+v, LastCheckPartialSuccess: %+v, CountReplicas: %+v, CountValidReplicas: %+v, CountValidReplicatingReplicas: %+v, CountLaggingReplicas: %+v, CountDelayedReplicas: %+v",
a.AnalyzedInstanceAlias, a.ClusterDetails.Keyspace, a.ClusterDetails.Shard, a.IsPrimary, a.LastCheckValid, a.LastCheckPartialSuccess, a.CountReplicas, a.CountValidReplicas, a.CountValidReplicatingReplicas, a.CountLaggingReplicas, a.CountDelayedReplicas,
a.AnalyzedInstanceAlias, a.AnalyzedKeyspace, a.AnalyzedShard, a.IsPrimary, a.LastCheckValid, a.LastCheckPartialSuccess, a.CountReplicas, a.CountValidReplicas, a.CountValidReplicatingReplicas, a.CountLaggingReplicas, a.CountDelayedReplicas,
)
if util.ClearToLog("analysis_dao", analysisMessage) {
log.Infof(analysisMessage)
}
}
keyspaceShard := getKeyspaceShardName(a.ClusterDetails.Keyspace, a.ClusterDetails.Shard)
keyspaceShard := getKeyspaceShardName(a.AnalyzedKeyspace, a.AnalyzedShard)
if clusters[keyspaceShard] == nil {
clusters[keyspaceShard] = &clusterAnalysis{}
if a.TabletType == topodatapb.TabletType_PRIMARY {
Expand Down Expand Up @@ -614,14 +612,14 @@ func postProcessAnalyses(result []*ReplicationAnalysis, clusters map[string]*clu
// If one of them is an InvalidPrimary, then we see if all the other tablets in this keyspace shard are
// unable to replicate or not.
if analysis.Analysis == InvalidPrimary {
keyspaceName := analysis.ClusterDetails.Keyspace
shardName := analysis.ClusterDetails.Shard
keyspaceName := analysis.AnalyzedKeyspace
shardName := analysis.AnalyzedShard
keyspaceShard := getKeyspaceShardName(keyspaceName, shardName)
totalReplicas := clusters[keyspaceShard].totalTablets - 1
var notReplicatingReplicas []int
for idx, replicaAnalysis := range result {
if replicaAnalysis.ClusterDetails.Keyspace == keyspaceName &&
replicaAnalysis.ClusterDetails.Shard == shardName && topo.IsReplicaType(replicaAnalysis.TabletType) {
if replicaAnalysis.AnalyzedKeyspace == keyspaceName &&
replicaAnalysis.AnalyzedShard == shardName && topo.IsReplicaType(replicaAnalysis.TabletType) {
// If the replica's last check is invalid or its replication is stopped, then we consider as not replicating.
if !replicaAnalysis.LastCheckValid || replicaAnalysis.ReplicationStopped {
notReplicatingReplicas = append(notReplicatingReplicas, idx)
Expand Down
84 changes: 48 additions & 36 deletions go/vt/vtorc/inst/analysis_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,19 +1147,14 @@ func TestAuditInstanceAnalysisInChangelog(t *testing.T) {

// TestPostProcessAnalyses tests the functionality of the postProcessAnalyses function.
func TestPostProcessAnalyses(t *testing.T) {
ks0 := ClusterInfo{
Keyspace: "ks",
Shard: "0",
}
ks80 := ClusterInfo{
Keyspace: "ks",
Shard: "80-",
}
keyspace := "ks"
shard0 := "0"
shard80 := "80-"
clusters := map[string]*clusterAnalysis{
getKeyspaceShardName(ks0.Keyspace, ks0.Shard): {
getKeyspaceShardName(keyspace, shard0): {
totalTablets: 4,
},
getKeyspaceShardName(ks80.Keyspace, ks80.Shard): {
getKeyspaceShardName(keyspace, shard80): {
totalTablets: 3,
},
}
Expand All @@ -1173,20 +1168,23 @@ func TestPostProcessAnalyses(t *testing.T) {
name: "No processing needed",
analyses: []*ReplicationAnalysis{
{
Analysis: ReplicationStopped,
TabletType: topodatapb.TabletType_REPLICA,
LastCheckValid: true,
ClusterDetails: ks0,
Analysis: ReplicationStopped,
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
TabletType: topodatapb.TabletType_REPLICA,
LastCheckValid: true,
}, {
Analysis: ReplicaSemiSyncMustBeSet,
LastCheckValid: true,
TabletType: topodatapb.TabletType_REPLICA,
ClusterDetails: ks0,
Analysis: ReplicaSemiSyncMustBeSet,
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
LastCheckValid: true,
TabletType: topodatapb.TabletType_REPLICA,
}, {
Analysis: PrimaryHasPrimary,
LastCheckValid: true,
TabletType: topodatapb.TabletType_REPLICA,
ClusterDetails: ks0,
Analysis: PrimaryHasPrimary,
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
LastCheckValid: true,
TabletType: topodatapb.TabletType_REPLICA,
},
},
}, {
Expand All @@ -1195,60 +1193,69 @@ func TestPostProcessAnalyses(t *testing.T) {
{
Analysis: InvalidPrimary,
AnalyzedInstanceAlias: "zone1-100",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
TabletType: topodatapb.TabletType_PRIMARY,
ClusterDetails: ks0,
}, {
Analysis: NoProblem,
LastCheckValid: true,
AnalyzedInstanceAlias: "zone1-202",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard80,
TabletType: topodatapb.TabletType_RDONLY,
ClusterDetails: ks80,
}, {
Analysis: ConnectedToWrongPrimary,
LastCheckValid: true,
AnalyzedInstanceAlias: "zone1-101",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
TabletType: topodatapb.TabletType_REPLICA,
ReplicationStopped: true,
ClusterDetails: ks0,
}, {
Analysis: ReplicationStopped,
LastCheckValid: true,
AnalyzedInstanceAlias: "zone1-102",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
TabletType: topodatapb.TabletType_RDONLY,
ReplicationStopped: true,
ClusterDetails: ks0,
}, {
Analysis: InvalidReplica,
AnalyzedInstanceAlias: "zone1-108",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
TabletType: topodatapb.TabletType_REPLICA,
LastCheckValid: false,
ClusterDetails: ks0,
}, {
Analysis: NoProblem,
AnalyzedInstanceAlias: "zone1-302",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard80,
LastCheckValid: true,
TabletType: topodatapb.TabletType_REPLICA,
ClusterDetails: ks80,
},
},
want: []*ReplicationAnalysis{
{
Analysis: DeadPrimary,
AnalyzedInstanceAlias: "zone1-100",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
TabletType: topodatapb.TabletType_PRIMARY,
ClusterDetails: ks0,
}, {
Analysis: NoProblem,
LastCheckValid: true,
AnalyzedInstanceAlias: "zone1-202",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard80,
TabletType: topodatapb.TabletType_RDONLY,
ClusterDetails: ks80,
}, {
Analysis: NoProblem,
LastCheckValid: true,
AnalyzedInstanceAlias: "zone1-302",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard80,
TabletType: topodatapb.TabletType_REPLICA,
ClusterDetails: ks80,
},
},
},
Expand All @@ -1258,33 +1265,38 @@ func TestPostProcessAnalyses(t *testing.T) {
{
Analysis: InvalidPrimary,
AnalyzedInstanceAlias: "zone1-100",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
TabletType: topodatapb.TabletType_PRIMARY,
ClusterDetails: ks0,
}, {
Analysis: NoProblem,
AnalyzedInstanceAlias: "zone1-202",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard80,
LastCheckValid: true,
TabletType: topodatapb.TabletType_RDONLY,
ClusterDetails: ks80,
}, {
Analysis: NoProblem,
LastCheckValid: true,
AnalyzedInstanceAlias: "zone1-101",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
TabletType: topodatapb.TabletType_REPLICA,
ClusterDetails: ks0,
}, {
Analysis: ReplicationStopped,
LastCheckValid: true,
AnalyzedInstanceAlias: "zone1-102",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard0,
TabletType: topodatapb.TabletType_RDONLY,
ReplicationStopped: true,
ClusterDetails: ks0,
}, {
Analysis: NoProblem,
LastCheckValid: true,
AnalyzedInstanceAlias: "zone1-302",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard80,
TabletType: topodatapb.TabletType_REPLICA,
ClusterDetails: ks80,
},
},
},
Expand Down
23 changes: 0 additions & 23 deletions go/vt/vtorc/inst/cluster.go

This file was deleted.

4 changes: 2 additions & 2 deletions go/vt/vtorc/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func recheckPrimaryHealth(analysisEntry *inst.ReplicationAnalysis, discoveryFunc
// checkIfAlreadyFixed checks whether the problem that the analysis entry represents has already been fixed by another agent or not
func checkIfAlreadyFixed(analysisEntry *inst.ReplicationAnalysis) (bool, error) {
// Run a replication analysis again. We will check if the problem persisted
analysisEntries, err := inst.GetReplicationAnalysis(analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, &inst.ReplicationAnalysisHints{})
analysisEntries, err := inst.GetReplicationAnalysis(analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, &inst.ReplicationAnalysisHints{})
if err != nil {
return false, err
}
Expand Down Expand Up @@ -799,7 +799,7 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysi
_ = AuditTopologyRecovery(topologyRecovery, message)
return false, nil, err
}
logger.Infof("Analysis: %v, will elect a new primary for %v:%v", analysisEntry.Analysis, analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard)
logger.Infof("Analysis: %v, will elect a new primary for %v:%v", analysisEntry.Analysis, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard)

var promotedReplica *inst.Instance
// This has to be done in the end; whether successful or not, we should mark that the recovery is done.
Expand Down
16 changes: 8 additions & 8 deletions go/vt/vtorc/logic/topology_recovery_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func InsertRecoveryDetection(analysisEntry *inst.ReplicationAnalysis) error {
)`,
analysisEntry.AnalyzedInstanceAlias,
string(analysisEntry.Analysis),
analysisEntry.ClusterDetails.Keyspace,
analysisEntry.ClusterDetails.Shard,
analysisEntry.AnalyzedKeyspace,
analysisEntry.AnalyzedShard,
)
if err != nil {
log.Error(err)
Expand Down Expand Up @@ -85,8 +85,8 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover
sqlutils.NilIfZero(topologyRecovery.ID),
analysisEntry.AnalyzedInstanceAlias,
string(analysisEntry.Analysis),
analysisEntry.ClusterDetails.Keyspace,
analysisEntry.ClusterDetails.Shard,
analysisEntry.AnalyzedKeyspace,
analysisEntry.AnalyzedShard,
analysisEntry.AnalyzedInstanceAlias,
analysisEntry.RecoveryId,
)
Expand All @@ -111,13 +111,13 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover
// AttemptRecoveryRegistration tries to add a recovery entry; if this fails that means recovery is already in place.
func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis) (*TopologyRecovery, error) {
// Check if there is an active recovery in progress for the cluster of the given instance.
recoveries, err := ReadActiveClusterRecoveries(analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard)
recoveries, err := ReadActiveClusterRecoveries(analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard)
if err != nil {
log.Error(err)
return nil, err
}
if len(recoveries) > 0 {
errMsg := fmt.Sprintf("AttemptRecoveryRegistration: Active recovery (id:%v) in the cluster %s:%s for %s", recoveries[0].ID, analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, recoveries[0].AnalysisEntry.Analysis)
errMsg := fmt.Sprintf("AttemptRecoveryRegistration: Active recovery (id:%v) in the cluster %s:%s for %s", recoveries[0].ID, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, recoveries[0].AnalysisEntry.Analysis)
log.Errorf(errMsg)
return nil, errors.New(errMsg)
}
Expand Down Expand Up @@ -189,8 +189,8 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog

topologyRecovery.AnalysisEntry.AnalyzedInstanceAlias = m.GetString("alias")
topologyRecovery.AnalysisEntry.Analysis = inst.AnalysisCode(m.GetString("analysis"))
topologyRecovery.AnalysisEntry.ClusterDetails.Keyspace = m.GetString("keyspace")
topologyRecovery.AnalysisEntry.ClusterDetails.Shard = m.GetString("shard")
topologyRecovery.AnalysisEntry.AnalyzedKeyspace = m.GetString("keyspace")
topologyRecovery.AnalysisEntry.AnalyzedShard = m.GetString("shard")

topologyRecovery.SuccessorAlias = m.GetString("successor_alias")

Expand Down
18 changes: 6 additions & 12 deletions go/vt/vtorc/logic/topology_recovery_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,10 @@ func TestTopologyRecovery(t *testing.T) {
replicationAnalysis := inst.ReplicationAnalysis{
AnalyzedInstanceAlias: "zone1-0000000101",
TabletType: tab101.Type,
ClusterDetails: inst.ClusterInfo{
Keyspace: keyspace,
Shard: shard,
},
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard,
Analysis: inst.ReplicaIsWritable,
IsReadOnly: false,
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard,
Analysis: inst.ReplicaIsWritable,
IsReadOnly: false,
}
topologyRecovery := NewTopologyRecovery(replicationAnalysis)

Expand Down Expand Up @@ -144,11 +140,9 @@ func TestInsertRecoveryDetection(t *testing.T) {
}()
ra := &inst.ReplicationAnalysis{
AnalyzedInstanceAlias: "alias-1",
AnalyzedKeyspace: keyspace,
AnalyzedShard: shard,
Analysis: inst.ClusterHasNoPrimary,
ClusterDetails: inst.ClusterInfo{
Keyspace: keyspace,
Shard: shard,
},
}
err := InsertRecoveryDetection(ra)
require.NoError(t, err)
Expand Down
Loading