diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index 049c8ad37b1..c56827b5899 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -35,8 +35,8 @@ import ( // queueItem represents an item in the discovery.Queue. type queueItem struct { - PushedAt time.Time Key string + PushedAt time.Time } // Queue is an ordered queue with deduplication. @@ -82,8 +82,8 @@ func (q *Queue) Push(key string) { return } q.queue <- queueItem{ - PushedAt: time.Now(), Key: key, + PushedAt: time.Now(), } } diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 4379fd73d6f..bb3868d4686 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -50,11 +50,16 @@ var snapshotDiscoveryKeys chan string var snapshotDiscoveryKeysMutex sync.Mutex var hasReceivedSIGTERM int32 -var discoveriesCounter = stats.NewCounter("DiscoveriesAttempt", "Number of discoveries attempted") -var failedDiscoveriesCounter = stats.NewCounter("DiscoveriesFail", "Number of failed discoveries") -var instancePollSecondsExceededCounter = stats.NewCounter("DiscoveriesInstancePollSecondsExceeded", "Number of instances that took longer than InstancePollSeconds to poll") -var discoveryQueueLengthGauge = stats.NewGauge("DiscoveriesQueueLength", "Length of the discovery queue") -var discoveryRecentCountGauge = stats.NewGauge("DiscoveriesRecentCount", "Number of recent discoveries") +var ( + discoveriesCounter = stats.NewCounter("DiscoveriesAttempt", "Number of discoveries attempted") + failedDiscoveriesCounter = stats.NewCounter("DiscoveriesFail", "Number of failed discoveries") + instancePollSecondsExceededCounter = stats.NewCounter("DiscoveriesInstancePollSecondsExceeded", "Number of instances that took longer than InstancePollSeconds to poll") + discoveryQueueLengthGauge = stats.NewGauge("DiscoveriesQueueLength", "Length of the discovery queue") + discoveryRecentCountGauge = stats.NewGauge("DiscoveriesRecentCount", "Number of recent discoveries") + discoveryWorkersGauge = stats.NewGauge("DiscoveryWorkers", "Number of discovery workers") + discoveryWorkersActiveGauge = stats.NewGauge("DiscoveryWorkersActive", "Number of discovery workers actively discovering tablets") +) + var discoveryMetrics = collection.CreateOrReturnCollection(DiscoveryMetricsName) var recentDiscoveryOperationKeys *cache.Cache @@ -111,11 +116,19 @@ func handleDiscoveryRequests() { discoveryQueue = discovery.NewQueue() // create a pool of discovery workers for i := uint(0); i < config.GetDiscoveryWorkers(); i++ { + discoveryWorkersGauge.Add(1) go func() { for { + // .Consume() blocks until there is a new key to process. + // We are not "active" until we got a tablet alias. tabletAlias := discoveryQueue.Consume() - DiscoverInstance(tabletAlias, false /* forceDiscovery */) - discoveryQueue.Release(tabletAlias) + func() { + discoveryWorkersActiveGauge.Add(1) + defer discoveryWorkersActiveGauge.Add(-1) + + DiscoverInstance(tabletAlias, false /* forceDiscovery */) + discoveryQueue.Release(tabletAlias) + }() } }() }