Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the Reader.Collect Signature. #3732

Merged
merged 7 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Attribute `KeyValue` creations functions to `go.opentelemetry.io/otel/semconv/v1.17.0` for all non-enum semantic conventions.
These functions ensure semantic convention type correctness. (#3675)

### Changed

- The `Collect` method of the `"go.opentelemetry.io/otel/sdk/metric".Reader` interface is updated to accept the `metricdata.ResourceMetrics` value the collection will be made into.
This change is made to enable memory reuse by SDK users. (#3732)
### Fixed

- Removed the `http.target` attribute from being added by `ServerRequest` in the following packages. (#3687)
Expand Down
4 changes: 3 additions & 1 deletion exporters/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {

// Collect implements prometheus.Collector.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
metrics, err := c.reader.Collect(context.TODO())
// TODO (#3047): Use a sync.Pool instead of allocating metrics every Collect.
metrics := metricdata.ResourceMetrics{}
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
err := c.reader.Collect(context.TODO(), &metrics)
if err != nil {
otel.Handle(err)
if err == metric.ErrReaderNotRegistered {
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func BenchmarkCounterCollectOneAttr(b *testing.B) {
for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.Int("K", 1))

_, _ = rdr.Collect(ctx)
_ = rdr.Collect(ctx, nil)
}
}

Expand All @@ -104,7 +104,7 @@ func BenchmarkCounterCollectTenAttrs(b *testing.B) {
for j := 0; j < 10; j++ {
cntr.Add(ctx, 1, attribute.Int("K", j))
}
_, _ = rdr.Collect(ctx)
_ = rdr.Collect(ctx, nil)
}
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func benchCollectHistograms(count int) func(*testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
collectedMetrics, _ = r.Collect(ctx)
_ = r.Collect(ctx, &collectedMetrics)
if len(collectedMetrics.ScopeMetrics[0].Metrics) != count {
b.Fatalf("got %d metrics, want %d", len(collectedMetrics.ScopeMetrics[0].Metrics), count)
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type reader struct {
externalProducers []Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
collectFunc func(context.Context, *metricdata.ResourceMetrics) error
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
Expand All @@ -48,8 +48,8 @@ func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.e
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind)
}
func (r *reader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collectFunc(ctx)
func (r *reader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
return r.collectFunc(ctx, rm)
}
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }
Expand Down
24 changes: 16 additions & 8 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -112,11 +113,17 @@ func (mr *manualReader) Shutdown(context.Context) error {
}

// Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
// callbacks necessary and stores the result in rm.
//
// Collect will return an error if called after shutdown.
// Collect will return an error if rm is a nil ResourceMetrics.
func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("manual reader: *metricdata.ResourceMetrics is nil")
}
p := mr.sdkProducer.Load()
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
return ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
Expand All @@ -126,12 +133,13 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("manual reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
return err
}

rm, err := ph.produce(ctx)
// TODO (#3047): When produce is updated to accept output as param, pass rm. //TODO:
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
rmTemp, err := ph.produce(ctx)
*rm = rmTemp
if err != nil {
return metricdata.ResourceMetrics{}, err
return err
}
var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
Expand All @@ -141,7 +149,7 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
return unifyErrors(errs)
}

// manualReaderConfig contains configuration options for a ManualReader.
Expand Down
24 changes: 15 additions & 9 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ func TestMeterCreatesInstruments(t *testing.T) {

tt.fn(t, m)

rm, err := rdr.Collect(context.Background())
rm := metricdata.ResourceMetrics{}
err := rdr.Collect(context.Background(), &rm)
assert.NoError(t, err)

require.Len(t, rm.ScopeMetrics, 1)
Expand Down Expand Up @@ -566,7 +567,7 @@ func TestCallbackObserverNonRegistered(t *testing.T) {

var got metricdata.ResourceMetrics
assert.NotPanics(t, func() {
got, err = rdr.Collect(context.Background())
err = rdr.Collect(context.Background(), &got)
})

assert.NoError(t, err)
Expand Down Expand Up @@ -660,7 +661,8 @@ func TestGlobalInstRegisterCallback(t *testing.T) {
_, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr)
assert.NoError(t, err)

got, err := rdr.Collect(context.Background())
got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
assert.NoError(t, err)
assert.Lenf(t, l.messages, 0, "Warnings and errors logged:\n%s", l)
metricdatatest.AssertEqual(t, metricdata.ResourceMetrics{
Expand Down Expand Up @@ -772,7 +774,8 @@ func TestMetersProvideScope(t *testing.T) {
},
}

got, err := rdr.Collect(context.Background())
got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
assert.NoError(t, err)
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
}
Expand Down Expand Up @@ -816,14 +819,14 @@ func TestUnregisterUnregisters(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
_, err = r.Collect(ctx)
err = r.Collect(ctx, &metricdata.ResourceMetrics{})
require.NoError(t, err)
assert.True(t, called, "callback not called for registered callback")

called = false
require.NoError(t, reg.Unregister(), "unregister")

_, err = r.Collect(ctx)
err = r.Collect(ctx, &metricdata.ResourceMetrics{})
require.NoError(t, err)
assert.False(t, called, "callback called for unregistered callback")
}
Expand Down Expand Up @@ -869,7 +872,8 @@ func TestRegisterCallbackDropAggregations(t *testing.T) {
)
require.NoError(t, err)

data, err := r.Collect(context.Background())
data := metricdata.ResourceMetrics{}
err = r.Collect(context.Background(), &data)
require.NoError(t, err)

assert.False(t, called, "callback called for all drop instruments")
Expand Down Expand Up @@ -1238,7 +1242,8 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) {
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))

m, err := rdr.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := rdr.Collect(context.Background(), &m)
assert.NoError(t, err)

require.Len(t, m.ScopeMetrics, 1)
Expand Down Expand Up @@ -1331,7 +1336,8 @@ func TestAsynchronousExample(t *testing.T) {

collect := func(t *testing.T) {
t.Helper()
got, err := reader.Collect(context.Background())
got := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp())
Expand Down
25 changes: 17 additions & 8 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -206,21 +207,29 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio
// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error {
m, err := r.Collect(ctx)
// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := metricdata.ResourceMetrics{}
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
err := r.Collect(ctx, &rm)
if err == nil {
err = r.export(ctx, m)
err = r.export(ctx, rm)
}
return err
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK and other Producers. The returned metric data is not exported
// to the configured exporter, it is left to the caller to handle that if
// desired.
// the SDK and other Producers and stores the result in rm. The returned metric
// data is not exported to the configured exporter, it is left to the caller to
// handle that if desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collect(ctx, r.sdkProducer.Load())
// An error is returned if this is called after Shutdown. An error is return if rm is nil.
func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
}
// TODO (#3047): When collect is updated to accept output as param, pass rm.
rmTemp, err := r.collect(ctx, r.sdkProducer.Load())
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
*rm = rmTemp
return err
}

// collect unwraps p as a produceHolder and returns its produce results.
Expand Down
5 changes: 3 additions & 2 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type Reader interface {
aggregation(InstrumentKind) aggregation.Aggregation // nolint:revive // import-shadow for method scoped by type.

// Collect gathers and returns all metric data related to the Reader from
// the SDK. An error is returned if this is called after Shutdown.
Collect(context.Context) (metricdata.ResourceMetrics, error)
// the SDK and stores it in out. An error is returned if this is called
// after Shutdown or if out is nil.
Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error

// ForceFlush flushes all metric measurements held in an export pipeline.
//
Expand Down
31 changes: 21 additions & 10 deletions sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,23 @@ func (ts *readerTestSuite) TearDownTest() {
}

func (ts *readerTestSuite) TestErrorForNotRegistered() {
_, err := ts.Reader.Collect(context.Background())
err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{})
ts.ErrorIs(err, ErrReaderNotRegistered)
}

func (ts *readerTestSuite) TestSDKProducer() {
ts.Reader.register(testSDKProducer{})
m, err := ts.Reader.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.NoError(err)
ts.Equal(testResourceMetricsA, m)
}

func (ts *readerTestSuite) TestExternalProducer() {
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})
m, err := ts.Reader.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.NoError(err)
ts.Equal(testResourceMetricsAB, m)
}
Expand All @@ -78,7 +80,8 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() {
ts.Reader.RegisterProducer(testExternalProducer{})
ts.Require().NoError(ts.Reader.Shutdown(ctx))

m, err := ts.Reader.Collect(ctx)
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(ctx, &m)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(metricdata.ResourceMetrics{}, m)
}
Expand Down Expand Up @@ -113,7 +116,7 @@ func (ts *readerTestSuite) TestMultipleRegister() {
// This should be ignored.
ts.Reader.register(p1)

_, err := ts.Reader.Collect(context.Background())
err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{})
ts.Equal(assert.AnError, err)
}

