Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/orchestrator/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type ReplicationAnalysis struct {
AnalyzedInstanceDataCenter string
AnalyzedInstanceRegion string
AnalyzedKeyspace string
AnalyzedShard string
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the name of the shard that was analyzed too now that we want to restrict the number of tablets we want to refresh

AnalyzedInstancePhysicalEnvironment string
AnalyzedInstanceBinlogCoordinates BinlogCoordinates
IsPrimary bool
Expand Down
2 changes: 2 additions & 0 deletions go/vt/orchestrator/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
vitess_tablet.port,
vitess_tablet.tablet_type,
vitess_tablet.primary_timestamp,
vitess_tablet.shard AS shard,
vitess_keyspace.keyspace AS keyspace,
vitess_keyspace.keyspace_type AS keyspace_type,
vitess_keyspace.durability_policy AS durability_policy,
Expand Down Expand Up @@ -377,6 +378,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)

a.TabletType = tablet.Type
a.AnalyzedKeyspace = m.GetString("keyspace")
a.AnalyzedShard = m.GetString("shard")
a.PrimaryTimeStamp = m.GetTime("primary_timestamp")

if keyspaceType := topodatapb.KeyspaceType(m.GetInt("keyspace_type")); keyspaceType == topodatapb.KeyspaceType_SNAPSHOT {
Expand Down
80 changes: 59 additions & 21 deletions go/vt/orchestrator/inst/analysis_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (

func TestGetReplicationAnalysis(t *testing.T) {
tests := []struct {
name string
info []*test.InfoForRecoveryAnalysis
codeWanted AnalysisCode
wantErr string
name string
info []*test.InfoForRecoveryAnalysis
codeWanted AnalysisCode
shardWanted string
keyspaceWanted string
wantErr string
}{
{
name: "ClusterHasNoPrimary",
Expand All @@ -49,7 +51,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
DurabilityPolicy: "none",
LastCheckValid: 1,
}},
codeWanted: ClusterHasNoPrimary,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: ClusterHasNoPrimary,
}, {
name: "DeadPrimary",
info: []*test.InfoForRecoveryAnalysis{{
Expand All @@ -69,7 +73,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
CountValidReplicatingReplicas: 0,
IsPrimary: 1,
}},
codeWanted: DeadPrimary,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: DeadPrimary,
}, {
name: "DeadPrimaryWithoutReplicas",
info: []*test.InfoForRecoveryAnalysis{{
Expand All @@ -87,7 +93,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
CountReplicas: 0,
IsPrimary: 1,
}},
codeWanted: DeadPrimaryWithoutReplicas,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: DeadPrimaryWithoutReplicas,
}, {
name: "DeadPrimaryAndReplicas",
info: []*test.InfoForRecoveryAnalysis{{
Expand All @@ -105,7 +113,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
CountReplicas: 3,
IsPrimary: 1,
}},
codeWanted: DeadPrimaryAndReplicas,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: DeadPrimaryAndReplicas,
}, {
name: "DeadPrimaryAndSomeReplicas",
info: []*test.InfoForRecoveryAnalysis{{
Expand All @@ -125,7 +135,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
CountValidReplicatingReplicas: 0,
IsPrimary: 1,
}},
codeWanted: DeadPrimaryAndSomeReplicas,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: DeadPrimaryAndSomeReplicas,
}, {
name: "PrimaryHasPrimary",
info: []*test.InfoForRecoveryAnalysis{{
Expand All @@ -144,7 +156,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
CountValidReplicas: 4,
IsPrimary: 0,
}},
codeWanted: PrimaryHasPrimary,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: PrimaryHasPrimary,
}, {
name: "PrimaryIsReadOnly",
info: []*test.InfoForRecoveryAnalysis{{
Expand All @@ -164,7 +178,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
IsPrimary: 1,
ReadOnly: 1,
}},
codeWanted: PrimaryIsReadOnly,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: PrimaryIsReadOnly,
}, {
name: "PrimarySemiSyncMustNotBeSet",
info: []*test.InfoForRecoveryAnalysis{{
Expand All @@ -184,7 +200,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
IsPrimary: 1,
SemiSyncPrimaryEnabled: 1,
}},
codeWanted: PrimarySemiSyncMustNotBeSet,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: PrimarySemiSyncMustNotBeSet,
}, {
name: "PrimarySemiSyncMustBeSet",
info: []*test.InfoForRecoveryAnalysis{{
Expand All @@ -204,7 +222,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
IsPrimary: 1,
SemiSyncPrimaryEnabled: 0,
}},
codeWanted: PrimarySemiSyncMustBeSet,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: PrimarySemiSyncMustBeSet,
}, {
name: "NotConnectedToPrimary",
info: []*test.InfoForRecoveryAnalysis{{
Expand Down Expand Up @@ -239,7 +259,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
ReadOnly: 1,
IsPrimary: 1,
}},
codeWanted: NotConnectedToPrimary,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: NotConnectedToPrimary,
}, {
name: "ReplicaIsWritable",
info: []*test.InfoForRecoveryAnalysis{{
Expand Down Expand Up @@ -276,7 +298,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
LastCheckValid: 1,
ReadOnly: 0,
}},
codeWanted: ReplicaIsWritable,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: ReplicaIsWritable,
}, {
name: "ConnectedToWrongPrimary",
info: []*test.InfoForRecoveryAnalysis{{
Expand Down Expand Up @@ -313,7 +337,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
LastCheckValid: 1,
ReadOnly: 1,
}},
codeWanted: ConnectedToWrongPrimary,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: ConnectedToWrongPrimary,
}, {
name: "ReplicationStopped",
info: []*test.InfoForRecoveryAnalysis{{
Expand Down Expand Up @@ -351,7 +377,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
ReadOnly: 1,
ReplicationStopped: 1,
}},
codeWanted: ReplicationStopped,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: ReplicationStopped,
},
{
name: "ReplicaSemiSyncMustBeSet",
Expand Down Expand Up @@ -400,7 +428,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
ReadOnly: 1,
SemiSyncReplicaEnabled: 0,
}},
codeWanted: ReplicaSemiSyncMustBeSet,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: ReplicaSemiSyncMustBeSet,
}, {
name: "ReplicaSemiSyncMustNotBeSet",
info: []*test.InfoForRecoveryAnalysis{{
Expand Down Expand Up @@ -447,7 +477,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
ReadOnly: 1,
SemiSyncReplicaEnabled: 1,
}},
codeWanted: ReplicaSemiSyncMustNotBeSet,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: ReplicaSemiSyncMustNotBeSet,
}, {
name: "SnapshotKeyspace",
info: []*test.InfoForRecoveryAnalysis{{
Expand All @@ -465,7 +497,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
DurabilityPolicy: "none",
LastCheckValid: 1,
}},
codeWanted: NoProblem,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: NoProblem,
}, {
name: "EmptyDurabilityPolicy",
info: []*test.InfoForRecoveryAnalysis{{
Expand All @@ -481,7 +515,9 @@ func TestGetReplicationAnalysis(t *testing.T) {
LastCheckValid: 1,
}},
// We will ignore these keyspaces too until the durability policy is set in the topo server
codeWanted: NoProblem,
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: NoProblem,
},
}
for _, tt := range tests {
Expand All @@ -505,6 +541,8 @@ func TestGetReplicationAnalysis(t *testing.T) {
}
require.Len(t, got, 1)
require.Equal(t, tt.codeWanted, got[0].Analysis)
require.Equal(t, tt.keyspaceWanted, got[0].AnalyzedKeyspace)
require.Equal(t, tt.shardWanted, got[0].AnalyzedShard)
})
}
}
6 changes: 5 additions & 1 deletion go/vt/orchestrator/logic/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func DiscoverInstance(instanceKey inst.InstanceKey, forceDiscovery bool) {
return
}

