Skip to content

Commit 14a17b3

Browse files
dashpoleMrAlias
andauthored
Add Metric Producer as a new interface, which returns scope metrics (#3524)
* add RegisterProducer method and metric.Producer interface * rename testProducer to testSDKProducer * rename testMetrics to testResourceMetrics * add testExternalProducer for testing bridges * add test data for testing external producers * clean up help text * unit tests for external Producer * changelog * improve test coverage * Update CHANGELOG.md Co-authored-by: Tyler Yahn <[email protected]> * support partial errors * fix lint * add additional test * unallocate producers on shutdown * don't register Producers after shutdown Co-authored-by: Tyler Yahn <[email protected]>
1 parent 4e76347 commit 14a17b3

8 files changed

+267
-80
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1212

1313
- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)
1414

15+
### Added
16+
17+
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)
18+
1519
## [1.11.2/0.34.0] 2022-12-05
1620

1721
### Added

sdk/metric/config.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,19 @@ func unify(funcs []func(context.Context) error) func(context.Context) error {
5454
errs = append(errs, err)
5555
}
5656
}
57-
switch len(errs) {
58-
case 0:
59-
return nil
60-
case 1:
61-
return errs[0]
62-
default:
63-
return fmt.Errorf("%v", errs)
64-
}
57+
return unifyErrors(errs)
58+
}
59+
}
60+
61+
// unifyErrors combines multiple errors into a single error.
62+
func unifyErrors(errs []error) error {
63+
switch len(errs) {
64+
case 0:
65+
return nil
66+
case 1:
67+
return errs[0]
68+
default:
69+
return fmt.Errorf("%v", errs)
6570
}
6671
}
6772

sdk/metric/config_test.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import (
2828
)
2929

3030
type reader struct {
31-
producer producer
32-
temporalityFunc TemporalitySelector
33-
aggregationFunc AggregationSelector
34-
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
35-
forceFlushFunc func(context.Context) error
36-
shutdownFunc func(context.Context) error
31+
producer sdkProducer
32+
externalProducers []Producer
33+
temporalityFunc TemporalitySelector
34+
aggregationFunc AggregationSelector
35+
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
36+
forceFlushFunc func(context.Context) error
37+
shutdownFunc func(context.Context) error
3738
}
3839

3940
var _ Reader = (*reader)(nil)
@@ -42,7 +43,8 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n
4243
return r.aggregationFunc(kind)
4344
}
4445

45-
func (r *reader) register(p producer) { r.producer = p }
46+
func (r *reader) register(p sdkProducer) { r.producer = p }
47+
func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.externalProducers, p) }
4648
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
4749
return r.temporalityFunc(kind)
4850
}

sdk/metric/manual_reader.go

