diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index ef40ef29a34..eb1265868a1 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -26,9 +26,10 @@ const ( // periodicReaderConfig contains configuration options for a PeriodicReader. type periodicReaderConfig struct { - interval time.Duration - timeout time.Duration - producers []Producer + interval time.Duration + timeout time.Duration + maxExportBatchSize int + producers []Producer } // newPeriodicReaderConfig returns a periodicReaderConfig configured with @@ -96,6 +97,18 @@ func WithInterval(d time.Duration) PeriodicReaderOption { }) } +// WithMaxExportBatchSize returns a PeriodicReaderOption that configures +// the maximum export batch size allowed for a PeriodicReader. +func WithMaxExportBatchSize(size int) PeriodicReaderOption { + return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig { + if size <= 0 { + return conf + } + conf.maxExportBatchSize = size + return conf + }) +} + // NewPeriodicReader returns a Reader that collects and exports metric data to // the exporter at a defined interval. By default, the returned Reader will // collect and export data every 60 seconds, and will cancel any attempts that @@ -109,12 +122,13 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri conf := newPeriodicReaderConfig(options) ctx, cancel := context.WithCancel(context.Background()) r := &PeriodicReader{ - interval: conf.interval, - timeout: conf.timeout, - exporter: exporter, - flushCh: make(chan chan error), - cancel: cancel, - done: make(chan struct{}), + interval: conf.interval, + timeout: conf.timeout, + maxExportBatchSize: conf.maxExportBatchSize, + exporter: exporter, + flushCh: make(chan chan error), + cancel: cancel, + done: make(chan struct{}), rmPool: sync.Pool{ New: func() any { return &metricdata.ResourceMetrics{} @@ -157,10 +171,11 @@ type PeriodicReader struct { isShutdown bool externalProducers atomic.Value - interval time.Duration - timeout time.Duration - exporter Exporter - flushCh chan chan error + interval time.Duration + timeout time.Duration + maxExportBatchSize int + exporter Exporter + flushCh chan chan error done chan struct{} cancel context.CancelFunc @@ -223,14 +238,19 @@ func (r *PeriodicReader) aggregation( // collectAndExport gather all metric data related to the periodicReader r from // the SDK and exports it with r's exporter. func (r *PeriodicReader) collectAndExport(ctx context.Context) error { - ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect and export timeout")) - defer cancel() - // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect. rm := r.rmPool.Get().(*metricdata.ResourceMetrics) - err := r.Collect(ctx, rm) + var err error + err = r.Collect(ctx, rm) if err == nil { - err = r.export(ctx, rm) + if r.maxExportBatchSize > 0 { + for resourceMetricsDPC(rm) > r.maxExportBatchSize { + batch := &metricdata.ResourceMetrics{} + splitMetrics(r.maxExportBatchSize, rm, batch) + err = errors.Join(err, r.export(ctx, batch)) + } + } + err = errors.Join(err, r.export(ctx, rm)) } r.rmPool.Put(rm) return err @@ -256,6 +276,8 @@ func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet // collect unwraps p as a produceHolder and returns its produce results. func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.ResourceMetrics) error { + ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect timeout")) + defer cancel() var err error if r.inst != nil { cp := r.inst.CollectMetrics(ctx) @@ -296,6 +318,8 @@ func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.Reso // export exports metric data m using r's exporter. func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error { + ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader export timeout")) + defer cancel() return r.exporter.Export(ctx, m) } @@ -357,7 +381,14 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error { m := r.rmPool.Get().(*metricdata.ResourceMetrics) err = r.collect(ctx, ph, m) if err == nil { - err = r.export(ctx, m) + if r.maxExportBatchSize > 0 { + for resourceMetricsDPC(m) > r.maxExportBatchSize { + batch := &metricdata.ResourceMetrics{} + splitMetrics(r.maxExportBatchSize, m, batch) + err = errors.Join(err, r.export(ctx, batch)) + } + } + err = errors.Join(err, r.export(ctx, m)) } r.rmPool.Put(m) } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 946bf4fb76c..b937ceda2be 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -5,6 +5,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "errors" "fmt" "strings" "testing" @@ -216,11 +217,11 @@ func (ts *periodicReaderTestSuite) TearDownTest() { } func (ts *periodicReaderTestSuite) TestForceFlushPropagated() { - ts.Equal(assert.AnError, ts.ErrReader.ForceFlush(context.Background())) + ts.ErrorIs(ts.ErrReader.ForceFlush(context.Background()), assert.AnError) } func (ts *periodicReaderTestSuite) TestShutdownPropagated() { - ts.Equal(assert.AnError, ts.ErrReader.Shutdown(context.Background())) + ts.ErrorIs(ts.ErrReader.Shutdown(context.Background()), assert.AnError) } func TestPeriodicReader(t *testing.T) { @@ -291,7 +292,92 @@ func TestPeriodicReaderRun(t *testing.T) { r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) trigger <- time.Now() - assert.Equal(t, assert.AnError, <-eh.Err) + assert.ErrorIs(t, <-eh.Err, assert.AnError) + + // Ensure Reader is allowed clean up attempt. + _ = r.Shutdown(t.Context()) +} + +func TestPeriodicReaderBatching(t *testing.T) { + trigger := triggerTicker(t) + + // Register an error handler to validate export errors are passed to + // otel.Handle. + defer func(orig otel.ErrorHandler) { + otel.SetErrorHandler(orig) + }(otel.GetErrorHandler()) + eh := newChErrorHandler() + otel.SetErrorHandler(eh) + + expectations := []metricdata.ResourceMetrics{ + testResourceMetricsAB, + testResourceMetricsC1, + testResourceMetricsC2, + } + + expectationIdx := 0 + exp := &fnExporter{ + exportFunc: func(_ context.Context, m *metricdata.ResourceMetrics) error { + // collectAndExport is potentially called multiple times, so just + // make sure batches are split correctly and are in order. + expect := expectations[expectationIdx%len(expectations)] + // The testSDKProducer produces three batches of metrics. + assert.Equal(t, expect, *m, fmt.Sprintf("expectations[%d] not equal", expectationIdx)) + expectationIdx++ + return assert.AnError + }, + } + + r := NewPeriodicReader( + exp, + WithMaxExportBatchSize(2), + WithProducer(testExternalProducer{}), + WithProducer(testExternalProducer{ + produceFunc: func(context.Context) ([]metricdata.ScopeMetrics, error) { + // Splitting modifies the batch, so we need to create a new one each time. + return []metricdata.ScopeMetrics{metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/internal"}, + Metrics: []metricdata.Metrics{{ + Name: "metric1", + Description: "first of multiple metrics", + Unit: "ms", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("user", "david")), + StartTime: ts1, + Time: ts1.Add(time.Second), + Value: 1, + }}, + }, + }, { + Name: "metric2", + Description: "second of multiple metrics", + Unit: "ms", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("user", "tyler")), + StartTime: ts2, + Time: ts2.Add(time.Second), + Value: 10, + }, + { + Attributes: attribute.NewSet(attribute.String("user", "robert")), + StartTime: ts3, + Time: ts3.Add(time.Second), + Value: 100, + }, + }, + }, + }}, + }}, nil + }, + })) + r.register(testSDKProducer{}) + trigger <- time.Now() + assert.Equal(t, <-eh.Err, errors.Join(errors.Join(errors.Join(assert.AnError), assert.AnError), assert.AnError)) + trigger <- time.Now() + assert.Equal(t, <-eh.Err, errors.Join(errors.Join(errors.Join(assert.AnError), assert.AnError), assert.AnError)) // Ensure Reader is allowed clean up attempt. _ = r.Shutdown(t.Context()) @@ -318,7 +404,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - assert.Equal(t, assert.AnError, r.ForceFlush(t.Context()), "export error not returned") + assert.ErrorIs(t, r.ForceFlush(t.Context()), assert.AnError, "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") // Ensure Reader is allowed clean up attempt. @@ -374,7 +460,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - assert.Equal(t, assert.AnError, r.Shutdown(t.Context()), "export error not returned") + assert.ErrorIs(t, r.Shutdown(t.Context()), assert.AnError, "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") }) diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index eee7d6c4736..967dc05ced8 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -237,6 +237,62 @@ var testScopeMetricsB = metricdata.ScopeMetrics{ }}, } +var ( + ts1 = time.Now() + ts2 = time.Now() + ts3 = time.Now() +) + +var testScopeMetricsC1 = metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/internal"}, + Metrics: []metricdata.Metrics{{ + Name: "metric1", + Description: "first of multiple metrics", + Unit: "ms", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("user", "david")), + StartTime: ts1, + Time: ts1.Add(time.Second), + Value: 1, + }}, + }, + }, { + Name: "metric2", + Description: "second of multiple metrics", + Unit: "ms", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("user", "tyler")), + StartTime: ts2, + Time: ts2.Add(time.Second), + Value: 10, + }, + }, + }, + }}, +} + +var testScopeMetricsC2 = metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/internal"}, + Metrics: []metricdata.Metrics{{ + Name: "metric2", + Description: "second of multiple metrics", + Unit: "ms", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("user", "robert")), + StartTime: ts3, + Time: ts3.Add(time.Second), + Value: 100, + }, + }, + }, + }}, +} + var testResourceMetricsA = metricdata.ResourceMetrics{ Resource: resource.NewSchemaless(attribute.String("test", "Reader")), ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA}, @@ -247,6 +303,16 @@ var testResourceMetricsAB = metricdata.ResourceMetrics{ ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA, testScopeMetricsB}, } +var testResourceMetricsC1 = metricdata.ResourceMetrics{ + Resource: resource.NewSchemaless(attribute.String("test", "Reader")), + ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsC1}, +} + +var testResourceMetricsC2 = metricdata.ResourceMetrics{ + Resource: resource.NewSchemaless(attribute.String("test", "Reader")), + ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsC2}, +} + type testSDKProducer struct { produceFunc func(context.Context, *metricdata.ResourceMetrics) error } diff --git a/sdk/metric/splitmetrics.go b/sdk/metric/splitmetrics.go new file mode 100644 index 00000000000..ba32249baaa --- /dev/null +++ b/sdk/metric/splitmetrics.go @@ -0,0 +1,182 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metric + +import ( + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// splitMetrics removes metric data points from src and puts up to size metric +// data points in dest. This is adapted from the collector's batch processor: +// https://github.com/open-telemetry/opentelemetry-collector/blob/587b90b9ecc1db959ee9104d5bf993591f80ca43/processor/batchprocessor/splitmetrics.go +func splitMetrics(size int, src, dest *metricdata.ResourceMetrics) { + totalCopiedDataPoints := 0 + dest.Resource = src.Resource + i := 0 + for ; i < len(src.ScopeMetrics); i++ { + // If we are done skip everything else. + if totalCopiedDataPoints == size { + break + } + srcIlm := src.ScopeMetrics[i] + // If possible to move all metrics do that. + srcIlmDataPointCount := scopeMetricsDPC(srcIlm) + if srcIlmDataPointCount+totalCopiedDataPoints <= size { + totalCopiedDataPoints += srcIlmDataPointCount + dest.ScopeMetrics = append(dest.ScopeMetrics, srcIlm) + continue + } + + destIlm := metricdata.ScopeMetrics{ + Scope: srcIlm.Scope, + } + j := 0 + for ; j < len(srcIlm.Metrics); j++ { + // If we are done skip everything else. + if totalCopiedDataPoints == size { + break + } + srcMetric := srcIlm.Metrics[j] + // If possible to move all points do that. + srcMetricPointCount := metricDPC(srcMetric) + if srcMetricPointCount+totalCopiedDataPoints <= size { + totalCopiedDataPoints += srcMetricPointCount + destIlm.Metrics = append(destIlm.Metrics, srcMetric) + continue + } + + // If the metric has more data points than free slots we should split it. + newMetrics := metricdata.Metrics{} + copiedDataPoints := size - totalCopiedDataPoints + splitMetric(&srcIlm.Metrics[j], &newMetrics, copiedDataPoints) + destIlm.Metrics = append(destIlm.Metrics, newMetrics) + totalCopiedDataPoints += copiedDataPoints + break + } + // Delete all of the metrics we fully moved. + srcIlm.Metrics = srcIlm.Metrics[j:] + dest.ScopeMetrics = append(dest.ScopeMetrics, destIlm) + src.ScopeMetrics[i] = srcIlm + break + } + // Delete all of the scope metrics we fully moved. + src.ScopeMetrics = src.ScopeMetrics[i:] +} + +// resourceMetricsDPC calculates the total number of data points in the metricdata.ResourceMetrics. +func resourceMetricsDPC(rs *metricdata.ResourceMetrics) int { + dataPointCount := 0 + ilms := rs.ScopeMetrics + for k := 0; k < len(ilms); k++ { + dataPointCount += scopeMetricsDPC(ilms[k]) + } + return dataPointCount +} + +// scopeMetricsDPC calculates the total number of data points in the metricdata.ScopeMetrics. +func scopeMetricsDPC(ilm metricdata.ScopeMetrics) int { + dataPointCount := 0 + ms := ilm.Metrics + for k := 0; k < len(ms); k++ { + dataPointCount += metricDPC(ms[k]) + } + return dataPointCount +} + +// metricDPC calculates the total number of data points in the metricdata.Metrics. +func metricDPC(ms metricdata.Metrics) int { + switch a := ms.Data.(type) { + case metricdata.Gauge[int64]: + return len(a.DataPoints) + case metricdata.Gauge[float64]: + return len(a.DataPoints) + case metricdata.Sum[int64]: + return len(a.DataPoints) + case metricdata.Sum[float64]: + return len(a.DataPoints) + case metricdata.Histogram[int64]: + return len(a.DataPoints) + case metricdata.Histogram[float64]: + return len(a.DataPoints) + case metricdata.ExponentialHistogram[int64]: + return len(a.DataPoints) + case metricdata.ExponentialHistogram[float64]: + return len(a.DataPoints) + case metricdata.Summary: + return len(a.DataPoints) + } + return 0 +} + +// splitMetric removes metric points from the input data and moves data of the specified size to destination. +func splitMetric(ms, dest *metricdata.Metrics, size int) { + dest.Name = ms.Name + dest.Description = ms.Description + dest.Unit = ms.Unit + + switch a := ms.Data.(type) { + case metricdata.Gauge[int64]: + dest.Data = metricdata.Gauge[int64]{ + DataPoints: a.DataPoints[:size], + } + a.DataPoints = a.DataPoints[size:] + ms.Data = a + case metricdata.Gauge[float64]: + dest.Data = metricdata.Gauge[float64]{ + DataPoints: a.DataPoints[:size], + } + a.DataPoints = a.DataPoints[size:] + ms.Data = a + case metricdata.Sum[int64]: + dest.Data = metricdata.Sum[int64]{ + DataPoints: a.DataPoints[:size], + Temporality: a.Temporality, + IsMonotonic: a.IsMonotonic, + } + a.DataPoints = a.DataPoints[size:] + ms.Data = a + case metricdata.Sum[float64]: + dest.Data = metricdata.Sum[float64]{ + DataPoints: a.DataPoints[:size], + Temporality: a.Temporality, + IsMonotonic: a.IsMonotonic, + } + a.DataPoints = a.DataPoints[size:] + ms.Data = a + case metricdata.Histogram[int64]: + dest.Data = metricdata.Histogram[int64]{ + DataPoints: a.DataPoints[:size], + Temporality: a.Temporality, + } + a.DataPoints = a.DataPoints[size:] + ms.Data = a + case metricdata.Histogram[float64]: + dest.Data = metricdata.Histogram[float64]{ + DataPoints: a.DataPoints[:size], + Temporality: a.Temporality, + } + a.DataPoints = a.DataPoints[size:] + ms.Data = a + case metricdata.ExponentialHistogram[int64]: + dest.Data = metricdata.ExponentialHistogram[int64]{ + DataPoints: a.DataPoints[:size], + Temporality: a.Temporality, + } + a.DataPoints = a.DataPoints[size:] + ms.Data = a + case metricdata.ExponentialHistogram[float64]: + dest.Data = metricdata.ExponentialHistogram[float64]{ + DataPoints: a.DataPoints[:size], + Temporality: a.Temporality, + } + a.DataPoints = a.DataPoints[size:] + ms.Data = a + case metricdata.Summary: + dest.Data = metricdata.Summary{ + DataPoints: a.DataPoints[:size], + } + a.DataPoints = a.DataPoints[size:] + ms.Data = a + } +}