Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Flags:
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
--consul_auth_static_file string JSON File to read the topos/tokens from.
--discovery-workers int Number of workers used for tablet discovery (default 300)
--emit_stats If set, emit stats to push-based monitoring and stats backends
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vtorc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ const (
AuditPageSize = 20
DebugMetricsIntervalSeconds = 10
StaleInstanceCoordinatesExpireSeconds = 60
DiscoveryMaxConcurrency = 300 // Number of goroutines doing hosts discovery
DiscoveryQueueCapacity = 100000
DiscoveryQueueMaxStatisticsSize = 120
DiscoveryCollectionRetentionSeconds = 120
UnseenInstanceForgetHours = 240 // Number of hours after which an unseen instance is forgotten
)

var (
discoveryWorkers = 300
sqliteDataFile = "file::memory:?mode=memory&cache=shared"
instancePollTime = 5 * time.Second
snapshotTopologyInterval = 0 * time.Hour
Expand All @@ -62,6 +62,7 @@ var (

// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
fs.IntVar(&discoveryWorkers, "discovery-workers", discoveryWorkers, "Number of workers used for tablet discovery")
fs.StringVar(&sqliteDataFile, "sqlite-data-file", sqliteDataFile, "SQLite Datafile to use as VTOrc's database")
fs.DurationVar(&instancePollTime, "instance-poll-time", instancePollTime, "Timer duration on which VTOrc refreshes MySQL information")
fs.DurationVar(&snapshotTopologyInterval, "snapshot-topology-interval", snapshotTopologyInterval, "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours")
Expand All @@ -86,6 +87,7 @@ func RegisterFlags(fs *pflag.FlagSet) {
// strictly expected from user.
// TODO(sougou): change this to yaml parsing, and possible merge with tabletenv.
type Configuration struct {
DiscoveryWorkers uint // Number of workers used for tablet discovery
SQLite3DataFile string // full path to sqlite3 datafile
InstancePollSeconds uint // Number of seconds between instance reads
SnapshotTopologiesIntervalHours uint // Interval in hour between snapshot-topologies invocation. Default: 0 (disabled)
Expand Down Expand Up @@ -115,6 +117,7 @@ var readFileNames []string
// UpdateConfigValuesFromFlags is used to update the config values from the flags defined.
// This is done before we read any configuration files from the user. So the config files take precedence.
func UpdateConfigValuesFromFlags() {
Config.DiscoveryWorkers = uint(discoveryWorkers)
Config.SQLite3DataFile = sqliteDataFile
Config.InstancePollSeconds = uint(instancePollTime / time.Second)
Config.InstancePollSeconds = uint(instancePollTime / time.Second)
Expand Down Expand Up @@ -160,6 +163,7 @@ func LogConfigValues() {

func newConfiguration() *Configuration {
return &Configuration{
DiscoveryWorkers: 300,
SQLite3DataFile: "file::memory:?mode=memory&cache=shared",
InstancePollSeconds: 5,
SnapshotTopologiesIntervalHours: 0,
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtorc/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ CREATE INDEX last_checked_idx_database_instance ON database_instance(last_checke
CREATE INDEX last_seen_idx_database_instance ON database_instance(last_seen)
`,
`
CREATE INDEX hostname_port_database_instance ON database_instance(hostname, port)
`,
`
DROP TABLE IF EXISTS audit
`,
`
Expand Down
138 changes: 37 additions & 101 deletions go/vt/vtorc/discovery/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,142 +33,78 @@ import (
"vitess.io/vitess/go/vt/vtorc/config"
)

// QueueMetric contains the queue's active and queued sizes
type QueueMetric struct {
Active int
Queued int
// queueItem represents an item in the discovery.Queue.
type queueItem struct {
Key string
PushedAt time.Time
}

// Queue contains information for managing discovery requests
// Queue is an ordered queue with deduplication.
type Queue struct {
sync.Mutex

name string
done chan struct{}
queue chan string
queuedKeys map[string]time.Time
consumedKeys map[string]time.Time
metrics []QueueMetric
mu sync.Mutex
enqueued map[string]struct{}
queue chan queueItem
}

// DiscoveryQueue contains the discovery queue which can then be accessed via an API call for monitoring.
// Currently this is accessed by ContinuousDiscovery() but also from http api calls.
// I may need to protect this better?
var discoveryQueue map[string](*Queue)
var dcLock sync.Mutex

func init() {
discoveryQueue = make(map[string](*Queue))
}

// CreateOrReturnQueue allows for creation of a new discovery queue or
// returning a pointer to an existing one given the name.
func CreateOrReturnQueue(name string) *Queue {
dcLock.Lock()
defer dcLock.Unlock()
if q, found := discoveryQueue[name]; found {
return q
}

q := &Queue{
name: name,
queuedKeys: make(map[string]time.Time),
consumedKeys: make(map[string]time.Time),
queue: make(chan string, config.DiscoveryQueueCapacity),
}
go q.startMonitoring()

discoveryQueue[name] = q

return q
}

// monitoring queue sizes until we are told to stop
func (q *Queue) startMonitoring() {
log.Infof("Queue.startMonitoring(%s)", q.name)
ticker := time.NewTicker(time.Second) // hard-coded at every second

for {
select {
case <-ticker.C: // do the periodic expiry
q.collectStatistics()
case <-q.done:
return
}
// NewQueue creates a new queue.
func NewQueue() *Queue {
return &Queue{
enqueued: make(map[string]struct{}),
queue: make(chan queueItem, config.DiscoveryQueueCapacity),
}
}

// do a check of the entries in the queue, both those active and queued
func (q *Queue) collectStatistics() {
q.Lock()
defer q.Unlock()
// setKeyCheckEnqueued returns true if a key is already enqueued, if
// not the key will be marked as enqueued and false is returned.
func (q *Queue) setKeyCheckEnqueued(key string) (alreadyEnqueued bool) {
q.mu.Lock()
defer q.mu.Unlock()

q.metrics = append(q.metrics, QueueMetric{Queued: len(q.queuedKeys), Active: len(q.consumedKeys)})

// remove old entries if we get too big
if len(q.metrics) > config.DiscoveryQueueMaxStatisticsSize {
q.metrics = q.metrics[len(q.metrics)-config.DiscoveryQueueMaxStatisticsSize:]
_, alreadyEnqueued = q.enqueued[key]
if !alreadyEnqueued {
q.enqueued[key] = struct{}{}
}
return alreadyEnqueued
}

// QueueLen returns the length of the queue (channel size + queued size)
// QueueLen returns the length of the queue.
func (q *Queue) QueueLen() int {
q.Lock()
defer q.Unlock()
q.mu.Lock()
defer q.mu.Unlock()

return len(q.queue) + len(q.queuedKeys)
return len(q.enqueued)
}

// Push enqueues a key if it is not on a queue and is not being
// processed; silently returns otherwise.
func (q *Queue) Push(key string) {
q.Lock()
defer q.Unlock()

// is it enqueued already?
if _, found := q.queuedKeys[key]; found {
if q.setKeyCheckEnqueued(key) {
return
}

// is it being processed now?
if _, found := q.consumedKeys[key]; found {
return
q.queue <- queueItem{
Key: key,
PushedAt: time.Now(),
}

q.queuedKeys[key] = time.Now()
q.queue <- key
}

// Consume fetches a key to process; blocks if queue is empty.
// Release must be called once after Consume.
func (q *Queue) Consume() string {
q.Lock()
queue := q.queue
q.Unlock()
item := <-q.queue

key := <-queue

q.Lock()
defer q.Unlock()

// alarm if have been waiting for too long
timeOnQueue := time.Since(q.queuedKeys[key])
timeOnQueue := time.Since(item.PushedAt)
if timeOnQueue > time.Duration(config.Config.InstancePollSeconds)*time.Second {
log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", key, timeOnQueue.Seconds())
log.Warningf("key %v spent %.4fs waiting on a discovery queue", item.Key, timeOnQueue.Seconds())
}

q.consumedKeys[key] = q.queuedKeys[key]

delete(q.queuedKeys, key)

return key
return item.Key
}

// Release removes a key from a list of being processed keys
// which allows that key to be pushed into the queue again.
func (q *Queue) Release(key string) {
q.Lock()
defer q.Unlock()
q.mu.Lock()
defer q.mu.Unlock()

delete(q.consumedKeys, key)
delete(q.enqueued, key)
}
81 changes: 81 additions & 0 deletions go/vt/vtorc/discovery/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright 2025 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"strconv"
"testing"

"github.com/stretchr/testify/require"
)

func TestQueue(t *testing.T) {
q := NewQueue()
require.Zero(t, q.QueueLen())

// Push
q.Push(t.Name())
require.Equal(t, 1, q.QueueLen())
_, found := q.enqueued[t.Name()]
require.True(t, found)

// Push duplicate
q.Push(t.Name())
require.Equal(t, 1, q.QueueLen())

// Consume
require.Equal(t, t.Name(), q.Consume())
require.Equal(t, 1, q.QueueLen())
_, found = q.enqueued[t.Name()]
require.True(t, found)

// Release
q.Release(t.Name())
require.Zero(t, q.QueueLen())
_, found = q.enqueued[t.Name()]
require.False(t, found)
}

type testQueue interface {
QueueLen() int
Push(string)
Consume() string
Release(string)
}

func BenchmarkQueues(b *testing.B) {
tests := []struct {
name string
queue testQueue
}{
{"Current", NewQueue()},
}
for _, test := range tests {
q := test.queue
b.Run(test.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
for i := 0; i < 1000; i++ {
q.Push(b.Name() + strconv.Itoa(i))
}
q.QueueLen()
for i := 0; i < 1000; i++ {
q.Release(q.Consume())
}
}
})
}
}
2 changes: 1 addition & 1 deletion go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
goto Cleanup
}

fs, err = fullStatus(tabletAlias)
fs, err = fullStatus(tablet)
if err != nil {
goto Cleanup
}
Expand Down
12 changes: 0 additions & 12 deletions go/vt/vtorc/inst/shard_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,6 @@ func ReadShardNames(keyspaceName string) (shardNames []string, err error) {
return shardNames, err
}

// ReadAllShardNames reads the names of all vitess shards by keyspace.
func ReadAllShardNames() (shardNames map[string][]string, err error) {
shardNames = make(map[string][]string)
query := `select keyspace, shard from vitess_shard`
err = db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error {
ks := row.GetString("keyspace")
shardNames[ks] = append(shardNames[ks], row.GetString("shard"))
return nil
})
return shardNames, err
}

// ReadShardPrimaryInformation reads the vitess shard record and gets the shard primary alias and timestamp.
func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias string, primaryTimestamp string, err error) {
if err = topo.ValidateKeyspaceName(keyspaceName); err != nil {
Expand Down
7 changes: 0 additions & 7 deletions go/vt/vtorc/inst/shard_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,6 @@ func TestSaveReadAndDeleteShard(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{tt.shardName}, shardNames)

// ReadAllShardNames
allShardNames, err := ReadAllShardNames()
require.NoError(t, err)
ksShards, found := allShardNames[tt.keyspaceName]
require.True(t, found)
require.Equal(t, []string{tt.shardName}, ksShards)

// DeleteShard
require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName))
_, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName)
Expand Down
Loading
Loading