+49-11
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@ import (
2828
// manualReader is a simple Reader that allows an application to
2929
// read metrics on demand.
3030
type manualReader struct {
31-
producer atomic.Value
31+
sdkProducer atomic.Value
3232
shutdownOnce sync.Once
3333

34+
mu sync.Mutex
35+
isShutdown bool
36+
externalProducers atomic.Value
37+
3438
temporalitySelector TemporalitySelector
3539
aggregationSelector AggregationSelector
3640
}
@@ -41,22 +45,39 @@ var _ = map[Reader]struct{}{&manualReader{}: {}}
4145
// NewManualReader returns a Reader which is directly called to collect metrics.
4246
func NewManualReader(opts ...ManualReaderOption) Reader {
4347
cfg := newManualReaderConfig(opts)
44-
return &manualReader{
48+
r := &manualReader{
4549
temporalitySelector: cfg.temporalitySelector,
4650
aggregationSelector: cfg.aggregationSelector,
4751
}
52+
r.externalProducers.Store([]Producer{})
53+
return r
4854
}
4955

50-
// register stores the Producer which enables the caller to read
51-
// metrics on demand.
52-
func (mr *manualReader) register(p producer) {
56+
// register stores the sdkProducer which enables the caller
57+
// to read metrics from the SDK on demand.
58+
func (mr *manualReader) register(p sdkProducer) {
5359
// Only register once. If producer is already set, do nothing.
54-
if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
60+
if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
5561
msg := "did not register manual reader"
5662
global.Error(errDuplicateRegister, msg)
5763
}
5864
}
5965

66+
// RegisterProducer stores the external Producer which enables the caller
67+
// to read metrics on demand.
68+
func (mr *manualReader) RegisterProducer(p Producer) {
69+
mr.mu.Lock()
70+
defer mr.mu.Unlock()
71+
if mr.isShutdown {
72+
return
73+
}
74+
currentProducers := mr.externalProducers.Load().([]Producer)
75+
newProducers := []Producer{}
76+
newProducers = append(newProducers, currentProducers...)
77+
newProducers = append(newProducers, p)
78+
mr.externalProducers.Store(newProducers)
79+
}
80+
6081
// temporality reports the Temporality for the instrument kind provided.
6182
func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality {
6283
return mr.temporalitySelector(kind)
@@ -77,18 +98,23 @@ func (mr *manualReader) Shutdown(context.Context) error {
7798
err := ErrReaderShutdown
7899
mr.shutdownOnce.Do(func() {
79100
// Any future call to Collect will now return ErrReaderShutdown.
80-
mr.producer.Store(produceHolder{
101+
mr.sdkProducer.Store(produceHolder{
81102
produce: shutdownProducer{}.produce,
82103
})
104+
mr.mu.Lock()
105+
defer mr.mu.Unlock()
106+
mr.isShutdown = true
107+
// release references to Producer(s)
108+
mr.externalProducers.Store([]Producer{})
83109
err = nil
84110
})
85111
return err
86112
}
87113

88-
// Collect gathers all metrics from the SDK, calling any callbacks necessary.
89-
// Collect will return an error if called after shutdown.
114+
// Collect gathers all metrics from the SDK and other Producers, calling any
115+
// callbacks necessary. Collect will return an error if called after shutdown.
90116
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
91-
p := mr.producer.Load()
117+
p := mr.sdkProducer.Load()
92118
if p == nil {
93119
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
94120
}
@@ -103,7 +129,19 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
103129
return metricdata.ResourceMetrics{}, err
104130
}
105131

106-
return ph.produce(ctx)
132+
rm, err := ph.produce(ctx)
133+
if err != nil {
134+
return metricdata.ResourceMetrics{}, err
135+
}
136+
var errs []error
137+
for _, producer := range mr.externalProducers.Load().([]Producer) {
138+
externalMetrics, err := producer.Produce(ctx)
139+
if err != nil {
140+
errs = append(errs, err)
141+
}
142+
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
143+
}
144+
return rm, unifyErrors(errs)
107145
}
108146

109147
// manualReaderConfig contains configuration options for a ManualReader.

sdk/metric/periodic_reader.go

+47-8
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
114114
cancel: cancel,
115115
done: make(chan struct{}),
116116
}
117+
r.externalProducers.Store([]Producer{})
117118

118119
go func() {
119120
defer func() { close(r.done) }()
@@ -126,7 +127,11 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
126127
// periodicReader is a Reader that continuously collects and exports metric
127128
// data at a set interval.
128129
type periodicReader struct {
129-
producer atomic.Value
130+
sdkProducer atomic.Value
131+
132+
mu sync.Mutex
133+
isShutdown bool
134+
externalProducers atomic.Value
130135

131136
timeout time.Duration
132137
exporter Exporter
@@ -166,14 +171,28 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
166171
}
167172

168173
// register registers p as the producer of this reader.
169-
func (r *periodicReader) register(p producer) {
174+
func (r *periodicReader) register(p sdkProducer) {
170175
// Only register once. If producer is already set, do nothing.
171-
if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
176+
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
172177
msg := "did not register periodic reader"
173178
global.Error(errDuplicateRegister, msg)
174179
}
175180
}
176181

182+
// RegisterProducer registers p as an external Producer of this reader.
183+
func (r *periodicReader) RegisterProducer(p Producer) {
184+
r.mu.Lock()
185+
defer r.mu.Unlock()
186+
if r.isShutdown {
187+
return
188+
}
189+
currentProducers := r.externalProducers.Load().([]Producer)
190+
newProducers := []Producer{}
191+
newProducers = append(newProducers, currentProducers...)
192+
newProducers = append(newProducers, p)
193+
r.externalProducers.Store(newProducers)
194+
}
195+
177196
// temporality reports the Temporality for the instrument kind provided.
178197
func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
179198
return r.exporter.Temporality(kind)
@@ -195,12 +214,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
195214
}
196215

197216
// Collect gathers and returns all metric data related to the Reader from
198-
// the SDK. The returned metric data is not exported to the configured
199-
// exporter, it is left to the caller to handle that if desired.
217+
// the SDK and other Producers. The returned metric data is not exported
218+
// to the configured exporter, it is left to the caller to handle that if
219+
// desired.
200220
//
201221
// An error is returned if this is called after Shutdown.
202222
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
203-
return r.collect(ctx, r.producer.Load())
223+
return r.collect(ctx, r.sdkProducer.Load())
204224
}
205225

206226
// collect unwraps p as a produceHolder and returns its produce results.
@@ -218,7 +238,20 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
218238
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
219239
return metricdata.ResourceMetrics{}, err
220240
}
221-
return ph.produce(ctx)
241+
242+
rm, err := ph.produce(ctx)
243+
if err != nil {
244+
return metricdata.ResourceMetrics{}, err
245+
}
246+
var errs []error
247+
for _, producer := range r.externalProducers.Load().([]Producer) {
248+
externalMetrics, err := producer.Produce(ctx)
249+
if err != nil {
250+
errs = append(errs, err)
251+
}
252+
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
253+
}
254+
return rm, unifyErrors(errs)
222255
}
223256

