From cc9821ed5b880df0f10713bed6ac03b18ef0ebf1 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 17 Mar 2026 20:18:41 +0000 Subject: [PATCH 1/8] add OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE feature gate --- sdk/metric/internal/x/README.md | 24 ++++++++++++++ sdk/metric/internal/x/x.go | 17 ++++++++++ sdk/metric/internal/x/x_test.go | 56 +++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+) diff --git a/sdk/metric/internal/x/README.md b/sdk/metric/internal/x/README.md index 4205595b976..41a4c29969b 100644 --- a/sdk/metric/internal/x/README.md +++ b/sdk/metric/internal/x/README.md @@ -8,6 +8,30 @@ See the [Compatibility and Stability](#compatibility-and-stability) section for ## Features +- [Metric Export Batch Size](#metric-export-batch-size) + +### Metric Export Batch Size + +The metric export can be split into batches before exporting by specifying a maximum number of data points per batch. + +This experimental feature can be enabled by setting the `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE` environment variable. +The value MUST be a positive integer. +All other values or an empty value will result in the default behavior of not batching. + +#### Examples + +Enable metrics to be batched by maximum export batch size of 200. + +```console +export OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=200 +``` + +Disable metric export batching. + +```console +unset OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE +``` + ## Compatibility and Stability Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md). diff --git a/sdk/metric/internal/x/x.go b/sdk/metric/internal/x/x.go index 43ecb72e26c..8e3c62a13d2 100644 --- a/sdk/metric/internal/x/x.go +++ b/sdk/metric/internal/x/x.go @@ -9,6 +9,7 @@ package x // import "go.opentelemetry.io/otel/sdk/metric/internal/x" import ( "os" + "strconv" ) // Feature is an experimental feature control flag. It provides a uniform way @@ -51,3 +52,19 @@ func (f Feature[T]) Enabled() bool { _, ok := f.Lookup() return ok } + +// MetricExportBatchSize is an experimental feature flag that controls the +// max export batch size for metric data. +// +// To enable this feature set the OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE environment +// variable to a positive integer value. +var MetricExportBatchSize = newFeature( + "METRIC_EXPORT_BATCH_SIZE", + func(v string) (int, bool) { + val, err := strconv.Atoi(v) + if err == nil && val > 0 { + return val, true + } + return 0, false + }, +) diff --git a/sdk/metric/internal/x/x_test.go b/sdk/metric/internal/x/x_test.go index d7d5e41ce4a..0ccea035340 100644 --- a/sdk/metric/internal/x/x_test.go +++ b/sdk/metric/internal/x/x_test.go @@ -49,3 +49,59 @@ func assertDisabled[T any](f Feature[T]) func(*testing.T) { assert.Equal(t, zero, v, "Lookup value") } } + +func TestMetricExportBatchSize(t *testing.T) { + tests := []struct { + name string + value string + enabled bool + want int + }{ + { + name: "empty", + value: "", + enabled: false, + want: 0, + }, + { + name: "invalid", + value: "invalid", + enabled: false, + want: 0, + }, + { + name: "zero", + value: "0", + enabled: false, + want: 0, + }, + { + name: "negative", + value: "-10", + enabled: false, + want: 0, + }, + { + name: "valid small", + value: "10", + enabled: true, + want: 10, + }, + { + name: "valid large", + value: "200", + enabled: true, + want: 200, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv(MetricExportBatchSize.Key(), tt.value) + assert.Equal(t, tt.enabled, MetricExportBatchSize.Enabled()) + got, ok := MetricExportBatchSize.Lookup() + assert.Equal(t, tt.enabled, ok) + assert.Equal(t, tt.want, got) + }) + } +} From 56eeef92a76546a024d343b4a05e6b60f22ab20b Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 17 Mar 2026 20:22:35 +0000 Subject: [PATCH 2/8] implement batching logic in periodic reader --- sdk/metric/periodic_reader.go | 39 +++++++++++++++++++++++++++-------- sdk/metric/splitmetrics.go | 15 ++++++++++++++ 2 files changed, 45 insertions(+), 9 deletions(-) create mode 100644 sdk/metric/splitmetrics.go diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index d1efc9f374a..505292d2e8c 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric/internal/observ" + "go.opentelemetry.io/otel/sdk/metric/internal/x" "go.opentelemetry.io/otel/sdk/metric/metricdata" semconv "go.opentelemetry.io/otel/semconv/v1.40.0" ) @@ -126,6 +127,9 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri }, }, } + if val, ok := x.MetricExportBatchSize.Lookup(); ok { + r.maxExportBatchSize = val + } r.externalProducers.Store(conf.producers) go func() { @@ -162,10 +166,11 @@ type PeriodicReader struct { isShutdown bool externalProducers atomic.Value - interval time.Duration - timeout time.Duration - exporter Exporter - flushCh chan chan error + interval time.Duration + timeout time.Duration + maxExportBatchSize int + exporter Exporter + flushCh chan chan error done chan struct{} cancel context.CancelFunc @@ -235,14 +240,18 @@ func (r *PeriodicReader) cardinalityLimit(kind InstrumentKind) (int, bool) { // collectAndExport gather all metric data related to the periodicReader r from // the SDK and exports it with r's exporter. func (r *PeriodicReader) collectAndExport(ctx context.Context) error { - ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect and export timeout")) - defer cancel() - // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect. rm := r.rmPool.Get().(*metricdata.ResourceMetrics) err := r.Collect(ctx, rm) if err == nil { - err = r.export(ctx, rm) + if r.maxExportBatchSize > 0 { + batches := splitResourceMetrics(r.maxExportBatchSize, rm) + for _, batch := range batches { + err = errors.Join(err, r.export(ctx, batch)) + } + } else { + err = r.export(ctx, rm) + } } r.rmPool.Put(rm) return err @@ -268,6 +277,9 @@ func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet // collect unwraps p as a produceHolder and returns its produce results. func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.ResourceMetrics) error { + ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect timeout")) + defer cancel() + var err error if r.inst != nil { cp := r.inst.CollectMetrics(ctx) @@ -308,6 +320,8 @@ func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.Reso // export exports metric data m using r's exporter. func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error { + ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader export timeout")) + defer cancel() return r.exporter.Export(ctx, m) } @@ -369,7 +383,14 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error { m := r.rmPool.Get().(*metricdata.ResourceMetrics) err = r.collect(ctx, ph, m) if err == nil { - err = r.export(ctx, m) + if r.maxExportBatchSize > 0 { + batches := splitResourceMetrics(r.maxExportBatchSize, m) + for _, batch := range batches { + err = errors.Join(err, r.export(ctx, batch)) + } + } else { + err = r.export(ctx, m) + } } r.rmPool.Put(m) } diff --git a/sdk/metric/splitmetrics.go b/sdk/metric/splitmetrics.go new file mode 100644 index 00000000000..aabf730e117 --- /dev/null +++ b/sdk/metric/splitmetrics.go @@ -0,0 +1,15 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metric + +import ( + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// splitResourceMetrics splits a metricdata.ResourceMetrics into multiple ResourceMetrics, sequentially, +// ensuring no ResourceMetrics has more than `size` data points. It does not mutate the `src` object. +func splitResourceMetrics(size int, src *metricdata.ResourceMetrics) []*metricdata.ResourceMetrics { + // TODO: implement + return []*metricdata.ResourceMetrics{src} +} From 4dda64a896bf1c785ad79735d64303a88394c4f8 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 17 Mar 2026 20:25:28 +0000 Subject: [PATCH 3/8] implement batch splitting --- sdk/metric/periodic_reader.go | 20 +- sdk/metric/periodic_reader_test.go | 79 ++++++++ sdk/metric/splitmetrics.go | 180 +++++++++++++++++- sdk/metric/splitmetrics_test.go | 286 +++++++++++++++++++++++++++++ 4 files changed, 552 insertions(+), 13 deletions(-) create mode 100644 sdk/metric/splitmetrics_test.go diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 505292d2e8c..deac441fab2 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -128,7 +128,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri }, } if val, ok := x.MetricExportBatchSize.Lookup(); ok { - r.maxExportBatchSize = val + r.batcher = batcher{size: val} } r.externalProducers.Store(conf.producers) @@ -166,11 +166,11 @@ type PeriodicReader struct { isShutdown bool externalProducers atomic.Value - interval time.Duration - timeout time.Duration - maxExportBatchSize int - exporter Exporter - flushCh chan chan error + interval time.Duration + timeout time.Duration + batcher batcher + exporter Exporter + flushCh chan chan error done chan struct{} cancel context.CancelFunc @@ -244,8 +244,8 @@ func (r *PeriodicReader) collectAndExport(ctx context.Context) error { rm := r.rmPool.Get().(*metricdata.ResourceMetrics) err := r.Collect(ctx, rm) if err == nil { - if r.maxExportBatchSize > 0 { - batches := splitResourceMetrics(r.maxExportBatchSize, rm) + if r.batcher.size > 0 { + batches := r.batcher.splitResourceMetrics(rm) for _, batch := range batches { err = errors.Join(err, r.export(ctx, batch)) } @@ -383,8 +383,8 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error { m := r.rmPool.Get().(*metricdata.ResourceMetrics) err = r.collect(ctx, ph, m) if err == nil { - if r.maxExportBatchSize > 0 { - batches := splitResourceMetrics(r.maxExportBatchSize, m) + if r.batcher.size > 0 { + batches := r.batcher.splitResourceMetrics(m) for _, batch := range batches { err = errors.Join(err, r.export(ctx, batch)) } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 694abf1fe2a..38f33661e95 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -297,6 +297,85 @@ func TestPeriodicReaderRun(t *testing.T) { _ = r.Shutdown(t.Context()) } +func TestPeriodicReaderBatching(t *testing.T) { + t.Setenv("OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE", "2") + + var exported []metricdata.ResourceMetrics + exp := &fnExporter{ + exportFunc: func(_ context.Context, m *metricdata.ResourceMetrics) error { + exported = append(exported, *m) + return nil + }, + } + + ts1, ts2, ts3 := time.Now(), time.Now(), time.Now() + testMetrics := []metricdata.ScopeMetrics{{ + Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/internal"}, + Metrics: []metricdata.Metrics{{ + Name: "metric1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("user", "david")), + StartTime: ts1, Time: ts1.Add(time.Second), Value: 1, + }}, + }, + }, { + Name: "metric2", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("user", "tyler")), + StartTime: ts2, + Time: ts2.Add(time.Second), + Value: 10, + }, + { + Attributes: attribute.NewSet(attribute.String("user", "robert")), + StartTime: ts3, + Time: ts3.Add(time.Second), + Value: 100, + }, + }, + }, + }}, + }} + + r := NewPeriodicReader( + exp, + WithProducer(testExternalProducer{ + produceFunc: func(context.Context) ([]metricdata.ScopeMetrics, error) { + return testMetrics, nil + }, + }), + ) + // testSDKProducer generates 2 Data Points (testResourceMetricsAB) + r.register(testSDKProducer{}) + + // Trigger export via ForceFlush + assert.NoError(t, r.ForceFlush(t.Context())) + + // We should have a total of 4 data points + // testSDKProducer: 1 + // testExternalProducer: 3 + // Max batch size is 2, so it should split into 2 batches (2 + 2) + assert.Len(t, exported, 2) + + dpCount := 0 + for _, batch := range exported { + batchPoints := 0 + for _, sm := range batch.ScopeMetrics { + for _, m := range sm.Metrics { + batchPoints += metricDPC(m) + } + } + assert.LessOrEqual(t, batchPoints, 2) + dpCount += batchPoints + } + assert.Equal(t, 4, dpCount) + + _ = r.Shutdown(t.Context()) +} + func TestPeriodicReaderFlushesPending(t *testing.T) { // Override the ticker so tests are not flaky and rely on timing. trigger := triggerTicker(t) diff --git a/sdk/metric/splitmetrics.go b/sdk/metric/splitmetrics.go index aabf730e117..7ead93c95b5 100644 --- a/sdk/metric/splitmetrics.go +++ b/sdk/metric/splitmetrics.go @@ -7,9 +7,183 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +type batcher struct { + size int +} + // splitResourceMetrics splits a metricdata.ResourceMetrics into multiple ResourceMetrics, sequentially, // ensuring no ResourceMetrics has more than `size` data points. It does not mutate the `src` object. -func splitResourceMetrics(size int, src *metricdata.ResourceMetrics) []*metricdata.ResourceMetrics { - // TODO: implement - return []*metricdata.ResourceMetrics{src} +func (b batcher) splitResourceMetrics(src *metricdata.ResourceMetrics) []*metricdata.ResourceMetrics { + if b.size <= 0 || len(src.ScopeMetrics) == 0 { + return []*metricdata.ResourceMetrics{src} + } + var batches []*metricdata.ResourceMetrics + var currentBatch *metricdata.ResourceMetrics + currentPoints := 0 + + for i := 0; i < len(src.ScopeMetrics); i++ { + sm := src.ScopeMetrics[i] + + take := b.size - currentPoints + smChunks := b.splitScopeMetrics(sm, take) + + for _, chunk := range smChunks { + if currentBatch == nil { + currentBatch = &metricdata.ResourceMetrics{Resource: src.Resource} + batches = append(batches, currentBatch) + } + currentBatch.ScopeMetrics = append(currentBatch.ScopeMetrics, chunk) + currentPoints += scopeMetricsDPC(chunk) + + if currentPoints == b.size { + currentBatch = nil + currentPoints = 0 + } + } + } + return batches +} + +// splitScopeMetrics splits a metricdata.ScopeMetrics into chunks. The first chunk will have at most firstSize data points. +func (b batcher) splitScopeMetrics(sm metricdata.ScopeMetrics, firstSize int) []metricdata.ScopeMetrics { + smPoints := scopeMetricsDPC(sm) + if smPoints <= firstSize { + return []metricdata.ScopeMetrics{sm} + } + + var chunks []metricdata.ScopeMetrics + var currentChunk *metricdata.ScopeMetrics + currentPoints := 0 + targetSize := firstSize + + for i := 0; i < len(sm.Metrics); i++ { + m := sm.Metrics[i] + + take := targetSize - currentPoints + mChunks := b.splitMetric(m, take) + + for _, mc := range mChunks { + if currentChunk == nil { + chunks = append(chunks, metricdata.ScopeMetrics{Scope: sm.Scope}) + currentChunk = &chunks[len(chunks)-1] + } + currentChunk.Metrics = append(currentChunk.Metrics, mc) + currentPoints += metricDPC(mc) + + if currentPoints == targetSize { + currentChunk = nil + currentPoints = 0 + targetSize = b.size + } + } + } + return chunks +} + +// splitMetric splits a metricdata.Metrics into chunks. The first chunk will have at most firstSize data points. +func (b batcher) splitMetric(m metricdata.Metrics, firstSize int) []metricdata.Metrics { + mPoints := metricDPC(m) + if mPoints <= firstSize { + return []metricdata.Metrics{m} + } + + var chunks []metricdata.Metrics + mRemaining := mPoints + mOffset := 0 + take := firstSize + + for mRemaining > 0 { + if take > mRemaining { + take = mRemaining + } + chunks = append(chunks, copyMetricData(m, mOffset, take)) + mRemaining -= take + mOffset += take + take = b.size + } + return chunks +} + +func copyMetricData(m metricdata.Metrics, offset, take int) metricdata.Metrics { + dest := metricdata.Metrics{ + Name: m.Name, + Description: m.Description, + Unit: m.Unit, + } + switch a := m.Data.(type) { + case metricdata.Gauge[int64]: + dest.Data = metricdata.Gauge[int64]{DataPoints: a.DataPoints[offset : offset+take]} + case metricdata.Gauge[float64]: + dest.Data = metricdata.Gauge[float64]{DataPoints: a.DataPoints[offset : offset+take]} + case metricdata.Sum[int64]: + dest.Data = metricdata.Sum[int64]{ + DataPoints: a.DataPoints[offset : offset+take], + Temporality: a.Temporality, + IsMonotonic: a.IsMonotonic, + } + case metricdata.Sum[float64]: + dest.Data = metricdata.Sum[float64]{ + DataPoints: a.DataPoints[offset : offset+take], + Temporality: a.Temporality, + IsMonotonic: a.IsMonotonic, + } + case metricdata.Histogram[int64]: + dest.Data = metricdata.Histogram[int64]{ + DataPoints: a.DataPoints[offset : offset+take], + Temporality: a.Temporality, + } + case metricdata.Histogram[float64]: + dest.Data = metricdata.Histogram[float64]{ + DataPoints: a.DataPoints[offset : offset+take], + Temporality: a.Temporality, + } + case metricdata.ExponentialHistogram[int64]: + dest.Data = metricdata.ExponentialHistogram[int64]{ + DataPoints: a.DataPoints[offset : offset+take], + Temporality: a.Temporality, + } + case metricdata.ExponentialHistogram[float64]: + dest.Data = metricdata.ExponentialHistogram[float64]{ + DataPoints: a.DataPoints[offset : offset+take], + Temporality: a.Temporality, + } + case metricdata.Summary: + dest.Data = metricdata.Summary{DataPoints: a.DataPoints[offset : offset+take]} + } + return dest +} + +// scopeMetricsDPC calculates the total number of data points in the metricdata.ScopeMetrics. +func scopeMetricsDPC(ilm metricdata.ScopeMetrics) int { + dataPointCount := 0 + ms := ilm.Metrics + for k := range ms { + dataPointCount += metricDPC(ms[k]) + } + return dataPointCount +} + +// metricDPC calculates the total number of data points in the metricdata.Metrics. +func metricDPC(ms metricdata.Metrics) int { + switch a := ms.Data.(type) { + case metricdata.Gauge[int64]: + return len(a.DataPoints) + case metricdata.Gauge[float64]: + return len(a.DataPoints) + case metricdata.Sum[int64]: + return len(a.DataPoints) + case metricdata.Sum[float64]: + return len(a.DataPoints) + case metricdata.Histogram[int64]: + return len(a.DataPoints) + case metricdata.Histogram[float64]: + return len(a.DataPoints) + case metricdata.ExponentialHistogram[int64]: + return len(a.DataPoints) + case metricdata.ExponentialHistogram[float64]: + return len(a.DataPoints) + case metricdata.Summary: + return len(a.DataPoints) + } + return 0 } diff --git a/sdk/metric/splitmetrics_test.go b/sdk/metric/splitmetrics_test.go new file mode 100644 index 00000000000..dec7498fdca --- /dev/null +++ b/sdk/metric/splitmetrics_test.go @@ -0,0 +1,286 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metric + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestSplitResourceMetrics(t *testing.T) { + points := func(n int) []metricdata.DataPoint[int64] { + var dps []metricdata.DataPoint[int64] + for i := range n { + dps = append(dps, metricdata.DataPoint[int64]{Value: int64(i)}) + } + return dps + } + + tests := []struct { + name string + size int + input *metricdata.ResourceMetrics + expected [][][]int // Expected representation of the batching structure + }{ + { + name: "no splitting needed", + size: 10, + input: &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + {Data: metricdata.Gauge[int64]{DataPoints: points(3)}}, + {Data: metricdata.Gauge[int64]{DataPoints: points(2)}}, + }, + }, + }, + }, + expected: [][][]int{{{3, 2}}}, + }, + { + name: "split on metric boundary", + size: 3, + input: &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + {Data: metricdata.Gauge[int64]{DataPoints: points(3)}}, + {Data: metricdata.Gauge[int64]{DataPoints: points(2)}}, + }, + }, + }, + }, + expected: [][][]int{ + {{3}}, + {{2}}, + }, + }, + { + name: "split inside a single metric", + size: 2, + input: &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + {Data: metricdata.Gauge[int64]{DataPoints: points(5)}}, + }, + }, + }, + }, + expected: [][][]int{ + {{2}}, + {{2}}, + {{1}}, + }, + }, + { + name: "split across scopes", + size: 4, + input: &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + {Data: metricdata.Gauge[int64]{DataPoints: points(2)}}, + {Data: metricdata.Gauge[int64]{DataPoints: points(3)}}, + }, + }, + { + Metrics: []metricdata.Metrics{ + {Data: metricdata.Gauge[int64]{DataPoints: points(2)}}, + }, + }, + }, + }, + expected: [][][]int{ + {{2, 2}}, + { + {1}, + {2}, + }, // The 3-point metric's 3rd point overflowed into the second batch, filling 1, leaving 3 left for the new scope + }, + }, + { + name: "zero points input", + size: 5, + input: &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + {Data: metricdata.Gauge[int64]{DataPoints: points(0)}}, + }, + }, + }, + }, + expected: [][][]int{{{0}}}, + }, + { + name: "size zero", + size: 0, + input: &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + {Data: metricdata.Gauge[int64]{DataPoints: points(1)}}, + }, + }, + }, + }, + expected: [][][]int{{{1}}}, + }, + { + name: "empty scope metrics", + size: 10, + input: &metricdata.ResourceMetrics{}, + expected: [][][]int{nil}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := batcher{size: tt.size} + batches := b.splitResourceMetrics(tt.input) + + var actual [][][]int + for _, batch := range batches { + var scopes [][]int + for _, sm := range batch.ScopeMetrics { + var metrics []int + for _, m := range sm.Metrics { + metrics = append(metrics, metricDPC(m)) + } + scopes = append(scopes, metrics) + } + actual = append(actual, scopes) + } + assert.Equal(t, tt.expected, actual) + }) + } +} + +func TestCopyMetricData(t *testing.T) { + tests := []struct { + name string + data metricdata.Aggregation + expectedPoints int + }{ + { + name: "Gauge[int64]", + data: metricdata.Gauge[int64]{DataPoints: []metricdata.DataPoint[int64]{{}, {}}}, + expectedPoints: 2, + }, + { + name: "Gauge[float64]", + data: metricdata.Gauge[float64]{DataPoints: []metricdata.DataPoint[float64]{{}, {}}}, + expectedPoints: 2, + }, + { + name: "Sum[int64]", + data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{}, {}}, + Temporality: metricdata.DeltaTemporality, + IsMonotonic: true, + }, + expectedPoints: 2, + }, + { + name: "Sum[float64]", + data: metricdata.Sum[float64]{ + DataPoints: []metricdata.DataPoint[float64]{{}, {}}, + Temporality: metricdata.DeltaTemporality, + IsMonotonic: true, + }, + expectedPoints: 2, + }, + { + name: "Histogram[int64]", + data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{{}, {}}, + Temporality: metricdata.DeltaTemporality, + }, + expectedPoints: 2, + }, + { + name: "Histogram[float64]", + data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{{}, {}}, + Temporality: metricdata.CumulativeTemporality, + }, + expectedPoints: 2, + }, + { + name: "ExponentialHistogram[int64]", + data: metricdata.ExponentialHistogram[int64]{ + DataPoints: []metricdata.ExponentialHistogramDataPoint[int64]{{}, {}}, + Temporality: metricdata.DeltaTemporality, + }, + expectedPoints: 2, + }, + { + name: "ExponentialHistogram[float64]", + data: metricdata.ExponentialHistogram[float64]{ + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{{}, {}}, + Temporality: metricdata.CumulativeTemporality, + }, + expectedPoints: 2, + }, + { + name: "Summary", + data: metricdata.Summary{DataPoints: []metricdata.SummaryDataPoint{{}, {}}}, + expectedPoints: 2, + }, + { + name: "Unknown Type", + data: nil, + expectedPoints: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := metricdata.Metrics{ + Name: "test", + Description: "desc", + Unit: "1", + Data: tt.data, + } + assert.Equal(t, tt.expectedPoints, metricDPC(m)) + + if tt.expectedPoints == 0 { + return + } + // Test copying 1 element out of 2 + copied := copyMetricData(m, 0, 1) + assert.Equal(t, 1, metricDPC(copied)) + assert.Equal(t, "test", copied.Name) + assert.Equal(t, "desc", copied.Description) + assert.Equal(t, "1", copied.Unit) + + switch expectedData := tt.data.(type) { + case metricdata.Sum[int64]: + copiedData := copied.Data.(metricdata.Sum[int64]) + assert.Equal(t, expectedData.Temporality, copiedData.Temporality) + assert.Equal(t, expectedData.IsMonotonic, copiedData.IsMonotonic) + case metricdata.Sum[float64]: + copiedData := copied.Data.(metricdata.Sum[float64]) + assert.Equal(t, expectedData.Temporality, copiedData.Temporality) + assert.Equal(t, expectedData.IsMonotonic, copiedData.IsMonotonic) + case metricdata.Histogram[int64]: + copiedData := copied.Data.(metricdata.Histogram[int64]) + assert.Equal(t, expectedData.Temporality, copiedData.Temporality) + case metricdata.Histogram[float64]: + copiedData := copied.Data.(metricdata.Histogram[float64]) + assert.Equal(t, expectedData.Temporality, copiedData.Temporality) + case metricdata.ExponentialHistogram[int64]: + copiedData := copied.Data.(metricdata.ExponentialHistogram[int64]) + assert.Equal(t, expectedData.Temporality, copiedData.Temporality) + case metricdata.ExponentialHistogram[float64]: + copiedData := copied.Data.(metricdata.ExponentialHistogram[float64]) + assert.Equal(t, expectedData.Temporality, copiedData.Temporality) + } + }) + } +} From a808ee9fe8c6ccde2e36bb692bb2c8b206be6a9c Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 18 Mar 2026 14:50:51 +0000 Subject: [PATCH 4/8] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78601740530..d2ca7a4c76c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `ByteSlice` and `ByteSliceValue` functions for new `BYTESLICE` attribute type in `go.opentelemetry.io/otel/attribute`. (#7948) - Add `String` method for `Value` type in `go.opentelemetry.io/otel/attribute`. (#8142) - Add `Error` field on `Record` type in `go.opentelemetry.io/otel/log/logtest`. (#8148) +- Add experimental support for splitting metric data across multiple batches in `go.opentelemetry.io/otel/sdk/metric`. + Set `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=` to enable for all periodic readers. (#8071) ### Changed From 216e54ba49d1fc96274acd4e10f0b51e8937f859 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 18 Mar 2026 17:32:58 +0000 Subject: [PATCH 5/8] lint --- sdk/metric/splitmetrics.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdk/metric/splitmetrics.go b/sdk/metric/splitmetrics.go index 7ead93c95b5..a7931412bb2 100644 --- a/sdk/metric/splitmetrics.go +++ b/sdk/metric/splitmetrics.go @@ -1,12 +1,13 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package metric +package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +// batcher splits metrics into batches. type batcher struct { size int } @@ -104,6 +105,7 @@ func (b batcher) splitMetric(m metricdata.Metrics, firstSize int) []metricdata.M return chunks } +// copyMetricData creates a copy of the metricdata.Metrics with the specified offset and number of datapoints to take. func copyMetricData(m metricdata.Metrics, offset, take int) metricdata.Metrics { dest := metricdata.Metrics{ Name: m.Name, @@ -154,9 +156,9 @@ func copyMetricData(m metricdata.Metrics, offset, take int) metricdata.Metrics { } // scopeMetricsDPC calculates the total number of data points in the metricdata.ScopeMetrics. -func scopeMetricsDPC(ilm metricdata.ScopeMetrics) int { +func scopeMetricsDPC(sm metricdata.ScopeMetrics) int { dataPointCount := 0 - ms := ilm.Metrics + ms := sm.Metrics for k := range ms { dataPointCount += metricDPC(ms[k]) } @@ -164,8 +166,8 @@ func scopeMetricsDPC(ilm metricdata.ScopeMetrics) int { } // metricDPC calculates the total number of data points in the metricdata.Metrics. -func metricDPC(ms metricdata.Metrics) int { - switch a := ms.Data.(type) { +func metricDPC(m metricdata.Metrics) int { + switch a := m.Data.(type) { case metricdata.Gauge[int64]: return len(a.DataPoints) case metricdata.Gauge[float64]: From 79d4bd05182dbda00cd652168d6f579a738b3c72 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 3 Apr 2026 15:50:30 +0000 Subject: [PATCH 6/8] gate timeout changes behind the feature gate --- sdk/metric/periodic_reader.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index deac441fab2..aa2147749cc 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -240,6 +240,11 @@ func (r *PeriodicReader) cardinalityLimit(kind InstrumentKind) (int, bool) { // collectAndExport gather all metric data related to the periodicReader r from // the SDK and exports it with r's exporter. func (r *PeriodicReader) collectAndExport(ctx context.Context) error { + if !x.MetricExportBatchSize.Enabled() { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect and export timeout")) + defer cancel() + } // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect. rm := r.rmPool.Get().(*metricdata.ResourceMetrics) err := r.Collect(ctx, rm) @@ -277,8 +282,11 @@ func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet // collect unwraps p as a produceHolder and returns its produce results. func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.ResourceMetrics) error { - ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect timeout")) - defer cancel() + if x.MetricExportBatchSize.Enabled() { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect timeout")) + defer cancel() + } var err error if r.inst != nil { @@ -320,8 +328,11 @@ func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.Reso // export exports metric data m using r's exporter. func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error { - ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader export timeout")) - defer cancel() + if x.MetricExportBatchSize.Enabled() { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader export timeout")) + defer cancel() + } return r.exporter.Export(ctx, m) } From 3b9427c933870d67ffcfc45f591711bed78c0f86 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 8 Apr 2026 14:31:41 +0000 Subject: [PATCH 7/8] fix direct collect calls --- sdk/metric/periodic_reader.go | 48 ++++----- sdk/metric/periodic_reader_test.go | 150 +++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 22 deletions(-) diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index aa2147749cc..5539e9e9a44 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -240,11 +240,9 @@ func (r *PeriodicReader) cardinalityLimit(kind InstrumentKind) (int, bool) { // collectAndExport gather all metric data related to the periodicReader r from // the SDK and exports it with r's exporter. func (r *PeriodicReader) collectAndExport(ctx context.Context) error { - if !x.MetricExportBatchSize.Enabled() { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect and export timeout")) - defer cancel() - } + originalCtx := ctx + ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect and export timeout")) + defer cancel() // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect. rm := r.rmPool.Get().(*metricdata.ResourceMetrics) err := r.Collect(ctx, rm) @@ -252,10 +250,12 @@ func (r *PeriodicReader) collectAndExport(ctx context.Context) error { if r.batcher.size > 0 { batches := r.batcher.splitResourceMetrics(rm) for _, batch := range batches { - err = errors.Join(err, r.export(ctx, batch)) + // The export timeout is applied individually to each batch by using + // the original context. + err = errors.Join(err, r.exportWithTimeout(originalCtx, batch)) } } else { - err = r.export(ctx, rm) + err = r.exporter.Export(ctx, rm) } } r.rmPool.Put(rm) @@ -282,12 +282,6 @@ func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet // collect unwraps p as a produceHolder and returns its produce results. func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.ResourceMetrics) error { - if x.MetricExportBatchSize.Enabled() { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect timeout")) - defer cancel() - } - var err error if r.inst != nil { cp := r.inst.CollectMetrics(ctx) @@ -327,12 +321,10 @@ func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.Reso } // export exports metric data m using r's exporter. -func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error { - if x.MetricExportBatchSize.Enabled() { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader export timeout")) - defer cancel() - } +func (r *PeriodicReader) exportWithTimeout(ctx context.Context, m *metricdata.ResourceMetrics) error { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader export timeout")) + defer cancel() return r.exporter.Export(ctx, m) } @@ -374,7 +366,9 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error { err := ErrReaderShutdown r.shutdownOnce.Do(func() { // Prioritize the ctx timeout if it is set. - if _, ok := ctx.Deadline(); !ok { + originalCtx := ctx + _, userProvidedContext := ctx.Deadline() + if !userProvidedContext { var cancel context.CancelFunc ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader shutdown timeout")) defer cancel() @@ -397,10 +391,20 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error { if r.batcher.size > 0 { batches := r.batcher.splitResourceMetrics(m) for _, batch := range batches { - err = errors.Join(err, r.export(ctx, batch)) + if userProvidedContext { + // Do not apply the export timeout if the user passed a timeout to + // Shutdown(). + err = errors.Join(err, r.exporter.Export(ctx, batch)) + } else { + // The export timeout is applied individually to each batch by using + // the original context. + err = errors.Join(err, r.exportWithTimeout(originalCtx, batch)) + } } } else { - err = r.export(ctx, m) + // Do not apply the export timeout if the user passed a timeout to + // Shutdown(). + err = r.exporter.Export(ctx, m) } } r.rmPool.Put(m) diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 38f33661e95..4bb1326788e 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -376,6 +376,60 @@ func TestPeriodicReaderBatching(t *testing.T) { _ = r.Shutdown(t.Context()) } +func TestPeriodicReaderBatching_WithoutCancel(t *testing.T) { + t.Setenv("OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE", "1") // Force small batches + + trigger := triggerTicker(t) + + timeout := 200 * time.Millisecond + + var exportCount int + done := make(chan struct{}) + exp := &fnExporter{ + exportFunc: func(ctx context.Context, _ *metricdata.ResourceMetrics) error { + exportCount++ + // Simulate export taking some time + select { + case <-time.After(100 * time.Millisecond): + if exportCount == 2 { + close(done) + } + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, + } + + r := NewPeriodicReader(exp, WithTimeout(timeout)) + + r.register(testSDKProducer{ + produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { + // Simulate Collect taking time (150ms) + // So when we enter the loop, only 50ms are left of the top-level 200ms timeout! + select { + case <-time.After(150 * time.Millisecond): + case <-ctx.Done(): + return ctx.Err() + } + + *rm = testResourceMetricsAB // Has 2 data points + return nil + }, + }) + + trigger <- time.Now() + + select { + case <-done: + // Success + case <-time.After(time.Second): + t.Fatal("timeout waiting for exports") + } + + assert.Equal(t, 2, exportCount) +} + func TestPeriodicReaderFlushesPending(t *testing.T) { // Override the ticker so tests are not flaky and rely on timing. trigger := triggerTicker(t) @@ -449,6 +503,56 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { _ = r.Shutdown(t.Context()) }) + t.Run("ForceFlush timeout on export with batching", func(t *testing.T) { + t.Setenv("OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE", "1") // Force small batches + + timeout := 200 * time.Millisecond + + var exportCount int + exp := &fnExporter{ + exportFunc: func(ctx context.Context, _ *metricdata.ResourceMetrics) error { + exportCount++ + select { + case <-time.After(100 * time.Millisecond): + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, + } + + ts1 := time.Now() + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{ + produceFunc: func(_ context.Context) ([]metricdata.ScopeMetrics, error) { + return []metricdata.ScopeMetrics{{ + Scope: instrumentation.Scope{Name: "test"}, + Metrics: []metricdata.Metrics{{ + Name: "metric1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Value: 1, Time: ts1}, + {Value: 2, Time: ts1}, + }, + }, + }}, + }}, nil + }, + })) + r.register(testSDKProducer{ + produceFunc: func(_ context.Context, _ *metricdata.ResourceMetrics) error { + return nil + }, + }) + + ctx, cancel := context.WithTimeout(t.Context(), 300*time.Millisecond) + defer cancel() + err := r.ForceFlush(ctx) + assert.NoError(t, err) + assert.Equal(t, 2, exportCount) + + _ = r.Shutdown(t.Context()) + }) + t.Run("Shutdown", func(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) @@ -495,6 +599,52 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { assert.ErrorIs(t, r.Shutdown(t.Context()), context.DeadlineExceeded) assert.False(t, *called, "exporter Export method called when it should have failed before export") }) + + t.Run("Shutdown timeout on export with batching", func(t *testing.T) { + t.Setenv("OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE", "1") // Force small batches + + timeout := 200 * time.Millisecond + + var exportCount int + exp := &fnExporter{ + exportFunc: func(ctx context.Context, _ *metricdata.ResourceMetrics) error { + exportCount++ + select { + case <-time.After(50 * time.Millisecond): + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, + } + + ts1 := time.Now() + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{ + produceFunc: func(_ context.Context) ([]metricdata.ScopeMetrics, error) { + return []metricdata.ScopeMetrics{{ + Scope: instrumentation.Scope{Name: "test"}, + Metrics: []metricdata.Metrics{{ + Name: "metric1", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Value: 1, Time: ts1}, + {Value: 2, Time: ts1}, + }, + }, + }}, + }}, nil + }, + })) + r.register(testSDKProducer{ + produceFunc: func(_ context.Context, _ *metricdata.ResourceMetrics) error { + return nil + }, + }) + + err := r.Shutdown(t.Context()) + assert.NoError(t, err) + assert.Equal(t, 2, exportCount) + }) } func TestPeriodicReaderMultipleForceFlush(t *testing.T) { From 0d5e2030d463fb4f7a3727d3945a4bf44a9293aa Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 8 Apr 2026 19:59:57 +0000 Subject: [PATCH 8/8] update changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d2ca7a4c76c..9047596061b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `String` method for `Value` type in `go.opentelemetry.io/otel/attribute`. (#8142) - Add `Error` field on `Record` type in `go.opentelemetry.io/otel/log/logtest`. (#8148) - Add experimental support for splitting metric data across multiple batches in `go.opentelemetry.io/otel/sdk/metric`. - Set `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=` to enable for all periodic readers. (#8071) + Set `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=` to enable for all periodic readers. + See `go.opentelemetry.io/otel/sdk/metric/internal/x` for feature documentation. (#8071) ### Changed