diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 57eb907cf4d..ae3d86bd35c 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -32,6 +32,7 @@ Flags: --config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s) --config-type string Config file type (omit to infer config type from file extension). --consul_auth_static_file string JSON File to read the topos/tokens from. + --discovery-workers int Number of workers used for tablet discovery (default 300) --emit_stats If set, emit stats to push-based monitoring and stats backends --enable-primary-disk-stalled-recovery Whether VTOrc should detect a stalled disk on the primary and failover --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index db367673aeb..b25086cf6ad 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -32,7 +32,6 @@ const ( AuditPageSize = 20 DebugMetricsIntervalSeconds = 10 StaleInstanceCoordinatesExpireSeconds = 60 - DiscoveryMaxConcurrency = 300 // Number of goroutines doing hosts discovery DiscoveryQueueCapacity = 100000 DiscoveryQueueMaxStatisticsSize = 120 DiscoveryCollectionRetentionSeconds = 120 @@ -58,6 +57,15 @@ var ( }, ) + discoveryWorkers = viperutil.Configure( + "discovery-workers", + viperutil.Options[int]{ + FlagName: "discovery-workers", + Default: 300, + Dynamic: false, + }, + ) + sqliteDataFile = viperutil.Configure( "sqlite-data-file", viperutil.Options[string]{ @@ -191,6 +199,7 @@ func init() { // registerFlags registers the flags required by VTOrc func registerFlags(fs *pflag.FlagSet) { + fs.Int("discovery-workers", discoveryWorkers.Default(), "Number of workers used for tablet discovery") fs.String("sqlite-data-file", sqliteDataFile.Default(), "SQLite Datafile to use as VTOrc's database") fs.Duration("instance-poll-time", instancePollTime.Default(), "Timer duration on which VTOrc refreshes MySQL information") fs.Duration("snapshot-topology-interval", snapshotTopologyInterval.Default(), "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours") @@ -211,6 +220,7 @@ func registerFlags(fs *pflag.FlagSet) { viperutil.BindFlags(fs, instancePollTime, preventCrossCellFailover, + discoveryWorkers, sqliteDataFile, snapshotTopologyInterval, reasonableReplicationLag, @@ -248,6 +258,11 @@ func GetPreventCrossCellFailover() bool { return preventCrossCellFailover.Get() } +// GetDiscoveryWorkers is a getter function. +func GetDiscoveryWorkers() uint { + return uint(discoveryWorkers.Get()) +} + // GetSQLiteDataFile is a getter function. func GetSQLiteDataFile() string { return sqliteDataFile.Get() diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index bf279b781f2..049c8ad37b1 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -33,85 +33,78 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" ) -// Queue contains information for managing discovery requests -type Queue struct { - sync.Mutex +// queueItem represents an item in the discovery.Queue. +type queueItem struct { + PushedAt time.Time + Key string +} - name string - done chan struct{} - queue chan string - queuedKeys map[string]time.Time - consumedKeys map[string]time.Time +// Queue is an ordered queue with deduplication. +type Queue struct { + mu sync.Mutex + enqueued map[string]struct{} + queue chan queueItem } -// CreateQueue allows for creation of a new discovery queue -func CreateQueue(name string) *Queue { +// NewQueue creates a new queue. +func NewQueue() *Queue { return &Queue{ - name: name, - queuedKeys: make(map[string]time.Time), - consumedKeys: make(map[string]time.Time), - queue: make(chan string, config.DiscoveryQueueCapacity), + enqueued: make(map[string]struct{}), + queue: make(chan queueItem, config.DiscoveryQueueCapacity), + } +} + +// setKeyCheckEnqueued returns true if a key is already enqueued, if +// not the key will be marked as enqueued and false is returned. +func (q *Queue) setKeyCheckEnqueued(key string) (alreadyEnqueued bool) { + q.mu.Lock() + defer q.mu.Unlock() + + _, alreadyEnqueued = q.enqueued[key] + if !alreadyEnqueued { + q.enqueued[key] = struct{}{} } + return alreadyEnqueued } -// QueueLen returns the length of the queue (channel size + queued size) +// QueueLen returns the length of the queue. func (q *Queue) QueueLen() int { - q.Lock() - defer q.Unlock() + q.mu.Lock() + defer q.mu.Unlock() - return len(q.queue) + len(q.queuedKeys) + return len(q.enqueued) } // Push enqueues a key if it is not on a queue and is not being // processed; silently returns otherwise. func (q *Queue) Push(key string) { - q.Lock() - defer q.Unlock() - - // is it enqueued already? - if _, found := q.queuedKeys[key]; found { + if q.setKeyCheckEnqueued(key) { return } - - // is it being processed now? - if _, found := q.consumedKeys[key]; found { - return + q.queue <- queueItem{ + PushedAt: time.Now(), + Key: key, } - - q.queuedKeys[key] = time.Now() - q.queue <- key } // Consume fetches a key to process; blocks if queue is empty. // Release must be called once after Consume. func (q *Queue) Consume() string { - q.Lock() - queue := q.queue - q.Unlock() - - key := <-queue + item := <-q.queue - q.Lock() - defer q.Unlock() - - // alarm if have been waiting for too long - timeOnQueue := time.Since(q.queuedKeys[key]) + timeOnQueue := time.Since(item.PushedAt) if timeOnQueue > config.GetInstancePollTime() { - log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", key, timeOnQueue.Seconds()) + log.Warningf("key %v spent %.4fs waiting on a discovery queue", item.Key, timeOnQueue.Seconds()) } - q.consumedKeys[key] = q.queuedKeys[key] - - delete(q.queuedKeys, key) - - return key + return item.Key } // Release removes a key from a list of being processed keys // which allows that key to be pushed into the queue again. func (q *Queue) Release(key string) { - q.Lock() - defer q.Unlock() + q.mu.Lock() + defer q.mu.Unlock() - delete(q.consumedKeys, key) + delete(q.enqueued, key) } diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go new file mode 100644 index 00000000000..fa3e8c16c59 --- /dev/null +++ b/go/vt/vtorc/discovery/queue_test.go @@ -0,0 +1,81 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQueue(t *testing.T) { + q := NewQueue() + require.Zero(t, q.QueueLen()) + + // Push + q.Push(t.Name()) + require.Equal(t, 1, q.QueueLen()) + _, found := q.enqueued[t.Name()] + require.True(t, found) + + // Push duplicate + q.Push(t.Name()) + require.Equal(t, 1, q.QueueLen()) + + // Consume + require.Equal(t, t.Name(), q.Consume()) + require.Equal(t, 1, q.QueueLen()) + _, found = q.enqueued[t.Name()] + require.True(t, found) + + // Release + q.Release(t.Name()) + require.Zero(t, q.QueueLen()) + _, found = q.enqueued[t.Name()] + require.False(t, found) +} + +type testQueue interface { + QueueLen() int + Push(string) + Consume() string + Release(string) +} + +func BenchmarkQueues(b *testing.B) { + tests := []struct { + name string + queue testQueue + }{ + {"Current", NewQueue()}, + } + for _, test := range tests { + q := test.queue + b.Run(test.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + for i := 0; i < 1000; i++ { + q.Push(b.Name() + strconv.Itoa(i)) + } + q.QueueLen() + for i := 0; i < 1000; i++ { + q.Release(q.Consume()) + } + } + }) + } +} diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 5ac5af50d47..4379fd73d6f 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -108,9 +108,9 @@ func waitForLocksRelease() { // handleDiscoveryRequests iterates the discoveryQueue channel and calls upon // instance discovery per entry. func handleDiscoveryRequests() { - discoveryQueue = discovery.CreateQueue("DEFAULT") + discoveryQueue = discovery.NewQueue() // create a pool of discovery workers - for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ { + for i := uint(0); i < config.GetDiscoveryWorkers(); i++ { go func() { for { tabletAlias := discoveryQueue.Consume()