Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 50 additions & 19 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ const (

// periodicReaderConfig contains configuration options for a PeriodicReader.
type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
producers []Producer
interval time.Duration
timeout time.Duration
maxExportBatchSize int
producers []Producer
}

// newPeriodicReaderConfig returns a periodicReaderConfig configured with
Expand Down Expand Up @@ -96,6 +97,18 @@ func WithInterval(d time.Duration) PeriodicReaderOption {
})
}

// WithMaxExportBatchSize returns a PeriodicReaderOption that configures
// the maximum export batch size allowed for a PeriodicReader.
func WithMaxExportBatchSize(size int) PeriodicReaderOption {
return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
if size <= 0 {
return conf
}
conf.maxExportBatchSize = size
return conf
})
}

// NewPeriodicReader returns a Reader that collects and exports metric data to
// the exporter at a defined interval. By default, the returned Reader will
// collect and export data every 60 seconds, and will cancel any attempts that
Expand All @@ -109,12 +122,13 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri
conf := newPeriodicReaderConfig(options)
ctx, cancel := context.WithCancel(context.Background())
r := &PeriodicReader{
interval: conf.interval,
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
interval: conf.interval,
timeout: conf.timeout,
maxExportBatchSize: conf.maxExportBatchSize,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
rmPool: sync.Pool{
New: func() any {
return &metricdata.ResourceMetrics{}
Expand Down Expand Up @@ -157,10 +171,11 @@ type PeriodicReader struct {
isShutdown bool
externalProducers atomic.Value

interval time.Duration
timeout time.Duration
exporter Exporter
flushCh chan chan error
interval time.Duration
timeout time.Duration
maxExportBatchSize int
exporter Exporter
flushCh chan chan error

done chan struct{}
cancel context.CancelFunc
Expand Down Expand Up @@ -223,14 +238,19 @@ func (r *PeriodicReader) aggregation(
// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect and export timeout"))
defer cancel()

// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
err := r.Collect(ctx, rm)
var err error
err = r.Collect(ctx, rm)
if err == nil {
err = r.export(ctx, rm)
if r.maxExportBatchSize > 0 {
for resourceMetricsDPC(rm) > r.maxExportBatchSize {
batch := &metricdata.ResourceMetrics{}
splitMetrics(r.maxExportBatchSize, rm, batch)
err = errors.Join(err, r.export(ctx, batch))
}
}
err = errors.Join(err, r.export(ctx, rm))
}
r.rmPool.Put(rm)
return err
Expand All @@ -256,6 +276,8 @@ func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet

// collect unwraps p as a produceHolder and returns its produce results.
func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.ResourceMetrics) error {
ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect timeout"))
defer cancel()
var err error
if r.inst != nil {
cp := r.inst.CollectMetrics(ctx)
Expand Down Expand Up @@ -296,6 +318,8 @@ func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.Reso

// export exports metric data m using r's exporter.
func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader export timeout"))
defer cancel()
return r.exporter.Export(ctx, m)
}

Expand Down Expand Up @@ -357,7 +381,14 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error {
m := r.rmPool.Get().(*metricdata.ResourceMetrics)
err = r.collect(ctx, ph, m)
if err == nil {
err = r.export(ctx, m)
if r.maxExportBatchSize > 0 {
for resourceMetricsDPC(m) > r.maxExportBatchSize {
batch := &metricdata.ResourceMetrics{}
splitMetrics(r.maxExportBatchSize, m, batch)
err = errors.Join(err, r.export(ctx, batch))
}
}
err = errors.Join(err, r.export(ctx, m))
}
r.rmPool.Put(m)
}
Expand Down
96 changes: 91 additions & 5 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import (
"context"
"errors"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -216,11 +217,11 @@
}

func (ts *periodicReaderTestSuite) TestForceFlushPropagated() {
ts.Equal(assert.AnError, ts.ErrReader.ForceFlush(context.Background()))
ts.ErrorIs(ts.ErrReader.ForceFlush(context.Background()), assert.AnError)
}

func (ts *periodicReaderTestSuite) TestShutdownPropagated() {
ts.Equal(assert.AnError, ts.ErrReader.Shutdown(context.Background()))
ts.ErrorIs(ts.ErrReader.Shutdown(context.Background()), assert.AnError)
}

func TestPeriodicReader(t *testing.T) {
Expand Down Expand Up @@ -291,7 +292,92 @@
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
trigger <- time.Now()
assert.Equal(t, assert.AnError, <-eh.Err)
assert.ErrorIs(t, <-eh.Err, assert.AnError)

// Ensure Reader is allowed clean up attempt.
_ = r.Shutdown(t.Context())
}

func TestPeriodicReaderBatching(t *testing.T) {
trigger := triggerTicker(t)

// Register an error handler to validate export errors are passed to
// otel.Handle.
defer func(orig otel.ErrorHandler) {
otel.SetErrorHandler(orig)
}(otel.GetErrorHandler())
eh := newChErrorHandler()
otel.SetErrorHandler(eh)

expectations := []metricdata.ResourceMetrics{
testResourceMetricsAB,
testResourceMetricsC1,
testResourceMetricsC2,
}

expectationIdx := 0
exp := &fnExporter{
exportFunc: func(_ context.Context, m *metricdata.ResourceMetrics) error {
// collectAndExport is potentially called multiple times, so just
// make sure batches are split correctly and are in order.
expect := expectations[expectationIdx%len(expectations)]
// The testSDKProducer produces three batches of metrics.
assert.Equal(t, expect, *m, fmt.Sprintf("expectations[%d] not equal", expectationIdx))

Check failure on line 325 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

formatter: remove unnecessary fmt.Sprintf (testifylint)
expectationIdx++
return assert.AnError
},
}

r := NewPeriodicReader(
exp,
WithMaxExportBatchSize(2),
WithProducer(testExternalProducer{}),
WithProducer(testExternalProducer{
produceFunc: func(context.Context) ([]metricdata.ScopeMetrics, error) {
// Splitting modifies the batch, so we need to create a new one each time.
return []metricdata.ScopeMetrics{metricdata.ScopeMetrics{

Check failure on line 338 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofumpt)
Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/internal"},
Metrics: []metricdata.Metrics{{
Name: "metric1",
Description: "first of multiple metrics",
Unit: "ms",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{{
Attributes: attribute.NewSet(attribute.String("user", "david")),
StartTime: ts1,
Time: ts1.Add(time.Second),
Value: 1,
}},
},
}, {
Name: "metric2",
Description: "second of multiple metrics",
Unit: "ms",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("user", "tyler")),
StartTime: ts2,
Time: ts2.Add(time.Second),
Value: 10,
},
{
Attributes: attribute.NewSet(attribute.String("user", "robert")),
StartTime: ts3,
Time: ts3.Add(time.Second),
Value: 100,
},
},
},
}},
}}, nil
},
}))
r.register(testSDKProducer{})
trigger <- time.Now()
assert.Equal(t, <-eh.Err, errors.Join(errors.Join(errors.Join(assert.AnError), assert.AnError), assert.AnError))
trigger <- time.Now()
assert.Equal(t, <-eh.Err, errors.Join(errors.Join(errors.Join(assert.AnError), assert.AnError), assert.AnError))

