Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .chloggen/fix-prometheus-exporter-metric-family-leak.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: exporter/prometheus

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix unbounded memory growth when metrics are no longer being scraped.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [41123]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Expired metric families now get cleaned up even when no Prometheus scraper is actively collecting,
preventing memory from growing indefinitely.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
19 changes: 19 additions & 0 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type accumulator interface {
// Collect returns a slice with relevant aggregated metrics and their resource attributes.
// The number or metrics and attributes returned will be the same.
Collect() (metrics []pmetric.Metric, resourceAttrs []pcommon.Map, scopeNames, scopeVersions, scopeSchemaURLs []string, scopeAttributes []pcommon.Map)
// cleanupExpired removes registered metrics whose last update exceeds the
// configured expiration, independently of Collect.
cleanupExpired()
}

// LastValueAccumulator keeps last value for accumulated metrics
Expand Down Expand Up @@ -425,6 +428,22 @@ func (a *lastValueAccumulator) Collect() ([]pmetric.Metric, []pcommon.Map, []str
return metrics, resourceAttrs, scopeNames, scopeVersions, scopeSchemaURLs, scopeAttributes
}

// cleanupExpired removes registered metrics that have exceeded the expiration
// time. This is the same expiration logic that Collect() performs inline, but
// can be called independently so that stale time series are evicted even when
// no Prometheus scrape is active.
func (a *lastValueAccumulator) cleanupExpired() {
expirationTime := time.Now().Add(-a.metricExpiration)
a.registeredMetrics.Range(func(key, value any) bool {
v := value.(*accumulatedValue)
if expirationTime.After(v.updated) {
a.logger.Debug(fmt.Sprintf("metric expired: %s", v.value.Name()))
a.registeredMetrics.Delete(key)
}
return true
})
}

func timeseriesSignature(scopeName, scopeVersion, scopeSchemaURL string, scopeAttributes pcommon.Map, metric pmetric.Metric, attributes, resourceAttrs pcommon.Map) string {
// Get a string builder from the pool
sb := stringBuilderPool.Get().(*strings.Builder)
Expand Down
2 changes: 2 additions & 0 deletions exporter/prometheusexporter/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func (*mockAccumulator) Accumulate(pmetric.ResourceMetrics) (n int) {
return 0
}

func (*mockAccumulator) cleanupExpired() {}

func (a *mockAccumulator) Collect() ([]pmetric.Metric, []pcommon.Map, []string, []string, []string, []pcommon.Map) {
rAttrs := make([]pcommon.Map, len(a.metrics))
scopeNames := make([]string, len(a.metrics))
Expand Down
27 changes: 27 additions & 0 deletions exporter/prometheusexporter/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"net/http"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -25,6 +26,7 @@ type prometheusExporter struct {
collector *collector
registry *prometheus.Registry
settings component.TelemetrySettings
stopCh chan struct{} // signals the background metric cleanup goroutine to stop
}

var errBlankPrometheusAddress = errors.New("expecting a non-blank address to run the Prometheus metrics handler")
Expand Down Expand Up @@ -83,6 +85,27 @@ func (pe *prometheusExporter) Start(ctx context.Context, host component.Host) er
_ = srv.Serve(ln)
}()

// Start a background goroutine that periodically evicts expired metric families.
// Without this, cleanup only happens during Collect() (i.e. when Prometheus scrapes).
// If no scraper is active, stale entries in metricFamilies accumulate indefinitely,
// causing unbounded memory growth. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/41123
if pe.collector.metricExpiration > 0 {
pe.stopCh = make(chan struct{})
go func(stopCh chan struct{}) {
ticker := time.NewTicker(pe.collector.metricExpiration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pe.collector.accumulator.cleanupExpired()
pe.collector.cleanupMetricFamilies()
case <-stopCh:
return
}
}
}(pe.stopCh)
}
Comment thread
dashpole marked this conversation as resolved.

return nil
}

Expand All @@ -97,5 +120,9 @@ func (pe *prometheusExporter) ConsumeMetrics(_ context.Context, md pmetric.Metri
}

func (pe *prometheusExporter) Shutdown(ctx context.Context) error {
if pe.stopCh != nil {
close(pe.stopCh)
Comment thread
dashpole marked this conversation as resolved.
pe.stopCh = nil
}
return pe.shutdownFunc(ctx)
}
76 changes: 76 additions & 0 deletions exporter/prometheusexporter/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"io"
"net/http"
"testing"
"testing/synctest"
"time"

io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
Expand All @@ -20,6 +22,8 @@ import (
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
Expand Down Expand Up @@ -720,3 +724,75 @@ this_one_there_where_{arch="x86",instance="test-instance",job="test-service",os=
})
}
}

func TestPrometheusExporter_BackgroundCleanup(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
expiration := 5 * time.Minute
c := newCollector(&Config{MetricExpiration: expiration}, zap.NewNop())
a := c.accumulator.(*lastValueAccumulator)

staleMetric := pmetric.NewMetric()
staleMetric.SetName("stale_accumulated")
a.registeredMetrics.Store("stale_acc_key", &accumulatedValue{
value: staleMetric,
resourceAttrs: pcommon.NewMap(),
scopeAttributes: pcommon.NewMap(),
updated: time.Now().Add(-10 * time.Minute),
})
freshMetric := pmetric.NewMetric()
freshMetric.SetName("fresh_accumulated")
a.registeredMetrics.Store("fresh_acc_key", &accumulatedValue{
value: freshMetric,
resourceAttrs: pcommon.NewMap(),
scopeAttributes: pcommon.NewMap(),
updated: time.Now().Add(time.Hour),
})

gaugeType := io_prometheus_client.MetricType_GAUGE
c.metricFamilies.Store("stale_metric", metricFamily{
lastSeen: time.Now().Add(-10 * time.Minute),
mf: &io_prometheus_client.MetricFamily{
Name: proto.String("stale_metric"),
Help: proto.String("should be cleaned up"),
Type: &gaugeType,
},
})
c.metricFamilies.Store("fresh_metric", metricFamily{
lastSeen: time.Now().Add(time.Hour),
mf: &io_prometheus_client.MetricFamily{
Name: proto.String("fresh_metric"),
Help: proto.String("should remain"),
Type: &gaugeType,
},
})

stopCh := make(chan struct{})
go func() {
ticker := time.NewTicker(expiration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
a.cleanupExpired()
c.cleanupMetricFamilies()
case <-stopCh:
return
}
}
}()
defer close(stopCh)

time.Sleep(expiration + time.Second)
synctest.Wait()

_, mfFound := c.metricFamilies.Load("stale_metric")
assert.False(t, mfFound, "stale_metric should have been evicted")
_, accFound := a.registeredMetrics.Load("stale_acc_key")
assert.False(t, accFound, "stale_accumulated should have been evicted")

_, ok := c.metricFamilies.Load("fresh_metric")
assert.True(t, ok, "fresh_metric should not have been evicted")
_, ok = a.registeredMetrics.Load("fresh_acc_key")
assert.True(t, ok, "fresh_accumulated should not have been evicted")
})
}
Loading