Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Flags:
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
--consul_auth_static_file string JSON File to read the topos/tokens from.
--discovery-workers int Number of workers used for tablet discovery (default 300)
--emit_stats If set, emit stats to push-based monitoring and stats backends
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
--grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy
Expand Down
21 changes: 0 additions & 21 deletions go/vt/vtorc/collection/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,6 @@ func (c *Collection) SetExpirePeriod(duration time.Duration) {
c.expirePeriod = duration
}

// ExpirePeriod returns the currently configured expiration period
func (c *Collection) ExpirePeriod() time.Duration {
c.Lock()
defer c.Unlock()
return c.expirePeriod
}

// StopAutoExpiration prepares to stop by terminating the auto-expiration process
func (c *Collection) StopAutoExpiration() {
if c == nil {
Expand Down Expand Up @@ -181,20 +174,6 @@ func (c *Collection) StartAutoExpiration() {
}
}

// Metrics returns a slice containing all the metric values
func (c *Collection) Metrics() []Metric {
if c == nil {
return nil
}
c.Lock()
defer c.Unlock()

if len(c.collection) == 0 {
return nil // nothing to return
}
return c.collection
}

// Since returns the Metrics on or after the given time. We assume
// the metrics are stored in ascending time.
// Iterate backwards until we reach the first value before the given time
Expand Down
38 changes: 6 additions & 32 deletions go/vt/vtorc/collection/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package collection
import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

var randomString = []string{
Expand Down Expand Up @@ -58,27 +60,6 @@ func TestCreateOrReturnCollection(t *testing.T) {
}
}

// TestExpirePeriod checks that the set expire period is returned
func TestExpirePeriod(t *testing.T) {
oneSecond := time.Second
twoSeconds := 2 * oneSecond

// create a new collection
c := &Collection{}

// check if we change it we get back the value we provided
c.SetExpirePeriod(oneSecond)
if c.ExpirePeriod() != oneSecond {
t.Errorf("TestExpirePeriod: did not get back oneSecond")
}

// change the period and check again
c.SetExpirePeriod(twoSeconds)
if c.ExpirePeriod() != twoSeconds {
t.Errorf("TestExpirePeriod: did not get back twoSeconds")
}
}

// dummy structure for testing
type testMetric struct {
}
Expand All @@ -90,15 +71,8 @@ func (tm *testMetric) When() time.Time {
// check that Append() works as expected
func TestAppend(t *testing.T) {
c := &Collection{}

if len(c.Metrics()) != 0 {
t.Errorf("TestAppend: len(Metrics) = %d, expecting %d", len(c.Metrics()), 0)
}
for _, v := range []int{1, 2, 3} {
tm := &testMetric{}
_ = c.Append(tm)
if len(c.Metrics()) != v {
t.Errorf("TestExpirePeriod: len(Metrics) = %d, expecting %d", len(c.Metrics()), v)
}
}
// Test for nil metric
err := c.Append(nil)
assert.Error(t, err)
assert.Equal(t, err.Error(), "Collection.Append: m == nil")
}
6 changes: 5 additions & 1 deletion go/vt/vtorc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ const (
AuditPageSize = 20
DebugMetricsIntervalSeconds = 10
StaleInstanceCoordinatesExpireSeconds = 60
DiscoveryMaxConcurrency = 300 // Number of goroutines doing hosts discovery
DiscoveryQueueCapacity = 100000
DiscoveryQueueMaxStatisticsSize = 120
DiscoveryCollectionRetentionSeconds = 120
Expand All @@ -48,6 +47,7 @@ const (
)

var (
discoveryWorkers = 300
sqliteDataFile = "file::memory:?mode=memory&cache=shared"
instancePollTime = 5 * time.Second
snapshotTopologyInterval = 0 * time.Hour
Expand All @@ -68,6 +68,7 @@ var (

// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
fs.Int("discovery-workers", discoveryWorkers, "Number of workers used for tablet discovery")
fs.StringVar(&sqliteDataFile, "sqlite-data-file", sqliteDataFile, "SQLite Datafile to use as VTOrc's database")
fs.DurationVar(&instancePollTime, "instance-poll-time", instancePollTime, "Timer duration on which VTOrc refreshes MySQL information")
fs.DurationVar(&snapshotTopologyInterval, "snapshot-topology-interval", snapshotTopologyInterval, "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours")
Expand All @@ -91,6 +92,7 @@ func RegisterFlags(fs *pflag.FlagSet) {
// strictly expected from user.
// TODO(sougou): change this to yaml parsing, and possible merge with tabletenv.
type Configuration struct {
DiscoveryWorkers uint // Number of workers used for tablet discovery
SQLite3DataFile string // full path to sqlite3 datafile
InstancePollSeconds uint // Number of seconds between instance reads
SnapshotTopologiesIntervalHours uint // Interval in hour between snapshot-topologies invocation. Default: 0 (disabled)
Expand Down Expand Up @@ -120,6 +122,7 @@ var readFileNames []string
// UpdateConfigValuesFromFlags is used to update the config values from the flags defined.
// This is done before we read any configuration files from the user. So the config files take precedence.
func UpdateConfigValuesFromFlags() {
Config.DiscoveryWorkers = uint(discoveryWorkers)
Config.SQLite3DataFile = sqliteDataFile
Config.InstancePollSeconds = uint(instancePollTime / time.Second)
Config.InstancePollSeconds = uint(instancePollTime / time.Second)
Expand Down Expand Up @@ -165,6 +168,7 @@ func LogConfigValues() {

func newConfiguration() *Configuration {
return &Configuration{
DiscoveryWorkers: 300,
SQLite3DataFile: "file::memory:?mode=memory&cache=shared",
InstancePollSeconds: 5,
SnapshotTopologiesIntervalHours: 0,
Expand Down
11 changes: 0 additions & 11 deletions go/vt/vtorc/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,6 @@ func (m *vtorcDB) QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.
return QueryVTOrc(query, argsArray, onRow)
}

type DummySQLResult struct {
}

func (dummyRes DummySQLResult) LastInsertId() (int64, error) {
return 0, nil
}

func (dummyRes DummySQLResult) RowsAffected() (int64, error) {
return 1, nil
}

// OpenTopology returns the DB instance for the vtorc backed database
func OpenVTOrc() (db *sql.DB, err error) {
var fromCache bool
Expand Down
9 changes: 0 additions & 9 deletions go/vt/vtorc/discovery/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ func max(values stats.Float64Data) float64 {
return s
}

// internal routine to return the minimum value or 9e9
func min(values stats.Float64Data) float64 {
s, err := stats.Min(values)
if err != nil {
return 9e9 // a large number (should use something better than this but it's ok for now)
}
return s
}

// internal routine to return the median or 0
func median(values stats.Float64Data) float64 {
s, err := stats.Median(values)
Expand Down
138 changes: 37 additions & 101 deletions go/vt/vtorc/discovery/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,142 +33,78 @@ import (
"vitess.io/vitess/go/vt/vtorc/config"
)

// QueueMetric contains the queue's active and queued sizes
type QueueMetric struct {
Active int
Queued int
// queueItem represents an item in the discovery.Queue.
type queueItem struct {
Key string
PushedAt time.Time
}

// Queue contains information for managing discovery requests
// Queue is an ordered queue with deduplication.
type Queue struct {
sync.Mutex

name string
done chan struct{}
queue chan string
queuedKeys map[string]time.Time
consumedKeys map[string]time.Time
metrics []QueueMetric
mu sync.Mutex
enqueued map[string]struct{}
queue chan queueItem
}

// DiscoveryQueue contains the discovery queue which can then be accessed via an API call for monitoring.
// Currently this is accessed by ContinuousDiscovery() but also from http api calls.
// I may need to protect this better?
var discoveryQueue map[string](*Queue)
var dcLock sync.Mutex

func init() {
discoveryQueue = make(map[string](*Queue))
}

// CreateOrReturnQueue allows for creation of a new discovery queue or
// returning a pointer to an existing one given the name.
func CreateOrReturnQueue(name string) *Queue {
dcLock.Lock()
defer dcLock.Unlock()
if q, found := discoveryQueue[name]; found {
return q
}

q := &Queue{
name: name,
queuedKeys: make(map[string]time.Time),
consumedKeys: make(map[string]time.Time),
queue: make(chan string, config.DiscoveryQueueCapacity),
}
go q.startMonitoring()

discoveryQueue[name] = q

return q
}

// monitoring queue sizes until we are told to stop
func (q *Queue) startMonitoring() {
log.Infof("Queue.startMonitoring(%s)", q.name)
ticker := time.NewTicker(time.Second) // hard-coded at every second

for {
select {
case <-ticker.C: // do the periodic expiry
q.collectStatistics()
case <-q.done:
return
}
// NewQueue creates a new queue.
func NewQueue() *Queue {
return &Queue{
enqueued: make(map[string]struct{}),
queue: make(chan queueItem, config.DiscoveryQueueCapacity),
}
}

// do a check of the entries in the queue, both those active and queued
func (q *Queue) collectStatistics() {
q.Lock()
defer q.Unlock()
// setKeyCheckEnqueued returns true if a key is already enqueued, if
// not the key will be marked as enqueued and false is returned.
func (q *Queue) setKeyCheckEnqueued(key string) (alreadyEnqueued bool) {
q.mu.Lock()
defer q.mu.Unlock()

q.metrics = append(q.metrics, QueueMetric{Queued: len(q.queuedKeys), Active: len(q.consumedKeys)})

// remove old entries if we get too big
if len(q.metrics) > config.DiscoveryQueueMaxStatisticsSize {
q.metrics = q.metrics[len(q.metrics)-config.DiscoveryQueueMaxStatisticsSize:]
_, alreadyEnqueued = q.enqueued[key]
if !alreadyEnqueued {
q.enqueued[key] = struct{}{}
}
return alreadyEnqueued
}

// QueueLen returns the length of the queue (channel size + queued size)
// QueueLen returns the length of the queue.
func (q *Queue) QueueLen() int {
q.Lock()
defer q.Unlock()
q.mu.Lock()
defer q.mu.Unlock()

return len(q.queue) + len(q.queuedKeys)
return len(q.enqueued)
}

// Push enqueues a key if it is not on a queue and is not being
// processed; silently returns otherwise.
func (q *Queue) Push(key string) {
q.Lock()
defer q.Unlock()

// is it enqueued already?
if _, found := q.queuedKeys[key]; found {
if q.setKeyCheckEnqueued(key) {
return
}

// is it being processed now?
if _, found := q.consumedKeys[key]; found {
return
q.queue <- queueItem{
Key: key,
PushedAt: time.Now(),
}

q.queuedKeys[key] = time.Now()
q.queue <- key
}

// Consume fetches a key to process; blocks if queue is empty.
// Release must be called once after Consume.
func (q *Queue) Consume() string {
q.Lock()
queue := q.queue
q.Unlock()
item := <-q.queue

key := <-queue

q.Lock()
defer q.Unlock()

// alarm if have been waiting for too long
timeOnQueue := time.Since(q.queuedKeys[key])
timeOnQueue := time.Since(item.PushedAt)
if timeOnQueue > time.Duration(config.Config.InstancePollSeconds)*time.Second {
log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", key, timeOnQueue.Seconds())
log.Warningf("key %v spent %.4fs waiting on a discovery queue", item.Key, timeOnQueue.Seconds())
}

q.consumedKeys[key] = q.queuedKeys[key]

delete(q.queuedKeys, key)

return key
return item.Key
}

// Release removes a key from a list of being processed keys
// which allows that key to be pushed into the queue again.
func (q *Queue) Release(key string) {
q.Lock()
defer q.Unlock()
q.mu.Lock()
defer q.mu.Unlock()

delete(q.consumedKeys, key)
delete(q.enqueued, key)
}
Loading
Loading