diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 11022f8e2a7..93f3bab12dc 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -33,6 +33,7 @@ Flags: --config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s) --config-type string Config file type (omit to infer config type from file extension). --consul_auth_static_file string JSON File to read the topos/tokens from. + --discovery-workers int Number of workers used for tablet discovery (default 300) --emit_stats If set, emit stats to push-based monitoring and stats backends --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index 2d21e377cb6..9f52205a846 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -34,7 +34,6 @@ const ( AuditPageSize = 20 DebugMetricsIntervalSeconds = 10 StaleInstanceCoordinatesExpireSeconds = 60 - DiscoveryMaxConcurrency = 300 // Number of goroutines doing hosts discovery DiscoveryQueueCapacity = 100000 DiscoveryQueueMaxStatisticsSize = 120 DiscoveryCollectionRetentionSeconds = 120 @@ -42,6 +41,7 @@ const ( ) var ( + discoveryWorkers = 300 sqliteDataFile = "file::memory:?mode=memory&cache=shared" instancePollTime = 5 * time.Second snapshotTopologyInterval = 0 * time.Hour @@ -62,6 +62,7 @@ var ( // RegisterFlags registers the flags required by VTOrc func RegisterFlags(fs *pflag.FlagSet) { + fs.IntVar(&discoveryWorkers, "discovery-workers", discoveryWorkers, "Number of workers used for tablet discovery") fs.StringVar(&sqliteDataFile, "sqlite-data-file", sqliteDataFile, "SQLite Datafile to use as VTOrc's database") fs.DurationVar(&instancePollTime, "instance-poll-time", instancePollTime, "Timer duration on which VTOrc refreshes MySQL information") fs.DurationVar(&snapshotTopologyInterval, "snapshot-topology-interval", snapshotTopologyInterval, "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours") @@ -86,6 +87,7 @@ func RegisterFlags(fs *pflag.FlagSet) { // strictly expected from user. // TODO(sougou): change this to yaml parsing, and possible merge with tabletenv. type Configuration struct { + DiscoveryWorkers uint // Number of workers used for tablet discovery SQLite3DataFile string // full path to sqlite3 datafile InstancePollSeconds uint // Number of seconds between instance reads SnapshotTopologiesIntervalHours uint // Interval in hour between snapshot-topologies invocation. Default: 0 (disabled) @@ -115,6 +117,7 @@ var readFileNames []string // UpdateConfigValuesFromFlags is used to update the config values from the flags defined. // This is done before we read any configuration files from the user. So the config files take precedence. func UpdateConfigValuesFromFlags() { + Config.DiscoveryWorkers = uint(discoveryWorkers) Config.SQLite3DataFile = sqliteDataFile Config.InstancePollSeconds = uint(instancePollTime / time.Second) Config.InstancePollSeconds = uint(instancePollTime / time.Second) @@ -160,6 +163,7 @@ func LogConfigValues() { func newConfiguration() *Configuration { return &Configuration{ + DiscoveryWorkers: 300, SQLite3DataFile: "file::memory:?mode=memory&cache=shared", InstancePollSeconds: 5, SnapshotTopologiesIntervalHours: 0, diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index f997dc6ac0a..85338f526e5 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -116,6 +116,9 @@ CREATE INDEX last_checked_idx_database_instance ON database_instance(last_checke CREATE INDEX last_seen_idx_database_instance ON database_instance(last_seen) `, ` +CREATE INDEX hostname_port_database_instance ON database_instance(hostname, port) + `, + ` DROP TABLE IF EXISTS audit `, ` diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index 95751c6ae25..4caffe0d121 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -33,142 +33,78 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" ) -// QueueMetric contains the queue's active and queued sizes -type QueueMetric struct { - Active int - Queued int +// queueItem represents an item in the discovery.Queue. +type queueItem struct { + Key string + PushedAt time.Time } -// Queue contains information for managing discovery requests +// Queue is an ordered queue with deduplication. type Queue struct { - sync.Mutex - - name string - done chan struct{} - queue chan string - queuedKeys map[string]time.Time - consumedKeys map[string]time.Time - metrics []QueueMetric + mu sync.Mutex + enqueued map[string]struct{} + queue chan queueItem } -// DiscoveryQueue contains the discovery queue which can then be accessed via an API call for monitoring. -// Currently this is accessed by ContinuousDiscovery() but also from http api calls. -// I may need to protect this better? -var discoveryQueue map[string](*Queue) -var dcLock sync.Mutex - -func init() { - discoveryQueue = make(map[string](*Queue)) -} - -// CreateOrReturnQueue allows for creation of a new discovery queue or -// returning a pointer to an existing one given the name. -func CreateOrReturnQueue(name string) *Queue { - dcLock.Lock() - defer dcLock.Unlock() - if q, found := discoveryQueue[name]; found { - return q - } - - q := &Queue{ - name: name, - queuedKeys: make(map[string]time.Time), - consumedKeys: make(map[string]time.Time), - queue: make(chan string, config.DiscoveryQueueCapacity), - } - go q.startMonitoring() - - discoveryQueue[name] = q - - return q -} - -// monitoring queue sizes until we are told to stop -func (q *Queue) startMonitoring() { - log.Infof("Queue.startMonitoring(%s)", q.name) - ticker := time.NewTicker(time.Second) // hard-coded at every second - - for { - select { - case <-ticker.C: // do the periodic expiry - q.collectStatistics() - case <-q.done: - return - } +// NewQueue creates a new queue. +func NewQueue() *Queue { + return &Queue{ + enqueued: make(map[string]struct{}), + queue: make(chan queueItem, config.DiscoveryQueueCapacity), } } -// do a check of the entries in the queue, both those active and queued -func (q *Queue) collectStatistics() { - q.Lock() - defer q.Unlock() +// setKeyCheckEnqueued returns true if a key is already enqueued, if +// not the key will be marked as enqueued and false is returned. +func (q *Queue) setKeyCheckEnqueued(key string) (alreadyEnqueued bool) { + q.mu.Lock() + defer q.mu.Unlock() - q.metrics = append(q.metrics, QueueMetric{Queued: len(q.queuedKeys), Active: len(q.consumedKeys)}) - - // remove old entries if we get too big - if len(q.metrics) > config.DiscoveryQueueMaxStatisticsSize { - q.metrics = q.metrics[len(q.metrics)-config.DiscoveryQueueMaxStatisticsSize:] + _, alreadyEnqueued = q.enqueued[key] + if !alreadyEnqueued { + q.enqueued[key] = struct{}{} } + return alreadyEnqueued } -// QueueLen returns the length of the queue (channel size + queued size) +// QueueLen returns the length of the queue. func (q *Queue) QueueLen() int { - q.Lock() - defer q.Unlock() + q.mu.Lock() + defer q.mu.Unlock() - return len(q.queue) + len(q.queuedKeys) + return len(q.enqueued) } // Push enqueues a key if it is not on a queue and is not being // processed; silently returns otherwise. func (q *Queue) Push(key string) { - q.Lock() - defer q.Unlock() - - // is it enqueued already? - if _, found := q.queuedKeys[key]; found { + if q.setKeyCheckEnqueued(key) { return } - - // is it being processed now? - if _, found := q.consumedKeys[key]; found { - return + q.queue <- queueItem{ + Key: key, + PushedAt: time.Now(), } - - q.queuedKeys[key] = time.Now() - q.queue <- key } // Consume fetches a key to process; blocks if queue is empty. // Release must be called once after Consume. func (q *Queue) Consume() string { - q.Lock() - queue := q.queue - q.Unlock() + item := <-q.queue - key := <-queue - - q.Lock() - defer q.Unlock() - - // alarm if have been waiting for too long - timeOnQueue := time.Since(q.queuedKeys[key]) + timeOnQueue := time.Since(item.PushedAt) if timeOnQueue > time.Duration(config.Config.InstancePollSeconds)*time.Second { - log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", key, timeOnQueue.Seconds()) + log.Warningf("key %v spent %.4fs waiting on a discovery queue", item.Key, timeOnQueue.Seconds()) } - q.consumedKeys[key] = q.queuedKeys[key] - - delete(q.queuedKeys, key) - - return key + return item.Key } // Release removes a key from a list of being processed keys // which allows that key to be pushed into the queue again. func (q *Queue) Release(key string) { - q.Lock() - defer q.Unlock() + q.mu.Lock() + defer q.mu.Unlock() - delete(q.consumedKeys, key) + delete(q.enqueued, key) } diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go new file mode 100644 index 00000000000..fa3e8c16c59 --- /dev/null +++ b/go/vt/vtorc/discovery/queue_test.go @@ -0,0 +1,81 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQueue(t *testing.T) { + q := NewQueue() + require.Zero(t, q.QueueLen()) + + // Push + q.Push(t.Name()) + require.Equal(t, 1, q.QueueLen()) + _, found := q.enqueued[t.Name()] + require.True(t, found) + + // Push duplicate + q.Push(t.Name()) + require.Equal(t, 1, q.QueueLen()) + + // Consume + require.Equal(t, t.Name(), q.Consume()) + require.Equal(t, 1, q.QueueLen()) + _, found = q.enqueued[t.Name()] + require.True(t, found) + + // Release + q.Release(t.Name()) + require.Zero(t, q.QueueLen()) + _, found = q.enqueued[t.Name()] + require.False(t, found) +} + +type testQueue interface { + QueueLen() int + Push(string) + Consume() string + Release(string) +} + +func BenchmarkQueues(b *testing.B) { + tests := []struct { + name string + queue testQueue + }{ + {"Current", NewQueue()}, + } + for _, test := range tests { + q := test.queue + b.Run(test.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + for i := 0; i < 1000; i++ { + q.Push(b.Name() + strconv.Itoa(i)) + } + q.QueueLen() + for i := 0; i < 1000; i++ { + q.Release(q.Consume()) + } + } + }) + } +} diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 1ab13fb7cfa..54ad6a3cbba 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -198,7 +198,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named goto Cleanup } - fs, err = fullStatus(tabletAlias) + fs, err = fullStatus(tablet) if err != nil { goto Cleanup } diff --git a/go/vt/vtorc/inst/shard_dao.go b/go/vt/vtorc/inst/shard_dao.go index 5501ba5d7b9..7fe8df110f9 100644 --- a/go/vt/vtorc/inst/shard_dao.go +++ b/go/vt/vtorc/inst/shard_dao.go @@ -41,18 +41,6 @@ func ReadShardNames(keyspaceName string) (shardNames []string, err error) { return shardNames, err } -// ReadAllShardNames reads the names of all vitess shards by keyspace. -func ReadAllShardNames() (shardNames map[string][]string, err error) { - shardNames = make(map[string][]string) - query := `select keyspace, shard from vitess_shard` - err = db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error { - ks := row.GetString("keyspace") - shardNames[ks] = append(shardNames[ks], row.GetString("shard")) - return nil - }) - return shardNames, err -} - // ReadShardPrimaryInformation reads the vitess shard record and gets the shard primary alias and timestamp. func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias string, primaryTimestamp string, err error) { if err = topo.ValidateKeyspaceName(keyspaceName); err != nil { diff --git a/go/vt/vtorc/inst/shard_dao_test.go b/go/vt/vtorc/inst/shard_dao_test.go index 8cf67f10ee6..0077b3d64af 100644 --- a/go/vt/vtorc/inst/shard_dao_test.go +++ b/go/vt/vtorc/inst/shard_dao_test.go @@ -108,13 +108,6 @@ func TestSaveReadAndDeleteShard(t *testing.T) { require.NoError(t, err) require.Equal(t, []string{tt.shardName}, shardNames) - // ReadAllShardNames - allShardNames, err := ReadAllShardNames() - require.NoError(t, err) - ksShards, found := allShardNames[tt.keyspaceName] - require.True(t, found) - require.Equal(t, []string{tt.shardName}, ksShards) - // DeleteShard require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName)) _, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName) diff --git a/go/vt/vtorc/inst/tablet_dao.go b/go/vt/vtorc/inst/tablet_dao.go index af304292a70..8e6dbe03163 100644 --- a/go/vt/vtorc/inst/tablet_dao.go +++ b/go/vt/vtorc/inst/tablet_dao.go @@ -44,11 +44,7 @@ func InitializeTMC() tmclient.TabletManagerClient { } // fullStatus gets the full status of the MySQL running in vttablet. -func fullStatus(tabletAlias string) (*replicationdatapb.FullStatus, error) { - tablet, err := ReadTablet(tabletAlias) - if err != nil { - return nil, err - } +func fullStatus(tablet *topodatapb.Tablet) (*replicationdatapb.FullStatus, error) { tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer tmcCancel() return tmc.FullStatus(tmcCtx, tablet) @@ -78,6 +74,50 @@ func ReadTablet(tabletAlias string) (*topodatapb.Tablet, error) { return tablet, nil } +// ReadTabletCountsByCell returns the count of tablets watched by cell. +// The backend query uses an index by "cell": cell_idx_vitess_tablet. +func ReadTabletCountsByCell() (map[string]int64, error) { + tabletCounts := make(map[string]int64) + query := `SELECT + cell, + COUNT() AS count + FROM + vitess_tablet + GROUP BY + cell` + err := db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error { + cell := row.GetString("cell") + tabletCounts[cell] = row.GetInt64("count") + return nil + }) + return tabletCounts, err +} + +// ReadTabletCountsByKeyspaceShard returns the count of tablets watched by keyspace/shard. +// The backend query uses an index by "keyspace, shard": ks_idx_vitess_tablet. +func ReadTabletCountsByKeyspaceShard() (map[string]map[string]int64, error) { + tabletCounts := make(map[string]map[string]int64) + query := `SELECT + keyspace, + shard, + COUNT() AS count + FROM + vitess_tablet + GROUP BY + keyspace, + shard` + err := db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error { + keyspace := row.GetString("keyspace") + shard := row.GetString("shard") + if _, found := tabletCounts[keyspace]; !found { + tabletCounts[keyspace] = make(map[string]int64) + } + tabletCounts[keyspace][shard] = row.GetInt64("count") + return nil + }) + return tabletCounts, err +} + // SaveTablet saves the tablet record against the instanceKey. func SaveTablet(tablet *topodatapb.Tablet) error { tabletp, err := prototext.Marshal(tablet) diff --git a/go/vt/vtorc/inst/tablet_dao_test.go b/go/vt/vtorc/inst/tablet_dao_test.go index a876d857ace..d5de0eb268b 100644 --- a/go/vt/vtorc/inst/tablet_dao_test.go +++ b/go/vt/vtorc/inst/tablet_dao_test.go @@ -91,3 +91,56 @@ func TestSaveAndReadTablet(t *testing.T) { }) } } + +func TestReadTabletCountsByCell(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + for i := 0; i < 100; i++ { + require.NoError(t, SaveTablet(&topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "cell1", + Uid: uint32(i), + }, + Keyspace: "test", + Shard: "-", + })) + } + tabletCounts, err := ReadTabletCountsByCell() + require.NoError(t, err) + require.Equal(t, map[string]int64{"cell1": 100}, tabletCounts) +} + +func TestReadTabletCountsByKeyspaceShard(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + var uid uint32 + for _, shard := range []string{"-40", "40-80", "80-c0", "c0-"} { + for i := 0; i < 100; i++ { + require.NoError(t, SaveTablet(&topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "cell1", + Uid: uid, + }, + Keyspace: "test", + Shard: shard, + })) + uid++ + } + } + tabletCounts, err := ReadTabletCountsByKeyspaceShard() + require.NoError(t, err) + require.Equal(t, map[string]map[string]int64{ + "test": { + "-40": 100, + "40-80": 100, + "80-c0": 100, + "c0-": 100, + }, + }, tabletCounts) +} diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index 77f3930be1e..d2e7b8ed92e 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -21,35 +21,15 @@ import ( "sync" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" - "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/inst" ) -var statsShardsWatched = stats.NewGaugesFuncWithMultiLabels("ShardsWatched", - "Keyspace/shards currently watched", - []string{"Keyspace", "Shard"}, - getShardsWatchedStats) - -// getShardsWatchedStats returns the keyspace/shards watched in a format for stats. -func getShardsWatchedStats() map[string]int64 { - shardsWatched := make(map[string]int64) - allShardNames, err := inst.ReadAllShardNames() - if err != nil { - log.Errorf("Failed to read all shard names: %+v", err) - return shardsWatched - } - for ks, shards := range allShardNames { - for _, shard := range shards { - shardsWatched[ks+"."+shard] = 1 - } - } - return shardsWatched -} - // refreshAllKeyspacesAndShardsMu ensures RefreshAllKeyspacesAndShards // is not executed concurrently. var refreshAllKeyspacesAndShardsMu sync.Mutex @@ -76,7 +56,8 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error { refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer refreshCancel() - var wg sync.WaitGroup + + eg, _ := errgroup.WithContext(ctx) for idx, keyspace := range keyspaces { // Check if the current keyspace name is the same as the last one. // If it is, then we know we have already refreshed its information. @@ -84,19 +65,16 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error { if idx != 0 && keyspace == keyspaces[idx-1] { continue } - wg.Add(2) - go func(keyspace string) { - defer wg.Done() - _ = refreshKeyspaceHelper(refreshCtx, keyspace) - }(keyspace) - go func(keyspace string) { - defer wg.Done() - _ = refreshAllShards(refreshCtx, keyspace) - }(keyspace) - } - wg.Wait() - return nil + eg.Go(func() error { + return refreshKeyspaceHelper(refreshCtx, keyspace) + }) + + eg.Go(func() error { + return refreshAllShards(refreshCtx, keyspace) + }) + } + return eg.Wait() } // RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard. @@ -108,6 +86,26 @@ func RefreshKeyspaceAndShard(keyspaceName string, shardName string) error { return refreshShard(keyspaceName, shardName) } +// shouldWatchShard returns true if a shard is within the shardsToWatch +// ranges for it's keyspace. +func shouldWatchShard(shard *topo.ShardInfo) bool { + if len(shardsToWatch) == 0 { + return true + } + + watchRanges, found := shardsToWatch[shard.Keyspace()] + if !found { + return false + } + + for _, keyRange := range watchRanges { + if key.KeyRangeContainsKeyRange(keyRange, shard.GetKeyRange()) { + return true + } + } + return false +} + // refreshKeyspace refreshes the keyspace's information for the given keyspace from the topo func refreshKeyspace(keyspaceName string) error { refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) @@ -149,20 +147,24 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error { log.Error(err) return err } + + // save shards that should be watched. savedShards := make(map[string]bool, len(shardInfos)) for _, shardInfo := range shardInfos { - err = inst.SaveShard(shardInfo) - if err != nil { + if !shouldWatchShard(shardInfo) { + continue + } + if err = inst.SaveShard(shardInfo); err != nil { log.Error(err) return err } savedShards[shardInfo.ShardName()] = true } - // delete shards that were not returned by ts.FindAllShardsInKeyspace(...), - // indicating they are stale. + // delete shards that were not saved, indicating they are stale. shards, err := inst.ReadShardNames(keyspaceName) if err != nil { + log.Error(err) return err } for _, shard := range shards { @@ -171,8 +173,7 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error { } shardName := topoproto.KeyspaceShardString(keyspaceName, shard) log.Infof("Forgetting shard: %s", shardName) - err = inst.DeleteShard(keyspaceName, shard) - if err != nil { + if err = inst.DeleteShard(keyspaceName, shard); err != nil { log.Errorf("Failed to delete shard %s: %+v", shardName, err) return err } diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index 20e32d99470..10a1577039b 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -313,28 +313,41 @@ func verifyPrimaryAlias(t *testing.T, keyspaceName, shardName string, primaryAli func TestRefreshAllShards(t *testing.T) { // Store the old flags and restore on test completion + oldClustersToWatch := clustersToWatch oldTs := ts defer func() { + clustersToWatch = oldClustersToWatch ts = oldTs db.ClearVTOrcDatabase() }() ctx := context.Background() ts = memorytopo.NewServer(ctx, "zone1") + require.NoError(t, initializeShardsToWatch()) require.NoError(t, ts.CreateKeyspace(ctx, "ks1", keyspaceDurabilityNone)) - - shards := []string{"-80", "80-"} + shards := []string{"-40", "40-80", "80-c0", "c0-"} for _, shard := range shards { require.NoError(t, ts.CreateShard(ctx, "ks1", shard)) } + + // test shard refresh require.NoError(t, refreshAllShards(ctx, "ks1")) shardNames, err := inst.ReadShardNames("ks1") require.NoError(t, err) - require.Equal(t, []string{"-80", "80-"}, shardNames) + require.Equal(t, []string{"-40", "40-80", "80-c0", "c0-"}, shardNames) + + // test topo shard delete propagates + require.NoError(t, ts.DeleteShard(ctx, "ks1", "c0-")) + require.NoError(t, refreshAllShards(ctx, "ks1")) + shardNames, err = inst.ReadShardNames("ks1") + require.NoError(t, err) + require.Equal(t, []string{"-40", "40-80", "80-c0"}, shardNames) - require.NoError(t, ts.DeleteShard(ctx, "ks1", "80-")) + // test clustersToWatch filters what shards are saved + clustersToWatch = []string{"ks1/-80"} + require.NoError(t, initializeShardsToWatch()) require.NoError(t, refreshAllShards(ctx, "ks1")) shardNames, err = inst.ReadShardNames("ks1") require.NoError(t, err) - require.Equal(t, []string{"-80"}, shardNames) + require.Equal(t, []string{"-40", "40-80"}, shardNames) } diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 37bea1a8733..aa41ed7e64e 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -31,6 +31,7 @@ import ( "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" @@ -54,10 +55,48 @@ var ( // This is populated by parsing `--clusters_to_watch` flag. shardsToWatch map[string][]*topodatapb.KeyRange + // tablet stats + statsTabletsWatchedByCell = stats.NewGaugesFuncWithMultiLabels( + "TabletsWatchedByCell", + "Number of tablets watched by cell", + []string{"Cell"}, + getTabletsWatchedByCellStats, + ) + statsTabletsWatchedByShard = stats.NewGaugesFuncWithMultiLabels( + "TabletsWatchedByShard", + "Number of tablets watched by keyspace/shard", + []string{"Keyspace", "Shard"}, + getTabletsWatchedByShardStats, + ) + // ErrNoPrimaryTablet is a fixed error message. ErrNoPrimaryTablet = errors.New("no primary tablet found") ) +// getTabletsWatchedByCellStats returns the number of tablets watched by cell in stats format. +func getTabletsWatchedByCellStats() map[string]int64 { + tabletCountsByCell, err := inst.ReadTabletCountsByCell() + if err != nil { + log.Errorf("Failed to read tablet counts by cell: %+v", err) + } + return tabletCountsByCell +} + +// getTabletsWatchedByShardStats returns the number of tablets watched by keyspace/shard in stats format. +func getTabletsWatchedByShardStats() map[string]int64 { + tabletsWatchedByShard := make(map[string]int64) + tabletCountsByKS, err := inst.ReadTabletCountsByKeyspaceShard() + if err != nil { + log.Errorf("Failed to read tablet counts by shard: %+v", err) + } + for keyspace, countsByShard := range tabletCountsByKS { + for shard, tabletCount := range countsByShard { + tabletsWatchedByShard[keyspace+"."+shard] = tabletCount + } + } + return tabletsWatchedByShard +} + // RegisterFlags registers the flags required by VTOrc func RegisterFlags(fs *pflag.FlagSet) { fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/keyranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") @@ -315,22 +354,20 @@ func getLockAction(analysedInstance string, code inst.AnalysisCode) string { } // LockShard locks the keyspace-shard preventing others from performing conflicting actions. -func LockShard(ctx context.Context, tabletAlias string, lockAction string) (context.Context, func(*error), error) { - if tabletAlias == "" { - return nil, nil, errors.New("can't lock shard: instance is unspecified") +func LockShard(ctx context.Context, keyspace, shard, lockAction string) (context.Context, func(*error), error) { + if keyspace == "" { + return nil, nil, errors.New("can't lock shard: keyspace is unspecified") + } + if shard == "" { + return nil, nil, errors.New("can't lock shard: shard name is unspecified") } val := atomic.LoadInt32(&hasReceivedSIGTERM) if val > 0 { return nil, nil, errors.New("can't lock shard: SIGTERM received") } - tablet, err := inst.ReadTablet(tabletAlias) - if err != nil { - return nil, nil, err - } - atomic.AddInt32(&shardsLockCounter, 1) - ctx, unlock, err := ts.TryLockShard(ctx, tablet.Keyspace, tablet.Shard, lockAction) + ctx, unlock, err := ts.TryLockShard(ctx, keyspace, shard, lockAction) if err != nil { atomic.AddInt32(&shardsLockCounter, -1) return nil, nil, err diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index f82aad0a0c7..e6105c0691d 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -499,7 +499,9 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er } // We lock the shard here and then refresh the tablets information - ctx, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedInstanceAlias, getLockAction(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis)) + ctx, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, + getLockAction(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis), + ) if err != nil { logger.Errorf("Failed to lock shard, aborting recovery: %v", err) return err diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 6c6430ea6b3..b369324c593 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -53,11 +53,16 @@ var snapshotDiscoveryKeys chan string var snapshotDiscoveryKeysMutex sync.Mutex var hasReceivedSIGTERM int32 -var discoveriesCounter = stats.NewCounter("DiscoveriesAttempt", "Number of discoveries attempted") -var failedDiscoveriesCounter = stats.NewCounter("DiscoveriesFail", "Number of failed discoveries") -var instancePollSecondsExceededCounter = stats.NewCounter("DiscoveriesInstancePollSecondsExceeded", "Number of instances that took longer than InstancePollSeconds to poll") -var discoveryQueueLengthGauge = stats.NewGauge("DiscoveriesQueueLength", "Length of the discovery queue") -var discoveryRecentCountGauge = stats.NewGauge("DiscoveriesRecentCount", "Number of recent discoveries") +var ( + discoveriesCounter = stats.NewCounter("DiscoveriesAttempt", "Number of discoveries attempted") + failedDiscoveriesCounter = stats.NewCounter("DiscoveriesFail", "Number of failed discoveries") + instancePollSecondsExceededCounter = stats.NewCounter("DiscoveriesInstancePollSecondsExceeded", "Number of instances that took longer than InstancePollSeconds to poll") + discoveryQueueLengthGauge = stats.NewGauge("DiscoveriesQueueLength", "Length of the discovery queue") + discoveryRecentCountGauge = stats.NewGauge("DiscoveriesRecentCount", "Number of recent discoveries") + discoveryWorkersGauge = stats.NewGauge("DiscoveryWorkers", "Number of discovery workers") + discoveryWorkersActiveGauge = stats.NewGauge("DiscoveryWorkersActive", "Number of discovery workers actively discovering tablets") +) + var discoveryMetrics = collection.CreateOrReturnCollection(DiscoveryMetricsName) var recentDiscoveryOperationKeys *cache.Cache @@ -131,14 +136,22 @@ func waitForLocksRelease() { // handleDiscoveryRequests iterates the discoveryQueue channel and calls upon // instance discovery per entry. func handleDiscoveryRequests() { - discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT") + discoveryQueue = discovery.NewQueue() // create a pool of discovery workers - for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ { + for i := uint(0); i < config.Config.DiscoveryWorkers; i++ { + discoveryWorkersGauge.Add(1) go func() { for { + // .Consume() blocks until there is a new key to process. + // We are not "active" until we got a tablet alias. tabletAlias := discoveryQueue.Consume() - DiscoverInstance(tabletAlias, false /* forceDiscovery */) - discoveryQueue.Release(tabletAlias) + func() { + discoveryWorkersActiveGauge.Add(1) + defer discoveryWorkersActiveGauge.Add(-1) + + DiscoverInstance(tabletAlias, false /* forceDiscovery */) + discoveryQueue.Release(tabletAlias) + }() } }() }