From 908cf6b4106aa84b4ec99fe6c7fb315a9acc1d19 Mon Sep 17 00:00:00 2001 From: Rajneesh180 Date: Wed, 25 Mar 2026 17:16:15 +0530 Subject: [PATCH 1/3] fix(exporter/prometheus): add background cleanup for expired metric families Previously, stale metric families were only evicted during Collect() calls triggered by Prometheus scrapes. If no scraper was active, expired entries accumulated in the metricFamilies sync.Map indefinitely, causing unbounded memory growth. Add a background goroutine in Start() that ticks at the configured MetricExpiration interval and calls cleanupMetricFamilies() independently of scraping. The goroutine is stopped on Shutdown() via a stop channel. Fixes #41123 Signed-off-by: Rajneesh180 --- ...rometheus-exporter-metric-family-leak.yaml | 29 +++++++ exporter/prometheusexporter/accumulator.go | 19 +++++ exporter/prometheusexporter/collector_test.go | 2 + exporter/prometheusexporter/prometheus.go | 27 ++++++ .../prometheusexporter/prometheus_test.go | 84 +++++++++++++++++++ 5 files changed, 161 insertions(+) create mode 100644 .chloggen/fix-prometheus-exporter-metric-family-leak.yaml diff --git a/.chloggen/fix-prometheus-exporter-metric-family-leak.yaml b/.chloggen/fix-prometheus-exporter-metric-family-leak.yaml new file mode 100644 index 0000000000000..a11815e537acb --- /dev/null +++ b/.chloggen/fix-prometheus-exporter-metric-family-leak.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: exporter/prometheus + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix unbounded memory growth when metrics are no longer being scraped. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [41123] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Expired metric families now get cleaned up even when no Prometheus scraper is actively collecting, + preventing memory from growing indefinitely. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/prometheusexporter/accumulator.go b/exporter/prometheusexporter/accumulator.go index 7f458af625ce6..d85f215898e32 100644 --- a/exporter/prometheusexporter/accumulator.go +++ b/exporter/prometheusexporter/accumulator.go @@ -62,6 +62,9 @@ type accumulator interface { // Collect returns a slice with relevant aggregated metrics and their resource attributes. // The number or metrics and attributes returned will be the same. Collect() (metrics []pmetric.Metric, resourceAttrs []pcommon.Map, scopeNames, scopeVersions, scopeSchemaURLs []string, scopeAttributes []pcommon.Map) + // cleanupExpired removes registered metrics whose last update exceeds the + // configured expiration, independently of Collect. + cleanupExpired() } // LastValueAccumulator keeps last value for accumulated metrics @@ -425,6 +428,22 @@ func (a *lastValueAccumulator) Collect() ([]pmetric.Metric, []pcommon.Map, []str return metrics, resourceAttrs, scopeNames, scopeVersions, scopeSchemaURLs, scopeAttributes } +// cleanupExpired removes registered metrics that have exceeded the expiration +// time. This is the same expiration logic that Collect() performs inline, but +// can be called independently so that stale time series are evicted even when +// no Prometheus scrape is active. +func (a *lastValueAccumulator) cleanupExpired() { + expirationTime := time.Now().Add(-a.metricExpiration) + a.registeredMetrics.Range(func(key, value any) bool { + v := value.(*accumulatedValue) + if expirationTime.After(v.updated) { + a.logger.Debug(fmt.Sprintf("metric expired: %s", v.value.Name())) + a.registeredMetrics.Delete(key) + } + return true + }) +} + func timeseriesSignature(scopeName, scopeVersion, scopeSchemaURL string, scopeAttributes pcommon.Map, metric pmetric.Metric, attributes, resourceAttrs pcommon.Map) string { // Get a string builder from the pool sb := stringBuilderPool.Get().(*strings.Builder) diff --git a/exporter/prometheusexporter/collector_test.go b/exporter/prometheusexporter/collector_test.go index ac9d3444b95b1..322930309e84d 100644 --- a/exporter/prometheusexporter/collector_test.go +++ b/exporter/prometheusexporter/collector_test.go @@ -36,6 +36,8 @@ func (*mockAccumulator) Accumulate(pmetric.ResourceMetrics) (n int) { return 0 } +func (*mockAccumulator) cleanupExpired() {} + func (a *mockAccumulator) Collect() ([]pmetric.Metric, []pcommon.Map, []string, []string, []string, []pcommon.Map) { rAttrs := make([]pcommon.Map, len(a.metrics)) scopeNames := make([]string, len(a.metrics)) diff --git a/exporter/prometheusexporter/prometheus.go b/exporter/prometheusexporter/prometheus.go index 1fec8a4415ef2..f4933b72db2e5 100644 --- a/exporter/prometheusexporter/prometheus.go +++ b/exporter/prometheusexporter/prometheus.go @@ -8,6 +8,7 @@ import ( "errors" "net/http" "strings" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -25,6 +26,7 @@ type prometheusExporter struct { collector *collector registry *prometheus.Registry settings component.TelemetrySettings + stopCh chan struct{} // signals the background metric cleanup goroutine to stop } var errBlankPrometheusAddress = errors.New("expecting a non-blank address to run the Prometheus metrics handler") @@ -83,6 +85,27 @@ func (pe *prometheusExporter) Start(ctx context.Context, host component.Host) er _ = srv.Serve(ln) }() + // Start a background goroutine that periodically evicts expired metric families. + // Without this, cleanup only happens during Collect() (i.e. when Prometheus scrapes). + // If no scraper is active, stale entries in metricFamilies accumulate indefinitely, + // causing unbounded memory growth. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/41123 + if pe.collector.metricExpiration > 0 { + pe.stopCh = make(chan struct{}) + go func(stopCh chan struct{}) { + ticker := time.NewTicker(pe.collector.metricExpiration) + defer ticker.Stop() + for { + select { + case <-ticker.C: + pe.collector.accumulator.cleanupExpired() + pe.collector.cleanupMetricFamilies() + case <-stopCh: + return + } + } + }(pe.stopCh) + } + return nil } @@ -97,5 +120,9 @@ func (pe *prometheusExporter) ConsumeMetrics(_ context.Context, md pmetric.Metri } func (pe *prometheusExporter) Shutdown(ctx context.Context) error { + if pe.stopCh != nil { + close(pe.stopCh) + pe.stopCh = nil + } return pe.shutdownFunc(ctx) } diff --git a/exporter/prometheusexporter/prometheus_test.go b/exporter/prometheusexporter/prometheus_test.go index dd8bfeaa53f74..33e89a3fef2a3 100644 --- a/exporter/prometheusexporter/prometheus_test.go +++ b/exporter/prometheusexporter/prometheus_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -20,6 +21,7 @@ import ( "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" @@ -720,3 +722,85 @@ this_one_there_where_{arch="x86",instance="test-instance",job="test-service",os= }) } } + +// TestPrometheusExporter_BackgroundCleanup verifies that expired entries in both +// the accumulator's registeredMetrics and the collector's metricFamilies map are +// evicted by the background goroutine even when no Prometheus scrape occurs. +// This covers the memory leak described in +// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/41123 +func TestPrometheusExporter_BackgroundCleanup(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + cfg := &Config{ + ServerConfig: confighttp.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Transport: "tcp", + Endpoint: addr, + }, + }, + MetricExpiration: 50 * time.Millisecond, + } + + factory := NewFactory() + set := exportertest.NewNopSettings(metadata.Type) + exp, err := factory.CreateMetrics(t.Context(), set, cfg) + require.NoError(t, err) + + require.NoError(t, exp.Start(t.Context(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, exp.Shutdown(t.Context())) + }() + + c := exp.(*wrapMetricsExporter).exporter.collector + a := c.accumulator.(*lastValueAccumulator) + + // --- Seed the accumulator with a stale and a fresh time series --- + staleMetric := pmetric.NewMetric() + staleMetric.SetName("stale_accumulated") + a.registeredMetrics.Store("stale_acc_key", &accumulatedValue{ + value: staleMetric, + resourceAttrs: pcommon.NewMap(), + scopeAttributes: pcommon.NewMap(), + updated: time.Now().Add(-10 * time.Minute), + }) + freshMetric := pmetric.NewMetric() + freshMetric.SetName("fresh_accumulated") + a.registeredMetrics.Store("fresh_acc_key", &accumulatedValue{ + value: freshMetric, + resourceAttrs: pcommon.NewMap(), + scopeAttributes: pcommon.NewMap(), + updated: time.Now().Add(time.Hour), + }) + + // --- Seed the metricFamilies map with a stale and a fresh entry --- + gaugeType := io_prometheus_client.MetricType_GAUGE + c.metricFamilies.Store("stale_metric", metricFamily{ + lastSeen: time.Now().Add(-10 * time.Minute), + mf: &io_prometheus_client.MetricFamily{ + Name: proto.String("stale_metric"), + Help: proto.String("should be cleaned up"), + Type: &gaugeType, + }, + }) + c.metricFamilies.Store("fresh_metric", metricFamily{ + lastSeen: time.Now().Add(time.Hour), + mf: &io_prometheus_client.MetricFamily{ + Name: proto.String("fresh_metric"), + Help: proto.String("should remain"), + Type: &gaugeType, + }, + }) + + // Wait for the background ticker to fire. We intentionally do NOT scrape + // /metrics — the whole point is that cleanup happens independently of Collect(). + require.Eventually(t, func() bool { + _, mfFound := c.metricFamilies.Load("stale_metric") + _, accFound := a.registeredMetrics.Load("stale_acc_key") + return !mfFound && !accFound + }, 2*time.Second, 25*time.Millisecond, "stale entries were not removed by background cleanup") + + // Fresh entries must survive. + _, ok := c.metricFamilies.Load("fresh_metric") + assert.True(t, ok, "fresh_metric should not have been evicted") + _, ok = a.registeredMetrics.Load("fresh_acc_key") + assert.True(t, ok, "fresh_accumulated should not have been evicted") +} From ca699cea22b093571da1c5da641fe65eadc26662 Mon Sep 17 00:00:00 2001 From: Rajneesh180 Date: Tue, 31 Mar 2026 13:57:32 +0530 Subject: [PATCH 2/3] fix(exporter/prometheus): use synctest for background cleanup test Replace time-dependent require.Eventually with testing/synctest for deterministic fake-clock testing. Moves TestPrometheusExporter_BackgroundCleanup to a separate file with //go:build goexperiment.synctest build tag, matching the pattern used in deltatocumulativeprocessor/stale_test.go. Signed-off-by: Rajneesh Rana Signed-off-by: Rajneesh180 --- .../prometheus_synctest_test.go | 102 ++++++++++++++++++ .../prometheusexporter/prometheus_test.go | 84 --------------- 2 files changed, 102 insertions(+), 84 deletions(-) create mode 100644 exporter/prometheusexporter/prometheus_synctest_test.go diff --git a/exporter/prometheusexporter/prometheus_synctest_test.go b/exporter/prometheusexporter/prometheus_synctest_test.go new file mode 100644 index 0000000000000..e663f463c97e4 --- /dev/null +++ b/exporter/prometheusexporter/prometheus_synctest_test.go @@ -0,0 +1,102 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build goexperiment.synctest + +package prometheusexporter + +import ( + "testing" + "testing/synctest" + "time" + + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +// TestPrometheusExporter_BackgroundCleanup verifies that expired entries in both +// the accumulator's registeredMetrics and the collector's metricFamilies map are +// evicted by the background cleanup goroutine even when no Prometheus scrape occurs. +// Uses synctest for deterministic fake-clock testing instead of require.Eventually. +// Covers https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/41123 +func TestPrometheusExporter_BackgroundCleanup(t *testing.T) { + synctest.Run(func() { + expiration := 5 * time.Minute + c := newCollector(&Config{MetricExpiration: expiration}, zap.NewNop()) + a := c.accumulator.(*lastValueAccumulator) + + // Seed the accumulator with a stale and a fresh time series. + staleMetric := pmetric.NewMetric() + staleMetric.SetName("stale_accumulated") + a.registeredMetrics.Store("stale_acc_key", &accumulatedValue{ + value: staleMetric, + resourceAttrs: pcommon.NewMap(), + scopeAttributes: pcommon.NewMap(), + updated: time.Now().Add(-10 * time.Minute), + }) + freshMetric := pmetric.NewMetric() + freshMetric.SetName("fresh_accumulated") + a.registeredMetrics.Store("fresh_acc_key", &accumulatedValue{ + value: freshMetric, + resourceAttrs: pcommon.NewMap(), + scopeAttributes: pcommon.NewMap(), + updated: time.Now().Add(time.Hour), + }) + + // Seed the metricFamilies map with a stale and a fresh entry. + gaugeType := io_prometheus_client.MetricType_GAUGE + c.metricFamilies.Store("stale_metric", metricFamily{ + lastSeen: time.Now().Add(-10 * time.Minute), + mf: &io_prometheus_client.MetricFamily{ + Name: proto.String("stale_metric"), + Help: proto.String("should be cleaned up"), + Type: &gaugeType, + }, + }) + c.metricFamilies.Store("fresh_metric", metricFamily{ + lastSeen: time.Now().Add(time.Hour), + mf: &io_prometheus_client.MetricFamily{ + Name: proto.String("fresh_metric"), + Help: proto.String("should remain"), + Type: &gaugeType, + }, + }) + + // Start the background cleanup goroutine (same logic as prometheusExporter.Start). + stopCh := make(chan struct{}) + go func() { + ticker := time.NewTicker(expiration) + defer ticker.Stop() + for { + select { + case <-ticker.C: + a.cleanupExpired() + c.cleanupMetricFamilies() + case <-stopCh: + return + } + } + }() + defer close(stopCh) + + // Advance fake clock past the ticker interval so the cleanup fires. + time.Sleep(expiration + time.Second) + synctest.Wait() + + // Stale entries must have been evicted. + _, mfFound := c.metricFamilies.Load("stale_metric") + assert.False(t, mfFound, "stale_metric should have been evicted") + _, accFound := a.registeredMetrics.Load("stale_acc_key") + assert.False(t, accFound, "stale_accumulated should have been evicted") + + // Fresh entries must survive. + _, ok := c.metricFamilies.Load("fresh_metric") + assert.True(t, ok, "fresh_metric should not have been evicted") + _, ok = a.registeredMetrics.Load("fresh_acc_key") + assert.True(t, ok, "fresh_accumulated should not have been evicted") + }) +} diff --git a/exporter/prometheusexporter/prometheus_test.go b/exporter/prometheusexporter/prometheus_test.go index 33e89a3fef2a3..dd8bfeaa53f74 100644 --- a/exporter/prometheusexporter/prometheus_test.go +++ b/exporter/prometheusexporter/prometheus_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -21,7 +20,6 @@ import ( "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" - "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" @@ -722,85 +720,3 @@ this_one_there_where_{arch="x86",instance="test-instance",job="test-service",os= }) } } - -// TestPrometheusExporter_BackgroundCleanup verifies that expired entries in both -// the accumulator's registeredMetrics and the collector's metricFamilies map are -// evicted by the background goroutine even when no Prometheus scrape occurs. -// This covers the memory leak described in -// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/41123 -func TestPrometheusExporter_BackgroundCleanup(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - cfg := &Config{ - ServerConfig: confighttp.ServerConfig{ - NetAddr: confignet.AddrConfig{ - Transport: "tcp", - Endpoint: addr, - }, - }, - MetricExpiration: 50 * time.Millisecond, - } - - factory := NewFactory() - set := exportertest.NewNopSettings(metadata.Type) - exp, err := factory.CreateMetrics(t.Context(), set, cfg) - require.NoError(t, err) - - require.NoError(t, exp.Start(t.Context(), componenttest.NewNopHost())) - defer func() { - require.NoError(t, exp.Shutdown(t.Context())) - }() - - c := exp.(*wrapMetricsExporter).exporter.collector - a := c.accumulator.(*lastValueAccumulator) - - // --- Seed the accumulator with a stale and a fresh time series --- - staleMetric := pmetric.NewMetric() - staleMetric.SetName("stale_accumulated") - a.registeredMetrics.Store("stale_acc_key", &accumulatedValue{ - value: staleMetric, - resourceAttrs: pcommon.NewMap(), - scopeAttributes: pcommon.NewMap(), - updated: time.Now().Add(-10 * time.Minute), - }) - freshMetric := pmetric.NewMetric() - freshMetric.SetName("fresh_accumulated") - a.registeredMetrics.Store("fresh_acc_key", &accumulatedValue{ - value: freshMetric, - resourceAttrs: pcommon.NewMap(), - scopeAttributes: pcommon.NewMap(), - updated: time.Now().Add(time.Hour), - }) - - // --- Seed the metricFamilies map with a stale and a fresh entry --- - gaugeType := io_prometheus_client.MetricType_GAUGE - c.metricFamilies.Store("stale_metric", metricFamily{ - lastSeen: time.Now().Add(-10 * time.Minute), - mf: &io_prometheus_client.MetricFamily{ - Name: proto.String("stale_metric"), - Help: proto.String("should be cleaned up"), - Type: &gaugeType, - }, - }) - c.metricFamilies.Store("fresh_metric", metricFamily{ - lastSeen: time.Now().Add(time.Hour), - mf: &io_prometheus_client.MetricFamily{ - Name: proto.String("fresh_metric"), - Help: proto.String("should remain"), - Type: &gaugeType, - }, - }) - - // Wait for the background ticker to fire. We intentionally do NOT scrape - // /metrics — the whole point is that cleanup happens independently of Collect(). - require.Eventually(t, func() bool { - _, mfFound := c.metricFamilies.Load("stale_metric") - _, accFound := a.registeredMetrics.Load("stale_acc_key") - return !mfFound && !accFound - }, 2*time.Second, 25*time.Millisecond, "stale entries were not removed by background cleanup") - - // Fresh entries must survive. - _, ok := c.metricFamilies.Load("fresh_metric") - assert.True(t, ok, "fresh_metric should not have been evicted") - _, ok = a.registeredMetrics.Load("fresh_acc_key") - assert.True(t, ok, "fresh_accumulated should not have been evicted") -} From 2f27bbb5a9a03fae66c79dcfecdb41acc62e8493 Mon Sep 17 00:00:00 2001 From: Rajneesh180 Date: Wed, 1 Apr 2026 23:21:46 +0530 Subject: [PATCH 3/3] fix(exporter/prometheus): move synctest to stable API, drop build tag Merge background cleanup test into prometheus_test.go using synctest.Test (stable in Go 1.25+) instead of the experimental synctest.Run. Remove the separate test file and goexperiment build tag. Strip unnecessary comments. --- .../prometheus_synctest_test.go | 102 ------------------ .../prometheusexporter/prometheus_test.go | 76 +++++++++++++ 2 files changed, 76 insertions(+), 102 deletions(-) delete mode 100644 exporter/prometheusexporter/prometheus_synctest_test.go diff --git a/exporter/prometheusexporter/prometheus_synctest_test.go b/exporter/prometheusexporter/prometheus_synctest_test.go deleted file mode 100644 index e663f463c97e4..0000000000000 --- a/exporter/prometheusexporter/prometheus_synctest_test.go +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -//go:build goexperiment.synctest - -package prometheusexporter - -import ( - "testing" - "testing/synctest" - "time" - - io_prometheus_client "github.com/prometheus/client_model/go" - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.uber.org/zap" - "google.golang.org/protobuf/proto" -) - -// TestPrometheusExporter_BackgroundCleanup verifies that expired entries in both -// the accumulator's registeredMetrics and the collector's metricFamilies map are -// evicted by the background cleanup goroutine even when no Prometheus scrape occurs. -// Uses synctest for deterministic fake-clock testing instead of require.Eventually. -// Covers https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/41123 -func TestPrometheusExporter_BackgroundCleanup(t *testing.T) { - synctest.Run(func() { - expiration := 5 * time.Minute - c := newCollector(&Config{MetricExpiration: expiration}, zap.NewNop()) - a := c.accumulator.(*lastValueAccumulator) - - // Seed the accumulator with a stale and a fresh time series. - staleMetric := pmetric.NewMetric() - staleMetric.SetName("stale_accumulated") - a.registeredMetrics.Store("stale_acc_key", &accumulatedValue{ - value: staleMetric, - resourceAttrs: pcommon.NewMap(), - scopeAttributes: pcommon.NewMap(), - updated: time.Now().Add(-10 * time.Minute), - }) - freshMetric := pmetric.NewMetric() - freshMetric.SetName("fresh_accumulated") - a.registeredMetrics.Store("fresh_acc_key", &accumulatedValue{ - value: freshMetric, - resourceAttrs: pcommon.NewMap(), - scopeAttributes: pcommon.NewMap(), - updated: time.Now().Add(time.Hour), - }) - - // Seed the metricFamilies map with a stale and a fresh entry. - gaugeType := io_prometheus_client.MetricType_GAUGE - c.metricFamilies.Store("stale_metric", metricFamily{ - lastSeen: time.Now().Add(-10 * time.Minute), - mf: &io_prometheus_client.MetricFamily{ - Name: proto.String("stale_metric"), - Help: proto.String("should be cleaned up"), - Type: &gaugeType, - }, - }) - c.metricFamilies.Store("fresh_metric", metricFamily{ - lastSeen: time.Now().Add(time.Hour), - mf: &io_prometheus_client.MetricFamily{ - Name: proto.String("fresh_metric"), - Help: proto.String("should remain"), - Type: &gaugeType, - }, - }) - - // Start the background cleanup goroutine (same logic as prometheusExporter.Start). - stopCh := make(chan struct{}) - go func() { - ticker := time.NewTicker(expiration) - defer ticker.Stop() - for { - select { - case <-ticker.C: - a.cleanupExpired() - c.cleanupMetricFamilies() - case <-stopCh: - return - } - } - }() - defer close(stopCh) - - // Advance fake clock past the ticker interval so the cleanup fires. - time.Sleep(expiration + time.Second) - synctest.Wait() - - // Stale entries must have been evicted. - _, mfFound := c.metricFamilies.Load("stale_metric") - assert.False(t, mfFound, "stale_metric should have been evicted") - _, accFound := a.registeredMetrics.Load("stale_acc_key") - assert.False(t, accFound, "stale_accumulated should have been evicted") - - // Fresh entries must survive. - _, ok := c.metricFamilies.Load("fresh_metric") - assert.True(t, ok, "fresh_metric should not have been evicted") - _, ok = a.registeredMetrics.Load("fresh_acc_key") - assert.True(t, ok, "fresh_accumulated should not have been evicted") - }) -} diff --git a/exporter/prometheusexporter/prometheus_test.go b/exporter/prometheusexporter/prometheus_test.go index dd8bfeaa53f74..b1349a52a9f5a 100644 --- a/exporter/prometheusexporter/prometheus_test.go +++ b/exporter/prometheusexporter/prometheus_test.go @@ -8,8 +8,10 @@ import ( "io" "net/http" "testing" + "testing/synctest" "time" + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -20,6 +22,8 @@ import ( "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" @@ -720,3 +724,75 @@ this_one_there_where_{arch="x86",instance="test-instance",job="test-service",os= }) } } + +func TestPrometheusExporter_BackgroundCleanup(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + expiration := 5 * time.Minute + c := newCollector(&Config{MetricExpiration: expiration}, zap.NewNop()) + a := c.accumulator.(*lastValueAccumulator) + + staleMetric := pmetric.NewMetric() + staleMetric.SetName("stale_accumulated") + a.registeredMetrics.Store("stale_acc_key", &accumulatedValue{ + value: staleMetric, + resourceAttrs: pcommon.NewMap(), + scopeAttributes: pcommon.NewMap(), + updated: time.Now().Add(-10 * time.Minute), + }) + freshMetric := pmetric.NewMetric() + freshMetric.SetName("fresh_accumulated") + a.registeredMetrics.Store("fresh_acc_key", &accumulatedValue{ + value: freshMetric, + resourceAttrs: pcommon.NewMap(), + scopeAttributes: pcommon.NewMap(), + updated: time.Now().Add(time.Hour), + }) + + gaugeType := io_prometheus_client.MetricType_GAUGE + c.metricFamilies.Store("stale_metric", metricFamily{ + lastSeen: time.Now().Add(-10 * time.Minute), + mf: &io_prometheus_client.MetricFamily{ + Name: proto.String("stale_metric"), + Help: proto.String("should be cleaned up"), + Type: &gaugeType, + }, + }) + c.metricFamilies.Store("fresh_metric", metricFamily{ + lastSeen: time.Now().Add(time.Hour), + mf: &io_prometheus_client.MetricFamily{ + Name: proto.String("fresh_metric"), + Help: proto.String("should remain"), + Type: &gaugeType, + }, + }) + + stopCh := make(chan struct{}) + go func() { + ticker := time.NewTicker(expiration) + defer ticker.Stop() + for { + select { + case <-ticker.C: + a.cleanupExpired() + c.cleanupMetricFamilies() + case <-stopCh: + return + } + } + }() + defer close(stopCh) + + time.Sleep(expiration + time.Second) + synctest.Wait() + + _, mfFound := c.metricFamilies.Load("stale_metric") + assert.False(t, mfFound, "stale_metric should have been evicted") + _, accFound := a.registeredMetrics.Load("stale_acc_key") + assert.False(t, accFound, "stale_accumulated should have been evicted") + + _, ok := c.metricFamilies.Load("fresh_metric") + assert.True(t, ok, "fresh_metric should not have been evicted") + _, ok = a.registeredMetrics.Load("fresh_acc_key") + assert.True(t, ok, "fresh_accumulated should not have been evicted") + }) +}