Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422
* [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452
* [BUGFIX] Fixed float64 precision stability when aggregating metrics before exposing them. This could have lead to false counters resets when querying some metrics exposed by Cortex. #3506

## Blocksconvert

Expand Down
27 changes: 4 additions & 23 deletions pkg/alertmanager/alertmanager_metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package alertmanager

import (
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -11,9 +9,7 @@ import (
// This struct aggregates metrics exported by Alertmanager
// and re-exports those aggregates as Cortex metrics.
type alertmanagerMetrics struct {
// Maps userID -> registry
regsMu sync.Mutex
regs map[string]*prometheus.Registry
regs *util.UserRegistries

// exported metrics, gathered from Alertmanager API
alertsReceived *prometheus.Desc
Expand Down Expand Up @@ -52,8 +48,7 @@ type alertmanagerMetrics struct {

func newAlertmanagerMetrics() *alertmanagerMetrics {
return &alertmanagerMetrics{
regs: map[string]*prometheus.Registry{},
regsMu: sync.Mutex{},
regs: util.NewUserRegistries(),
alertsReceived: prometheus.NewDesc(
"cortex_alertmanager_alerts_received_total",
"The total number of received alerts.",
Expand Down Expand Up @@ -146,21 +141,7 @@ func newAlertmanagerMetrics() *alertmanagerMetrics {
}

func (m *alertmanagerMetrics) addUserRegistry(user string, reg *prometheus.Registry) {
m.regsMu.Lock()
m.regs[user] = reg
m.regsMu.Unlock()
}

func (m *alertmanagerMetrics) registries() map[string]*prometheus.Registry {
regs := map[string]*prometheus.Registry{}

m.regsMu.Lock()
defer m.regsMu.Unlock()
for uid, r := range m.regs {
regs[uid] = r
}

return regs
m.regs.AddUserRegistry(user, reg)
}

func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -189,7 +170,7 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
}

func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.registries())
data := m.regs.BuildMetricFamiliesPerUser()

data.SendSumOfCountersPerUser(out, m.alertsReceived, "alertmanager_alerts_received_total")
data.SendSumOfCountersPerUser(out, m.alertsInvalid, "alertmanager_alerts_invalid_total")
Expand Down
25 changes: 4 additions & 21 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package ingester

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -256,13 +254,12 @@ type tsdbMetrics struct {
memSeriesCreatedTotal *prometheus.Desc
memSeriesRemovedTotal *prometheus.Desc

regsMu sync.RWMutex // custom mutex for shipper registry, to avoid blocking main user state mutex on collection
regs map[string]*prometheus.Registry // One prometheus registry per tenant
regs *util.UserRegistries
}

func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics {
m := &tsdbMetrics{
regs: make(map[string]*prometheus.Registry),
regs: util.NewUserRegistries(),

dirSyncs: prometheus.NewDesc(
"cortex_ingester_shipper_dir_syncs_total",
Expand Down Expand Up @@ -419,7 +416,7 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) {
}

func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
data := util.BuildMetricFamiliesPerUserFromUserRegistries(sm.registries())
data := sm.regs.BuildMetricFamiliesPerUser()

// OK, we have it all. Let's build results.
data.SendSumOfCounters(out, sm.dirSyncs, "thanos_shipper_dir_syncs_total")
Expand Down Expand Up @@ -455,20 +452,6 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfCountersPerUser(out, sm.memSeriesRemovedTotal, "prometheus_tsdb_head_series_removed_total")
}

// make a copy of the map, so that metrics can be gathered while the new registry is being added.
func (sm *tsdbMetrics) registries() map[string]*prometheus.Registry {
sm.regsMu.RLock()
defer sm.regsMu.RUnlock()

regs := make(map[string]*prometheus.Registry, len(sm.regs))
for u, r := range sm.regs {
regs[u] = r
}
return regs
}

func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Registry) {
sm.regsMu.Lock()
sm.regs[userID] = registry
sm.regsMu.Unlock()
sm.regs.AddUserRegistry(userID, registry)
}
2 changes: 1 addition & 1 deletion pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
r.lastReloadSuccessful.DeleteLabelValues(userID)
r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID)
r.configUpdatesTotal.DeleteLabelValues(userID)
r.userManagerMetrics.DeleteUserRegistry(userID)
r.userManagerMetrics.RemoveUserRegistry(userID)
level.Info(r.logger).Log("msg", "deleting rule manager", "user", userID)
}
}
Expand Down
42 changes: 9 additions & 33 deletions pkg/ruler/manager_metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package ruler

import (
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -11,9 +9,7 @@ import (
// ManagerMetrics aggregates metrics exported by the Prometheus
// rules package and returns them as Cortex metrics
type ManagerMetrics struct {
// Maps userID -> registry
regsMu sync.Mutex
regs map[string]*prometheus.Registry
regs *util.UserRegistries

EvalDuration *prometheus.Desc
IterationDuration *prometheus.Desc
Expand All @@ -30,8 +26,7 @@ type ManagerMetrics struct {
// NewManagerMetrics returns a ManagerMetrics struct
func NewManagerMetrics() *ManagerMetrics {
return &ManagerMetrics{
regs: map[string]*prometheus.Registry{},
regsMu: sync.Mutex{},
regs: util.NewUserRegistries(),

EvalDuration: prometheus.NewDesc(
"cortex_prometheus_rule_evaluation_duration_seconds",
Expand Down Expand Up @@ -96,33 +91,14 @@ func NewManagerMetrics() *ManagerMetrics {
}
}

// AddUserRegistry adds a Prometheus registry to the struct
// AddUserRegistry adds a user-specific Prometheus registry.
func (m *ManagerMetrics) AddUserRegistry(user string, reg *prometheus.Registry) {
m.regsMu.Lock()
defer m.regsMu.Unlock()

m.regs[user] = reg
}

// DeleteUserRegistry removes user-specific Prometheus registry.
func (m *ManagerMetrics) DeleteUserRegistry(user string) {
m.regsMu.Lock()
defer m.regsMu.Unlock()

delete(m.regs, user)
m.regs.AddUserRegistry(user, reg)
}

// Registries returns a map of prometheus registries managed by the struct
func (m *ManagerMetrics) Registries() map[string]*prometheus.Registry {
regs := map[string]*prometheus.Registry{}

m.regsMu.Lock()
defer m.regsMu.Unlock()
for uid, r := range m.regs {
regs[uid] = r
}

return regs
// RemoveUserRegistry removes user-specific Prometheus registry.
func (m *ManagerMetrics) RemoveUserRegistry(user string) {
m.regs.RemoveUserRegistry(user)
}

// Describe implements the Collector interface
Expand All @@ -141,10 +117,10 @@ func (m *ManagerMetrics) Describe(out chan<- *prometheus.Desc) {

// Collect implements the Collector interface
func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) {
data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.Registries())
data := m.regs.BuildMetricFamiliesPerUser()

// WARNING: It is important that all metrics generated in this method are "Per User".
// Thanks to that we can actually *remove* metrics for given user (see DeleteUserRegistry).
// Thanks to that we can actually *remove* metrics for given user (see RemoveUserRegistry).
// If same user is later re-added, all metrics will start from 0, which is fine.

data.SendSumOfSummariesPerUser(out, m.EvalDuration, "prometheus_rule_evaluation_duration_seconds")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/manager_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestManagerMetrics(t *testing.T) {
managerMetrics.AddUserRegistry("user3", populateManager(100))

managerMetrics.AddUserRegistry("user4", populateManager(1000))
managerMetrics.DeleteUserRegistry("user4")
managerMetrics.RemoveUserRegistry("user4")

//noinspection ALL
err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(`
Expand Down
26 changes: 4 additions & 22 deletions pkg/storegateway/bucket_store_metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package storegateway

import (
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -11,9 +9,7 @@ import (
// BucketStoreMetrics aggregates metrics exported by Thanos Bucket Store
// and re-exports those aggregates as Cortex metrics.
type BucketStoreMetrics struct {
// Maps userID -> registry
regsMu sync.Mutex
regs map[string]*prometheus.Registry
regs *util.UserRegistries

// exported metrics, gathered from Thanos BucketStore
blockLoads *prometheus.Desc
Expand Down Expand Up @@ -50,7 +46,7 @@ type BucketStoreMetrics struct {

func NewBucketStoreMetrics() *BucketStoreMetrics {
return &BucketStoreMetrics{
regs: map[string]*prometheus.Registry{},
regs: util.NewUserRegistries(),

blockLoads: prometheus.NewDesc(
"cortex_bucket_store_block_loads_total",
Expand Down Expand Up @@ -168,21 +164,7 @@ func NewBucketStoreMetrics() *BucketStoreMetrics {
}

func (m *BucketStoreMetrics) AddUserRegistry(user string, reg *prometheus.Registry) {
m.regsMu.Lock()
m.regs[user] = reg
m.regsMu.Unlock()
}

func (m *BucketStoreMetrics) registries() map[string]*prometheus.Registry {
regs := map[string]*prometheus.Registry{}

m.regsMu.Lock()
defer m.regsMu.Unlock()
for uid, r := range m.regs {
regs[uid] = r
}

return regs
m.regs.AddUserRegistry(user, reg)
}

func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -219,7 +201,7 @@ func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc) {
}

func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric) {
data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.registries())
data := m.regs.BuildMetricFamiliesPerUser()

data.SendSumOfCounters(out, m.blockLoads, "thanos_bucket_store_block_loads_total")
data.SendSumOfCounters(out, m.blockLoadFailures, "thanos_bucket_store_block_load_failures_total")
Expand Down
14 changes: 8 additions & 6 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) {
// Start the configure number of gateways.
var gateways []*StoreGateway
var gatewayIds []string
registries := map[string]*prometheus.Registry{}
registries := util.NewUserRegistries()

for i := 1; i <= testData.numGateways; i++ {
instanceID := fmt.Sprintf("gateway-%d", i)
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) {

gateways = append(gateways, g)
gatewayIds = append(gatewayIds, instanceID)
registries[instanceID] = reg
registries.AddUserRegistry(instanceID, reg)
}

// Wait until the ring client of each gateway has synced (to avoid flaky tests on subsequent assertions).
Expand All @@ -356,7 +356,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) {
}

// Assert on the number of blocks loaded extracting this information from metrics.
metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(registries)
metrics := registries.BuildMetricFamiliesPerUser()
assert.Equal(t, float64(testData.expectedBlocksLoaded), metrics.GetSumOfGauges("cortex_bucket_store_blocks_loaded"))
assert.Equal(t, float64(2*testData.numGateways), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_discovered"))

Expand Down Expand Up @@ -550,7 +550,9 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) {
defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck

// Assert on the initial state.
metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{"test": reg})
regs := util.NewUserRegistries()
regs.AddUserRegistry("test", reg)
metrics := regs.BuildMetricFamiliesPerUser()
assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_storegateway_bucket_sync_total"))

// Change the ring topology.
Expand All @@ -563,14 +565,14 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) {
// Assert whether the sync triggered or not.
if testData.expectedSync {
test.Poll(t, time.Second, float64(2), func() interface{} {
metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{"test": reg})
metrics := regs.BuildMetricFamiliesPerUser()
return metrics.GetSumOfCounters("cortex_storegateway_bucket_sync_total")
})
} else {
// Give some time to the store-gateway to trigger the sync (if any).
time.Sleep(250 * time.Millisecond)

metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{"test": reg})
metrics := regs.BuildMetricFamiliesPerUser()
assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_storegateway_bucket_sync_total"))
}
})
Expand Down
Loading