diff --git a/CHANGELOG.md b/CHANGELOG.md index cb056ab5039..58d572c58be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423 * [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422 * [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452 +* [BUGFIX] Fixed float64 precision stability when aggregating metrics before exposing them. This could have lead to false counters resets when querying some metrics exposed by Cortex. #3506 ## Blocksconvert diff --git a/pkg/alertmanager/alertmanager_metrics.go b/pkg/alertmanager/alertmanager_metrics.go index 62617bb7036..d4e2b6991aa 100644 --- a/pkg/alertmanager/alertmanager_metrics.go +++ b/pkg/alertmanager/alertmanager_metrics.go @@ -1,8 +1,6 @@ package alertmanager import ( - "sync" - "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/util" @@ -11,9 +9,7 @@ import ( // This struct aggregates metrics exported by Alertmanager // and re-exports those aggregates as Cortex metrics. type alertmanagerMetrics struct { - // Maps userID -> registry - regsMu sync.Mutex - regs map[string]*prometheus.Registry + regs *util.UserRegistries // exported metrics, gathered from Alertmanager API alertsReceived *prometheus.Desc @@ -52,8 +48,7 @@ type alertmanagerMetrics struct { func newAlertmanagerMetrics() *alertmanagerMetrics { return &alertmanagerMetrics{ - regs: map[string]*prometheus.Registry{}, - regsMu: sync.Mutex{}, + regs: util.NewUserRegistries(), alertsReceived: prometheus.NewDesc( "cortex_alertmanager_alerts_received_total", "The total number of received alerts.", @@ -146,21 +141,7 @@ func newAlertmanagerMetrics() *alertmanagerMetrics { } func (m *alertmanagerMetrics) addUserRegistry(user string, reg *prometheus.Registry) { - m.regsMu.Lock() - m.regs[user] = reg - m.regsMu.Unlock() -} - -func (m *alertmanagerMetrics) registries() map[string]*prometheus.Registry { - regs := map[string]*prometheus.Registry{} - - m.regsMu.Lock() - defer m.regsMu.Unlock() - for uid, r := range m.regs { - regs[uid] = r - } - - return regs + m.regs.AddUserRegistry(user, reg) } func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) { @@ -189,7 +170,7 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) { } func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) { - data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.registries()) + data := m.regs.BuildMetricFamiliesPerUser() data.SendSumOfCountersPerUser(out, m.alertsReceived, "alertmanager_alerts_received_total") data.SendSumOfCountersPerUser(out, m.alertsInvalid, "alertmanager_alerts_invalid_total") diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 6a0d31582af..117b5f26538 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -1,8 +1,6 @@ package ingester import ( - "sync" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -256,13 +254,12 @@ type tsdbMetrics struct { memSeriesCreatedTotal *prometheus.Desc memSeriesRemovedTotal *prometheus.Desc - regsMu sync.RWMutex // custom mutex for shipper registry, to avoid blocking main user state mutex on collection - regs map[string]*prometheus.Registry // One prometheus registry per tenant + regs *util.UserRegistries } func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics { m := &tsdbMetrics{ - regs: make(map[string]*prometheus.Registry), + regs: util.NewUserRegistries(), dirSyncs: prometheus.NewDesc( "cortex_ingester_shipper_dir_syncs_total", @@ -419,7 +416,7 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) { } func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { - data := util.BuildMetricFamiliesPerUserFromUserRegistries(sm.registries()) + data := sm.regs.BuildMetricFamiliesPerUser() // OK, we have it all. Let's build results. data.SendSumOfCounters(out, sm.dirSyncs, "thanos_shipper_dir_syncs_total") @@ -455,20 +452,6 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfCountersPerUser(out, sm.memSeriesRemovedTotal, "prometheus_tsdb_head_series_removed_total") } -// make a copy of the map, so that metrics can be gathered while the new registry is being added. -func (sm *tsdbMetrics) registries() map[string]*prometheus.Registry { - sm.regsMu.RLock() - defer sm.regsMu.RUnlock() - - regs := make(map[string]*prometheus.Registry, len(sm.regs)) - for u, r := range sm.regs { - regs[u] = r - } - return regs -} - func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Registry) { - sm.regsMu.Lock() - sm.regs[userID] = registry - sm.regsMu.Unlock() + sm.regs.AddUserRegistry(userID, registry) } diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index ec7dd113fdd..5057215e014 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -109,7 +109,7 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou r.lastReloadSuccessful.DeleteLabelValues(userID) r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID) r.configUpdatesTotal.DeleteLabelValues(userID) - r.userManagerMetrics.DeleteUserRegistry(userID) + r.userManagerMetrics.RemoveUserRegistry(userID) level.Info(r.logger).Log("msg", "deleting rule manager", "user", userID) } } diff --git a/pkg/ruler/manager_metrics.go b/pkg/ruler/manager_metrics.go index f4c39422294..2787a695aec 100644 --- a/pkg/ruler/manager_metrics.go +++ b/pkg/ruler/manager_metrics.go @@ -1,8 +1,6 @@ package ruler import ( - "sync" - "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/util" @@ -11,9 +9,7 @@ import ( // ManagerMetrics aggregates metrics exported by the Prometheus // rules package and returns them as Cortex metrics type ManagerMetrics struct { - // Maps userID -> registry - regsMu sync.Mutex - regs map[string]*prometheus.Registry + regs *util.UserRegistries EvalDuration *prometheus.Desc IterationDuration *prometheus.Desc @@ -30,8 +26,7 @@ type ManagerMetrics struct { // NewManagerMetrics returns a ManagerMetrics struct func NewManagerMetrics() *ManagerMetrics { return &ManagerMetrics{ - regs: map[string]*prometheus.Registry{}, - regsMu: sync.Mutex{}, + regs: util.NewUserRegistries(), EvalDuration: prometheus.NewDesc( "cortex_prometheus_rule_evaluation_duration_seconds", @@ -96,33 +91,14 @@ func NewManagerMetrics() *ManagerMetrics { } } -// AddUserRegistry adds a Prometheus registry to the struct +// AddUserRegistry adds a user-specific Prometheus registry. func (m *ManagerMetrics) AddUserRegistry(user string, reg *prometheus.Registry) { - m.regsMu.Lock() - defer m.regsMu.Unlock() - - m.regs[user] = reg -} - -// DeleteUserRegistry removes user-specific Prometheus registry. -func (m *ManagerMetrics) DeleteUserRegistry(user string) { - m.regsMu.Lock() - defer m.regsMu.Unlock() - - delete(m.regs, user) + m.regs.AddUserRegistry(user, reg) } -// Registries returns a map of prometheus registries managed by the struct -func (m *ManagerMetrics) Registries() map[string]*prometheus.Registry { - regs := map[string]*prometheus.Registry{} - - m.regsMu.Lock() - defer m.regsMu.Unlock() - for uid, r := range m.regs { - regs[uid] = r - } - - return regs +// RemoveUserRegistry removes user-specific Prometheus registry. +func (m *ManagerMetrics) RemoveUserRegistry(user string) { + m.regs.RemoveUserRegistry(user) } // Describe implements the Collector interface @@ -141,10 +117,10 @@ func (m *ManagerMetrics) Describe(out chan<- *prometheus.Desc) { // Collect implements the Collector interface func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) { - data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.Registries()) + data := m.regs.BuildMetricFamiliesPerUser() // WARNING: It is important that all metrics generated in this method are "Per User". - // Thanks to that we can actually *remove* metrics for given user (see DeleteUserRegistry). + // Thanks to that we can actually *remove* metrics for given user (see RemoveUserRegistry). // If same user is later re-added, all metrics will start from 0, which is fine. data.SendSumOfSummariesPerUser(out, m.EvalDuration, "prometheus_rule_evaluation_duration_seconds") diff --git a/pkg/ruler/manager_metrics_test.go b/pkg/ruler/manager_metrics_test.go index 91149f59daf..07dac5c7a0d 100644 --- a/pkg/ruler/manager_metrics_test.go +++ b/pkg/ruler/manager_metrics_test.go @@ -22,7 +22,7 @@ func TestManagerMetrics(t *testing.T) { managerMetrics.AddUserRegistry("user3", populateManager(100)) managerMetrics.AddUserRegistry("user4", populateManager(1000)) - managerMetrics.DeleteUserRegistry("user4") + managerMetrics.RemoveUserRegistry("user4") //noinspection ALL err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` diff --git a/pkg/storegateway/bucket_store_metrics.go b/pkg/storegateway/bucket_store_metrics.go index 9cd96e13d45..224101db878 100644 --- a/pkg/storegateway/bucket_store_metrics.go +++ b/pkg/storegateway/bucket_store_metrics.go @@ -1,8 +1,6 @@ package storegateway import ( - "sync" - "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/util" @@ -11,9 +9,7 @@ import ( // BucketStoreMetrics aggregates metrics exported by Thanos Bucket Store // and re-exports those aggregates as Cortex metrics. type BucketStoreMetrics struct { - // Maps userID -> registry - regsMu sync.Mutex - regs map[string]*prometheus.Registry + regs *util.UserRegistries // exported metrics, gathered from Thanos BucketStore blockLoads *prometheus.Desc @@ -50,7 +46,7 @@ type BucketStoreMetrics struct { func NewBucketStoreMetrics() *BucketStoreMetrics { return &BucketStoreMetrics{ - regs: map[string]*prometheus.Registry{}, + regs: util.NewUserRegistries(), blockLoads: prometheus.NewDesc( "cortex_bucket_store_block_loads_total", @@ -168,21 +164,7 @@ func NewBucketStoreMetrics() *BucketStoreMetrics { } func (m *BucketStoreMetrics) AddUserRegistry(user string, reg *prometheus.Registry) { - m.regsMu.Lock() - m.regs[user] = reg - m.regsMu.Unlock() -} - -func (m *BucketStoreMetrics) registries() map[string]*prometheus.Registry { - regs := map[string]*prometheus.Registry{} - - m.regsMu.Lock() - defer m.regsMu.Unlock() - for uid, r := range m.regs { - regs[uid] = r - } - - return regs + m.regs.AddUserRegistry(user, reg) } func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc) { @@ -219,7 +201,7 @@ func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc) { } func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric) { - data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.registries()) + data := m.regs.BuildMetricFamiliesPerUser() data.SendSumOfCounters(out, m.blockLoads, "thanos_bucket_store_block_loads_total") data.SendSumOfCounters(out, m.blockLoadFailures, "thanos_bucket_store_block_load_failures_total") diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index f6b0da2529d..ff2b0718052 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -301,7 +301,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { // Start the configure number of gateways. var gateways []*StoreGateway var gatewayIds []string - registries := map[string]*prometheus.Registry{} + registries := util.NewUserRegistries() for i := 1; i <= testData.numGateways; i++ { instanceID := fmt.Sprintf("gateway-%d", i) @@ -333,7 +333,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { gateways = append(gateways, g) gatewayIds = append(gatewayIds, instanceID) - registries[instanceID] = reg + registries.AddUserRegistry(instanceID, reg) } // Wait until the ring client of each gateway has synced (to avoid flaky tests on subsequent assertions). @@ -356,7 +356,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { } // Assert on the number of blocks loaded extracting this information from metrics. - metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(registries) + metrics := registries.BuildMetricFamiliesPerUser() assert.Equal(t, float64(testData.expectedBlocksLoaded), metrics.GetSumOfGauges("cortex_bucket_store_blocks_loaded")) assert.Equal(t, float64(2*testData.numGateways), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_discovered")) @@ -550,7 +550,9 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck // Assert on the initial state. - metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{"test": reg}) + regs := util.NewUserRegistries() + regs.AddUserRegistry("test", reg) + metrics := regs.BuildMetricFamiliesPerUser() assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_storegateway_bucket_sync_total")) // Change the ring topology. @@ -563,14 +565,14 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) { // Assert whether the sync triggered or not. if testData.expectedSync { test.Poll(t, time.Second, float64(2), func() interface{} { - metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{"test": reg}) + metrics := regs.BuildMetricFamiliesPerUser() return metrics.GetSumOfCounters("cortex_storegateway_bucket_sync_total") }) } else { // Give some time to the store-gateway to trigger the sync (if any). time.Sleep(250 * time.Millisecond) - metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{"test": reg}) + metrics := regs.BuildMetricFamiliesPerUser() assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_storegateway_bucket_sync_total")) } }) diff --git a/pkg/storegateway/metadata_fetcher_metrics.go b/pkg/storegateway/metadata_fetcher_metrics.go index 1d02d1e9961..113dd616235 100644 --- a/pkg/storegateway/metadata_fetcher_metrics.go +++ b/pkg/storegateway/metadata_fetcher_metrics.go @@ -1,8 +1,6 @@ package storegateway import ( - "sync" - "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/util" @@ -11,9 +9,7 @@ import ( // This struct aggregates metrics exported by Thanos MetaFetcher // and re-exports those aggregates as Cortex metrics. type MetadataFetcherMetrics struct { - // Maps userID -> registry - regsMu sync.Mutex - regs map[string]*prometheus.Registry + regs *util.UserRegistries // Exported metrics, gathered from Thanos MetaFetcher syncs *prometheus.Desc @@ -29,7 +25,7 @@ type MetadataFetcherMetrics struct { func NewMetadataFetcherMetrics() *MetadataFetcherMetrics { return &MetadataFetcherMetrics{ - regs: map[string]*prometheus.Registry{}, + regs: util.NewUserRegistries(), syncs: prometheus.NewDesc( "cortex_blocks_meta_syncs_total", @@ -55,25 +51,10 @@ func NewMetadataFetcherMetrics() *MetadataFetcherMetrics { } func (m *MetadataFetcherMetrics) AddUserRegistry(user string, reg *prometheus.Registry) { - m.regsMu.Lock() - m.regs[user] = reg - m.regsMu.Unlock() -} - -func (m *MetadataFetcherMetrics) registries() map[string]*prometheus.Registry { - regs := map[string]*prometheus.Registry{} - - m.regsMu.Lock() - defer m.regsMu.Unlock() - for uid, r := range m.regs { - regs[uid] = r - } - - return regs + m.regs.AddUserRegistry(user, reg) } func (m *MetadataFetcherMetrics) Describe(out chan<- *prometheus.Desc) { - out <- m.syncs out <- m.syncFailures out <- m.syncDuration @@ -82,7 +63,7 @@ func (m *MetadataFetcherMetrics) Describe(out chan<- *prometheus.Desc) { } func (m *MetadataFetcherMetrics) Collect(out chan<- prometheus.Metric) { - data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.registries()) + data := m.regs.BuildMetricFamiliesPerUser() data.SendSumOfCounters(out, m.syncs, "blocks_meta_syncs_total") data.SendSumOfCounters(out, m.syncFailures, "blocks_meta_sync_failures_total") diff --git a/pkg/util/metrics_helper.go b/pkg/util/metrics_helper.go index 86ead8f79cc..ed7d5bc5d7c 100644 --- a/pkg/util/metrics_helper.go +++ b/pkg/util/metrics_helper.go @@ -120,32 +120,15 @@ func (mfm MetricFamilyMap) sumOfSingleValuesWithLabels(metric string, labelNames // MetricFamiliesPerUser is a collection of metrics gathered via calling Gatherer.Gather() method on different // gatherers, one per user. -type MetricFamiliesPerUser map[string]MetricFamilyMap - -func BuildMetricFamiliesPerUserFromUserRegistries(regs map[string]*prometheus.Registry) MetricFamiliesPerUser { - data := MetricFamiliesPerUser{} - for userID, r := range regs { - m, err := r.Gather() - if err == nil { - var mfm MetricFamilyMap // := would shadow err from outer block, and single err check will not work - mfm, err = NewMetricFamilyMap(m) - if err == nil { - data[userID] = mfm - } - } - - if err != nil { - level.Warn(Logger).Log("msg", "failed to gather metrics from registry", "user", userID, "err", err) - continue - } - } - return data +type MetricFamiliesPerUser []struct { + user string + metrics MetricFamilyMap } func (d MetricFamiliesPerUser) GetSumOfCounters(counter string) float64 { result := float64(0) - for _, userMetrics := range d { - result += userMetrics.SumCounters(counter) + for _, userEntry := range d { + result += userEntry.metrics.SumCounters(counter) } return result } @@ -159,28 +142,28 @@ func (d MetricFamiliesPerUser) SendSumOfCountersWithLabels(out chan<- prometheus } func (d MetricFamiliesPerUser) SendSumOfCountersPerUser(out chan<- prometheus.Metric, desc *prometheus.Desc, counter string) { - for user, userMetrics := range d { - v := userMetrics.SumCounters(counter) + for _, userEntry := range d { + v := userEntry.metrics.SumCounters(counter) - out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, v, user) + out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, v, userEntry.user) } } // SendSumOfCountersPerUserWithLabels provides metrics with the provided label names on a per-user basis. This function assumes that `user` is the // first label on the provided metric Desc func (d MetricFamiliesPerUser) SendSumOfCountersPerUserWithLabels(out chan<- prometheus.Metric, desc *prometheus.Desc, metric string, labelNames ...string) { - for user, userMetrics := range d { + for _, userEntry := range d { result := singleValueWithLabelsMap{} - userMetrics.sumOfSingleValuesWithLabels(metric, labelNames, counterValue, result.aggregateFn) - result.prependUserLabelValue(user) + userEntry.metrics.sumOfSingleValuesWithLabels(metric, labelNames, counterValue, result.aggregateFn) + result.prependUserLabelValue(userEntry.user) result.WriteToMetricChannel(out, desc, prometheus.CounterValue) } } func (d MetricFamiliesPerUser) GetSumOfGauges(gauge string) float64 { result := float64(0) - for _, userMetrics := range d { - result += userMetrics.SumGauges(gauge) + for _, userEntry := range d { + result += userEntry.metrics.SumGauges(gauge) } return result } @@ -196,26 +179,26 @@ func (d MetricFamiliesPerUser) SendSumOfGaugesWithLabels(out chan<- prometheus.M // SendSumOfGaugesPerUserWithLabels provides metrics with the provided label names on a per-user basis. This function assumes that `user` is the // first label on the provided metric Desc func (d MetricFamiliesPerUser) SendSumOfGaugesPerUserWithLabels(out chan<- prometheus.Metric, desc *prometheus.Desc, metric string, labelNames ...string) { - for user, userMetrics := range d { + for _, userEntry := range d { result := singleValueWithLabelsMap{} - userMetrics.sumOfSingleValuesWithLabels(metric, labelNames, gaugeValue, result.aggregateFn) - result.prependUserLabelValue(user) + userEntry.metrics.sumOfSingleValuesWithLabels(metric, labelNames, gaugeValue, result.aggregateFn) + result.prependUserLabelValue(userEntry.user) result.WriteToMetricChannel(out, desc, prometheus.GaugeValue) } } func (d MetricFamiliesPerUser) sumOfSingleValuesWithLabels(metric string, fn func(*dto.Metric) float64, labelNames []string) singleValueWithLabelsMap { result := singleValueWithLabelsMap{} - for _, userMetrics := range d { - userMetrics.sumOfSingleValuesWithLabels(metric, labelNames, fn, result.aggregateFn) + for _, userEntry := range d { + userEntry.metrics.sumOfSingleValuesWithLabels(metric, labelNames, fn, result.aggregateFn) } return result } func (d MetricFamiliesPerUser) SendMaxOfGauges(out chan<- prometheus.Metric, desc *prometheus.Desc, gauge string) { result := math.NaN() - for _, userMetrics := range d { - if value := userMetrics.MaxGauges(gauge); math.IsNaN(result) || value > result { + for _, userEntry := range d { + if value := userEntry.metrics.MaxGauges(gauge); math.IsNaN(result) || value > result { result = value } } @@ -229,16 +212,16 @@ func (d MetricFamiliesPerUser) SendMaxOfGauges(out chan<- prometheus.Metric, des } func (d MetricFamiliesPerUser) SendMaxOfGaugesPerUser(out chan<- prometheus.Metric, desc *prometheus.Desc, gauge string) { - for user, userMetrics := range d { - result := userMetrics.MaxGauges(gauge) - out <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, result, user) + for _, userEntry := range d { + result := userEntry.metrics.MaxGauges(gauge) + out <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, result, userEntry.user) } } func (d MetricFamiliesPerUser) SendSumOfSummaries(out chan<- prometheus.Metric, desc *prometheus.Desc, summaryName string) { summaryData := SummaryData{} - for _, userMetrics := range d { - userMetrics.SumSummariesTo(summaryName, &summaryData) + for _, userEntry := range d { + userEntry.metrics.SumSummariesTo(summaryName, &summaryData) } out <- summaryData.Metric(desc) } @@ -251,8 +234,8 @@ func (d MetricFamiliesPerUser) SendSumOfSummariesWithLabels(out chan<- prometheu result := map[string]summaryResult{} - for _, userMetrics := range d { - metricsPerLabelValue := getMetricsWithLabelNames(userMetrics[summaryName], labelNames) + for _, mfm := range d { + metricsPerLabelValue := getMetricsWithLabelNames(mfm.metrics[summaryName], labelNames) for key, mwl := range metricsPerLabelValue { for _, m := range mwl.metrics { @@ -273,16 +256,16 @@ func (d MetricFamiliesPerUser) SendSumOfSummariesWithLabels(out chan<- prometheu } func (d MetricFamiliesPerUser) SendSumOfSummariesPerUser(out chan<- prometheus.Metric, desc *prometheus.Desc, summaryName string) { - for user, userMetrics := range d { - data := userMetrics.SumSummaries(summaryName) - out <- data.Metric(desc, user) + for _, userEntry := range d { + data := userEntry.metrics.SumSummaries(summaryName) + out <- data.Metric(desc, userEntry.user) } } func (d MetricFamiliesPerUser) SendSumOfHistograms(out chan<- prometheus.Metric, desc *prometheus.Desc, histogramName string) { hd := HistogramData{} - for _, userMetrics := range d { - userMetrics.SumHistogramsTo(histogramName, &hd) + for _, userEntry := range d { + userEntry.metrics.SumHistogramsTo(histogramName, &hd) } out <- hd.Metric(desc) } @@ -295,8 +278,8 @@ func (d MetricFamiliesPerUser) SendSumOfHistogramsWithLabels(out chan<- promethe result := map[string]histogramResult{} - for _, userMetrics := range d { - metricsPerLabelValue := getMetricsWithLabelNames(userMetrics[histogramName], labelNames) + for _, mfm := range d { + metricsPerLabelValue := getMetricsWithLabelNames(mfm.metrics[histogramName], labelNames) for key, mwl := range metricsPerLabelValue { for _, m := range mwl.metrics { @@ -505,3 +488,83 @@ func (h *HistogramDataCollector) Add(hd HistogramData) { h.data.AddHistogramData(hd) } + +// UserRegistry holds a Prometheus registry associated to a specific user. +type UserRegistry struct { + user string + reg *prometheus.Registry +} + +// UserRegistries holds Prometheus registries for multiple users, guaranteeing +// multi-thread safety and stable ordering. +type UserRegistries struct { + regsMu sync.Mutex + regs []UserRegistry +} + +// NewUserRegistries makes new UserRegistries. +func NewUserRegistries() *UserRegistries { + return &UserRegistries{} +} + +// AddUserRegistry adds an user registry. It allows to add multiple registries for +// the same user and no check is done on user uniqueness. +func (r *UserRegistries) AddUserRegistry(user string, reg *prometheus.Registry) { + r.regsMu.Lock() + defer r.regsMu.Unlock() + + // New registries must be added to the end of the list, to guarantee stability. + r.regs = append(r.regs, UserRegistry{ + user: user, + reg: reg, + }) +} + +// RemoveUserRegistry removes all Prometheus registries for a given user. +func (r *UserRegistries) RemoveUserRegistry(user string) { + r.regsMu.Lock() + defer r.regsMu.Unlock() + + for idx, entry := range r.regs { + if user == entry.user { + r.regs = append(r.regs[:idx], r.regs[idx+1:]...) + } + } +} + +// Registries returns a copy of the user registries list. +func (r *UserRegistries) Registries() []UserRegistry { + r.regsMu.Lock() + defer r.regsMu.Unlock() + + out := make([]UserRegistry, 0, len(r.regs)) + out = append(out, r.regs...) + + return out +} + +func (r *UserRegistries) BuildMetricFamiliesPerUser() MetricFamiliesPerUser { + data := MetricFamiliesPerUser{} + for _, entry := range r.Registries() { + m, err := entry.reg.Gather() + if err == nil { + var mfm MetricFamilyMap // := would shadow err from outer block, and single err check will not work + mfm, err = NewMetricFamilyMap(m) + if err == nil { + data = append(data, struct { + user string + metrics MetricFamilyMap + }{ + user: entry.user, + metrics: mfm, + }) + } + } + + if err != nil { + level.Warn(Logger).Log("msg", "failed to gather metrics from registry", "user", entry.user, "err", err) + continue + } + } + return data +} diff --git a/pkg/util/metrics_helper_test.go b/pkg/util/metrics_helper_test.go index 9d35a2743e3..d2cc3566813 100644 --- a/pkg/util/metrics_helper_test.go +++ b/pkg/util/metrics_helper_test.go @@ -1,7 +1,10 @@ package util import ( + "math/rand" + "strconv" "testing" + "time" "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" @@ -115,17 +118,16 @@ func TestSendSumOfGaugesPerUserWithLabels(t *testing.T) { user1Reg.MustRegister(user1Metric) user2Reg.MustRegister(user2Metric) - mf := BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ - "user-1": user1Reg, - "user-2": user2Reg, - }) + regs := NewUserRegistries() + regs.AddUserRegistry("user-1", user1Reg) + regs.AddUserRegistry("user-2", user2Reg) + mf := regs.BuildMetricFamiliesPerUser() { desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_one"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendSumOfGaugesPerUserWithLabels(out, desc, "test_metric", "label_one") }) - require.NoError(t, err) expected := []*dto.Metric{ {Label: makeLabels("label_one", "a", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(180)}}, {Label: makeLabels("label_one", "a", "user", "user-2"), Gauge: &dto.Gauge{Value: proto.Float64(100)}}, @@ -135,10 +137,9 @@ func TestSendSumOfGaugesPerUserWithLabels(t *testing.T) { { desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_two"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendSumOfGaugesPerUserWithLabels(out, desc, "test_metric", "label_two") }) - require.NoError(t, err) expected := []*dto.Metric{ {Label: makeLabels("label_two", "b", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(100)}}, {Label: makeLabels("label_two", "c", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(80)}}, @@ -150,10 +151,9 @@ func TestSendSumOfGaugesPerUserWithLabels(t *testing.T) { { desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_one", "label_two"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendSumOfGaugesPerUserWithLabels(out, desc, "test_metric", "label_one", "label_two") }) - require.NoError(t, err) expected := []*dto.Metric{ {Label: makeLabels("label_one", "a", "label_two", "b", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(100)}}, {Label: makeLabels("label_one", "a", "label_two", "c", "user", "user-1"), Gauge: &dto.Gauge{Value: proto.Float64(80)}}, @@ -168,19 +168,18 @@ func TestSendMaxOfGauges(t *testing.T) { user1Reg := prometheus.NewRegistry() user2Reg := prometheus.NewRegistry() desc := prometheus.NewDesc("test_metric", "", nil, nil) + regs := NewUserRegistries() + regs.AddUserRegistry("user-1", user1Reg) + regs.AddUserRegistry("user-2", user2Reg) // No matching metric. - mf := BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ - "user-1": user1Reg, - "user-2": user2Reg, - }) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + mf := regs.BuildMetricFamiliesPerUser() + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendMaxOfGauges(out, desc, "test_metric") }) expected := []*dto.Metric{ {Label: nil, Gauge: &dto.Gauge{Value: proto.Float64(0)}}, } - require.NoError(t, err) require.ElementsMatch(t, expected, actual) // Register a metric for each user. @@ -188,18 +187,14 @@ func TestSendMaxOfGauges(t *testing.T) { user2Metric := promauto.With(user2Reg).NewGauge(prometheus.GaugeOpts{Name: "test_metric"}) user1Metric.Set(100) user2Metric.Set(80) - mf = BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ - "user-1": user1Reg, - "user-2": user2Reg, - }) + mf = regs.BuildMetricFamiliesPerUser() - actual, err = collectMetrics(func(out chan prometheus.Metric) { + actual = collectMetrics(t, func(out chan prometheus.Metric) { mf.SendMaxOfGauges(out, desc, "test_metric") }) expected = []*dto.Metric{ {Label: nil, Gauge: &dto.Gauge{Value: proto.Float64(100)}}, } - require.NoError(t, err) require.ElementsMatch(t, expected, actual) } @@ -217,17 +212,16 @@ func TestSendSumOfHistogramsWithLabels(t *testing.T) { user1Reg.MustRegister(user1Metric) user2Reg.MustRegister(user2Metric) - mf := BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ - "user-1": user1Reg, - "user-2": user2Reg, - }) + regs := NewUserRegistries() + regs.AddUserRegistry("user-1", user1Reg) + regs.AddUserRegistry("user-2", user2Reg) + mf := regs.BuildMetricFamiliesPerUser() { desc := prometheus.NewDesc("test_metric", "", []string{"label_one"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendSumOfHistogramsWithLabels(out, desc, "test_metric", "label_one") }) - require.NoError(t, err) expected := []*dto.Metric{ {Label: makeLabels("label_one", "a"), Histogram: &dto.Histogram{SampleCount: uint64p(4), SampleSum: float64p(10), Bucket: []*dto.Bucket{ {UpperBound: float64p(1), CumulativeCount: uint64p(1)}, @@ -240,10 +234,9 @@ func TestSendSumOfHistogramsWithLabels(t *testing.T) { { desc := prometheus.NewDesc("test_metric", "", []string{"label_two"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendSumOfHistogramsWithLabels(out, desc, "test_metric", "label_two") }) - require.NoError(t, err) expected := []*dto.Metric{ {Label: makeLabels("label_two", "b"), Histogram: &dto.Histogram{SampleCount: uint64p(2), SampleSum: float64p(4), Bucket: []*dto.Bucket{ {UpperBound: float64p(1), CumulativeCount: uint64p(1)}, @@ -261,10 +254,9 @@ func TestSendSumOfHistogramsWithLabels(t *testing.T) { { desc := prometheus.NewDesc("test_metric", "", []string{"label_one", "label_two"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendSumOfHistogramsWithLabels(out, desc, "test_metric", "label_one", "label_two") }) - require.NoError(t, err) expected := []*dto.Metric{ {Label: makeLabels("label_one", "a", "label_two", "b"), Histogram: &dto.Histogram{SampleCount: uint64p(2), SampleSum: float64p(4), Bucket: []*dto.Bucket{ {UpperBound: float64p(1), CumulativeCount: uint64p(1)}, @@ -296,17 +288,16 @@ func TestSumOfCounterPerUserWithLabels(t *testing.T) { user1Reg.MustRegister(user1Metric) user2Reg.MustRegister(user2Metric) - mf := BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ - "user-1": user1Reg, - "user-2": user2Reg, - }) + regs := NewUserRegistries() + regs.AddUserRegistry("user-1", user1Reg) + regs.AddUserRegistry("user-2", user2Reg) + mf := regs.BuildMetricFamiliesPerUser() { desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_one"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendSumOfCountersPerUserWithLabels(out, desc, "test_metric", "label_one") }) - require.NoError(t, err) expected := []*dto.Metric{ {Label: makeLabels("label_one", "a", "user", "user-1"), Counter: &dto.Counter{Value: proto.Float64(180)}}, {Label: makeLabels("label_one", "a", "user", "user-2"), Counter: &dto.Counter{Value: proto.Float64(100)}}, @@ -316,10 +307,9 @@ func TestSumOfCounterPerUserWithLabels(t *testing.T) { { desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_two"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendSumOfCountersPerUserWithLabels(out, desc, "test_metric", "label_two") }) - require.NoError(t, err) expected := []*dto.Metric{ {Label: makeLabels("label_two", "b", "user", "user-1"), Counter: &dto.Counter{Value: proto.Float64(100)}}, {Label: makeLabels("label_two", "c", "user", "user-1"), Counter: &dto.Counter{Value: proto.Float64(80)}}, @@ -331,10 +321,9 @@ func TestSumOfCounterPerUserWithLabels(t *testing.T) { { desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_one", "label_two"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendSumOfCountersPerUserWithLabels(out, desc, "test_metric", "label_one", "label_two") }) - require.NoError(t, err) expected := []*dto.Metric{ {Label: makeLabels("label_one", "a", "label_two", "b", "user", "user-1"), Counter: &dto.Counter{Value: proto.Float64(100)}}, {Label: makeLabels("label_one", "a", "label_two", "c", "user", "user-1"), Counter: &dto.Counter{Value: proto.Float64(80)}}, @@ -361,17 +350,16 @@ func TestSendSumOfSummariesPerUser(t *testing.T) { user1Reg.MustRegister(user1Metric) user2Reg.MustRegister(user2Metric) - mf := BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ - "user-1": user1Reg, - "user-2": user2Reg, - }) + regs := NewUserRegistries() + regs.AddUserRegistry("user-1", user1Reg) + regs.AddUserRegistry("user-2", user2Reg) + mf := regs.BuildMetricFamiliesPerUser() { desc := prometheus.NewDesc("test_metric", "", []string{"user"}, nil) - actual, err := collectMetrics(func(out chan prometheus.Metric) { + actual := collectMetrics(t, func(out chan prometheus.Metric) { mf.SendSumOfSummariesPerUser(out, desc, "test_metric") }) - require.NoError(t, err) expected := []*dto.Metric{ { Label: makeLabels("user", "user-1"), @@ -420,7 +408,107 @@ func TestSendSumOfSummariesPerUser(t *testing.T) { } } -func collectMetrics(send func(out chan prometheus.Metric)) ([]*dto.Metric, error) { +func TestFloat64PrecisionStability(t *testing.T) { + const ( + numRuns = 100 + numRegistries = 100 + cardinality = 20 + ) + + // Randomise the seed but log it in case we need to reproduce the test on failure. + seed := time.Now().UnixNano() + rand.Seed(seed) + t.Log("random generator seed:", seed) + + // Generate a large number of registries with different metrics each. + registries := NewUserRegistries() + for userID := 1; userID <= numRegistries; userID++ { + reg := prometheus.NewRegistry() + labelNames := []string{"label_one", "label_two"} + + g := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{Name: "test_gauge"}, labelNames) + for i := 0; i < cardinality; i++ { + g.WithLabelValues("a", strconv.Itoa(i)).Set(rand.Float64()) + } + + c := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter"}, labelNames) + for i := 0; i < cardinality; i++ { + c.WithLabelValues("a", strconv.Itoa(i)).Add(rand.Float64()) + } + + h := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{Name: "test_histogram", Buckets: []float64{0.1, 0.5, 1}}, labelNames) + for i := 0; i < cardinality; i++ { + h.WithLabelValues("a", strconv.Itoa(i)).Observe(rand.Float64()) + } + + s := promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{Name: "test_summary"}, labelNames) + for i := 0; i < cardinality; i++ { + s.WithLabelValues("a", strconv.Itoa(i)).Observe(rand.Float64()) + } + + registries.AddUserRegistry(strconv.Itoa(userID), reg) + } + + // Ensure multiple runs always return the same exact results. + expected := map[string][]*dto.Metric{} + + for run := 0; run < numRuns; run++ { + mf := registries.BuildMetricFamiliesPerUser() + + gauge := collectMetrics(t, func(out chan prometheus.Metric) { + mf.SendSumOfGauges(out, prometheus.NewDesc("test_gauge", "", nil, nil), "test_gauge") + }) + gaugeWithLabels := collectMetrics(t, func(out chan prometheus.Metric) { + mf.SendSumOfGaugesWithLabels(out, prometheus.NewDesc("test_gauge", "", []string{"label_one"}, nil), "test_gauge", "label_one") + }) + + counter := collectMetrics(t, func(out chan prometheus.Metric) { + mf.SendSumOfCounters(out, prometheus.NewDesc("test_counter", "", nil, nil), "test_counter") + }) + counterWithLabels := collectMetrics(t, func(out chan prometheus.Metric) { + mf.SendSumOfCountersWithLabels(out, prometheus.NewDesc("test_counter", "", []string{"label_one"}, nil), "test_counter", "label_one") + }) + + histogram := collectMetrics(t, func(out chan prometheus.Metric) { + mf.SendSumOfHistograms(out, prometheus.NewDesc("test_histogram", "", nil, nil), "test_histogram") + }) + histogramWithLabels := collectMetrics(t, func(out chan prometheus.Metric) { + mf.SendSumOfHistogramsWithLabels(out, prometheus.NewDesc("test_histogram", "", []string{"label_one"}, nil), "test_histogram", "label_one") + }) + + summary := collectMetrics(t, func(out chan prometheus.Metric) { + mf.SendSumOfSummaries(out, prometheus.NewDesc("test_summary", "", nil, nil), "test_summary") + }) + summaryWithLabels := collectMetrics(t, func(out chan prometheus.Metric) { + mf.SendSumOfSummariesWithLabels(out, prometheus.NewDesc("test_summary", "", []string{"label_one"}, nil), "test_summary", "label_one") + }) + + // The first run we just store the expected value. + if run == 0 { + expected["gauge"] = gauge + expected["gauge_with_labels"] = gaugeWithLabels + expected["counter"] = counter + expected["counter_with_labels"] = counterWithLabels + expected["histogram"] = histogram + expected["histogram_with_labels"] = histogramWithLabels + expected["summary"] = summary + expected["summary_with_labels"] = summaryWithLabels + continue + } + + // All subsequent runs we assert the actual metric with the expected one. + require.Equal(t, expected["gauge"], gauge) + require.Equal(t, expected["gauge_with_labels"], gaugeWithLabels) + require.Equal(t, expected["counter"], counter) + require.Equal(t, expected["counter_with_labels"], counterWithLabels) + require.Equal(t, expected["histogram"], histogram) + require.Equal(t, expected["histogram_with_labels"], histogramWithLabels) + require.Equal(t, expected["summary"], summary) + require.Equal(t, expected["summary_with_labels"], summaryWithLabels) + } +} + +func collectMetrics(t *testing.T, send func(out chan prometheus.Metric)) []*dto.Metric { out := make(chan prometheus.Metric) go func() { @@ -428,18 +516,16 @@ func collectMetrics(send func(out chan prometheus.Metric)) ([]*dto.Metric, error close(out) }() - metrics := []*dto.Metric{} + var metrics []*dto.Metric for m := range out { collected := &dto.Metric{} err := m.Write(collected) - if err != nil { - return nil, err - } + require.NoError(t, err) metrics = append(metrics, collected) } - return metrics, nil + return metrics } func float64p(v float64) *float64 {