Expand All @@ -134,7 +137,8 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() {
},
)

m, err := ts.Reader.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.Equal(assert.AnError, err)
ts.Equal(testResourceMetricsAB, m)
}
Expand All @@ -146,7 +150,8 @@ func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() {
}})
ts.Reader.RegisterProducer(testExternalProducer{})

m, err := ts.Reader.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.Equal(assert.AnError, err)
ts.Equal(metricdata.ResourceMetrics{}, m)
}
Expand All @@ -165,7 +170,7 @@ func (ts *readerTestSuite) TestMethodConcurrency() {
wg.Add(1)
go func() {
defer wg.Done()
_, _ = ts.Reader.Collect(ctx)
_ = ts.Reader.Collect(ctx, nil)
}()

wg.Add(1)
Expand All @@ -190,11 +195,17 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() {
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})

m, err := ts.Reader.Collect(ctx)
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(ctx, &m)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(metricdata.ResourceMetrics{}, m)
}

func (ts *readerTestSuite) TestCollectNilResourceMetricError() {
ctx := context.Background()
ts.Assert().Error(ts.Reader.Collect(ctx, nil))
}

var testScopeMetricsA = metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"},
Metrics: []metricdata.Metrics{{
Expand Down Expand Up @@ -279,7 +290,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
collectedMetrics, err = r.Collect(ctx)
err = r.Collect(ctx, &collectedMetrics)
assert.Equalf(b, testResourceMetricsA, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err)
}
}
Expand Down