diff --git a/go/vt/orchestrator/inst/analysis.go b/go/vt/orchestrator/inst/analysis.go index efb6a8a0fcd..c65e86d14e9 100644 --- a/go/vt/orchestrator/inst/analysis.go +++ b/go/vt/orchestrator/inst/analysis.go @@ -122,6 +122,7 @@ type ReplicationAnalysis struct { AnalyzedInstanceDataCenter string AnalyzedInstanceRegion string AnalyzedKeyspace string + AnalyzedShard string AnalyzedInstancePhysicalEnvironment string AnalyzedInstanceBinlogCoordinates BinlogCoordinates IsPrimary bool diff --git a/go/vt/orchestrator/inst/analysis_dao.go b/go/vt/orchestrator/inst/analysis_dao.go index cebcb09ffc0..8244a349812 100644 --- a/go/vt/orchestrator/inst/analysis_dao.go +++ b/go/vt/orchestrator/inst/analysis_dao.go @@ -76,6 +76,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) vitess_tablet.port, vitess_tablet.tablet_type, vitess_tablet.primary_timestamp, + vitess_tablet.shard AS shard, vitess_keyspace.keyspace AS keyspace, vitess_keyspace.keyspace_type AS keyspace_type, vitess_keyspace.durability_policy AS durability_policy, @@ -377,6 +378,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) a.TabletType = tablet.Type a.AnalyzedKeyspace = m.GetString("keyspace") + a.AnalyzedShard = m.GetString("shard") a.PrimaryTimeStamp = m.GetTime("primary_timestamp") if keyspaceType := topodatapb.KeyspaceType(m.GetInt("keyspace_type")); keyspaceType == topodatapb.KeyspaceType_SNAPSHOT { diff --git a/go/vt/orchestrator/inst/analysis_dao_test.go b/go/vt/orchestrator/inst/analysis_dao_test.go index 7d9ad70291e..9ebe6f42348 100644 --- a/go/vt/orchestrator/inst/analysis_dao_test.go +++ b/go/vt/orchestrator/inst/analysis_dao_test.go @@ -29,10 +29,12 @@ import ( func TestGetReplicationAnalysis(t *testing.T) { tests := []struct { - name string - info []*test.InfoForRecoveryAnalysis - codeWanted AnalysisCode - wantErr string + name string + info []*test.InfoForRecoveryAnalysis + codeWanted AnalysisCode + shardWanted string + keyspaceWanted string + wantErr string }{ { name: "ClusterHasNoPrimary", @@ -49,7 +51,9 @@ func TestGetReplicationAnalysis(t *testing.T) { DurabilityPolicy: "none", LastCheckValid: 1, }}, - codeWanted: ClusterHasNoPrimary, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: ClusterHasNoPrimary, }, { name: "DeadPrimary", info: []*test.InfoForRecoveryAnalysis{{ @@ -69,7 +73,9 @@ func TestGetReplicationAnalysis(t *testing.T) { CountValidReplicatingReplicas: 0, IsPrimary: 1, }}, - codeWanted: DeadPrimary, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: DeadPrimary, }, { name: "DeadPrimaryWithoutReplicas", info: []*test.InfoForRecoveryAnalysis{{ @@ -87,7 +93,9 @@ func TestGetReplicationAnalysis(t *testing.T) { CountReplicas: 0, IsPrimary: 1, }}, - codeWanted: DeadPrimaryWithoutReplicas, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: DeadPrimaryWithoutReplicas, }, { name: "DeadPrimaryAndReplicas", info: []*test.InfoForRecoveryAnalysis{{ @@ -105,7 +113,9 @@ func TestGetReplicationAnalysis(t *testing.T) { CountReplicas: 3, IsPrimary: 1, }}, - codeWanted: DeadPrimaryAndReplicas, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: DeadPrimaryAndReplicas, }, { name: "DeadPrimaryAndSomeReplicas", info: []*test.InfoForRecoveryAnalysis{{ @@ -125,7 +135,9 @@ func TestGetReplicationAnalysis(t *testing.T) { CountValidReplicatingReplicas: 0, IsPrimary: 1, }}, - codeWanted: DeadPrimaryAndSomeReplicas, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: DeadPrimaryAndSomeReplicas, }, { name: "PrimaryHasPrimary", info: []*test.InfoForRecoveryAnalysis{{ @@ -144,7 +156,9 @@ func TestGetReplicationAnalysis(t *testing.T) { CountValidReplicas: 4, IsPrimary: 0, }}, - codeWanted: PrimaryHasPrimary, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: PrimaryHasPrimary, }, { name: "PrimaryIsReadOnly", info: []*test.InfoForRecoveryAnalysis{{ @@ -164,7 +178,9 @@ func TestGetReplicationAnalysis(t *testing.T) { IsPrimary: 1, ReadOnly: 1, }}, - codeWanted: PrimaryIsReadOnly, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: PrimaryIsReadOnly, }, { name: "PrimarySemiSyncMustNotBeSet", info: []*test.InfoForRecoveryAnalysis{{ @@ -184,7 +200,9 @@ func TestGetReplicationAnalysis(t *testing.T) { IsPrimary: 1, SemiSyncPrimaryEnabled: 1, }}, - codeWanted: PrimarySemiSyncMustNotBeSet, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: PrimarySemiSyncMustNotBeSet, }, { name: "PrimarySemiSyncMustBeSet", info: []*test.InfoForRecoveryAnalysis{{ @@ -204,7 +222,9 @@ func TestGetReplicationAnalysis(t *testing.T) { IsPrimary: 1, SemiSyncPrimaryEnabled: 0, }}, - codeWanted: PrimarySemiSyncMustBeSet, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: PrimarySemiSyncMustBeSet, }, { name: "NotConnectedToPrimary", info: []*test.InfoForRecoveryAnalysis{{ @@ -239,7 +259,9 @@ func TestGetReplicationAnalysis(t *testing.T) { ReadOnly: 1, IsPrimary: 1, }}, - codeWanted: NotConnectedToPrimary, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: NotConnectedToPrimary, }, { name: "ReplicaIsWritable", info: []*test.InfoForRecoveryAnalysis{{ @@ -276,7 +298,9 @@ func TestGetReplicationAnalysis(t *testing.T) { LastCheckValid: 1, ReadOnly: 0, }}, - codeWanted: ReplicaIsWritable, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: ReplicaIsWritable, }, { name: "ConnectedToWrongPrimary", info: []*test.InfoForRecoveryAnalysis{{ @@ -313,7 +337,9 @@ func TestGetReplicationAnalysis(t *testing.T) { LastCheckValid: 1, ReadOnly: 1, }}, - codeWanted: ConnectedToWrongPrimary, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: ConnectedToWrongPrimary, }, { name: "ReplicationStopped", info: []*test.InfoForRecoveryAnalysis{{ @@ -351,7 +377,9 @@ func TestGetReplicationAnalysis(t *testing.T) { ReadOnly: 1, ReplicationStopped: 1, }}, - codeWanted: ReplicationStopped, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: ReplicationStopped, }, { name: "ReplicaSemiSyncMustBeSet", @@ -400,7 +428,9 @@ func TestGetReplicationAnalysis(t *testing.T) { ReadOnly: 1, SemiSyncReplicaEnabled: 0, }}, - codeWanted: ReplicaSemiSyncMustBeSet, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: ReplicaSemiSyncMustBeSet, }, { name: "ReplicaSemiSyncMustNotBeSet", info: []*test.InfoForRecoveryAnalysis{{ @@ -447,7 +477,9 @@ func TestGetReplicationAnalysis(t *testing.T) { ReadOnly: 1, SemiSyncReplicaEnabled: 1, }}, - codeWanted: ReplicaSemiSyncMustNotBeSet, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: ReplicaSemiSyncMustNotBeSet, }, { name: "SnapshotKeyspace", info: []*test.InfoForRecoveryAnalysis{{ @@ -465,7 +497,9 @@ func TestGetReplicationAnalysis(t *testing.T) { DurabilityPolicy: "none", LastCheckValid: 1, }}, - codeWanted: NoProblem, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: NoProblem, }, { name: "EmptyDurabilityPolicy", info: []*test.InfoForRecoveryAnalysis{{ @@ -481,7 +515,9 @@ func TestGetReplicationAnalysis(t *testing.T) { LastCheckValid: 1, }}, // We will ignore these keyspaces too until the durability policy is set in the topo server - codeWanted: NoProblem, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: NoProblem, }, } for _, tt := range tests { @@ -505,6 +541,8 @@ func TestGetReplicationAnalysis(t *testing.T) { } require.Len(t, got, 1) require.Equal(t, tt.codeWanted, got[0].Analysis) + require.Equal(t, tt.keyspaceWanted, got[0].AnalyzedKeyspace) + require.Equal(t, tt.shardWanted, got[0].AnalyzedShard) }) } } diff --git a/go/vt/orchestrator/logic/orchestrator.go b/go/vt/orchestrator/logic/orchestrator.go index daa3c46da0e..7942a2bd169 100644 --- a/go/vt/orchestrator/logic/orchestrator.go +++ b/go/vt/orchestrator/logic/orchestrator.go @@ -256,6 +256,10 @@ func DiscoverInstance(instanceKey inst.InstanceKey, forceDiscovery bool) { return } + if forceDiscovery { + log.Infof("Force discovered - %+v", instance) + } + discoveryMetrics.Append(&discovery.Metric{ Timestamp: time.Now(), InstanceKey: instanceKey, @@ -452,7 +456,7 @@ func ContinuousDiscovery() { }() case <-tabletTopoTick: go RefreshAllKeyspaces() - go RefreshTablets(false /* forceRefresh */) + go refreshAllTablets() } } } diff --git a/go/vt/orchestrator/logic/tablet_discovery.go b/go/vt/orchestrator/logic/tablet_discovery.go index 3937ab253bb..3d31a52b8b1 100644 --- a/go/vt/orchestrator/logic/tablet_discovery.go +++ b/go/vt/orchestrator/logic/tablet_discovery.go @@ -20,7 +20,6 @@ import ( "context" "errors" "flag" - "fmt" "strings" "sync" "sync/atomic" @@ -48,6 +47,8 @@ var ( clustersToWatch = flag.String("clusters_to_watch", "", "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") shutdownWaitTime = flag.Duration("shutdown_wait_time", 30*time.Second, "maximum time to wait for vtorc to release all the locks that it is holding before shutting down on SIGTERM") shardsLockCounter int32 + // ErrNoPrimaryTablet is a fixed error message. + ErrNoPrimaryTablet = errors.New("no primary tablet found") ) // OpenTabletDiscovery opens the vitess topo if enables and returns a ticker @@ -69,11 +70,11 @@ func OpenTabletDiscovery() <-chan time.Time { return time.Tick(15 * time.Second) //nolint SA1015: using time.Tick leaks the underlying ticker } -// RefreshTablets reloads the tablets from topo. -func RefreshTablets(forceRefresh bool) { +// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while +func refreshAllTablets() { refreshTabletsUsing(func(instanceKey *inst.InstanceKey) { - DiscoverInstance(*instanceKey, forceRefresh) - }, forceRefresh) + DiscoverInstance(*instanceKey, false /* forceDiscovery */) + }, false /* forceRefresh */) } func refreshTabletsUsing(loader func(instanceKey *inst.InstanceKey), forceRefresh bool) { @@ -157,6 +158,28 @@ func refreshTabletsInCell(ctx context.Context, cell string, loader func(instance refreshTablets(tablets, query, args, loader, forceRefresh) } +// forceRefreshAllTabletsInShard is used to refresh all the tablet's information (both MySQL information and topo records) +// for a given shard. This function is meant to be called before or after a cluster-wide operation that we know will +// change the replication information for the entire cluster drastically enough to warrant a full forceful refresh +func forceRefreshAllTabletsInShard(ctx context.Context, keyspace, shard string) { + log.Infof("force refresh of all tablets in shard - %v/%v", keyspace, shard) + refreshCtx, refreshCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer refreshCancel() + refreshTabletsInKeyspaceShard(refreshCtx, keyspace, shard, func(instanceKey *inst.InstanceKey) { + DiscoverInstance(*instanceKey, true) + }, true) +} + +// refreshTabletInfoOfShard only refreshes the tablet records from the topo-server for all the tablets +// of the given keyspace-shard. +func refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) { + log.Infof("refresh of tablet records of shard - %v/%v", keyspace, shard) + refreshTabletsInKeyspaceShard(ctx, keyspace, shard, func(instanceKey *inst.InstanceKey) { + // No-op + // We only want to refresh the tablet information for the given shard + }, false) +} + func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(instanceKey *inst.InstanceKey), forceRefresh bool) { tablets, err := ts.GetTabletMapForShard(ctx, keyspace, shard) if err != nil { @@ -285,21 +308,32 @@ func setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, prima return tmc.SetReplicationSource(ctx, replica, primary.Alias, 0, "", true, semiSync) } -// shardPrimary finds the primary of the given keyspace-shard by reading the topo server -func shardPrimary(ctx context.Context, keyspace string, shard string) (primary *topodatapb.Tablet, err error) { - si, err := ts.GetShard(ctx, keyspace, shard) - if err != nil { - return nil, err - } - if !si.HasPrimary() { - return nil, fmt.Errorf("no primary tablet for shard %v/%v", keyspace, shard) - } - // TODO(GuptaManan100): Instead of another topo call, use the local information by calling - // ReadTablet. Currently this isn't possible since we only have the primary alias and not the source host and port - // This should be fixed once the tablet alias is changed to be the primary key of the table - primaryInfo, err := ts.GetTablet(ctx, si.PrimaryAlias) - if err != nil { - return nil, err +// shardPrimary finds the primary of the given keyspace-shard by reading the orchestrator backend +func shardPrimary(keyspace string, shard string) (primary *topodatapb.Tablet, err error) { + query := `SELECT + info, + hostname, + port, + tablet_type, + primary_timestamp + FROM + vitess_tablet + WHERE + keyspace = ? AND shard = ? + AND tablet_type = ? + ORDER BY + primary_timestamp DESC + LIMIT 1 +` + err = db.Db.QueryOrchestrator(query, sqlutils.Args(keyspace, shard, topodatapb.TabletType_PRIMARY), func(m sqlutils.RowMap) error { + if primary == nil { + primary = &topodatapb.Tablet{} + return prototext.Unmarshal([]byte(m.GetString("info")), primary) + } + return nil + }) + if primary == nil && err == nil { + err = ErrNoPrimaryTablet } - return primaryInfo.Tablet, nil + return primary, err } diff --git a/go/vt/orchestrator/logic/tablet_discovery_test.go b/go/vt/orchestrator/logic/tablet_discovery_test.go new file mode 100644 index 00000000000..5c48db45bd8 --- /dev/null +++ b/go/vt/orchestrator/logic/tablet_discovery_test.go @@ -0,0 +1,256 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logic + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "vitess.io/vitess/go/vt/orchestrator/db" + "vitess.io/vitess/go/vt/orchestrator/inst" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vttime" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +var ( + keyspace = "ks" + shard = "0" + hostname = "localhost" + cell1 = "zone-1" + tab100 = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell1, + Uid: 100, + }, + Hostname: hostname, + Keyspace: keyspace, + Shard: shard, + Type: topodatapb.TabletType_PRIMARY, + MysqlHostname: hostname, + MysqlPort: 100, + PrimaryTermStartTime: &vttime.Time{ + Seconds: 15, + }, + } + tab101 = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell1, + Uid: 101, + }, + Hostname: hostname, + Keyspace: keyspace, + Shard: shard, + Type: topodatapb.TabletType_REPLICA, + MysqlHostname: hostname, + MysqlPort: 101, + } + tab102 = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell1, + Uid: 102, + }, + Hostname: hostname, + Keyspace: keyspace, + Shard: shard, + Type: topodatapb.TabletType_RDONLY, + MysqlHostname: hostname, + MysqlPort: 102, + } + tab103 = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell1, + Uid: 103, + }, + Hostname: hostname, + Keyspace: keyspace, + Shard: shard, + Type: topodatapb.TabletType_PRIMARY, + MysqlHostname: hostname, + MysqlPort: 103, + PrimaryTermStartTime: &vttime.Time{ + // Higher time than tab100 + Seconds: 3500, + }, + } +) + +func TestRefreshTabletsInKeyspaceShard(t *testing.T) { + // Store the old flags and restore on test completion + oldTs := ts + defer func() { + ts = oldTs + }() + + // Open the orchestrator + // After the test completes delete everything from the vitess_tablet table + orcDb, err := db.OpenOrchestrator() + require.NoError(t, err) + defer func() { + _, err = orcDb.Exec("delete from vitess_tablet") + require.NoError(t, err) + }() + + // Create a memory topo-server and create the keyspace and shard records + ts = memorytopo.NewServer(cell1) + _, err = ts.GetOrCreateShard(context.Background(), keyspace, shard) + require.NoError(t, err) + + // Add tablets to the topo-server + tablets := []*topodatapb.Tablet{tab100, tab101, tab102} + for _, tablet := range tablets { + err := ts.CreateTablet(context.Background(), tablet) + require.NoError(t, err) + } + + t.Run("initial call to refreshTabletsInKeyspaceShard", func(t *testing.T) { + // We expect all 3 tablets to be refreshed since they are being discovered for the first time + verifyRefreshTabletsInKeyspaceShard(t, false, 3, tablets) + }) + + t.Run("call refreshTabletsInKeyspaceShard again - no force refresh", func(t *testing.T) { + // We expect no tablets to be refreshed since they are all already upto date + verifyRefreshTabletsInKeyspaceShard(t, false, 0, tablets) + }) + + t.Run("call refreshTabletsInKeyspaceShard again - force refresh", func(t *testing.T) { + // We expect all 3 tablets to be refreshed since we requested force refresh + verifyRefreshTabletsInKeyspaceShard(t, true, 3, tablets) + }) + + t.Run("change a tablet and call refreshTabletsInKeyspaceShard again", func(t *testing.T) { + startTimeInitially := tab100.PrimaryTermStartTime.Seconds + defer func() { + tab100.PrimaryTermStartTime.Seconds = startTimeInitially + }() + tab100.PrimaryTermStartTime.Seconds = 1000 + ts.UpdateTabletFields(context.Background(), tab100.Alias, func(tablet *topodatapb.Tablet) error { + tablet.PrimaryTermStartTime.Seconds = 1000 + return nil + }) + // We expect 1 tablet to be refreshed since that is the only one that has changed + verifyRefreshTabletsInKeyspaceShard(t, false, 1, tablets) + }) +} + +func TestShardPrimary(t *testing.T) { + testcases := []*struct { + name string + tablets []*topodatapb.Tablet + expectedPrimary *topodatapb.Tablet + expectedErr string + }{ + { + name: "One primary type tablet", + tablets: []*topodatapb.Tablet{tab100, tab101, tab102}, + expectedPrimary: tab100, + }, { + name: "Two primary type tablets", + tablets: []*topodatapb.Tablet{tab100, tab101, tab102, tab103}, + // In this case we expect the tablet with higher PrimaryTermStartTime to be the primary tablet + expectedPrimary: tab103, + }, { + name: "No primary type tablets", + tablets: []*topodatapb.Tablet{tab101, tab102}, + expectedErr: "no primary tablet found", + }, + } + + oldTs := ts + defer func() { + ts = oldTs + }() + + // Open the orchestrator + // After the test completes delete everything from the vitess_tablet table + orcDb, err := db.OpenOrchestrator() + require.NoError(t, err) + defer func() { + _, err = orcDb.Exec("delete from vitess_tablet") + require.NoError(t, err) + }() + + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + _, err = orcDb.Exec("delete from vitess_tablet") + + // Create a memory topo-server and create the keyspace and shard records + ts = memorytopo.NewServer(cell1) + _, err = ts.GetOrCreateShard(context.Background(), keyspace, shard) + require.NoError(t, err) + + // Add tablets to the topo-server + for _, tablet := range testcase.tablets { + err := ts.CreateTablet(context.Background(), tablet) + require.NoError(t, err) + } + + // refresh the tablet info so that they are stored in the orch backend + verifyRefreshTabletsInKeyspaceShard(t, false, len(testcase.tablets), testcase.tablets) + + primary, err := shardPrimary(keyspace, shard) + if testcase.expectedErr != "" { + assert.Contains(t, err.Error(), testcase.expectedErr) + assert.Nil(t, primary) + } else { + assert.NoError(t, err) + diff := cmp.Diff(primary, testcase.expectedPrimary, cmp.Comparer(proto.Equal)) + assert.Empty(t, diff) + } + }) + } +} + +// verifyRefreshTabletsInKeyspaceShard calls refreshTabletsInKeyspaceShard with the forceRefresh parameter provided and verifies that +// the number of instances refreshed matches the parameter and all the tablets match the ones provided +func verifyRefreshTabletsInKeyspaceShard(t *testing.T, forceRefresh bool, instanceRefreshRequired int, tablets []*topodatapb.Tablet) { + instancesRefreshed := 0 + // call refreshTabletsInKeyspaceShard while counting all the instances that are refreshed + refreshTabletsInKeyspaceShard(context.Background(), keyspace, shard, func(instanceKey *inst.InstanceKey) { + instancesRefreshed++ + }, forceRefresh) + // Verify that all the tablets are present in the database + for _, tablet := range tablets { + verifyTabletInfo(t, tablet, "") + } + // Verify that refresh as many tablets as expected + assert.EqualValues(t, instanceRefreshRequired, instancesRefreshed) +} + +// verifyTabletInfo verifies that the tablet information read from the orchestrator database +// is the same as the one provided or reading it gives the same error as expected +func verifyTabletInfo(t *testing.T, tabletWanted *topodatapb.Tablet, errString string) { + t.Helper() + tabletKey := inst.InstanceKey{ + Hostname: hostname, + Port: int(tabletWanted.MysqlPort), + } + tablet, err := inst.ReadTablet(tabletKey) + if errString != "" { + assert.EqualError(t, err, errString) + } else { + assert.NoError(t, err) + assert.EqualValues(t, tabletKey.Port, tablet.MysqlPort) + diff := cmp.Diff(tablet, tabletWanted, cmp.Comparer(proto.Equal)) + assert.Empty(t, diff) + } +} diff --git a/go/vt/orchestrator/logic/topology_recovery.go b/go/vt/orchestrator/logic/topology_recovery.go index d243d7df733..7bddbe75fcc 100644 --- a/go/vt/orchestrator/logic/topology_recovery.go +++ b/go/vt/orchestrator/logic/topology_recovery.go @@ -60,7 +60,8 @@ const ( ) // recoveryFunction is the code of the recovery function to be used -// this is returned from getCheckAndRecoverFunction to compare the functions returned +// this is returned from getCheckAndRecoverFunctionCode to compare the functions returned +// Each recoveryFunction is one to one mapped to a corresponding recovery. type recoveryFunction int const ( @@ -632,8 +633,6 @@ func recoverDeadPrimary(ctx context.Context, analysisEntry inst.ReplicationAnaly }, ) - // We should refresh the tablet information again to update our information. - RefreshTablets(true /* forceRefresh */) if ev != nil && ev.NewPrimary != nil { promotedReplica, _, _ = inst.ReadInstance(&inst.InstanceKey{ Hostname: ev.NewPrimary.MysqlHostname, @@ -810,60 +809,114 @@ func checkAndExecuteFailureDetectionProcesses(analysisEntry inst.ReplicationAnal return true, true, err } -// getCheckAndRecoverFunction gets the recovery function to use for the given analysis. -// It also returns a recoveryFunction which is supposed to be unique for each function that we return. -// It is used for checking the equality of the returned function. -func getCheckAndRecoverFunction(analysisCode inst.AnalysisCode, analyzedInstanceKey *inst.InstanceKey) ( - checkAndRecoverFunction func(ctx context.Context, analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error), - recoverFunctionCode recoveryFunction, - isActionableRecovery bool, -) { +// getCheckAndRecoverFunctionCode gets the recovery function code to use for the given analysis. +func getCheckAndRecoverFunctionCode(analysisCode inst.AnalysisCode, analyzedInstanceKey *inst.InstanceKey) recoveryFunction { switch analysisCode { // primary case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas: if isInEmergencyOperationGracefulPeriod(analyzedInstanceKey) { - return checkAndRecoverGenericProblem, recoverGenericProblemFunc, false + return recoverGenericProblemFunc } - return recoverDeadPrimary, recoverDeadPrimaryFunc, true + return recoverDeadPrimaryFunc case inst.PrimaryHasPrimary: - return recoverPrimaryHasPrimary, recoverPrimaryHasPrimaryFunc, true + return recoverPrimaryHasPrimaryFunc case inst.LockedSemiSyncPrimary: if isInEmergencyOperationGracefulPeriod(analyzedInstanceKey) { - return checkAndRecoverGenericProblem, recoverGenericProblemFunc, false + return recoverGenericProblemFunc } - return checkAndRecoverLockedSemiSyncPrimary, recoverLockedSemiSyncPrimaryFunc, true + return recoverLockedSemiSyncPrimaryFunc case inst.ClusterHasNoPrimary: - return electNewPrimary, electNewPrimaryFunc, true + return electNewPrimaryFunc case inst.PrimaryIsReadOnly, inst.PrimarySemiSyncMustBeSet, inst.PrimarySemiSyncMustNotBeSet: - return fixPrimary, fixPrimaryFunc, true + return fixPrimaryFunc // replica case inst.NotConnectedToPrimary, inst.ConnectedToWrongPrimary, inst.ReplicationStopped, inst.ReplicaIsWritable, inst.ReplicaSemiSyncMustBeSet, inst.ReplicaSemiSyncMustNotBeSet: - return fixReplica, fixReplicaFunc, true + return fixReplicaFunc // primary, non actionable case inst.DeadPrimaryAndReplicas: - return checkAndRecoverGenericProblem, recoverGenericProblemFunc, false + return recoverGenericProblemFunc case inst.UnreachablePrimary: - return checkAndRecoverGenericProblem, recoverGenericProblemFunc, false + return recoverGenericProblemFunc case inst.UnreachablePrimaryWithLaggingReplicas: - return checkAndRecoverGenericProblem, recoverGenericProblemFunc, false + return recoverGenericProblemFunc case inst.AllPrimaryReplicasNotReplicating: - return checkAndRecoverGenericProblem, recoverGenericProblemFunc, false + return recoverGenericProblemFunc case inst.AllPrimaryReplicasNotReplicatingOrDead: - return checkAndRecoverGenericProblem, recoverGenericProblemFunc, false + return recoverGenericProblemFunc } // Right now this is mostly causing noise with no clear action. // Will revisit this in the future. // case inst.AllPrimaryReplicasStale: - // return checkAndRecoverGenericProblem, recoverGenericProblemFunc, false + // return recoverGenericProblemFunc + + return noRecoveryFunc +} - return nil, noRecoveryFunc, false +// hasActionableRecovery tells if a recoveryFunction has an actionable recovery or not +func hasActionableRecovery(recoveryFunctionCode recoveryFunction) bool { + switch recoveryFunctionCode { + case noRecoveryFunc: + return false + case recoverGenericProblemFunc: + return false + case recoverDeadPrimaryFunc: + return true + case recoverPrimaryHasPrimaryFunc: + return true + case recoverLockedSemiSyncPrimaryFunc: + return true + case electNewPrimaryFunc: + return true + case fixPrimaryFunc: + return true + case fixReplicaFunc: + return true + default: + return false + } +} + +// getCheckAndRecoverFunction gets the recovery function for the given code. +func getCheckAndRecoverFunction(recoveryFunctionCode recoveryFunction) ( + checkAndRecoverFunction func(ctx context.Context, analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error), +) { + switch recoveryFunctionCode { + case noRecoveryFunc: + return nil + case recoverGenericProblemFunc: + return checkAndRecoverGenericProblem + case recoverDeadPrimaryFunc: + return recoverDeadPrimary + case recoverPrimaryHasPrimaryFunc: + return recoverPrimaryHasPrimary + case recoverLockedSemiSyncPrimaryFunc: + return checkAndRecoverLockedSemiSyncPrimary + case electNewPrimaryFunc: + return electNewPrimary + case fixPrimaryFunc: + return fixPrimary + case fixReplicaFunc: + return fixReplica + default: + return nil + } +} + +// isClusterWideRecovery returns whether the given recovery is a cluster-wide recovery or not +func isClusterWideRecovery(recoveryFunctionCode recoveryFunction) bool { + switch recoveryFunctionCode { + case recoverDeadPrimaryFunc, electNewPrimaryFunc: + return true + default: + return false + } } // analysisEntriesHaveSameRecovery tells whether the two analysis entries have the same recovery function or not func analysisEntriesHaveSameRecovery(prevAnalysis, newAnalysis inst.ReplicationAnalysis) bool { - _, prevRecoveryFunctionCode, _ := getCheckAndRecoverFunction(prevAnalysis.Analysis, &prevAnalysis.AnalyzedInstanceKey) - _, newRecoveryFunctionCode, _ := getCheckAndRecoverFunction(newAnalysis.Analysis, &newAnalysis.AnalyzedInstanceKey) + prevRecoveryFunctionCode := getCheckAndRecoverFunctionCode(prevAnalysis.Analysis, &prevAnalysis.AnalyzedInstanceKey) + newRecoveryFunctionCode := getCheckAndRecoverFunctionCode(newAnalysis.Analysis, &newAnalysis.AnalyzedInstanceKey) return prevRecoveryFunctionCode == newRecoveryFunctionCode } @@ -892,11 +945,12 @@ func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, cand atomic.AddInt64(&countPendingRecoveries, 1) defer atomic.AddInt64(&countPendingRecoveries, -1) - checkAndRecoverFunction, _, isActionableRecovery := getCheckAndRecoverFunction(analysisEntry.Analysis, &analysisEntry.AnalyzedInstanceKey) + checkAndRecoverFunctionCode := getCheckAndRecoverFunctionCode(analysisEntry.Analysis, &analysisEntry.AnalyzedInstanceKey) + isActionableRecovery := hasActionableRecovery(checkAndRecoverFunctionCode) analysisEntry.IsActionableRecovery = isActionableRecovery runEmergentOperations(&analysisEntry) - if checkAndRecoverFunction == nil { + if checkAndRecoverFunctionCode == noRecoveryFunc { // Unhandled problem type if analysisEntry.Analysis != inst.NoProblem { if util.ClearToLog("executeCheckAndRecoverFunction", analysisEntry.AnalyzedInstanceKey.StringCode()) { @@ -954,12 +1008,43 @@ func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, cand // changes, we should be checking that this failure is indeed needed to be fixed. We do this after locking the shard to be sure // that the data that we use now is up-to-date. if isActionableRecovery { + // The first step we have to do is refresh the keyspace information + // This is required to know if the durability policies have changed or not + // If they have, then recoveries like ReplicaSemiSyncMustNotBeSet, etc won't be valid anymore err := RefreshKeyspace(analysisEntry.AnalyzedKeyspace) if err != nil { return false, nil, err } - // TODO (@GuptaManan100): Refresh only the shard tablet information instead of all the tablets - RefreshTablets(true /* forceRefresh */) + // If we are about to run a cluster-wide recovery, it is imperative to first refresh all the tablets + // of a shard because a new tablet could have been promoted, and we need to have this visibility before we + // run a cluster operation of our own. + if isClusterWideRecovery(checkAndRecoverFunctionCode) { + forceRefreshAllTabletsInShard(ctx, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) + } else { + // If we are not running a cluster-wide recovery, then it is only concerned with the specific tablet + // on which the failure occurred and the primary instance of the shard. + // For example, ConnectedToWrongPrimary analysis only cares for whom the current primary tablet is + // and the host-port set on the tablet in question. + // So, we only need to refresh the tablet info records (to know if the primary tablet has changed), + // and the replication data of the new primary and this tablet. + refreshTabletInfoOfShard(ctx, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) + DiscoverInstance(analysisEntry.AnalyzedInstanceKey, true) + primaryTablet, err := shardPrimary(analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) + if err != nil { + log.Errorf("executeCheckAndRecoverFunction: Analysis: %+v, InstanceKey: %+v, candidateInstanceKey: %+v, "+"skipProcesses: %v: error while finding the shard primary: %v", + analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey, candidateInstanceKey, skipProcesses, err) + return false, nil, err + } + primaryInstanceKey := inst.InstanceKey{ + Hostname: primaryTablet.MysqlHostname, + Port: int(primaryTablet.MysqlPort), + } + // We can skip the refresh if we know the tablet we are looking at is the primary tablet. + // This would be the case for PrimaryHasPrimary recovery. We don't need to refresh the same tablet twice. + if !analysisEntry.AnalyzedInstanceKey.Equals(&primaryInstanceKey) { + DiscoverInstance(primaryInstanceKey, true) + } + } alreadyFixed, err := checkIfAlreadyFixed(analysisEntry) if err != nil { log.Errorf("executeCheckAndRecoverFunction: Analysis: %+v, InstanceKey: %+v, candidateInstanceKey: %+v, "+"skipProcesses: %v: error while trying to find if the problem is already fixed: %v", @@ -976,7 +1061,7 @@ func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, cand if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: recovery", analysisEntry.AnalyzedInstanceKey.StringCode()) { log.Infof("executeCheckAndRecoverFunction: proceeding with %+v recovery on %+v; isRecoverable?: %+v; skipProcesses: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey, isActionableRecovery, skipProcesses) } - recoveryAttempted, topologyRecovery, err = checkAndRecoverFunction(ctx, analysisEntry, candidateInstanceKey, forceInstanceRecovery, skipProcesses) + recoveryAttempted, topologyRecovery, err = getCheckAndRecoverFunction(checkAndRecoverFunctionCode)(ctx, analysisEntry, candidateInstanceKey, forceInstanceRecovery, skipProcesses) if !recoveryAttempted { return recoveryAttempted, topologyRecovery, err } @@ -988,6 +1073,18 @@ func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, cand } else { log.Infof("Topology recovery: %+v", topologyRecovery) } + // If we ran a cluster wide recovery and actually attemped it, then we know that the replication state for all the tablets in this cluster + // would have changed. So we can go ahead and pre-emptively refresh them. + // For this refresh we don't use the same context that we used for the recovery, since that context might have expired or could expire soon + // Instead we pass the background context. The call forceRefreshAllTabletsInShard handles adding a timeout to it for us. + if isClusterWideRecovery(checkAndRecoverFunctionCode) { + forceRefreshAllTabletsInShard(context.Background(), analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) + } else { + // For all other recoveries, we would have changed the replication status of the analyzed tablet + // so it doesn't hurt to re-read the information of this tablet, otherwise we'll requeue the same recovery + // that we just completed because we would be using stale data. + DiscoverInstance(analysisEntry.AnalyzedInstanceKey, true) + } if !skipProcesses { if topologyRecovery.SuccessorKey == nil { // Execute general unsuccessful post failover processes @@ -1010,10 +1107,7 @@ func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, cand // checkIfAlreadyFixed checks whether the problem that the analysis entry represents has already been fixed by another agent or not func checkIfAlreadyFixed(analysisEntry inst.ReplicationAnalysis) (bool, error) { // Run a replication analysis again. We will check if the problem persisted - // TODO (@GuptaManan100): Use specific cluster name to filter the scope of replication analysis. - // Can't do this now since SuggestedClusterAlias, ClusterName, ClusterAlias aren't consistent - // and passing any one causes issues in some failures - analysisEntries, err := inst.GetReplicationAnalysis("", &inst.ReplicationAnalysisHints{}) + analysisEntries, err := inst.GetReplicationAnalysis(analysisEntry.ClusterDetails.ClusterName, &inst.ReplicationAnalysisHints{}) if err != nil { return false, err } @@ -1266,10 +1360,9 @@ func GracefulPrimaryTakeover(clusterName string, designatedKey *inst.InstanceKey }, ) - // here we need to forcefully refresh all the tablets otherwise old information is used and failover scenarios are spawned off which are not required - // For example, if we do not refresh the tablets forcefully and the new primary is found in the cache then its source key is not updated and this spawns off - // PrimaryHasPrimary analysis which runs ERS - RefreshTablets(true /* forceRefresh */) + // here we need to forcefully refresh all the tablets because we know we have made a cluster wide operation, + // and it affects the replication information for all the tablets + forceRefreshAllTabletsInShard(context.Background(), primaryTablet.Keyspace, primaryTablet.Shard) if ev != nil && ev.NewPrimary != nil { promotedReplica, _, _ = inst.ReadInstance(&inst.InstanceKey{ Hostname: ev.NewPrimary.MysqlHostname, @@ -1335,10 +1428,6 @@ func electNewPrimary(ctx context.Context, analysisEntry inst.ReplicationAnalysis }, ) - // here we need to forcefully refresh all the tablets otherwise old information is used and failover scenarios are spawned off which are not required - // For example, if we do not refresh the tablets forcefully and the new primary is found in the cache then its source key is not updated and this spawns off - // PrimaryHasPrimary analysis which runs ERS - RefreshTablets(true /* forceRefresh */) if ev != nil && ev.NewPrimary != nil { promotedReplica, _, _ = inst.ReadInstance(&inst.InstanceKey{ Hostname: ev.NewPrimary.MysqlHostname, @@ -1399,7 +1488,7 @@ func fixReplica(ctx context.Context, analysisEntry inst.ReplicationAnalysis, can return false, topologyRecovery, err } - primaryTablet, err := shardPrimary(ctx, analyzedTablet.Keyspace, analyzedTablet.Shard) + primaryTablet, err := shardPrimary(analyzedTablet.Keyspace, analyzedTablet.Shard) if err != nil { log.Info("Could not compute primary for %v/%v", analyzedTablet.Keyspace, analyzedTablet.Shard) return false, topologyRecovery, err diff --git a/go/vt/orchestrator/test/recovery_analysis.go b/go/vt/orchestrator/test/recovery_analysis.go index 8548374789b..e1e4d17a8f8 100644 --- a/go/vt/orchestrator/test/recovery_analysis.go +++ b/go/vt/orchestrator/test/recovery_analysis.go @@ -32,6 +32,7 @@ type InfoForRecoveryAnalysis struct { PrimaryTabletInfo *topodatapb.Tablet PrimaryTimestamp *time.Time Keyspace string + Shard string KeyspaceType int DurabilityPolicy string IsPrimary int @@ -126,6 +127,8 @@ func (info *InfoForRecoveryAnalysis) ConvertToRowMap() sqlutils.RowMap { rowMap["is_primary"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.IsPrimary), Valid: true} rowMap["is_stale_binlog_coordinates"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.IsStaleBinlogCoordinates), Valid: true} rowMap["keyspace_type"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.KeyspaceType), Valid: true} + rowMap["keyspace"] = sqlutils.CellData{String: info.Keyspace, Valid: true} + rowMap["shard"] = sqlutils.CellData{String: info.Shard, Valid: true} rowMap["last_check_partial_success"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.LastCheckPartialSuccess), Valid: true} rowMap["max_replica_gtid_errant"] = sqlutils.CellData{String: info.MaxReplicaGTIDErrant, Valid: true} rowMap["max_replica_gtid_mode"] = sqlutils.CellData{String: info.MaxReplicaGTIDMode, Valid: true} @@ -161,6 +164,7 @@ func (info *InfoForRecoveryAnalysis) SetValuesFromTabletInfo() { info.Port = int(info.TabletInfo.MysqlPort) info.DataCenter = info.TabletInfo.Alias.Cell info.Keyspace = info.TabletInfo.Keyspace + info.Shard = info.TabletInfo.Shard info.ClusterName = fmt.Sprintf("%v:%v", info.TabletInfo.Keyspace, info.TabletInfo.Shard) info.ClusterDomain = fmt.Sprintf("%v:%d", info.TabletInfo.MysqlHostname, info.TabletInfo.MysqlPort) }