Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1160 from grafana/perOrgActiveMetrics
Browse files Browse the repository at this point in the history
support per-org metrics_active for scraping by prometheus
  • Loading branch information
Dieterbe authored Dec 14, 2018
2 parents 60eb98e + 0388fad commit 8d7e53e
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 27 deletions.
93 changes: 68 additions & 25 deletions mdata/aggmetrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mdata

import (
"strconv"
"sync"
"time"

Expand All @@ -18,7 +19,7 @@ type AggMetrics struct {
cachePusher cache.CachePusher
dropFirstChunk bool
sync.RWMutex
Metrics map[schema.MKey]*AggMetric
Metrics map[uint32]map[schema.Key]*AggMetric
chunkMaxStale uint32
metricMaxStale uint32
gcInterval time.Duration
Expand All @@ -29,7 +30,7 @@ func NewAggMetrics(store Store, cachePusher cache.CachePusher, dropFirstChunk bo
store: store,
cachePusher: cachePusher,
dropFirstChunk: dropFirstChunk,
Metrics: make(map[schema.MKey]*AggMetric),
Metrics: make(map[uint32]map[schema.Key]*AggMetric),
chunkMaxStale: chunkMaxStale,
metricMaxStale: metricMaxStale,
gcInterval: gcInterval,
Expand All @@ -54,44 +55,83 @@ func (ms *AggMetrics) GC() {
metricMinTs := now - uint32(ms.metricMaxStale)

// as this is the only goroutine that can delete from ms.Metrics
// we only need to lock long enough to get the list of actives metrics.
// it doesn't matter if new metrics are added while we iterate this list.
// we only need to lock long enough to get the list of orgs, then for each org
// get the list of active metrics.
// It doesn't matter if new orgs or metrics are added while we iterate these lists.
ms.RLock()
keys := make([]schema.MKey, 0, len(ms.Metrics))
for k := range ms.Metrics {
keys = append(keys, k)
orgs := make([]uint32, 0, len(ms.Metrics))
for o := range ms.Metrics {
orgs = append(orgs, o)
}
ms.RUnlock()
for _, key := range keys {
gcMetric.Inc()
for _, org := range orgs {
orgActiveMetrics := promActiveMetrics.WithLabelValues(strconv.Itoa(int(org)))
keys := make([]schema.Key, 0, len(ms.Metrics[org]))
ms.RLock()
a := ms.Metrics[key]
for k := range ms.Metrics[org] {
keys = append(keys, k)
}
ms.RUnlock()
for _, key := range keys {
gcMetric.Inc()
ms.RLock()
a := ms.Metrics[org][key]
ms.RUnlock()
if a.GC(now, chunkMinTs, metricMinTs) {
log.Debugf("metric %s is stale. Purging data from memory.", key)
ms.Lock()
delete(ms.Metrics[org], key)
orgActiveMetrics.Set(float64(len(ms.Metrics[org])))
ms.Unlock()
}
}
ms.RLock()
orgActive := len(ms.Metrics[org])
orgActiveMetrics.Set(float64(orgActive))
ms.RUnlock()
if a.GC(now, chunkMinTs, metricMinTs) {
log.Debugf("metric %s is stale. Purging data from memory.", key)

// If this org has no keys, then delete the org from the map
if orgActive == 0 {
// To prevent races, we need to check that there are still no metrics for the org while holding a write lock
ms.Lock()
delete(ms.Metrics, key)
metricsActive.Set(len(ms.Metrics))
promActiveMetrics.Set(float64(len(ms.Metrics)))
orgActive = len(ms.Metrics[org])
if orgActive == 0 {
delete(ms.Metrics, org)
}
ms.Unlock()
}
}

// Get the totalActive across all orgs.
totalActive := 0
ms.RLock()
for o := range ms.Metrics {
totalActive += len(ms.Metrics[o])
}
ms.RUnlock()
metricsActive.Set(totalActive)
}
}

func (ms *AggMetrics) Get(key schema.MKey) (Metric, bool) {
var m *AggMetric
ms.RLock()
m, ok := ms.Metrics[key]
_, ok := ms.Metrics[key.Org]
if ok {
m, ok = ms.Metrics[key.Org][key.Key]
}
ms.RUnlock()
return m, ok
}

func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metric {

var m *AggMetric
// in the most common case, it's already there and an Rlock is all we need
ms.RLock()
m, ok := ms.Metrics[key]
_, ok := ms.Metrics[key.Org]
if ok {
m, ok = ms.Metrics[key.Org][key.Key]
}
ms.RUnlock()
if ok {
return m
Expand All @@ -102,22 +142,25 @@ func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metri
}

agg := Aggregations.Get(aggId)
schema := Schemas.Get(schemaId)
confSchema := Schemas.Get(schemaId)

// if it wasn't there, get the write lock and prepare to add it
// but first we need to check again if someone has added it in
// the meantime (quite rare, but anyway)
ms.Lock()
m, ok = ms.Metrics[key]
if _, ok := ms.Metrics[key.Org]; !ok {
ms.Metrics[key.Org] = make(map[schema.Key]*AggMetric)
}
m, ok = ms.Metrics[key.Org][key.Key]
if ok {
ms.Unlock()
return m
}
m = NewAggMetric(ms.store, ms.cachePusher, k, schema.Retentions, schema.ReorderWindow, &agg, ms.dropFirstChunk)
ms.Metrics[key] = m
active := len(ms.Metrics)
m = NewAggMetric(ms.store, ms.cachePusher, k, confSchema.Retentions, confSchema.ReorderWindow, &agg, ms.dropFirstChunk)
ms.Metrics[key.Org][key.Key] = m
active := len(ms.Metrics[key.Org])
ms.Unlock()
metricsActive.Set(active)
promActiveMetrics.Set(float64(active))
metricsActive.Inc()
promActiveMetrics.WithLabelValues(strconv.Itoa(int(key.Org))).Set(float64(active))
return m
}
4 changes: 2 additions & 2 deletions mdata/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ var (
schemasFile = "/etc/metrictank/storage-schemas.conf"
aggFile = "/etc/metrictank/storage-aggregation.conf"

promActiveMetrics = promauto.NewGauge(prometheus.GaugeOpts{
promActiveMetrics = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "metrictank",
Name: "metrics_active",
Help: "Current # of active metrics",
})
}, []string{"org"})
)

func ConfigSetup() {
Expand Down

0 comments on commit 8d7e53e

Please sign in to comment.