diff --git a/CHANGELOG.md b/CHANGELOG.md index e175f445b38f..f675a2e66b22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808) +### Fixed + +- Fix `go.opentelemetry.io/otel/sdk/metric` to cache instruments to avoid leaking memory when the same instrument is created multiple times. (#4820) + ## [1.23.0-rc.1] 2024-01-18 This is a release candidate for the v1.23.0 release. diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index a4cfcbb95f13..11c776fd5277 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -265,9 +265,16 @@ var ( ) func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string) float64Observable { - return float64Observable{ - observable: newObservable[float64](m, kind, name, desc, u), - } + return m.float64ObservableInsts.Lookup(instID{ + Name: name, + Description: desc, + Unit: u, + Kind: kind, + }, func() float64Observable { + return float64Observable{ + observable: newObservable[float64](m, kind, name, desc, u), + } + }) } type int64Observable struct { @@ -286,9 +293,16 @@ var ( ) func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string) int64Observable { - return int64Observable{ - observable: newObservable[int64](m, kind, name, desc, u), - } + return m.int64ObservableInsts.Lookup(instID{ + Name: name, + Description: desc, + Unit: u, + Kind: kind, + }, func() int64Observable { + return int64Observable{ + observable: newObservable[int64](m, kind, name, desc, u), + } + }) } type observable[N int64 | float64] struct { diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 76f1e70a3d16..5fc58eee7163 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -41,6 +41,11 @@ type meter struct { scope instrumentation.Scope pipes pipelines + int64Insts *cache[instID, int64InstVal] + float64Insts *cache[instID, float64InstVal] + int64ObservableInsts *cache[instID, int64Observable] + float64ObservableInsts *cache[instID, float64Observable] + int64Resolver resolver[int64] float64Resolver resolver[float64] } @@ -50,11 +55,20 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter { // meter is asked to create are logged to the user. var viewCache cache[string, instID] + var int64Insts cache[instID, int64InstVal] + var float64Insts cache[instID, float64InstVal] + var int64ObservableInsts cache[instID, int64Observable] + var float64ObservableInsts cache[instID, float64Observable] + return &meter{ - scope: s, - pipes: p, - int64Resolver: newResolver[int64](p, &viewCache), - float64Resolver: newResolver[float64](p, &viewCache), + scope: s, + pipes: p, + int64Insts: &int64Insts, + float64Insts: &float64Insts, + int64ObservableInsts: &int64ObservableInsts, + float64ObservableInsts: &float64ObservableInsts, + int64Resolver: newResolver[int64](p, &viewCache), + float64Resolver: newResolver[float64](p, &viewCache), } } @@ -109,6 +123,9 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti // It registers callbacks for each reader's pipeline. func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) { inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit) + // If we are re-using the instrument, measures have already been appended + // and we don't need to append them again. + shouldAppendMeasures := len(inst.measures) == 0 for _, insert := range m.int64Resolver.inserters { // Connect the measure functions for instruments in this pipeline with the // callbacks for this pipeline. @@ -121,7 +138,9 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6 inst.dropAggregation = true continue } - inst.appendMeasures(in) + if shouldAppendMeasures { + inst.appendMeasures(in) + } for _, cback := range callbacks { inst := int64Observer{measures: in} insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) }) @@ -226,6 +245,9 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram // It registers callbacks for each reader's pipeline. func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) { inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit) + // If we are re-using the instrument, measures have already been appended + // and we don't need to append them again. + shouldAppendMeasures := len(inst.measures) == 0 for _, insert := range m.float64Resolver.inserters { // Connect the measure functions for instruments in this pipeline with the // callbacks for this pipeline. @@ -238,7 +260,9 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl inst.dropAggregation = true continue } - inst.appendMeasures(in) + if shouldAppendMeasures { + inst.appendMeasures(in) + } for _, cback := range callbacks { inst := float64Observer{measures: in} insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) }) @@ -498,6 +522,12 @@ func (noopRegister) Unregister() error { // int64InstProvider provides int64 OpenTelemetry instruments. type int64InstProvider struct{ *meter } +// int64InstVal is the cached value in an int64 instrument cache. +type int64InstVal struct { + instrument *int64Inst + err error +} + func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) { inst := Instrument{ Name: name, @@ -529,19 +559,41 @@ func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramC // lookup returns the resolved instrumentImpl. func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { - aggs, err := p.aggs(kind, name, desc, u) - return &int64Inst{measures: aggs}, err + val := p.meter.int64Insts.Lookup(instID{ + Name: name, + Description: desc, + Unit: u, + Kind: kind, + }, func() int64InstVal { + aggs, err := p.aggs(kind, name, desc, u) + return int64InstVal{instrument: &int64Inst{measures: aggs}, err: err} + }) + return val.instrument, val.err } // lookupHistogram returns the resolved instrumentImpl. func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) { - aggs, err := p.histogramAggs(name, cfg) - return &int64Inst{measures: aggs}, err + val := p.meter.int64Insts.Lookup(instID{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: InstrumentKindHistogram, + }, func() int64InstVal { + aggs, err := p.histogramAggs(name, cfg) + return int64InstVal{instrument: &int64Inst{measures: aggs}, err: err} + }) + return val.instrument, val.err } // float64InstProvider provides float64 OpenTelemetry instruments. type float64InstProvider struct{ *meter } +// float64InstVal is the cached value in an instrument cache. +type float64InstVal struct { + instrument *float64Inst + err error +} + func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) { inst := Instrument{ Name: name, @@ -573,14 +625,30 @@ func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64Histog // lookup returns the resolved instrumentImpl. func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { - aggs, err := p.aggs(kind, name, desc, u) - return &float64Inst{measures: aggs}, err + val := p.meter.float64Insts.Lookup(instID{ + Name: name, + Description: desc, + Unit: u, + Kind: kind, + }, func() float64InstVal { + aggs, err := p.aggs(kind, name, desc, u) + return float64InstVal{instrument: &float64Inst{measures: aggs}, err: err} + }) + return val.instrument, val.err } // lookupHistogram returns the resolved instrumentImpl. func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) { - aggs, err := p.histogramAggs(name, cfg) - return &float64Inst{measures: aggs}, err + val := p.meter.float64Insts.Lookup(instID{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: InstrumentKindHistogram, + }, func() float64InstVal { + aggs, err := p.histogramAggs(name, cfg) + return float64InstVal{instrument: &float64Inst{measures: aggs}, err: err} + }) + return val.instrument, val.err } type int64Observer struct { diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index d068ecd4badd..037f0cad5552 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -2272,3 +2272,112 @@ func TestObservableDropAggregation(t *testing.T) { }) } } + +func TestDuplicateInstrumentCreation(t *testing.T) { + for _, tt := range []struct { + desc string + createInstrument func(metric.Meter) error + }{ + { + desc: "Int64ObservableCounter", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Int64ObservableCounter("observable.int64.counter") + return err + }, + }, + { + desc: "Int64ObservableUpDownCounter", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Int64ObservableUpDownCounter("observable.int64.up.down.counter") + return err + }, + }, + { + desc: "Int64ObservableGauge", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Int64ObservableGauge("observable.int64.gauge") + return err + }, + }, + { + desc: "Float64ObservableCounter", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Float64ObservableCounter("observable.float64.counter") + return err + }, + }, + { + desc: "Float64ObservableUpDownCounter", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Float64ObservableUpDownCounter("observable.float64.up.down.counter") + return err + }, + }, + { + desc: "Float64ObservableGauge", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Float64ObservableGauge("observable.float64.gauge") + return err + }, + }, + { + desc: "Int64Counter", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Int64Counter("sync.int64.counter") + return err + }, + }, + { + desc: "Int64UpDownCounter", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Int64UpDownCounter("sync.int64.up.down.counter") + return err + }, + }, + { + desc: "Int64Histogram", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Int64Histogram("sync.int64.histogram") + return err + }, + }, + { + desc: "Float64Counter", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Float64Counter("sync.float64.counter") + return err + }, + }, + { + desc: "Float64UpDownCounter", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Float64UpDownCounter("sync.float64.up.down.counter") + return err + }, + }, + { + desc: "Float64Histogram", + createInstrument: func(meter metric.Meter) error { + _, err := meter.Float64Histogram("sync.float64.histogram") + return err + }, + }, + } { + t.Run(tt.desc, func(t *testing.T) { + reader := NewManualReader() + defer func() { + require.NoError(t, reader.Shutdown(context.Background())) + }() + + m := NewMeterProvider(WithReader(reader)).Meter("TestDuplicateInstrumentCreation") + for i := 0; i < 3; i++ { + require.NoError(t, tt.createInstrument(m)) + } + internalMeter, ok := m.(*meter) + require.True(t, ok) + // check that multiple calls to create the same instrument only create 1 instrument + numInstruments := len(internalMeter.int64Insts.data) + len(internalMeter.float64Insts.data) + len(internalMeter.int64ObservableInsts.data) + len(internalMeter.float64ObservableInsts.data) + require.Equal(t, 1, numInstruments) + }) + } +}