From ef675bfbe354bc274f1de6ac8f917676b90f068b Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 8 Dec 2022 20:02:39 +0000 Subject: [PATCH 01/15] add RegisterProducer method and metric.Producer interface --- sdk/metric/config_test.go | 16 ++++++----- sdk/metric/manual_reader.go | 50 +++++++++++++++++++++++++++-------- sdk/metric/periodic_reader.go | 44 ++++++++++++++++++++++++------ sdk/metric/reader.go | 19 ++++++++++--- 4 files changed, 100 insertions(+), 29 deletions(-) diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index a924d879d00..dc5eff2eee2 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -28,12 +28,13 @@ import ( ) type reader struct { - producer producer - temporalityFunc TemporalitySelector - aggregationFunc AggregationSelector - collectFunc func(context.Context) (metricdata.ResourceMetrics, error) - forceFlushFunc func(context.Context) error - shutdownFunc func(context.Context) error + producer sdkProducer + externalProducers []Producer + temporalityFunc TemporalitySelector + aggregationFunc AggregationSelector + collectFunc func(context.Context) (metricdata.ResourceMetrics, error) + forceFlushFunc func(context.Context) error + shutdownFunc func(context.Context) error } var _ Reader = (*reader)(nil) @@ -42,7 +43,8 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n return r.aggregationFunc(kind) } -func (r *reader) register(p producer) { r.producer = p } +func (r *reader) register(p sdkProducer) { r.producer = p } +func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.externalProducers, p) } func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality { return r.temporalityFunc(kind) } diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 0ebfadf33a3..9223caf264b 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -28,9 +28,12 @@ import ( // manualReader is a simple Reader that allows an application to // read metrics on demand. type manualReader struct { - producer atomic.Value + sdkProducer atomic.Value shutdownOnce sync.Once + mu sync.Mutex + externalProducers atomic.Value + temporalitySelector TemporalitySelector aggregationSelector AggregationSelector } @@ -41,22 +44,36 @@ var _ = map[Reader]struct{}{&manualReader{}: {}} // NewManualReader returns a Reader which is directly called to collect metrics. func NewManualReader(opts ...ManualReaderOption) Reader { cfg := newManualReaderConfig(opts) - return &manualReader{ + r := &manualReader{ temporalitySelector: cfg.temporalitySelector, aggregationSelector: cfg.aggregationSelector, } + r.externalProducers.Store([]Producer{}) + return r } -// register stores the Producer which enables the caller to read -// metrics on demand. -func (mr *manualReader) register(p producer) { +// register stores the sdkProducer which enables the caller +// to read metrics from the SDK on demand. +func (mr *manualReader) register(p sdkProducer) { // Only register once. If producer is already set, do nothing. - if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { + if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { msg := "did not register manual reader" global.Error(errDuplicateRegister, msg) } } +// RegisterProducer stores the external Producer which enables the caller +// to read metrics on demand. +func (mr *manualReader) RegisterProducer(p Producer) { + mr.mu.Lock() + defer mr.mu.Unlock() + currentProducers := mr.externalProducers.Load().([]Producer) + newProducers := []Producer{} + newProducers = append(newProducers, currentProducers...) + newProducers = append(newProducers, p) + mr.externalProducers.Store(newProducers) +} + // temporality reports the Temporality for the instrument kind provided. func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality { return mr.temporalitySelector(kind) @@ -77,7 +94,7 @@ func (mr *manualReader) Shutdown(context.Context) error { err := ErrReaderShutdown mr.shutdownOnce.Do(func() { // Any future call to Collect will now return ErrReaderShutdown. - mr.producer.Store(produceHolder{ + mr.sdkProducer.Store(produceHolder{ produce: shutdownProducer{}.produce, }) err = nil @@ -85,10 +102,10 @@ func (mr *manualReader) Shutdown(context.Context) error { return err } -// Collect gathers all metrics from the SDK, calling any callbacks necessary. -// Collect will return an error if called after shutdown. +// Collect gathers all metrics from the SDK and other Producers, calling any +// callbacks necessary. Collect will return an error if called after shutdown. func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - p := mr.producer.Load() + p := mr.sdkProducer.Load() if p == nil { return metricdata.ResourceMetrics{}, ErrReaderNotRegistered } @@ -103,7 +120,18 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics return metricdata.ResourceMetrics{}, err } - return ph.produce(ctx) + rm, err := ph.produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + for _, producer := range mr.externalProducers.Load().([]Producer) { + externalMetrics, err := producer.Produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) + } + return rm, nil } // manualReaderConfig contains configuration options for a ManualReader. diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 00ba1305595..dd6c89307ba 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade cancel: cancel, done: make(chan struct{}), } + r.externalProducers.Store([]Producer{}) go func() { defer func() { close(r.done) }() @@ -126,7 +127,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade // periodicReader is a Reader that continuously collects and exports metric // data at a set interval. type periodicReader struct { - producer atomic.Value + sdkProducer atomic.Value + + mu sync.Mutex + externalProducers atomic.Value timeout time.Duration exporter Exporter @@ -166,14 +170,25 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) { } // register registers p as the producer of this reader. -func (r *periodicReader) register(p producer) { +func (r *periodicReader) register(p sdkProducer) { // Only register once. If producer is already set, do nothing. - if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { + if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { msg := "did not register periodic reader" global.Error(errDuplicateRegister, msg) } } +// RegisterProducer registers p as an external Producer of this reader. +func (r *periodicReader) RegisterProducer(p Producer) { + r.mu.Lock() + defer r.mu.Unlock() + currentProducers := r.externalProducers.Load().([]Producer) + newProducers := []Producer{} + newProducers = append(newProducers, currentProducers...) + newProducers = append(newProducers, p) + r.externalProducers.Store(newProducers) +} + // temporality reports the Temporality for the instrument kind provided. func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality { return r.exporter.Temporality(kind) @@ -195,12 +210,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error { } // Collect gathers and returns all metric data related to the Reader from -// the SDK. The returned metric data is not exported to the configured -// exporter, it is left to the caller to handle that if desired. +// the SDK and other Producers. The returned metric data is not exported +// to the configured exporter, it is left to the caller to handle that if +// desired. // // An error is returned if this is called after Shutdown. func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - return r.collect(ctx, r.producer.Load()) + return r.collect(ctx, r.sdkProducer.Load()) } // collect unwraps p as a produceHolder and returns its produce results. @@ -218,7 +234,19 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata err := fmt.Errorf("periodic reader: invalid producer: %T", p) return metricdata.ResourceMetrics{}, err } - return ph.produce(ctx) + + rm, err := ph.produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + for _, producer := range r.externalProducers.Load().([]Producer) { + externalMetrics, err := producer.Produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) + } + return rm, nil } // export exports metric data m using r's exporter. @@ -259,7 +287,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { <-r.done // Any future call to Collect will now return ErrReaderShutdown. - ph := r.producer.Swap(produceHolder{ + ph := r.sdkProducer.Swap(produceHolder{ produce: shutdownProducer{}.produce, }) diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index aa9d50ef666..bc27e84c90c 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -51,7 +51,12 @@ type Reader interface { // register registers a Reader with a MeterProvider. // The producer argument allows the Reader to signal the sdk to collect // and send aggregated metric measurements. - register(producer) + register(sdkProducer) + + // RegisterProducer registers a Reader with an external Producer. + // The Producer argument allows the Reader to signal the Producer to + // collect and send aggregated metric measurements. + RegisterProducer(Producer) // temporality reports the Temporality for the instrument kind provided. temporality(InstrumentKind) metricdata.Temporality @@ -84,14 +89,22 @@ type Reader interface { Shutdown(context.Context) error } -// producer produces metrics for a Reader. -type producer interface { +// sdkProducer produces metrics for a Reader. +type sdkProducer interface { // produce returns aggregated metrics from a single collection. // // This method is safe to call concurrently. produce(context.Context) (metricdata.ResourceMetrics, error) } +// Producer produces metrics for a Reader from an external source. +type Producer interface { + // Produce returns aggregated metrics from an external source. + // + // This method should be safe to call concurrently. + Produce(context.Context) ([]metricdata.ScopeMetrics, error) +} + // produceHolder is used as an atomic.Value to wrap the non-concrete producer // type. type produceHolder struct { From f74934cde9e46234eaf4e4398cd80a15951c3ae4 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 8 Dec 2022 20:04:08 +0000 Subject: [PATCH 02/15] rename testProducer to testSDKProducer --- sdk/metric/periodic_reader_test.go | 12 ++++++------ sdk/metric/reader_test.go | 22 +++++++++++----------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index d48c1a7de8e..5312df947bb 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -114,7 +114,7 @@ func (ts *periodicReaderTestSuite) SetupTest() { } ts.ErrReader = NewPeriodicReader(e) - ts.ErrReader.register(testProducer{}) + ts.ErrReader.register(testSDKProducer{}) } func (ts *periodicReaderTestSuite) TearDownTest() { @@ -186,14 +186,14 @@ func TestPeriodicReaderRun(t *testing.T) { exp := &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testProducer produces testMetrics. + // The testSDKProducer produces testMetrics. assert.Equal(t, testMetrics, m) return assert.AnError }, } r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.register(testSDKProducer{}) trigger <- time.Now() assert.Equal(t, assert.AnError, <-eh.Err) @@ -210,7 +210,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { called = new(bool) return &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testProducer produces testMetrics. + // The testSDKProducer produces testMetrics. assert.Equal(t, testMetrics, m) *called = true return assert.AnError @@ -221,7 +221,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush", func(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.register(testSDKProducer{}) assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") @@ -232,7 +232,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown", func(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.register(testSDKProducer{}) assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") }) diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 28b249bd3e2..13267b1f8ba 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -58,7 +58,7 @@ func (ts *readerTestSuite) TestErrorForNotRegistered() { } func (ts *readerTestSuite) TestProducer() { - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) m, err := ts.Reader.Collect(context.Background()) ts.NoError(err) ts.Equal(testMetrics, m) @@ -66,7 +66,7 @@ func (ts *readerTestSuite) TestProducer() { func (ts *readerTestSuite) TestCollectAfterShutdown() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) m, err := ts.Reader.Collect(ctx) @@ -76,27 +76,27 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() { func (ts *readerTestSuite) TestShutdownTwice() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown) } func (ts *readerTestSuite) TestMultipleForceFlush() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) ts.Require().NoError(ts.Reader.ForceFlush(ctx)) ts.NoError(ts.Reader.ForceFlush(ctx)) } func (ts *readerTestSuite) TestMultipleRegister() { - p0 := testProducer{ + p0 := testSDKProducer{ produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { // Differentiate this producer from the second by returning an // error. return testMetrics, assert.AnError }, } - p1 := testProducer{} + p1 := testSDKProducer{} ts.Reader.register(p0) // This should be ignored. @@ -110,7 +110,7 @@ func (ts *readerTestSuite) TestMethodConcurrency() { // Requires the race-detector (a default test option for the project). // All reader methods should be concurrent-safe. - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) ctx := context.Background() var wg sync.WaitGroup @@ -141,7 +141,7 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { ctx := context.Background() ts.Require().NoError(ts.Reader.Shutdown(ctx)) // Registering after shutdown should not revert the shutdown. - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) m, err := ts.Reader.Collect(ctx) ts.ErrorIs(err, ErrReaderShutdown) @@ -170,11 +170,11 @@ var testMetrics = metricdata.ResourceMetrics{ }}, } -type testProducer struct { +type testSDKProducer struct { produceFunc func(context.Context) (metricdata.ResourceMetrics, error) } -func (p testProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { +func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { if p.produceFunc != nil { return p.produceFunc(ctx) } @@ -183,7 +183,7 @@ func (p testProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, func benchReaderCollectFunc(r Reader) func(*testing.B) { ctx := context.Background() - r.register(testProducer{}) + r.register(testSDKProducer{}) // Store bechmark results in a closure to prevent the compiler from // inlining and skipping the function. From ff239aa17c7804c22b792a49b528cb3cb00733fa Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 8 Dec 2022 20:04:49 +0000 Subject: [PATCH 03/15] rename testMetrics to testResourceMetrics --- sdk/metric/periodic_reader_test.go | 8 ++++---- sdk/metric/reader_test.go | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 5312df947bb..4026768c2e9 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -186,8 +186,8 @@ func TestPeriodicReaderRun(t *testing.T) { exp := &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testSDKProducer produces testMetrics. - assert.Equal(t, testMetrics, m) + // The testSDKProducer produces testResourceMetrics. + assert.Equal(t, testResourceMetrics, m) return assert.AnError }, } @@ -210,8 +210,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { called = new(bool) return &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testSDKProducer produces testMetrics. - assert.Equal(t, testMetrics, m) + // The testSDKProducer produces testResourceMetrics. + assert.Equal(t, testResourceMetrics, m) *called = true return assert.AnError }, diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 13267b1f8ba..658f986d50e 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -61,7 +61,7 @@ func (ts *readerTestSuite) TestProducer() { ts.Reader.register(testSDKProducer{}) m, err := ts.Reader.Collect(context.Background()) ts.NoError(err) - ts.Equal(testMetrics, m) + ts.Equal(testResourceMetrics, m) } func (ts *readerTestSuite) TestCollectAfterShutdown() { @@ -93,7 +93,7 @@ func (ts *readerTestSuite) TestMultipleRegister() { produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { // Differentiate this producer from the second by returning an // error. - return testMetrics, assert.AnError + return testResourceMetrics, assert.AnError }, } p1 := testSDKProducer{} @@ -148,7 +148,7 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { ts.Equal(metricdata.ResourceMetrics{}, m) } -var testMetrics = metricdata.ResourceMetrics{ +var testResourceMetrics = metricdata.ResourceMetrics{ Resource: resource.NewSchemaless(attribute.String("test", "Reader")), ScopeMetrics: []metricdata.ScopeMetrics{{ Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"}, @@ -178,7 +178,7 @@ func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetric if p.produceFunc != nil { return p.produceFunc(ctx) } - return testMetrics, nil + return testResourceMetrics, nil } func benchReaderCollectFunc(r Reader) func(*testing.B) { @@ -198,7 +198,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) { for n := 0; n < b.N; n++ { collectedMetrics, err = r.Collect(ctx) - assert.Equalf(b, testMetrics, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) + assert.Equalf(b, testResourceMetrics, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) } } } From bd35c885a41e707f0d72fba2b4969d3a0ce29f9a Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 8 Dec 2022 20:11:33 +0000 Subject: [PATCH 04/15] add testExternalProducer for testing bridges --- sdk/metric/reader_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 658f986d50e..c96daeae45d 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -181,6 +181,34 @@ func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetric return testResourceMetrics, nil } +var testScopeMetrics = []metricdata.ScopeMetrics{{ + Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/external"}, + Metrics: []metricdata.Metrics{{ + Name: "fake scope data", + Description: "Data used to test a Producer reader", + Unit: unit.Milliseconds, + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("user", "ben")), + StartTime: time.Now(), + Time: time.Now().Add(time.Second), + Value: 10, + }}, + }, + }}, +}} + +type testExternalProducer struct { + produceFunc func(context.Context) ([]metricdata.ScopeMetrics, error) +} + +func (p testExternalProducer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + if p.produceFunc != nil { + return p.produceFunc(ctx) + } + return testScopeMetrics, nil +} + func benchReaderCollectFunc(r Reader) func(*testing.B) { ctx := context.Background() r.register(testSDKProducer{}) From 4cac786ced287a7f3600e036bf676660bc83ca9e Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 8 Dec 2022 20:18:52 +0000 Subject: [PATCH 05/15] add test data for testing external producers --- sdk/metric/periodic_reader_test.go | 8 +-- sdk/metric/reader_test.go | 79 ++++++++++++++++-------------- 2 files changed, 47 insertions(+), 40 deletions(-) diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 4026768c2e9..f0ded121c1f 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -186,8 +186,8 @@ func TestPeriodicReaderRun(t *testing.T) { exp := &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testSDKProducer produces testResourceMetrics. - assert.Equal(t, testResourceMetrics, m) + // The testSDKProducer produces testResourceMetricsA. + assert.Equal(t, testResourceMetricsA, m) return assert.AnError }, } @@ -210,8 +210,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { called = new(bool) return &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testSDKProducer produces testResourceMetrics. - assert.Equal(t, testResourceMetrics, m) + // The testSDKProducer produces testResourceMetricsA. + assert.Equal(t, testResourceMetricsA, m) *called = true return assert.AnError }, diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index c96daeae45d..24dd73beadf 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -61,7 +61,7 @@ func (ts *readerTestSuite) TestProducer() { ts.Reader.register(testSDKProducer{}) m, err := ts.Reader.Collect(context.Background()) ts.NoError(err) - ts.Equal(testResourceMetrics, m) + ts.Equal(testResourceMetricsA, m) } func (ts *readerTestSuite) TestCollectAfterShutdown() { @@ -93,7 +93,7 @@ func (ts *readerTestSuite) TestMultipleRegister() { produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { // Differentiate this producer from the second by returning an // error. - return testResourceMetrics, assert.AnError + return testResourceMetricsA, assert.AnError }, } p1 := testSDKProducer{} @@ -148,40 +148,26 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { ts.Equal(metricdata.ResourceMetrics{}, m) } -var testResourceMetrics = metricdata.ResourceMetrics{ - Resource: resource.NewSchemaless(attribute.String("test", "Reader")), - ScopeMetrics: []metricdata.ScopeMetrics{{ - Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"}, - Metrics: []metricdata.Metrics{{ - Name: "fake data", - Description: "Data used to test a reader", - Unit: unit.Dimensionless, - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{{ - Attributes: attribute.NewSet(attribute.String("user", "alice")), - StartTime: time.Now(), - Time: time.Now().Add(time.Second), - Value: -1, - }}, - }, - }}, +var testScopeMetricsA = metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"}, + Metrics: []metricdata.Metrics{{ + Name: "fake data", + Description: "Data used to test a reader", + Unit: unit.Dimensionless, + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("user", "alice")), + StartTime: time.Now(), + Time: time.Now().Add(time.Second), + Value: -1, + }}, + }, }}, } -type testSDKProducer struct { - produceFunc func(context.Context) (metricdata.ResourceMetrics, error) -} - -func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { - if p.produceFunc != nil { - return p.produceFunc(ctx) - } - return testResourceMetrics, nil -} - -var testScopeMetrics = []metricdata.ScopeMetrics{{ +var testScopeMetricsB = metricdata.ScopeMetrics{ Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/external"}, Metrics: []metricdata.Metrics{{ Name: "fake scope data", @@ -196,7 +182,28 @@ var testScopeMetrics = []metricdata.ScopeMetrics{{ }}, }, }}, -}} +} + +var testResourceMetricsA = metricdata.ResourceMetrics{ + Resource: resource.NewSchemaless(attribute.String("test", "Reader")), + ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA}, +} + +var testResourceMetricsAB = metricdata.ResourceMetrics{ + Resource: resource.NewSchemaless(attribute.String("test", "Reader")), + ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA, testScopeMetricsB}, +} + +type testSDKProducer struct { + produceFunc func(context.Context) (metricdata.ResourceMetrics, error) +} + +func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { + if p.produceFunc != nil { + return p.produceFunc(ctx) + } + return testResourceMetricsA, nil +} type testExternalProducer struct { produceFunc func(context.Context) ([]metricdata.ScopeMetrics, error) @@ -206,7 +213,7 @@ func (p testExternalProducer) Produce(ctx context.Context) ([]metricdata.ScopeMe if p.produceFunc != nil { return p.produceFunc(ctx) } - return testScopeMetrics, nil + return []metricdata.ScopeMetrics{testScopeMetricsB}, nil } func benchReaderCollectFunc(r Reader) func(*testing.B) { @@ -226,7 +233,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) { for n := 0; n < b.N; n++ { collectedMetrics, err = r.Collect(ctx) - assert.Equalf(b, testResourceMetrics, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) + assert.Equalf(b, testResourceMetricsA, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) } } } From 83bf73f777fb7e474ebbc99a05ca73f1f2c551ad Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 9 Dec 2022 16:04:53 +0000 Subject: [PATCH 06/15] clean up help text --- sdk/metric/reader.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index bc27e84c90c..c52cc58dff2 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -53,9 +53,9 @@ type Reader interface { // and send aggregated metric measurements. register(sdkProducer) - // RegisterProducer registers a Reader with an external Producer. - // The Producer argument allows the Reader to signal the Producer to - // collect and send aggregated metric measurements. + // RegisterProducer registers a an external Producer with this Reader. + // The Producer is used as a source of aggregated metric data which is + // incorporated into metrics collected from the SDK. RegisterProducer(Producer) // temporality reports the Temporality for the instrument kind provided. From fe17bae556aac6471f7f154c846d0a1a90cd28e3 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 9 Dec 2022 16:33:05 +0000 Subject: [PATCH 07/15] unit tests for external Producer --- sdk/metric/periodic_reader_test.go | 10 +++++++--- sdk/metric/reader_test.go | 22 +++++++++++++++------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index f0ded121c1f..138aae48944 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -115,6 +115,7 @@ func (ts *periodicReaderTestSuite) SetupTest() { ts.ErrReader = NewPeriodicReader(e) ts.ErrReader.register(testSDKProducer{}) + ts.ErrReader.RegisterProducer(testExternalProducer{}) } func (ts *periodicReaderTestSuite) TearDownTest() { @@ -186,14 +187,15 @@ func TestPeriodicReaderRun(t *testing.T) { exp := &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testSDKProducer produces testResourceMetricsA. - assert.Equal(t, testResourceMetricsA, m) + // The testSDKProducer produces testResourceMetricsAB. + assert.Equal(t, testResourceMetricsAB, m) return assert.AnError }, } r := NewPeriodicReader(exp) r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{}) trigger <- time.Now() assert.Equal(t, assert.AnError, <-eh.Err) @@ -211,7 +213,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { return &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { // The testSDKProducer produces testResourceMetricsA. - assert.Equal(t, testResourceMetricsA, m) + assert.Equal(t, testResourceMetricsAB, m) *called = true return assert.AnError }, @@ -222,6 +224,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp) r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") @@ -233,6 +236,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp) r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") }) diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 24dd73beadf..b3a75287466 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -57,16 +57,25 @@ func (ts *readerTestSuite) TestErrorForNotRegistered() { ts.ErrorIs(err, ErrReaderNotRegistered) } -func (ts *readerTestSuite) TestProducer() { +func (ts *readerTestSuite) TestSDKProducer() { ts.Reader.register(testSDKProducer{}) m, err := ts.Reader.Collect(context.Background()) ts.NoError(err) ts.Equal(testResourceMetricsA, m) } +func (ts *readerTestSuite) TestExternalProducer() { + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) + m, err := ts.Reader.Collect(context.Background()) + ts.NoError(err) + ts.Equal(testResourceMetricsAB, m) +} + func (ts *readerTestSuite) TestCollectAfterShutdown() { ctx := context.Background() ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) m, err := ts.Reader.Collect(ctx) @@ -77,6 +86,7 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() { func (ts *readerTestSuite) TestShutdownTwice() { ctx := context.Background() ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown) } @@ -84,6 +94,7 @@ func (ts *readerTestSuite) TestShutdownTwice() { func (ts *readerTestSuite) TestMultipleForceFlush() { ctx := context.Background() ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.ForceFlush(ctx)) ts.NoError(ts.Reader.ForceFlush(ctx)) } @@ -111,6 +122,7 @@ func (ts *readerTestSuite) TestMethodConcurrency() { // All reader methods should be concurrent-safe. ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ctx := context.Background() var wg sync.WaitGroup @@ -142,6 +154,7 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { ts.Require().NoError(ts.Reader.Shutdown(ctx)) // Registering after shutdown should not revert the shutdown. ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) m, err := ts.Reader.Collect(ctx) ts.ErrorIs(err, ErrReaderShutdown) @@ -205,14 +218,9 @@ func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetric return testResourceMetricsA, nil } -type testExternalProducer struct { - produceFunc func(context.Context) ([]metricdata.ScopeMetrics, error) -} +type testExternalProducer struct{} func (p testExternalProducer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { - if p.produceFunc != nil { - return p.produceFunc(ctx) - } return []metricdata.ScopeMetrics{testScopeMetricsB}, nil } From 8a0d68bd0f6cf4128a66136dd87280e49c920db4 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 9 Dec 2022 16:36:04 +0000 Subject: [PATCH 08/15] changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cb2b592746..48de509cc9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520) +### Added + +- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers (#3524) + ## [1.11.2/0.34.0] 2022-12-05 ### Added From 49462f3191a13012cd6e74a10398675f20c8cc05 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 9 Dec 2022 17:00:39 +0000 Subject: [PATCH 09/15] improve test coverage --- sdk/metric/reader_test.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index b3a75287466..f1148f015ca 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -117,6 +117,20 @@ func (ts *readerTestSuite) TestMultipleRegister() { ts.Equal(assert.AnError, err) } +func (ts *readerTestSuite) TestExternalProducerError() { + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer( + testExternalProducer{ + produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + return []metricdata.ScopeMetrics{testScopeMetricsB}, assert.AnError + }, + }, + ) + + _, err := ts.Reader.Collect(context.Background()) + ts.Equal(assert.AnError, err) +} + func (ts *readerTestSuite) TestMethodConcurrency() { // Requires the race-detector (a default test option for the project). @@ -218,9 +232,14 @@ func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetric return testResourceMetricsA, nil } -type testExternalProducer struct{} +type testExternalProducer struct { + produceFunc func(context.Context) ([]metricdata.ScopeMetrics, error) +} func (p testExternalProducer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + if p.produceFunc != nil { + return p.produceFunc(ctx) + } return []metricdata.ScopeMetrics{testScopeMetricsB}, nil } From e8b2d0c9398d037bafa667383a64d039cf804fcf Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 9 Dec 2022 12:30:22 -0500 Subject: [PATCH 10/15] Update CHANGELOG.md Co-authored-by: Tyler Yahn --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48de509cc9d..9f0e0943382 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added -- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers (#3524) +- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524) ## [1.11.2/0.34.0] 2022-12-05 From f070af9d612657850be541244ae73c115413b594 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 9 Dec 2022 20:01:59 +0000 Subject: [PATCH 11/15] support partial errors --- sdk/metric/config.go | 21 +++++++++++++-------- sdk/metric/manual_reader.go | 5 +++-- sdk/metric/periodic_reader.go | 5 +++-- sdk/metric/reader_test.go | 14 +++++++++++--- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index c78b0416415..86f04e6b416 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -54,14 +54,19 @@ func unify(funcs []func(context.Context) error) func(context.Context) error { errs = append(errs, err) } } - switch len(errs) { - case 0: - return nil - case 1: - return errs[0] - default: - return fmt.Errorf("%v", errs) - } + return unifyErrors(errs) + } +} + +// unifyErrors combines multiple errors into a single error +func unifyErrors(errs []error) error { + switch len(errs) { + case 0: + return nil + case 1: + return errs[0] + default: + return fmt.Errorf("%v", errs) } } diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 9223caf264b..4d88380afc4 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -124,14 +124,15 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics if err != nil { return metricdata.ResourceMetrics{}, err } + var errs []error for _, producer := range mr.externalProducers.Load().([]Producer) { externalMetrics, err := producer.Produce(ctx) if err != nil { - return metricdata.ResourceMetrics{}, err + errs = append(errs, err) } rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) } - return rm, nil + return rm, unifyErrors(errs) } // manualReaderConfig contains configuration options for a ManualReader. diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index dd6c89307ba..201a57f6683 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -239,14 +239,15 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata if err != nil { return metricdata.ResourceMetrics{}, err } + var errs []error for _, producer := range r.externalProducers.Load().([]Producer) { externalMetrics, err := producer.Produce(ctx) if err != nil { - return metricdata.ResourceMetrics{}, err + errs = append(errs, err) } rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) } - return rm, nil + return rm, unifyErrors(errs) } // export exports metric data m using r's exporter. diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index f1148f015ca..a0b20ba28dc 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -117,18 +117,26 @@ func (ts *readerTestSuite) TestMultipleRegister() { ts.Equal(assert.AnError, err) } -func (ts *readerTestSuite) TestExternalProducerError() { +func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { ts.Reader.register(testSDKProducer{}) ts.Reader.RegisterProducer( testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { - return []metricdata.ScopeMetrics{testScopeMetricsB}, assert.AnError + return []metricdata.ScopeMetrics{}, assert.AnError + }, + }, + ) + ts.Reader.RegisterProducer( + testExternalProducer{ + produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + return []metricdata.ScopeMetrics{testScopeMetricsB}, nil }, }, ) - _, err := ts.Reader.Collect(context.Background()) + m, err := ts.Reader.Collect(context.Background()) ts.Equal(assert.AnError, err) + ts.Equal(testResourceMetricsAB, m) } func (ts *readerTestSuite) TestMethodConcurrency() { From 31799adafb339cca0711c7040a7c4b1e1bba25d6 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 12 Dec 2022 17:38:41 +0000 Subject: [PATCH 12/15] fix lint --- sdk/metric/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 86f04e6b416..c837df8b76f 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -58,7 +58,7 @@ func unify(funcs []func(context.Context) error) func(context.Context) error { } } -// unifyErrors combines multiple errors into a single error +// unifyErrors combines multiple errors into a single error. func unifyErrors(errs []error) error { switch len(errs) { case 0: From ff4a1c242f32cf81a67cc5aabcfebb7bb9ff51d7 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 12 Dec 2022 17:39:47 +0000 Subject: [PATCH 13/15] add additional test --- sdk/metric/reader_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index a0b20ba28dc..191ab39945b 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -139,6 +139,18 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { ts.Equal(testResourceMetricsAB, m) } +func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { + ts.Reader.register(testSDKProducer{ + produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { + return metricdata.ResourceMetrics{}, assert.AnError + }}) + ts.Reader.RegisterProducer(testExternalProducer{}) + + m, err := ts.Reader.Collect(context.Background()) + ts.Equal(assert.AnError, err) + ts.Equal(metricdata.ResourceMetrics{}, m) +} + func (ts *readerTestSuite) TestMethodConcurrency() { // Requires the race-detector (a default test option for the project). From 0a881975426bb2d58f09f7ad19634b187bff2b5a Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 12 Dec 2022 18:28:56 +0000 Subject: [PATCH 14/15] unallocate producers on shutdown --- sdk/metric/manual_reader.go | 2 ++ sdk/metric/periodic_reader.go | 3 +++ 2 files changed, 5 insertions(+) diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 4d88380afc4..2a00606eafd 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -97,6 +97,8 @@ func (mr *manualReader) Shutdown(context.Context) error { mr.sdkProducer.Store(produceHolder{ produce: shutdownProducer{}.produce, }) + // release references to Producer(s) + mr.externalProducers.Store([]Producer{}) err = nil }) return err diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 201a57f6683..26dd694e9c7 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -301,6 +301,9 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { } } + // release references to Producer(s) + r.externalProducers.Store([]Producer{}) + sErr := r.exporter.Shutdown(ctx) if err == nil || err == ErrReaderShutdown { err = sErr From 09eef4408304b88d673c153ffa25c4a940fe3800 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 13 Dec 2022 19:18:51 +0000 Subject: [PATCH 15/15] don't register Producers after shutdown --- sdk/metric/manual_reader.go | 7 +++++++ sdk/metric/periodic_reader.go | 13 ++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 2a00606eafd..48a8b291e77 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -32,6 +32,7 @@ type manualReader struct { shutdownOnce sync.Once mu sync.Mutex + isShutdown bool externalProducers atomic.Value temporalitySelector TemporalitySelector @@ -67,6 +68,9 @@ func (mr *manualReader) register(p sdkProducer) { func (mr *manualReader) RegisterProducer(p Producer) { mr.mu.Lock() defer mr.mu.Unlock() + if mr.isShutdown { + return + } currentProducers := mr.externalProducers.Load().([]Producer) newProducers := []Producer{} newProducers = append(newProducers, currentProducers...) @@ -97,6 +101,9 @@ func (mr *manualReader) Shutdown(context.Context) error { mr.sdkProducer.Store(produceHolder{ produce: shutdownProducer{}.produce, }) + mr.mu.Lock() + defer mr.mu.Unlock() + mr.isShutdown = true // release references to Producer(s) mr.externalProducers.Store([]Producer{}) err = nil diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 26dd694e9c7..8425e42e16a 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -130,6 +130,7 @@ type periodicReader struct { sdkProducer atomic.Value mu sync.Mutex + isShutdown bool externalProducers atomic.Value timeout time.Duration @@ -182,6 +183,9 @@ func (r *periodicReader) register(p sdkProducer) { func (r *periodicReader) RegisterProducer(p Producer) { r.mu.Lock() defer r.mu.Unlock() + if r.isShutdown { + return + } currentProducers := r.externalProducers.Load().([]Producer) newProducers := []Producer{} newProducers = append(newProducers, currentProducers...) @@ -301,13 +305,16 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { } } - // release references to Producer(s) - r.externalProducers.Store([]Producer{}) - sErr := r.exporter.Shutdown(ctx) if err == nil || err == ErrReaderShutdown { err = sErr } + + r.mu.Lock() + defer r.mu.Unlock() + r.isShutdown = true + // release references to Producer(s) + r.externalProducers.Store([]Producer{}) }) return err }