Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

support per-org metrics_active for scraping by prometheus #1160

Merged
merged 3 commits into from
Dec 14, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 51 additions & 30 deletions mdata/aggmetrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mdata

import (
"strconv"
"sync"
"time"

Expand All @@ -18,7 +19,7 @@ type AggMetrics struct {
cachePusher cache.CachePusher
dropFirstChunk bool
sync.RWMutex
Metrics map[schema.MKey]*AggMetric
Metrics map[uint32]map[schema.MKey]*AggMetric
chunkMaxStale uint32
metricMaxStale uint32
gcInterval time.Duration
Expand All @@ -29,7 +30,7 @@ func NewAggMetrics(store Store, cachePusher cache.CachePusher, dropFirstChunk bo
store: store,
cachePusher: cachePusher,
dropFirstChunk: dropFirstChunk,
Metrics: make(map[schema.MKey]*AggMetric),
Metrics: make(map[uint32]map[schema.MKey]*AggMetric),
chunkMaxStale: chunkMaxStale,
metricMaxStale: metricMaxStale,
gcInterval: gcInterval,
Expand All @@ -53,45 +54,62 @@ func (ms *AggMetrics) GC() {
chunkMinTs := now - uint32(ms.chunkMaxStale)
metricMinTs := now - uint32(ms.metricMaxStale)

// as this is the only goroutine that can delete from ms.Metrics
// we only need to lock long enough to get the list of actives metrics.
// it doesn't matter if new metrics are added while we iterate this list.
ms.RLock()
keys := make([]schema.MKey, 0, len(ms.Metrics))
for k := range ms.Metrics {
keys = append(keys, k)
orgs := make([]uint32, 0, len(ms.Metrics))
for o := range ms.Metrics {
orgs = append(orgs, o)
}
ms.RUnlock()
for _, key := range keys {
gcMetric.Inc()
ms.RLock()
a := ms.Metrics[key]
totalActive := 0
for _, org := range orgs {
// as this is the only goroutine that can delete from ms.Metrics
// we only need to lock long enough to get the list of active metrics.
// it doesn't matter if new metrics are added while we iterate this list.
keys := make([]schema.MKey, 0, len(ms.Metrics[org]))
for k := range ms.Metrics[org] {
keys = append(keys, k)
}
ms.RUnlock()
if a.GC(now, chunkMinTs, metricMinTs) {
log.Debugf("metric %s is stale. Purging data from memory.", key)
ms.Lock()
delete(ms.Metrics, key)
metricsActive.Set(len(ms.Metrics))
promActiveMetrics.Set(float64(len(ms.Metrics)))
ms.Unlock()
for _, key := range keys {
gcMetric.Inc()
ms.RLock()
a := ms.Metrics[org][key]
ms.RUnlock()
if a.GC(now, chunkMinTs, metricMinTs) {
log.Debugf("metric %s is stale. Purging data from memory.", key)
ms.Lock()
delete(ms.Metrics[org], key)
ms.Unlock()
}
}
ms.RLock()
totalActive += len(ms.Metrics[org])
promActiveMetrics.WithLabelValues(strconv.Itoa(int(org))).Set(float64(len(ms.Metrics[org])))
ms.RUnlock()
}

metricsActive.Set(totalActive)
}
}

func (ms *AggMetrics) Get(key schema.MKey) (Metric, bool) {
var m *AggMetric
ms.RLock()
m, ok := ms.Metrics[key]
_, ok := ms.Metrics[key.Org]
if ok {
m, ok = ms.Metrics[key.Org][key]
}
ms.RUnlock()
return m, ok
}

func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metric {

var m *AggMetric
// in the most common case, it's already there and an Rlock is all we need
ms.RLock()
m, ok := ms.Metrics[key]
_, ok := ms.Metrics[key.Org]
if ok {
m, ok = ms.Metrics[key.Org][key]
}
ms.RUnlock()
if ok {
return m
Expand All @@ -102,22 +120,25 @@ func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metri
}

agg := Aggregations.Get(aggId)
schema := Schemas.Get(schemaId)
confSchema := Schemas.Get(schemaId)

// if it wasn't there, get the write lock and prepare to add it
// but first we need to check again if someone has added it in
// the meantime (quite rare, but anyway)
ms.Lock()
m, ok = ms.Metrics[key]
if _, ok := ms.Metrics[key.Org]; !ok {
ms.Metrics[key.Org] = make(map[schema.MKey]*AggMetric)
}
m, ok = ms.Metrics[key.Org][key]
if ok {
ms.Unlock()
return m
}
m = NewAggMetric(ms.store, ms.cachePusher, k, schema.Retentions, schema.ReorderWindow, &agg, ms.dropFirstChunk)
ms.Metrics[key] = m
active := len(ms.Metrics)
m = NewAggMetric(ms.store, ms.cachePusher, k, confSchema.Retentions, confSchema.ReorderWindow, &agg, ms.dropFirstChunk)
ms.Metrics[key.Org][key] = m
active := len(ms.Metrics[key.Org])
ms.Unlock()
metricsActive.Set(active)
promActiveMetrics.Set(float64(active))
metricsActive.Inc()
promActiveMetrics.WithLabelValues(strconv.Itoa(int(key.Org))).Set(float64(active))
return m
}
4 changes: 2 additions & 2 deletions mdata/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ var (
schemasFile = "/etc/metrictank/storage-schemas.conf"
aggFile = "/etc/metrictank/storage-aggregation.conf"

promActiveMetrics = promauto.NewGauge(prometheus.GaugeOpts{
promActiveMetrics = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "metrictank",
Name: "metrics_active",
Help: "Current # of active metrics",
})
}, []string{"org"})
)

func ConfigSetup() {
Expand Down