Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/vt/vtorc/discovery/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (

// queueItem represents an item in the discovery.Queue.
type queueItem struct {
PushedAt time.Time
Key string
PushedAt time.Time
}

// Queue is an ordered queue with deduplication.
Expand Down Expand Up @@ -82,8 +82,8 @@ func (q *Queue) Push(key string) {
return
}
q.queue <- queueItem{
PushedAt: time.Now(),
Key: key,
PushedAt: time.Now(),
}
}

Expand Down
27 changes: 20 additions & 7 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,16 @@ var snapshotDiscoveryKeys chan string
var snapshotDiscoveryKeysMutex sync.Mutex
var hasReceivedSIGTERM int32

var discoveriesCounter = stats.NewCounter("DiscoveriesAttempt", "Number of discoveries attempted")
var failedDiscoveriesCounter = stats.NewCounter("DiscoveriesFail", "Number of failed discoveries")
var instancePollSecondsExceededCounter = stats.NewCounter("DiscoveriesInstancePollSecondsExceeded", "Number of instances that took longer than InstancePollSeconds to poll")
var discoveryQueueLengthGauge = stats.NewGauge("DiscoveriesQueueLength", "Length of the discovery queue")
var discoveryRecentCountGauge = stats.NewGauge("DiscoveriesRecentCount", "Number of recent discoveries")
var (
discoveriesCounter = stats.NewCounter("DiscoveriesAttempt", "Number of discoveries attempted")
failedDiscoveriesCounter = stats.NewCounter("DiscoveriesFail", "Number of failed discoveries")
instancePollSecondsExceededCounter = stats.NewCounter("DiscoveriesInstancePollSecondsExceeded", "Number of instances that took longer than InstancePollSeconds to poll")
discoveryQueueLengthGauge = stats.NewGauge("DiscoveriesQueueLength", "Length of the discovery queue")
discoveryRecentCountGauge = stats.NewGauge("DiscoveriesRecentCount", "Number of recent discoveries")
discoveryWorkersGauge = stats.NewGauge("DiscoveryWorkers", "Number of discovery workers")
discoveryWorkersActiveGauge = stats.NewGauge("DiscoveryWorkersActive", "Number of discovery workers actively discovering tablets")
)

var discoveryMetrics = collection.CreateOrReturnCollection(DiscoveryMetricsName)

var recentDiscoveryOperationKeys *cache.Cache
Expand Down Expand Up @@ -111,11 +116,19 @@ func handleDiscoveryRequests() {
discoveryQueue = discovery.NewQueue()
// create a pool of discovery workers
for i := uint(0); i < config.GetDiscoveryWorkers(); i++ {
discoveryWorkersGauge.Add(1)
go func() {
for {
// .Consume() blocks until there is a new key to process.
// We are not "active" until we got a tablet alias.
tabletAlias := discoveryQueue.Consume()
DiscoverInstance(tabletAlias, false /* forceDiscovery */)
discoveryQueue.Release(tabletAlias)
func() {
discoveryWorkersActiveGauge.Add(1)
defer discoveryWorkersActiveGauge.Add(-1)

DiscoverInstance(tabletAlias, false /* forceDiscovery */)
discoveryQueue.Release(tabletAlias)
}()
}
}()
}
Expand Down
Loading