if forceDiscovery {
log.Infof("Force discovered - %+v", instance)
}
Comment on lines +259 to +261
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This addition of logging is intentional. Until we have a metrics page where we export the internal database information of VTOrc, this is going to be very useful in debugging. I had it in my mind to add this log and I am just piggy-backing on this PR.


discoveryMetrics.Append(&discovery.Metric{
Timestamp: time.Now(),
InstanceKey: instanceKey,
Expand Down Expand Up @@ -452,7 +456,7 @@ func ContinuousDiscovery() {
}()
case <-tabletTopoTick:
go RefreshAllKeyspaces()
go RefreshTablets(false /* forceRefresh */)
go refreshAllTablets()
}
}
}
76 changes: 55 additions & 21 deletions go/vt/orchestrator/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"errors"
"flag"
"fmt"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -48,6 +47,8 @@ var (
clustersToWatch = flag.String("clusters_to_watch", "", "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
shutdownWaitTime = flag.Duration("shutdown_wait_time", 30*time.Second, "maximum time to wait for vtorc to release all the locks that it is holding before shutting down on SIGTERM")
shardsLockCounter int32
// ErrNoPrimaryTablet is a fixed error message.
ErrNoPrimaryTablet = errors.New("no primary tablet found")
)

// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
Expand All @@ -69,11 +70,11 @@ func OpenTabletDiscovery() <-chan time.Time {
return time.Tick(15 * time.Second) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// RefreshTablets reloads the tablets from topo.
func RefreshTablets(forceRefresh bool) {
// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
func refreshAllTablets() {
refreshTabletsUsing(func(instanceKey *inst.InstanceKey) {
DiscoverInstance(*instanceKey, forceRefresh)
}, forceRefresh)
DiscoverInstance(*instanceKey, false /* forceDiscovery */)
}, false /* forceRefresh */)
}

func refreshTabletsUsing(loader func(instanceKey *inst.InstanceKey), forceRefresh bool) {
Expand Down Expand Up @@ -157,6 +158,28 @@ func refreshTabletsInCell(ctx context.Context, cell string, loader func(instance
refreshTablets(tablets, query, args, loader, forceRefresh)
}

// forceRefreshAllTabletsInShard is used to refresh all the tablet's information (both MySQL information and topo records)
// for a given shard. This function is meant to be called before or after a cluster-wide operation that we know will
// change the replication information for the entire cluster drastically enough to warrant a full forceful refresh
func forceRefreshAllTabletsInShard(ctx context.Context, keyspace, shard string) {
log.Infof("force refresh of all tablets in shard - %v/%v", keyspace, shard)
refreshCtx, refreshCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer refreshCancel()
refreshTabletsInKeyspaceShard(refreshCtx, keyspace, shard, func(instanceKey *inst.InstanceKey) {
DiscoverInstance(*instanceKey, true)
}, true)
}

// refreshTabletInfoOfShard only refreshes the tablet records from the topo-server for all the tablets
// of the given keyspace-shard.
func refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) {
log.Infof("refresh of tablet records of shard - %v/%v", keyspace, shard)
refreshTabletsInKeyspaceShard(ctx, keyspace, shard, func(instanceKey *inst.InstanceKey) {
// No-op
// We only want to refresh the tablet information for the given shard
}, false)
}

func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(instanceKey *inst.InstanceKey), forceRefresh bool) {
tablets, err := ts.GetTabletMapForShard(ctx, keyspace, shard)
if err != nil {
Expand Down Expand Up @@ -285,21 +308,32 @@ func setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, prima
return tmc.SetReplicationSource(ctx, replica, primary.Alias, 0, "", true, semiSync)
}

// shardPrimary finds the primary of the given keyspace-shard by reading the topo server
func shardPrimary(ctx context.Context, keyspace string, shard string) (primary *topodatapb.Tablet, err error) {
si, err := ts.GetShard(ctx, keyspace, shard)
if err != nil {
return nil, err
}
if !si.HasPrimary() {
return nil, fmt.Errorf("no primary tablet for shard %v/%v", keyspace, shard)
}
// TODO(GuptaManan100): Instead of another topo call, use the local information by calling
// ReadTablet. Currently this isn't possible since we only have the primary alias and not the source host and port
// This should be fixed once the tablet alias is changed to be the primary key of the table
primaryInfo, err := ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
return nil, err
// shardPrimary finds the primary of the given keyspace-shard by reading the orchestrator backend
func shardPrimary(keyspace string, shard string) (primary *topodatapb.Tablet, err error) {
query := `SELECT
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general comment: should we not put all query execution in some retryable template function?

Copy link
Copy Markdown
Contributor Author

@GuptaManan100 GuptaManan100 Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be a good addition, but so far we have not really needed, because even if the read fails, we just fail the recovery and then retry later.

info,
hostname,
port,
tablet_type,
primary_timestamp
FROM
vitess_tablet
WHERE
keyspace = ? AND shard = ?
AND tablet_type = ?
ORDER BY
primary_timestamp DESC
LIMIT 1
`
err = db.Db.QueryOrchestrator(query, sqlutils.Args(keyspace, shard, topodatapb.TabletType_PRIMARY), func(m sqlutils.RowMap) error {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of cleaning up, we should rename some of these functions. QueryOrchestrator is not the right name for this function. Similarly we have OpenOrchestrator too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I had it in my mind to get rid of the Orchestrator references everywhere, from Parameters, flags, package names, function names, etc. I'll do in a follow-up PR so that it is easier to review

if primary == nil {
primary = &topodatapb.Tablet{}
return prototext.Unmarshal([]byte(m.GetString("info")), primary)
}
return nil
})
if primary == nil && err == nil {
err = ErrNoPrimaryTablet
}
return primaryInfo.Tablet, nil
return primary, err
}
Loading