Skip to content
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `ByteSlice` and `ByteSliceValue` functions for new `BYTESLICE` attribute type in `go.opentelemetry.io/otel/attribute`. (#7948)
- Add `String` method for `Value` type in `go.opentelemetry.io/otel/attribute`. (#8142)
- Add `Error` field on `Record` type in `go.opentelemetry.io/otel/log/logtest`. (#8148)
- Add experimental support for splitting metric data across multiple batches in `go.opentelemetry.io/otel/sdk/metric`.
Set `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=<max_size>` to enable for all periodic readers.
See `go.opentelemetry.io/otel/sdk/metric/internal/x` for feature documentation. (#8071)

### Changed

Expand Down
24 changes: 24 additions & 0 deletions sdk/metric/internal/x/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,30 @@ See the [Compatibility and Stability](#compatibility-and-stability) section for

## Features

- [Metric Export Batch Size](#metric-export-batch-size)

### Metric Export Batch Size

The metric export can be split into batches before exporting by specifying a maximum number of data points per batch.

This experimental feature can be enabled by setting the `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE` environment variable.
The value MUST be a positive integer.
All other values or an empty value will result in the default behavior of not batching.

#### Examples

Enable metrics to be batched by maximum export batch size of 200.

```console
export OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=200
```

Disable metric export batching.

```console
unset OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE
```

## Compatibility and Stability

Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md).
Expand Down
17 changes: 17 additions & 0 deletions sdk/metric/internal/x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package x // import "go.opentelemetry.io/otel/sdk/metric/internal/x"

import (
"os"
"strconv"
)

// Feature is an experimental feature control flag. It provides a uniform way
Expand Down Expand Up @@ -51,3 +52,19 @@ func (f Feature[T]) Enabled() bool {
_, ok := f.Lookup()
return ok
}

// MetricExportBatchSize is an experimental feature flag that controls the
// max export batch size for metric data.
//
// To enable this feature set the OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE environment
// variable to a positive integer value.
var MetricExportBatchSize = newFeature(
"METRIC_EXPORT_BATCH_SIZE",
func(v string) (int, bool) {
val, err := strconv.Atoi(v)
if err == nil && val > 0 {
return val, true
}
return 0, false
},
)
56 changes: 56 additions & 0 deletions sdk/metric/internal/x/x_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,59 @@ func assertDisabled[T any](f Feature[T]) func(*testing.T) {
assert.Equal(t, zero, v, "Lookup value")
}
}

func TestMetricExportBatchSize(t *testing.T) {
tests := []struct {
name string
value string
enabled bool
want int
}{
{
name: "empty",
value: "",
enabled: false,
want: 0,
},
{
name: "invalid",
value: "invalid",
enabled: false,
want: 0,
},
{
name: "zero",
value: "0",
enabled: false,
want: 0,
},
{
name: "negative",
value: "-10",
enabled: false,
want: 0,
},
{
name: "valid small",
value: "10",
enabled: true,
want: 10,
},
{
name: "valid large",
value: "200",
enabled: true,
want: 200,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Setenv(MetricExportBatchSize.Key(), tt.value)
assert.Equal(t, tt.enabled, MetricExportBatchSize.Enabled())
got, ok := MetricExportBatchSize.Lookup()
assert.Equal(t, tt.enabled, ok)
assert.Equal(t, tt.want, got)
})
}
}
46 changes: 41 additions & 5 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/internal/observ"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
semconv "go.opentelemetry.io/otel/semconv/v1.40.0"
)
Expand Down Expand Up @@ -126,6 +127,9 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri
},
},
}
if val, ok := x.MetricExportBatchSize.Lookup(); ok {
r.batcher = batcher{size: val}
}
r.externalProducers.Store(conf.producers)

go func() {
Expand Down Expand Up @@ -164,6 +168,7 @@ type PeriodicReader struct {

interval time.Duration
timeout time.Duration
batcher batcher
exporter Exporter
flushCh chan chan error

Expand Down Expand Up @@ -235,14 +240,23 @@ func (r *PeriodicReader) cardinalityLimit(kind InstrumentKind) (int, bool) {
// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
originalCtx := ctx
ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect and export timeout"))
defer cancel()

// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
err := r.Collect(ctx, rm)
if err == nil {
err = r.export(ctx, rm)
if r.batcher.size > 0 {
batches := r.batcher.splitResourceMetrics(rm)
for _, batch := range batches {
// The export timeout is applied individually to each batch by using
// the original context.
err = errors.Join(err, r.exportWithTimeout(originalCtx, batch))
}
} else {
err = r.exporter.Export(ctx, rm)
}
}
r.rmPool.Put(rm)
return err
Expand Down Expand Up @@ -307,7 +321,10 @@ func (r *PeriodicReader) collect(ctx context.Context, p any, rm *metricdata.Reso
}

// export exports metric data m using r's exporter.
func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
func (r *PeriodicReader) exportWithTimeout(ctx context.Context, m *metricdata.ResourceMetrics) error {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader export timeout"))
defer cancel()
return r.exporter.Export(ctx, m)
}

Expand Down Expand Up @@ -349,7 +366,9 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error {
err := ErrReaderShutdown
r.shutdownOnce.Do(func() {
// Prioritize the ctx timeout if it is set.
if _, ok := ctx.Deadline(); !ok {
originalCtx := ctx
_, userProvidedContext := ctx.Deadline()
if !userProvidedContext {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader shutdown timeout"))
defer cancel()
Expand All @@ -369,7 +388,24 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error {
m := r.rmPool.Get().(*metricdata.ResourceMetrics)
err = r.collect(ctx, ph, m)
if err == nil {
err = r.export(ctx, m)
if r.batcher.size > 0 {
batches := r.batcher.splitResourceMetrics(m)
for _, batch := range batches {
if userProvidedContext {
// Do not apply the export timeout if the user passed a timeout to
// Shutdown().
err = errors.Join(err, r.exporter.Export(ctx, batch))
} else {
// The export timeout is applied individually to each batch by using
// the original context.
err = errors.Join(err, r.exportWithTimeout(originalCtx, batch))
}
}
} else {
// Do not apply the export timeout if the user passed a timeout to
// Shutdown().
err = r.exporter.Export(ctx, m)
Comment thread
dashpole marked this conversation as resolved.
}
}
r.rmPool.Put(m)
}
Expand Down
Loading
Loading