224257
// export exports metric data m using r's exporter.
@@ -259,7 +292,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
259292
<-r.done
260293

261294
// Any future call to Collect will now return ErrReaderShutdown.
262-
ph := r.producer.Swap(produceHolder{
295+
ph := r.sdkProducer.Swap(produceHolder{
263296
produce: shutdownProducer{}.produce,
264297
})
265298

@@ -276,6 +309,12 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
276309
if err == nil || err == ErrReaderShutdown {
277310
err = sErr
278311
}
312+
313+
r.mu.Lock()
314+
defer r.mu.Unlock()
315+
r.isShutdown = true
316+
// release references to Producer(s)
317+
r.externalProducers.Store([]Producer{})
279318
})
280319
return err
281320
}

sdk/metric/periodic_reader_test.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ func (ts *periodicReaderTestSuite) SetupTest() {
114114
}
115115

116116
ts.ErrReader = NewPeriodicReader(e)
117-
ts.ErrReader.register(testProducer{})
117+
ts.ErrReader.register(testSDKProducer{})
118+
ts.ErrReader.RegisterProducer(testExternalProducer{})
118119
}
119120

120121
func (ts *periodicReaderTestSuite) TearDownTest() {
@@ -186,14 +187,15 @@ func TestPeriodicReaderRun(t *testing.T) {
186187

187188
exp := &fnExporter{
188189
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
189-
// The testProducer produces testMetrics.
190-
assert.Equal(t, testMetrics, m)
190+
// The testSDKProducer produces testResourceMetricsAB.
191+
assert.Equal(t, testResourceMetricsAB, m)
191192
return assert.AnError
192193
},
193194
}
194195

195196
r := NewPeriodicReader(exp)
196-
r.register(testProducer{})
197+
r.register(testSDKProducer{})
198+
r.RegisterProducer(testExternalProducer{})
197199
trigger <- time.Now()
198200
assert.Equal(t, assert.AnError, <-eh.Err)
199201

@@ -210,8 +212,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
210212
called = new(bool)
211213
return &fnExporter{
212214
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
213-
// The testProducer produces testMetrics.
214-
assert.Equal(t, testMetrics, m)
215+
// The testSDKProducer produces testResourceMetricsA.
216+
assert.Equal(t, testResourceMetricsAB, m)
215217
*called = true
216218
return assert.AnError
217219
},
@@ -221,7 +223,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
221223
t.Run("ForceFlush", func(t *testing.T) {
222224
exp, called := expFunc(t)
223225
r := NewPeriodicReader(exp)
224-
r.register(testProducer{})
226+
r.register(testSDKProducer{})
227+
r.RegisterProducer(testExternalProducer{})
225228
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
226229
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
227230

@@ -232,7 +235,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
232235
t.Run("Shutdown", func(t *testing.T) {
233236
exp, called := expFunc(t)
234237
r := NewPeriodicReader(exp)
235-
r.register(testProducer{})
238+
r.register(testSDKProducer{})
239+
r.RegisterProducer(testExternalProducer{})
236240
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
237241
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
238242
})

sdk/metric/reader.go

+16-3
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ type Reader interface {
5151
// register registers a Reader with a MeterProvider.
5252
// The producer argument allows the Reader to signal the sdk to collect
5353
// and send aggregated metric measurements.
54-
register(producer)
54+
register(sdkProducer)
55+
56+
// RegisterProducer registers a an external Producer with this Reader.
57+
// The Producer is used as a source of aggregated metric data which is
58+
// incorporated into metrics collected from the SDK.
59+
RegisterProducer(Producer)
5560

5661
// temporality reports the Temporality for the instrument kind provided.
5762
temporality(InstrumentKind) metricdata.Temporality
@@ -84,14 +89,22 @@ type Reader interface {
8489
Shutdown(context.Context) error
8590
}
8691

87-
// producer produces metrics for a Reader.
88-
type producer interface {
92+
// sdkProducer produces metrics for a Reader.
93+
type sdkProducer interface {
8994
// produce returns aggregated metrics from a single collection.
9095
//
9196
// This method is safe to call concurrently.
9297
produce(context.Context) (metricdata.ResourceMetrics, error)
9398
}
9499

100+
// Producer produces metrics for a Reader from an external source.
101+
type Producer interface {
102+
// Produce returns aggregated metrics from an external source.
103+
//
104+
// This method should be safe to call concurrently.
105+
Produce(context.Context) ([]metricdata.ScopeMetrics, error)
106+
}
107+
95108
// produceHolder is used as an atomic.Value to wrap the non-concrete producer
96109
// type.
97110
type produceHolder struct {

0 commit comments

Comments
 (0)