diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index ba500a3732b..18b5e236f74 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -34,6 +34,7 @@ Flags: --config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s) --config-type string Config file type (omit to infer config type from file extension). --consul_auth_static_file string JSON File to read the topos/tokens from. + --discovery-workers int Number of workers used for tablet discovery (default 300) --emit_stats If set, emit stats to push-based monitoring and stats backends --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. --grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy diff --git a/go/vt/vtorc/collection/collection.go b/go/vt/vtorc/collection/collection.go index 0ef9a71b9a3..c70ef148b7f 100644 --- a/go/vt/vtorc/collection/collection.go +++ b/go/vt/vtorc/collection/collection.go @@ -128,13 +128,6 @@ func (c *Collection) SetExpirePeriod(duration time.Duration) { c.expirePeriod = duration } -// ExpirePeriod returns the currently configured expiration period -func (c *Collection) ExpirePeriod() time.Duration { - c.Lock() - defer c.Unlock() - return c.expirePeriod -} - // StopAutoExpiration prepares to stop by terminating the auto-expiration process func (c *Collection) StopAutoExpiration() { if c == nil { @@ -181,20 +174,6 @@ func (c *Collection) StartAutoExpiration() { } } -// Metrics returns a slice containing all the metric values -func (c *Collection) Metrics() []Metric { - if c == nil { - return nil - } - c.Lock() - defer c.Unlock() - - if len(c.collection) == 0 { - return nil // nothing to return - } - return c.collection -} - // Since returns the Metrics on or after the given time. We assume // the metrics are stored in ascending time. // Iterate backwards until we reach the first value before the given time diff --git a/go/vt/vtorc/collection/collection_test.go b/go/vt/vtorc/collection/collection_test.go index 23679245c26..c10d402769c 100644 --- a/go/vt/vtorc/collection/collection_test.go +++ b/go/vt/vtorc/collection/collection_test.go @@ -19,6 +19,8 @@ package collection import ( "testing" "time" + + "github.com/stretchr/testify/assert" ) var randomString = []string{ @@ -58,27 +60,6 @@ func TestCreateOrReturnCollection(t *testing.T) { } } -// TestExpirePeriod checks that the set expire period is returned -func TestExpirePeriod(t *testing.T) { - oneSecond := time.Second - twoSeconds := 2 * oneSecond - - // create a new collection - c := &Collection{} - - // check if we change it we get back the value we provided - c.SetExpirePeriod(oneSecond) - if c.ExpirePeriod() != oneSecond { - t.Errorf("TestExpirePeriod: did not get back oneSecond") - } - - // change the period and check again - c.SetExpirePeriod(twoSeconds) - if c.ExpirePeriod() != twoSeconds { - t.Errorf("TestExpirePeriod: did not get back twoSeconds") - } -} - // dummy structure for testing type testMetric struct { } @@ -90,15 +71,8 @@ func (tm *testMetric) When() time.Time { // check that Append() works as expected func TestAppend(t *testing.T) { c := &Collection{} - - if len(c.Metrics()) != 0 { - t.Errorf("TestAppend: len(Metrics) = %d, expecting %d", len(c.Metrics()), 0) - } - for _, v := range []int{1, 2, 3} { - tm := &testMetric{} - _ = c.Append(tm) - if len(c.Metrics()) != v { - t.Errorf("TestExpirePeriod: len(Metrics) = %d, expecting %d", len(c.Metrics()), v) - } - } + // Test for nil metric + err := c.Append(nil) + assert.Error(t, err) + assert.Equal(t, err.Error(), "Collection.Append: m == nil") } diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index ba3c41ddc61..5c020631748 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -39,7 +39,6 @@ const ( AuditPageSize = 20 DebugMetricsIntervalSeconds = 10 StaleInstanceCoordinatesExpireSeconds = 60 - DiscoveryMaxConcurrency = 300 // Number of goroutines doing hosts discovery DiscoveryQueueCapacity = 100000 DiscoveryQueueMaxStatisticsSize = 120 DiscoveryCollectionRetentionSeconds = 120 @@ -48,6 +47,7 @@ const ( ) var ( + discoveryWorkers = 300 sqliteDataFile = "file::memory:?mode=memory&cache=shared" instancePollTime = 5 * time.Second snapshotTopologyInterval = 0 * time.Hour @@ -68,6 +68,7 @@ var ( // RegisterFlags registers the flags required by VTOrc func RegisterFlags(fs *pflag.FlagSet) { + fs.Int("discovery-workers", discoveryWorkers, "Number of workers used for tablet discovery") fs.StringVar(&sqliteDataFile, "sqlite-data-file", sqliteDataFile, "SQLite Datafile to use as VTOrc's database") fs.DurationVar(&instancePollTime, "instance-poll-time", instancePollTime, "Timer duration on which VTOrc refreshes MySQL information") fs.DurationVar(&snapshotTopologyInterval, "snapshot-topology-interval", snapshotTopologyInterval, "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours") @@ -91,6 +92,7 @@ func RegisterFlags(fs *pflag.FlagSet) { // strictly expected from user. // TODO(sougou): change this to yaml parsing, and possible merge with tabletenv. type Configuration struct { + DiscoveryWorkers uint // Number of workers used for tablet discovery SQLite3DataFile string // full path to sqlite3 datafile InstancePollSeconds uint // Number of seconds between instance reads SnapshotTopologiesIntervalHours uint // Interval in hour between snapshot-topologies invocation. Default: 0 (disabled) @@ -120,6 +122,7 @@ var readFileNames []string // UpdateConfigValuesFromFlags is used to update the config values from the flags defined. // This is done before we read any configuration files from the user. So the config files take precedence. func UpdateConfigValuesFromFlags() { + Config.DiscoveryWorkers = uint(discoveryWorkers) Config.SQLite3DataFile = sqliteDataFile Config.InstancePollSeconds = uint(instancePollTime / time.Second) Config.InstancePollSeconds = uint(instancePollTime / time.Second) @@ -165,6 +168,7 @@ func LogConfigValues() { func newConfiguration() *Configuration { return &Configuration{ + DiscoveryWorkers: 300, SQLite3DataFile: "file::memory:?mode=memory&cache=shared", InstancePollSeconds: 5, SnapshotTopologiesIntervalHours: 0, diff --git a/go/vt/vtorc/db/db.go b/go/vt/vtorc/db/db.go index 688cc2039b8..64143477645 100644 --- a/go/vt/vtorc/db/db.go +++ b/go/vt/vtorc/db/db.go @@ -41,17 +41,6 @@ func (m *vtorcDB) QueryVTOrc(query string, argsArray []any, onRow func(sqlutils. return QueryVTOrc(query, argsArray, onRow) } -type DummySQLResult struct { -} - -func (dummyRes DummySQLResult) LastInsertId() (int64, error) { - return 0, nil -} - -func (dummyRes DummySQLResult) RowsAffected() (int64, error) { - return 1, nil -} - // OpenTopology returns the DB instance for the vtorc backed database func OpenVTOrc() (db *sql.DB, err error) { var fromCache bool diff --git a/go/vt/vtorc/discovery/funcs.go b/go/vt/vtorc/discovery/funcs.go index e468d10a420..eeafe2e20a4 100644 --- a/go/vt/vtorc/discovery/funcs.go +++ b/go/vt/vtorc/discovery/funcs.go @@ -47,15 +47,6 @@ func max(values stats.Float64Data) float64 { return s } -// internal routine to return the minimum value or 9e9 -func min(values stats.Float64Data) float64 { - s, err := stats.Min(values) - if err != nil { - return 9e9 // a large number (should use something better than this but it's ok for now) - } - return s -} - // internal routine to return the median or 0 func median(values stats.Float64Data) float64 { s, err := stats.Median(values) diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index 95751c6ae25..4caffe0d121 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -33,142 +33,78 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" ) -// QueueMetric contains the queue's active and queued sizes -type QueueMetric struct { - Active int - Queued int +// queueItem represents an item in the discovery.Queue. +type queueItem struct { + Key string + PushedAt time.Time } -// Queue contains information for managing discovery requests +// Queue is an ordered queue with deduplication. type Queue struct { - sync.Mutex - - name string - done chan struct{} - queue chan string - queuedKeys map[string]time.Time - consumedKeys map[string]time.Time - metrics []QueueMetric + mu sync.Mutex + enqueued map[string]struct{} + queue chan queueItem } -// DiscoveryQueue contains the discovery queue which can then be accessed via an API call for monitoring. -// Currently this is accessed by ContinuousDiscovery() but also from http api calls. -// I may need to protect this better? -var discoveryQueue map[string](*Queue) -var dcLock sync.Mutex - -func init() { - discoveryQueue = make(map[string](*Queue)) -} - -// CreateOrReturnQueue allows for creation of a new discovery queue or -// returning a pointer to an existing one given the name. -func CreateOrReturnQueue(name string) *Queue { - dcLock.Lock() - defer dcLock.Unlock() - if q, found := discoveryQueue[name]; found { - return q - } - - q := &Queue{ - name: name, - queuedKeys: make(map[string]time.Time), - consumedKeys: make(map[string]time.Time), - queue: make(chan string, config.DiscoveryQueueCapacity), - } - go q.startMonitoring() - - discoveryQueue[name] = q - - return q -} - -// monitoring queue sizes until we are told to stop -func (q *Queue) startMonitoring() { - log.Infof("Queue.startMonitoring(%s)", q.name) - ticker := time.NewTicker(time.Second) // hard-coded at every second - - for { - select { - case <-ticker.C: // do the periodic expiry - q.collectStatistics() - case <-q.done: - return - } +// NewQueue creates a new queue. +func NewQueue() *Queue { + return &Queue{ + enqueued: make(map[string]struct{}), + queue: make(chan queueItem, config.DiscoveryQueueCapacity), } } -// do a check of the entries in the queue, both those active and queued -func (q *Queue) collectStatistics() { - q.Lock() - defer q.Unlock() +// setKeyCheckEnqueued returns true if a key is already enqueued, if +// not the key will be marked as enqueued and false is returned. +func (q *Queue) setKeyCheckEnqueued(key string) (alreadyEnqueued bool) { + q.mu.Lock() + defer q.mu.Unlock() - q.metrics = append(q.metrics, QueueMetric{Queued: len(q.queuedKeys), Active: len(q.consumedKeys)}) - - // remove old entries if we get too big - if len(q.metrics) > config.DiscoveryQueueMaxStatisticsSize { - q.metrics = q.metrics[len(q.metrics)-config.DiscoveryQueueMaxStatisticsSize:] + _, alreadyEnqueued = q.enqueued[key] + if !alreadyEnqueued { + q.enqueued[key] = struct{}{} } + return alreadyEnqueued } -// QueueLen returns the length of the queue (channel size + queued size) +// QueueLen returns the length of the queue. func (q *Queue) QueueLen() int { - q.Lock() - defer q.Unlock() + q.mu.Lock() + defer q.mu.Unlock() - return len(q.queue) + len(q.queuedKeys) + return len(q.enqueued) } // Push enqueues a key if it is not on a queue and is not being // processed; silently returns otherwise. func (q *Queue) Push(key string) { - q.Lock() - defer q.Unlock() - - // is it enqueued already? - if _, found := q.queuedKeys[key]; found { + if q.setKeyCheckEnqueued(key) { return } - - // is it being processed now? - if _, found := q.consumedKeys[key]; found { - return + q.queue <- queueItem{ + Key: key, + PushedAt: time.Now(), } - - q.queuedKeys[key] = time.Now() - q.queue <- key } // Consume fetches a key to process; blocks if queue is empty. // Release must be called once after Consume. func (q *Queue) Consume() string { - q.Lock() - queue := q.queue - q.Unlock() + item := <-q.queue - key := <-queue - - q.Lock() - defer q.Unlock() - - // alarm if have been waiting for too long - timeOnQueue := time.Since(q.queuedKeys[key]) + timeOnQueue := time.Since(item.PushedAt) if timeOnQueue > time.Duration(config.Config.InstancePollSeconds)*time.Second { - log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", key, timeOnQueue.Seconds()) + log.Warningf("key %v spent %.4fs waiting on a discovery queue", item.Key, timeOnQueue.Seconds()) } - q.consumedKeys[key] = q.queuedKeys[key] - - delete(q.queuedKeys, key) - - return key + return item.Key } // Release removes a key from a list of being processed keys // which allows that key to be pushed into the queue again. func (q *Queue) Release(key string) { - q.Lock() - defer q.Unlock() + q.mu.Lock() + defer q.mu.Unlock() - delete(q.consumedKeys, key) + delete(q.enqueued, key) } diff --git a/go/vt/vtorc/discovery/queue_aggregated_stats.go b/go/vt/vtorc/discovery/queue_aggregated_stats.go deleted file mode 100644 index 79f2e310a58..00000000000 --- a/go/vt/vtorc/discovery/queue_aggregated_stats.go +++ /dev/null @@ -1,95 +0,0 @@ -/* - Copyright 2017 Simon J Mudd - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package discovery - -import ( - "github.com/montanaflynn/stats" - - "vitess.io/vitess/go/vt/log" -) - -// AggregatedQueueMetrics contains aggregate information some part queue metrics -type AggregatedQueueMetrics struct { - ActiveMinEntries float64 - ActiveMeanEntries float64 - ActiveMedianEntries float64 - ActiveP95Entries float64 - ActiveMaxEntries float64 - QueuedMinEntries float64 - QueuedMeanEntries float64 - QueuedMedianEntries float64 - QueuedP95Entries float64 - QueuedMaxEntries float64 -} - -// we pull out values in ints so convert to float64 for metric calculations -func intSliceToFloat64Slice(someInts []int) stats.Float64Data { - var slice stats.Float64Data - - for _, v := range someInts { - slice = append(slice, float64(v)) - } - - return slice -} - -// DiscoveryQueueMetrics returns some raw queue metrics based on the -// period (last N entries) requested. -func (q *Queue) DiscoveryQueueMetrics(period int) []QueueMetric { - q.Lock() - defer q.Unlock() - - // adjust period in case we ask for something that's too long - if period > len(q.metrics) { - log.Infof("DiscoveryQueueMetrics: wanted: %d, adjusting period to %d", period, len(q.metrics)) - period = len(q.metrics) - } - - a := q.metrics[len(q.metrics)-period:] - log.Infof("DiscoveryQueueMetrics: returning values: %+v", a) - return a -} - -// AggregatedDiscoveryQueueMetrics Returns some aggregate statistics -// based on the period (last N entries) requested. We store up to -// config.Config.DiscoveryQueueMaxStatisticsSize values and collect once -// a second so we expect period to be a smaller value. -func (q *Queue) AggregatedDiscoveryQueueMetrics(period int) *AggregatedQueueMetrics { - wanted := q.DiscoveryQueueMetrics(period) - - var activeEntries, queuedEntries []int - // fill vars - for i := range wanted { - activeEntries = append(activeEntries, wanted[i].Active) - queuedEntries = append(queuedEntries, wanted[i].Queued) - } - - a := &AggregatedQueueMetrics{ - ActiveMinEntries: min(intSliceToFloat64Slice(activeEntries)), - ActiveMeanEntries: mean(intSliceToFloat64Slice(activeEntries)), - ActiveMedianEntries: median(intSliceToFloat64Slice(activeEntries)), - ActiveP95Entries: percentile(intSliceToFloat64Slice(activeEntries), 95), - ActiveMaxEntries: max(intSliceToFloat64Slice(activeEntries)), - QueuedMinEntries: min(intSliceToFloat64Slice(queuedEntries)), - QueuedMeanEntries: mean(intSliceToFloat64Slice(queuedEntries)), - QueuedMedianEntries: median(intSliceToFloat64Slice(queuedEntries)), - QueuedP95Entries: percentile(intSliceToFloat64Slice(queuedEntries), 95), - QueuedMaxEntries: max(intSliceToFloat64Slice(queuedEntries)), - } - log.Infof("AggregatedDiscoveryQueueMetrics: returning values: %+v", a) - return a -} diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go new file mode 100644 index 00000000000..fa3e8c16c59 --- /dev/null +++ b/go/vt/vtorc/discovery/queue_test.go @@ -0,0 +1,81 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQueue(t *testing.T) { + q := NewQueue() + require.Zero(t, q.QueueLen()) + + // Push + q.Push(t.Name()) + require.Equal(t, 1, q.QueueLen()) + _, found := q.enqueued[t.Name()] + require.True(t, found) + + // Push duplicate + q.Push(t.Name()) + require.Equal(t, 1, q.QueueLen()) + + // Consume + require.Equal(t, t.Name(), q.Consume()) + require.Equal(t, 1, q.QueueLen()) + _, found = q.enqueued[t.Name()] + require.True(t, found) + + // Release + q.Release(t.Name()) + require.Zero(t, q.QueueLen()) + _, found = q.enqueued[t.Name()] + require.False(t, found) +} + +type testQueue interface { + QueueLen() int + Push(string) + Consume() string + Release(string) +} + +func BenchmarkQueues(b *testing.B) { + tests := []struct { + name string + queue testQueue + }{ + {"Current", NewQueue()}, + } + for _, test := range tests { + q := test.queue + b.Run(test.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + for i := 0; i < 1000; i++ { + q.Push(b.Name() + strconv.Itoa(i)) + } + q.QueueLen() + for i := 0; i < 1000; i++ { + q.Release(q.Consume()) + } + } + }) + } +} diff --git a/go/vt/vtorc/inst/instance_utils.go b/go/vt/vtorc/inst/instance_utils.go index f6bde729822..01302f00b4c 100644 --- a/go/vt/vtorc/inst/instance_utils.go +++ b/go/vt/vtorc/inst/instance_utils.go @@ -17,7 +17,6 @@ package inst import ( - "regexp" "strings" ) @@ -29,13 +28,3 @@ func MajorVersion(version string) []string { } return tokens[:2] } - -// RegexpMatchPatterns returns true if s matches any of the provided regexpPatterns -func RegexpMatchPatterns(s string, regexpPatterns []string) bool { - for _, filter := range regexpPatterns { - if matched, err := regexp.MatchString(filter, s); err == nil && matched { - return true - } - } - return false -} diff --git a/go/vt/vtorc/inst/instance_utils_test.go b/go/vt/vtorc/inst/instance_utils_test.go deleted file mode 100644 index f6247d5d6d0..00000000000 --- a/go/vt/vtorc/inst/instance_utils_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package inst - -import ( - "testing" -) - -type testPatterns struct { - s string - patterns []string - expected bool -} - -func TestRegexpMatchPatterns(t *testing.T) { - patterns := []testPatterns{ - {"hostname", []string{}, false}, - {"hostname", []string{"blah"}, false}, - {"hostname", []string{"blah", "blah"}, false}, - {"hostname", []string{"host", "blah"}, true}, - {"hostname", []string{"blah", "host"}, true}, - {"hostname", []string{"ho.tname"}, true}, - {"hostname", []string{"ho.tname2"}, false}, - {"hostname", []string{"ho.*me"}, true}, - } - - for _, p := range patterns { - if match := RegexpMatchPatterns(p.s, p.patterns); match != p.expected { - t.Errorf("RegexpMatchPatterns failed with: %q, %+v, got: %+v, expected: %+v", p.s, p.patterns, match, p.expected) - } - } -} diff --git a/go/vt/vtorc/inst/oracle_gtid_set.go b/go/vt/vtorc/inst/oracle_gtid_set.go index 0ddab05ef55..711232692f8 100644 --- a/go/vt/vtorc/inst/oracle_gtid_set.go +++ b/go/vt/vtorc/inst/oracle_gtid_set.go @@ -69,53 +69,6 @@ func (oracleGTIDSet *OracleGtidSet) RemoveUUID(uuid string) (removed bool) { return removed } -// RetainUUID retains only entries that belong to given UUID. -func (oracleGTIDSet *OracleGtidSet) RetainUUID(uuid string) (anythingRemoved bool) { - return oracleGTIDSet.RetainUUIDs([]string{uuid}) -} - -// RetainUUIDs retains only entries that belong to given UUIDs. -func (oracleGTIDSet *OracleGtidSet) RetainUUIDs(uuids []string) (anythingRemoved bool) { - retainUUIDs := map[string]bool{} - for _, uuid := range uuids { - retainUUIDs[uuid] = true - } - var filteredEntries []*OracleGtidSetEntry - for _, entry := range oracleGTIDSet.GtidEntries { - if retainUUIDs[entry.UUID] { - filteredEntries = append(filteredEntries, entry) - } else { - anythingRemoved = true - } - } - if anythingRemoved { - oracleGTIDSet.GtidEntries = filteredEntries - } - return anythingRemoved -} - -// SharedUUIDs returns UUIDs (range-less) that are shared between the two sets -func (oracleGTIDSet *OracleGtidSet) SharedUUIDs(other *OracleGtidSet) (shared []string) { - thisUUIDs := map[string]bool{} - for _, entry := range oracleGTIDSet.GtidEntries { - thisUUIDs[entry.UUID] = true - } - for _, entry := range other.GtidEntries { - if thisUUIDs[entry.UUID] { - shared = append(shared, entry.UUID) - } - } - return shared -} - -// Explode returns a user-friendly string representation of this entry -func (oracleGTIDSet *OracleGtidSet) Explode() (result []*OracleGtidSetEntry) { - for _, entries := range oracleGTIDSet.GtidEntries { - result = append(result, entries.Explode()...) - } - return result -} - func (oracleGTIDSet *OracleGtidSet) String() string { var tokens []string for _, entry := range oracleGTIDSet.GtidEntries { diff --git a/go/vt/vtorc/inst/oracle_gtid_set_entry.go b/go/vt/vtorc/inst/oracle_gtid_set_entry.go index 3affd326735..704b38760ef 100644 --- a/go/vt/vtorc/inst/oracle_gtid_set_entry.go +++ b/go/vt/vtorc/inst/oracle_gtid_set_entry.go @@ -18,16 +18,9 @@ package inst import ( "fmt" - "regexp" - "strconv" "strings" ) -var ( - singleValueInterval = regexp.MustCompile("^([0-9]+)$") - multiValueInterval = regexp.MustCompile("^([0-9]+)[-]([0-9]+)$") -) - // OracleGtidSetEntry represents an entry in a set of GTID ranges, // for example, the entry: "316d193c-70e5-11e5-adb2-ecf4bb2262ff:1-8935:8984-6124596" (may include gaps) type OracleGtidSetEntry struct { @@ -56,20 +49,3 @@ func NewOracleGtidSetEntry(gtidRangeString string) (*OracleGtidSetEntry, error) func (oracleGTIDSetEntry *OracleGtidSetEntry) String() string { return fmt.Sprintf("%s:%s", oracleGTIDSetEntry.UUID, oracleGTIDSetEntry.Ranges) } - -// String returns a user-friendly string representation of this entry -func (oracleGTIDSetEntry *OracleGtidSetEntry) Explode() (result [](*OracleGtidSetEntry)) { - intervals := strings.Split(oracleGTIDSetEntry.Ranges, ":") - for _, interval := range intervals { - if submatch := multiValueInterval.FindStringSubmatch(interval); submatch != nil { - intervalStart, _ := strconv.Atoi(submatch[1]) - intervalEnd, _ := strconv.Atoi(submatch[2]) - for i := intervalStart; i <= intervalEnd; i++ { - result = append(result, &OracleGtidSetEntry{UUID: oracleGTIDSetEntry.UUID, Ranges: fmt.Sprintf("%d", i)}) - } - } else if submatch := singleValueInterval.FindStringSubmatch(interval); submatch != nil { - result = append(result, &OracleGtidSetEntry{UUID: oracleGTIDSetEntry.UUID, Ranges: interval}) - } - } - return result -} diff --git a/go/vt/vtorc/inst/oracle_gtid_set_test.go b/go/vt/vtorc/inst/oracle_gtid_set_test.go index b62f9475696..7e5b61fe448 100644 --- a/go/vt/vtorc/inst/oracle_gtid_set_test.go +++ b/go/vt/vtorc/inst/oracle_gtid_set_test.go @@ -28,54 +28,6 @@ func TestNewOracleGtidSetEntry(t *testing.T) { } } -func TestExplode(t *testing.T) { - { - uuidSet := "00020194-3333-3333-3333-333333333333:7" - entry, err := NewOracleGtidSetEntry(uuidSet) - require.NoError(t, err) - - exploded := entry.Explode() - require.Equal(t, len(exploded), 1) - require.Equal(t, exploded[0].String(), "00020194-3333-3333-3333-333333333333:7") - } - { - uuidSet := "00020194-3333-3333-3333-333333333333:1-3" - entry, err := NewOracleGtidSetEntry(uuidSet) - require.NoError(t, err) - - exploded := entry.Explode() - require.Equal(t, len(exploded), 3) - require.Equal(t, exploded[0].String(), "00020194-3333-3333-3333-333333333333:1") - require.Equal(t, exploded[1].String(), "00020194-3333-3333-3333-333333333333:2") - require.Equal(t, exploded[2].String(), "00020194-3333-3333-3333-333333333333:3") - } - { - uuidSet := "00020194-3333-3333-3333-333333333333:1-3:6-7" - entry, err := NewOracleGtidSetEntry(uuidSet) - require.NoError(t, err) - - exploded := entry.Explode() - require.Equal(t, len(exploded), 5) - require.Equal(t, exploded[0].String(), "00020194-3333-3333-3333-333333333333:1") - require.Equal(t, exploded[1].String(), "00020194-3333-3333-3333-333333333333:2") - require.Equal(t, exploded[2].String(), "00020194-3333-3333-3333-333333333333:3") - require.Equal(t, exploded[3].String(), "00020194-3333-3333-3333-333333333333:6") - require.Equal(t, exploded[4].String(), "00020194-3333-3333-3333-333333333333:7") - } - { - gtidSetVal := "00020192-1111-1111-1111-111111111111:29-30, 00020194-3333-3333-3333-333333333333:7-8" - gtidSet, err := NewOracleGtidSet(gtidSetVal) - require.NoError(t, err) - - exploded := gtidSet.Explode() - require.Equal(t, len(exploded), 4) - require.Equal(t, exploded[0].String(), "00020192-1111-1111-1111-111111111111:29") - require.Equal(t, exploded[1].String(), "00020192-1111-1111-1111-111111111111:30") - require.Equal(t, exploded[2].String(), "00020194-3333-3333-3333-333333333333:7") - require.Equal(t, exploded[3].String(), "00020194-3333-3333-3333-333333333333:8") - } -} - func TestNewOracleGtidSet(t *testing.T) { { gtidSetVal := "00020192-1111-1111-1111-111111111111:20-30, 00020194-3333-3333-3333-333333333333:7-8" @@ -135,93 +87,3 @@ func TestRemoveUUID(t *testing.T) { require.True(t, gtidSet.IsEmpty()) } } - -func TestRetainUUID(t *testing.T) { - gtidSetVal := "00020192-1111-1111-1111-111111111111:20-30, 00020194-3333-3333-3333-333333333333:7-8" - { - gtidSet, err := NewOracleGtidSet(gtidSetVal) - require.NoError(t, err) - - require.Equal(t, len(gtidSet.GtidEntries), 2) - removed := gtidSet.RetainUUID("00020194-3333-3333-3333-333333333333") - require.True(t, removed) - require.Equal(t, len(gtidSet.GtidEntries), 1) - require.Equal(t, gtidSet.GtidEntries[0].String(), "00020194-3333-3333-3333-333333333333:7-8") - - removed = gtidSet.RetainUUID("00020194-3333-3333-3333-333333333333") - require.False(t, removed) - require.Equal(t, len(gtidSet.GtidEntries), 1) - require.Equal(t, gtidSet.GtidEntries[0].String(), "00020194-3333-3333-3333-333333333333:7-8") - - removed = gtidSet.RetainUUID("230ea8ea-81e3-11e4-972a-e25ec4bd140a") - require.True(t, removed) - require.Equal(t, len(gtidSet.GtidEntries), 0) - } -} - -func TestRetainUUIDs(t *testing.T) { - gtidSetVal := "00020192-1111-1111-1111-111111111111:20-30, 00020194-3333-3333-3333-333333333333:7-8" - { - gtidSet, err := NewOracleGtidSet(gtidSetVal) - require.NoError(t, err) - - require.Equal(t, len(gtidSet.GtidEntries), 2) - removed := gtidSet.RetainUUIDs([]string{"00020194-3333-3333-3333-333333333333", "00020194-5555-5555-5555-333333333333"}) - require.True(t, removed) - require.Equal(t, len(gtidSet.GtidEntries), 1) - require.Equal(t, gtidSet.GtidEntries[0].String(), "00020194-3333-3333-3333-333333333333:7-8") - - removed = gtidSet.RetainUUIDs([]string{"00020194-3333-3333-3333-333333333333", "00020194-5555-5555-5555-333333333333"}) - require.False(t, removed) - require.Equal(t, len(gtidSet.GtidEntries), 1) - require.Equal(t, gtidSet.GtidEntries[0].String(), "00020194-3333-3333-3333-333333333333:7-8") - - removed = gtidSet.RetainUUIDs([]string{"230ea8ea-81e3-11e4-972a-e25ec4bd140a"}) - require.True(t, removed) - require.Equal(t, len(gtidSet.GtidEntries), 0) - } -} - -func TestSharedUUIDs(t *testing.T) { - gtidSetVal := "00020192-1111-1111-1111-111111111111:20-30, 00020194-3333-3333-3333-333333333333:7-8" - gtidSet, err := NewOracleGtidSet(gtidSetVal) - require.NoError(t, err) - { - otherSet, err := NewOracleGtidSet("00020194-3333-3333-3333-333333333333:7-8,230ea8ea-81e3-11e4-972a-e25ec4bd140a:1-2") - require.NoError(t, err) - { - shared := gtidSet.SharedUUIDs(otherSet) - require.Equal(t, len(shared), 1) - require.Equal(t, shared[0], "00020194-3333-3333-3333-333333333333") - } - { - shared := otherSet.SharedUUIDs(gtidSet) - require.Equal(t, len(shared), 1) - require.Equal(t, shared[0], "00020194-3333-3333-3333-333333333333") - } - } - { - otherSet, err := NewOracleGtidSet("00020194-4444-4444-4444-333333333333:7-8,230ea8ea-81e3-11e4-972a-e25ec4bd140a:1-2") - require.NoError(t, err) - { - shared := gtidSet.SharedUUIDs(otherSet) - require.Equal(t, len(shared), 0) - } - { - shared := otherSet.SharedUUIDs(gtidSet) - require.Equal(t, len(shared), 0) - } - } - { - otherSet, err := NewOracleGtidSet("00020194-3333-3333-3333-333333333333:7-8,00020192-1111-1111-1111-111111111111:1-2") - require.NoError(t, err) - { - shared := gtidSet.SharedUUIDs(otherSet) - require.Equal(t, len(shared), 2) - } - { - shared := otherSet.SharedUUIDs(gtidSet) - require.Equal(t, len(shared), 2) - } - } -} diff --git a/go/vt/vtorc/inst/tablet_dao.go b/go/vt/vtorc/inst/tablet_dao.go index 5beccc09a29..a25b09edccb 100644 --- a/go/vt/vtorc/inst/tablet_dao.go +++ b/go/vt/vtorc/inst/tablet_dao.go @@ -74,6 +74,25 @@ func ReadTablet(tabletAlias string) (*topodatapb.Tablet, error) { return tablet, nil } +// ReadTabletCountsByCell returns the count of tablets watched by cell. +// The backend query uses an index by "cell": cell_idx_vitess_tablet. +func ReadTabletCountsByCell() (map[string]int64, error) { + tabletCounts := make(map[string]int64) + query := `SELECT + cell, + COUNT() AS count + FROM + vitess_tablet + GROUP BY + cell` + err := db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error { + cell := row.GetString("cell") + tabletCounts[cell] = row.GetInt64("count") + return nil + }) + return tabletCounts, err +} + // SaveTablet saves the tablet record against the instanceKey. func SaveTablet(tablet *topodatapb.Tablet) error { tabletp, err := prototext.Marshal(tablet) diff --git a/go/vt/vtorc/inst/tablet_dao_test.go b/go/vt/vtorc/inst/tablet_dao_test.go index a876d857ace..67fdf1a3227 100644 --- a/go/vt/vtorc/inst/tablet_dao_test.go +++ b/go/vt/vtorc/inst/tablet_dao_test.go @@ -91,3 +91,24 @@ func TestSaveAndReadTablet(t *testing.T) { }) } } + +func TestReadTabletCountsByCell(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + for i := 0; i < 100; i++ { + require.NoError(t, SaveTablet(&topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "cell1", + Uid: uint32(i), + }, + Keyspace: "test", + Shard: "-", + })) + } + tabletCounts, err := ReadTabletCountsByCell() + require.NoError(t, err) + require.Equal(t, map[string]int64{"cell1": 100}, tabletCounts) +} diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index 38903124b6d..953cac243a6 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -21,6 +21,7 @@ import ( "sync" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/key" @@ -77,7 +78,8 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error { refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer refreshCancel() - var wg sync.WaitGroup + + eg, _ := errgroup.WithContext(ctx) for idx, keyspace := range keyspaces { // Check if the current keyspace name is the same as the last one. // If it is, then we know we have already refreshed its information. @@ -85,19 +87,16 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error { if idx != 0 && keyspace == keyspaces[idx-1] { continue } - wg.Add(2) - go func(keyspace string) { - defer wg.Done() - _ = refreshKeyspaceHelper(refreshCtx, keyspace) - }(keyspace) - go func(keyspace string) { - defer wg.Done() - _ = refreshAllShards(refreshCtx, keyspace) - }(keyspace) - } - wg.Wait() - return nil + eg.Go(func() error { + return refreshKeyspaceHelper(refreshCtx, keyspace) + }) + + eg.Go(func() error { + return refreshAllShards(refreshCtx, keyspace) + }) + } + return eg.Wait() } // RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard. diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 91869bd9835..71851bf12fd 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -31,6 +31,7 @@ import ( "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" @@ -55,10 +56,30 @@ var ( // This is populated by parsing `--clusters_to_watch` flag. shardsToWatch map[string][]*topodatapb.KeyRange + statsTabletsWatched = stats.NewGaugesFuncWithMultiLabels( + "TabletsWatched", + "Number of tablets watched by cell", + []string{"cell"}, + getTabletsWatchedByCell, + ) + // ErrNoPrimaryTablet is a fixed error message. ErrNoPrimaryTablet = errors.New("no primary tablet found") ) +func getTabletsWatchedByCell() map[string]int64 { + tabletsWatchedByCell := make(map[string]int64) + tabletsByCell, err := inst.ReadTabletCountsByCell() + if err == nil { + log.Errorf("Failed to read tablet counts by cell: %+v", err) + return tabletsWatchedByCell + } + for cell, tabletCount := range tabletsByCell { + tabletsWatchedByCell[cell] = tabletCount + } + return tabletsWatchedByCell +} + // RegisterFlags registers the flags required by VTOrc func RegisterFlags(fs *pflag.FlagSet) { fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/keyranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") @@ -324,22 +345,20 @@ func getLockAction(analysedInstance string, code inst.AnalysisCode) string { } // LockShard locks the keyspace-shard preventing others from performing conflicting actions. -func LockShard(ctx context.Context, tabletAlias string, lockAction string) (context.Context, func(*error), error) { - if tabletAlias == "" { - return nil, nil, errors.New("can't lock shard: instance is unspecified") +func LockShard(ctx context.Context, keyspace, shard string, lockAction string) (context.Context, func(*error), error) { + if keyspace == "" { + return nil, nil, errors.New("can't lock shard: keyspace is unspecified") + } + if shard == "" { + return nil, nil, errors.New("can't lock shard: shard name is unspecified") } val := atomic.LoadInt32(&hasReceivedSIGTERM) if val > 0 { return nil, nil, errors.New("can't lock shard: SIGTERM received") } - tablet, err := inst.ReadTablet(tabletAlias) - if err != nil { - return nil, nil, err - } - atomic.AddInt32(&shardsLockCounter, 1) - ctx, unlock, err := ts.TryLockShard(ctx, tablet.Keyspace, tablet.Shard, lockAction) + ctx, unlock, err := ts.TryLockShard(ctx, keyspace, shard, lockAction) if err != nil { atomic.AddInt32(&shardsLockCounter, -1) return nil, nil, err diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index d6ac04d9414..a9ef6b1a794 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -657,7 +657,9 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er } // We lock the shard here and then refresh the tablets information - ctx, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedInstanceAlias, getLockAction(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis)) + ctx, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, + getLockAction(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis), + ) if err != nil { logger.Errorf("Failed to lock shard, aborting recovery: %v", err) return err diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 947150b9907..103ff4ab287 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -53,14 +53,19 @@ var snapshotDiscoveryKeys chan string var snapshotDiscoveryKeysMutex sync.Mutex var hasReceivedSIGTERM int32 -var discoveriesCounter = stats.NewCounter("discoveries.attempt", "Number of discoveries attempted") -var failedDiscoveriesCounter = stats.NewCounter("discoveries.fail", "Number of failed discoveries") -var instancePollSecondsExceededCounter = stats.NewCounter("discoveries.instance_poll_seconds_exceeded", "Number of instances that took longer than InstancePollSeconds to poll") -var discoveryQueueLengthGauge = stats.NewGauge("discoveries.queue_length", "Length of the discovery queue") -var discoveryRecentCountGauge = stats.NewGauge("discoveries.recent_count", "Number of recent discoveries") +var ( + discoveriesCounter = stats.NewCounter("discoveries.attempt", "Number of discoveries attempted") + failedDiscoveriesCounter = stats.NewCounter("discoveries.fail", "Number of failed discoveries") + instancePollSecondsExceededCounter = stats.NewCounter("discoveries.instance_poll_seconds_exceeded", "Number of instances that took longer than InstancePollSeconds to poll") + discoveryQueueLengthGauge = stats.NewGauge("discoveries.queue_length", "Length of the discovery queue") + discoveryRecentCountGauge = stats.NewGauge("discoveries.recent_count", "Number of recent discoveries") + discoveryWorkersGauge = stats.NewGauge("DiscoveryWorkers", "Number of discovery workers") + discoveryWorkersActiveGauge = stats.NewGauge("DiscoveryWorkersActive", "Number of discovery workers actively discovering tablets") + isElectedGauge = stats.NewGauge("elect.is_elected", "Elected state") + isHealthyGauge = stats.NewGauge("health.is_healthy", "Healthy state") +) + var discoveryMetrics = collection.CreateOrReturnCollection(DiscoveryMetricsName) -var isElectedGauge = stats.NewGauge("elect.is_elected", "Elected state") -var isHealthyGauge = stats.NewGauge("health.is_healthy", "Healthy state") var isElectedNode int64 @@ -148,12 +153,16 @@ func waitForLocksRelease() { // handleDiscoveryRequests iterates the discoveryQueue channel and calls upon // instance discovery per entry. func handleDiscoveryRequests() { - discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT") + discoveryQueue = discovery.NewQueue() // create a pool of discovery workers - for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ { + for i := uint(0); i < config.Config.DiscoveryWorkers; i++ { + discoveryWorkersGauge.Add(1) go func() { for { + // .Consume() blocks until there is a new key to process. + // We are not "active" until we got a tablet alias. tabletAlias := discoveryQueue.Consume() + // Possibly this used to be the elected node, but has // been demoted, while still the queue is full. if !IsLeaderOrActive() { @@ -163,8 +172,13 @@ func handleDiscoveryRequests() { continue } - DiscoverInstance(tabletAlias, false /* forceDiscovery */) - discoveryQueue.Release(tabletAlias) + func() { + discoveryWorkersActiveGauge.Add(1) + defer discoveryWorkersActiveGauge.Add(-1) + + DiscoverInstance(tabletAlias, false /* forceDiscovery */) + discoveryQueue.Release(tabletAlias) + }() } }() } diff --git a/go/vt/vtorc/metrics/query/aggregated.go b/go/vt/vtorc/metrics/query/aggregated.go deleted file mode 100644 index a284ca6f74d..00000000000 --- a/go/vt/vtorc/metrics/query/aggregated.go +++ /dev/null @@ -1,76 +0,0 @@ -// Package query provides query metrics with this file providing -// aggregated metrics based on the underlying values. -package query - -import ( - "time" - - "github.com/montanaflynn/stats" - - "vitess.io/vitess/go/vt/vtorc/collection" -) - -type AggregatedQueryMetrics struct { - // fill me in here - Count int - MaxLatencySeconds float64 - MeanLatencySeconds float64 - MedianLatencySeconds float64 - P95LatencySeconds float64 - MaxWaitSeconds float64 - MeanWaitSeconds float64 - MedianWaitSeconds float64 - P95WaitSeconds float64 -} - -// AggregatedSince returns the aggregated query metrics for the period -// given from the values provided. -func AggregatedSince(c *collection.Collection, t time.Time) AggregatedQueryMetrics { - - // Initialise timing metrics - var waitTimings []float64 - var queryTimings []float64 - - // Retrieve values since the time specified - values, err := c.Since(t) - a := AggregatedQueryMetrics{} - if err != nil { - return a // empty data - } - - // generate the metrics - for _, v := range values { - waitTimings = append(waitTimings, v.(*Metric).WaitLatency.Seconds()) - queryTimings = append(queryTimings, v.(*Metric).ExecuteLatency.Seconds()) - } - - a.Count = len(waitTimings) - - // generate aggregate values - if s, err := stats.Max(stats.Float64Data(waitTimings)); err == nil { - a.MaxWaitSeconds = s - } - if s, err := stats.Mean(stats.Float64Data(waitTimings)); err == nil { - a.MeanWaitSeconds = s - } - if s, err := stats.Median(stats.Float64Data(waitTimings)); err == nil { - a.MedianWaitSeconds = s - } - if s, err := stats.Percentile(stats.Float64Data(waitTimings), 95); err == nil { - a.P95WaitSeconds = s - } - if s, err := stats.Max(stats.Float64Data(queryTimings)); err == nil { - a.MaxLatencySeconds = s - } - if s, err := stats.Mean(stats.Float64Data(queryTimings)); err == nil { - a.MeanLatencySeconds = s - } - if s, err := stats.Median(stats.Float64Data(queryTimings)); err == nil { - a.MedianLatencySeconds = s - } - if s, err := stats.Percentile(stats.Float64Data(queryTimings), 95); err == nil { - a.P95LatencySeconds = s - } - - return a -} diff --git a/go/vt/vtorc/process/election_dao.go b/go/vt/vtorc/process/election_dao.go index 29ee42add1b..d5a133d786c 100644 --- a/go/vt/vtorc/process/election_dao.go +++ b/go/vt/vtorc/process/election_dao.go @@ -107,32 +107,6 @@ func AttemptElection() (bool, error) { return false, nil } -// GrabElection forcibly grabs leadership. Use with care!! -func GrabElection() error { - _, err := db.ExecVTOrc(` - replace into active_node ( - anchor, hostname, token, first_seen_active, last_seen_active - ) values ( - 1, ?, ?, datetime('now'), datetime('now') - ) - `, - ThisHostname, util.ProcessToken.Hash, - ) - if err != nil { - log.Error(err) - } - return err -} - -// Reelect clears the way for re-elections. Active node is immediately demoted. -func Reelect() error { - _, err := db.ExecVTOrc(`delete from active_node where anchor = 1`) - if err != nil { - log.Error(err) - } - return err -} - // ElectedNode returns the details of the elected node, as well as answering the question "is this process the elected one"? func ElectedNode() (node *NodeHealth, isElected bool, err error) { node = &NodeHealth{} diff --git a/go/vt/vtorc/util/token.go b/go/vt/vtorc/util/token.go index ff60e3e18ea..940f7a44698 100644 --- a/go/vt/vtorc/util/token.go +++ b/go/vt/vtorc/util/token.go @@ -24,10 +24,6 @@ import ( "time" ) -const ( - shortTokenLength = 8 -) - func toHash(input []byte) string { hasher := sha256.New() hasher.Write(input) @@ -50,13 +46,6 @@ type Token struct { Hash string } -func (token *Token) Short() string { - if len(token.Hash) <= shortTokenLength { - return token.Hash - } - return token.Hash[0:shortTokenLength] -} - var ProcessToken = NewToken() func NewToken() *Token {