Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 28 additions & 50 deletions pkg/autoscaler/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ var (
// ErrNoData denotes that the collector could not calculate data.
ErrNoData = errors.New("no data available")

// ErrNotScraping denotes that the collector is not collecting metrics for the given resource.
ErrNotScraping = errors.New("the requested resource is not being scraped")
// ErrNotCollecting denotes that the collector is not collecting metrics for the given resource.
ErrNotCollecting = errors.New("no metrics are being collected for the requested resource")
)

// StatsScraperFactory creates a StatsScraper for a given Metric.
Expand Down Expand Up @@ -134,27 +134,17 @@ func NewMetricCollector(statsScraperFactory StatsScraperFactory, logger *zap.Sug

// CreateOrUpdate either creates a collection for the given metric or update it, should
// it already exist.
// Map access optimized via double-checked locking.
func (c *MetricCollector) CreateOrUpdate(metric *av1alpha1.Metric) error {
scraper, err := c.statsScraperFactory(metric, c.logger)
if err != nil {
return err
}
key := types.NamespacedName{Namespace: metric.Namespace, Name: metric.Name}

c.collectionsMutex.RLock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before y'all yell at me: The maximum concurrency at this piece of code is 2. I think I only put this in here because doubly-checked locking was my favorite toy at the time. 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😂

collection, exists := c.collections[key]
c.collectionsMutex.RUnlock()
if exists {
collection.updateScraper(scraper)
collection.updateMetric(metric)
return collection.lastError()
}

c.collectionsMutex.Lock()
defer c.collectionsMutex.Unlock()

collection, exists = c.collections[key]
collection, exists := c.collections[key]
if exists {
collection.updateScraper(scraper)
collection.updateMetric(metric)
Expand Down Expand Up @@ -217,14 +207,15 @@ func (c *MetricCollector) StableAndPanicConcurrency(key types.NamespacedName, no

collection, exists := c.collections[key]
if !exists {
return 0, 0, ErrNotScraping
return 0, 0, ErrNotCollecting
}

s, p, noData := collection.stableAndPanicConcurrency(now)
if noData && collection.currentMetric().Spec.ScrapeTarget != "" {
if collection.concurrencyBuckets.IsEmpty(now) && collection.currentMetric().Spec.ScrapeTarget != "" {
return 0, 0, ErrNoData
}
return s, p, nil
return collection.concurrencyBuckets.WindowAverage(now),
collection.concurrencyPanicBuckets.WindowAverage(now),
nil
}

// StableAndPanicRPS returns both the stable and the panic RPS.
Expand All @@ -235,33 +226,35 @@ func (c *MetricCollector) StableAndPanicRPS(key types.NamespacedName, now time.T

collection, exists := c.collections[key]
if !exists {
return 0, 0, ErrNotScraping
return 0, 0, ErrNotCollecting
}

s, p, noData := collection.stableAndPanicRPS(now)
if noData && collection.currentMetric().Spec.ScrapeTarget != "" {
if collection.rpsBuckets.IsEmpty(now) && collection.currentMetric().Spec.ScrapeTarget != "" {
return 0, 0, ErrNoData
}
return s, p, nil
return collection.rpsBuckets.WindowAverage(now),
collection.rpsPanicBuckets.WindowAverage(now),
nil
}

// collection represents the collection of metrics for one specific entity.
type collection struct {
metricMutex sync.RWMutex
metric *av1alpha1.Metric

errMutex sync.RWMutex
lastErr error

scraperMutex sync.RWMutex
scraper StatsScraper
// Fields relevant to metric collection in general.
concurrencyBuckets *aggregation.TimedFloat64Buckets
concurrencyPanicBuckets *aggregation.TimedFloat64Buckets
rpsBuckets *aggregation.TimedFloat64Buckets
rpsPanicBuckets *aggregation.TimedFloat64Buckets

grp sync.WaitGroup
stopCh chan struct{}
// Fields relevant for metric scraping specifically.
scraperMutex sync.RWMutex
scraper StatsScraper
errMutex sync.RWMutex
lastErr error
grp sync.WaitGroup
stopCh chan struct{}
}

func (c *collection) updateScraper(ss StatsScraper) {
Expand Down Expand Up @@ -303,10 +296,10 @@ func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, tickFactory f
defer c.grp.Done()

scrapeTicker := tickFactory(scrapeTickInterval)
defer scrapeTicker.Stop()
for {
select {
case <-c.stopCh:
scrapeTicker.Stop()
return
case <-scrapeTicker.C:
currentMetric := c.currentMetric()
Expand Down Expand Up @@ -334,6 +327,12 @@ func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, tickFactory f
return c
}

// close stops collecting metrics, stops the scraper.
func (c *collection) close() {
close(c.stopCh)
c.grp.Wait()
}

// updateMetric safely updates the metric stored in the collection.
func (c *collection) updateMetric(metric *av1alpha1.Metric) {
c.metricMutex.Lock()
Expand Down Expand Up @@ -386,27 +385,6 @@ func (c *collection) record(stat Stat) {
c.rpsPanicBuckets.Record(stat.Time, rps)
}

// stableAndPanicConcurrency calculates both stable and panic concurrency based on the
// current stats.
func (c *collection) stableAndPanicConcurrency(now time.Time) (float64, float64, bool) {
return c.concurrencyBuckets.WindowAverage(now),
c.concurrencyPanicBuckets.WindowAverage(now),
c.concurrencyBuckets.IsEmpty(now)
}

// stableAndPanicRPS calculates both stable and panic RPS based on the
// current stats.
func (c *collection) stableAndPanicRPS(now time.Time) (float64, float64, bool) {
return c.rpsBuckets.WindowAverage(now), c.rpsPanicBuckets.WindowAverage(now),
c.rpsBuckets.IsEmpty(now)
}

// close stops collecting metrics, stops the scraper.
func (c *collection) close() {
close(c.stopCh)
c.grp.Wait()
}

// add adds the stats from `src` to `dst`.
func (dst *Stat) add(src Stat) {
dst.AverageConcurrentRequests += src.AverageConcurrentRequests
Expand Down
27 changes: 18 additions & 9 deletions pkg/autoscaler/metrics/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ func TestMetricCollectorScraper(t *testing.T) {
// Deleting the metric should cause a calculation error.
coll.Delete(defaultNamespace, defaultName)
_, _, err = coll.StableAndPanicConcurrency(metricKey, now)
if err != ErrNotScraping {
t.Errorf("StableAndPanicConcurrency() = %v, want %v", err, ErrNotScraping)
if err != ErrNotCollecting {
t.Errorf("StableAndPanicConcurrency() = %v, want %v", err, ErrNotCollecting)
}
_, _, err = coll.StableAndPanicRPS(metricKey, now)
if err != ErrNotScraping {
t.Errorf("StableAndPanicRPS() = %v, want %v", err, ErrNotScraping)
if err != ErrNotCollecting {
t.Errorf("StableAndPanicRPS() = %v, want %v", err, ErrNotCollecting)
}
}

Expand Down Expand Up @@ -437,15 +437,23 @@ func TestMetricCollectorError(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
factory := scraperFactory(test.scraper, nil)
mtp := &fake.ManualTickProvider{
Channel: make(chan time.Time),
}
coll := NewMetricCollector(factory, logger)
coll.tickProvider = mtp.NewTicker

watchCh := make(chan types.NamespacedName)
coll.Watch(func(key types.NamespacedName) {
watchCh <- key
})

// Create a collection and immediately tick.
coll.CreateOrUpdate(testMetric)
key := types.NamespacedName{Namespace: testMetric.Namespace, Name: testMetric.Name}
mtp.Channel <- time.Now()

// Expect an event to be propagated because we're erroring.
key := types.NamespacedName{Namespace: testMetric.Namespace, Name: testMetric.Name}
event := <-watchCh
if event != key {
t.Fatalf("Event = %v, want %v", event, key)
Expand Down Expand Up @@ -497,14 +505,15 @@ func TestMetricCollectorAggregate(t *testing.T) {
}
c.record(stat)
}
st, pan, noData := c.stableAndPanicConcurrency(now.Add(time.Duration(9) * time.Second))
if noData {

now = now.Add(time.Duration(9) * time.Second)
if c.concurrencyBuckets.IsEmpty(now) {
t.Fatal("Unexpected NoData error")
}
if got, want := st, 11.5; got != want {
if got, want := c.concurrencyBuckets.WindowAverage(now), 11.5; got != want {
t.Errorf("Stable Concurrency = %f, want: %f", got, want)
}
if got, want := pan, 13.5; got != want {
if got, want := c.concurrencyPanicBuckets.WindowAverage(now), 13.5; got != want {
t.Errorf("Stable Concurrency = %f, want: %f", got, want)
}
}