// Ensure Reader is allowed clean up attempt.
_ = r.Shutdown(t.Context())
Expand All @@ -318,7 +404,7 @@
exp, called := expFunc(t)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(t.Context()), "export error not returned")
assert.ErrorIs(t, r.ForceFlush(t.Context()), assert.AnError, "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")

// Ensure Reader is allowed clean up attempt.
Expand Down Expand Up @@ -374,7 +460,7 @@
exp, called := expFunc(t)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(t.Context()), "export error not returned")
assert.ErrorIs(t, r.Shutdown(t.Context()), assert.AnError, "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})

Expand Down
66 changes: 66 additions & 0 deletions sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,62 @@ var testScopeMetricsB = metricdata.ScopeMetrics{
}},
}

var (
ts1 = time.Now()
ts2 = time.Now()
ts3 = time.Now()
)

var testScopeMetricsC1 = metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/internal"},
Metrics: []metricdata.Metrics{{
Name: "metric1",
Description: "first of multiple metrics",
Unit: "ms",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{{
Attributes: attribute.NewSet(attribute.String("user", "david")),
StartTime: ts1,
Time: ts1.Add(time.Second),
Value: 1,
}},
},
}, {
Name: "metric2",
Description: "second of multiple metrics",
Unit: "ms",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("user", "tyler")),
StartTime: ts2,
Time: ts2.Add(time.Second),
Value: 10,
},
},
},
}},
}

var testScopeMetricsC2 = metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/internal"},
Metrics: []metricdata.Metrics{{
Name: "metric2",
Description: "second of multiple metrics",
Unit: "ms",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("user", "robert")),
StartTime: ts3,
Time: ts3.Add(time.Second),
Value: 100,
},
},
},
}},
}

var testResourceMetricsA = metricdata.ResourceMetrics{
Resource: resource.NewSchemaless(attribute.String("test", "Reader")),
ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA},
Expand All @@ -247,6 +303,16 @@ var testResourceMetricsAB = metricdata.ResourceMetrics{
ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA, testScopeMetricsB},
}

var testResourceMetricsC1 = metricdata.ResourceMetrics{
Resource: resource.NewSchemaless(attribute.String("test", "Reader")),
ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsC1},
}

var testResourceMetricsC2 = metricdata.ResourceMetrics{
Resource: resource.NewSchemaless(attribute.String("test", "Reader")),
ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsC2},
}

type testSDKProducer struct {
produceFunc func(context.Context, *metricdata.ResourceMetrics) error
}
Expand Down
Loading
Loading