From ce9849fe851b4703cd80f0d86ebf26e07e65f12b Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 15 Dec 2025 18:55:17 +0000 Subject: [PATCH] allow reservoirs to defer recording the timestamp until storage --- sdk/metric/exemplar/fixed_size_reservoir.go | 1 + sdk/metric/exemplar/histogram_reservoir.go | 1 + sdk/metric/exemplar/storage.go | 3 +++ sdk/metric/internal/aggregate/filtered_reservoir.go | 11 ++++++++++- sdk/metric/internal/reservoir/deferred_timestamp.go | 10 ++++++++++ 5 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 sdk/metric/internal/reservoir/deferred_timestamp.go diff --git a/sdk/metric/exemplar/fixed_size_reservoir.go b/sdk/metric/exemplar/fixed_size_reservoir.go index 4dba8bdc64a..aa9f63dafdd 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir.go +++ b/sdk/metric/exemplar/fixed_size_reservoir.go @@ -37,6 +37,7 @@ var _ Reservoir = &FixedSizeReservoir{} // additional measurement with a decreasing probability. type FixedSizeReservoir struct { reservoir.ConcurrentSafe + reservoir.DeferTimestamp *storage mu sync.Mutex diff --git a/sdk/metric/exemplar/histogram_reservoir.go b/sdk/metric/exemplar/histogram_reservoir.go index dacac3ebaec..3fdd40622e0 100644 --- a/sdk/metric/exemplar/histogram_reservoir.go +++ b/sdk/metric/exemplar/histogram_reservoir.go @@ -41,6 +41,7 @@ var _ Reservoir = &HistogramReservoir{} // define by bounds. type HistogramReservoir struct { reservoir.ConcurrentSafe + reservoir.DeferTimestamp *storage // bounds are bucket bounds in ascending order. diff --git a/sdk/metric/exemplar/storage.go b/sdk/metric/exemplar/storage.go index 790496027da..c22dfcba739 100644 --- a/sdk/metric/exemplar/storage.go +++ b/sdk/metric/exemplar/storage.go @@ -26,6 +26,9 @@ func newStorage(n int) *storage { } func (r *storage) store(ctx context.Context, idx int, ts time.Time, v Value, droppedAttr []attribute.KeyValue) { + if ts.IsZero() { + ts = time.Now() + } r.measurements[idx].mux.Lock() defer r.measurements[idx].mux.Unlock() r.measurements[idx].FilteredAttributes = droppedAttr diff --git a/sdk/metric/internal/aggregate/filtered_reservoir.go b/sdk/metric/internal/aggregate/filtered_reservoir.go index e4f9409bc80..814584f1aa4 100644 --- a/sdk/metric/internal/aggregate/filtered_reservoir.go +++ b/sdk/metric/internal/aggregate/filtered_reservoir.go @@ -36,6 +36,10 @@ type filteredExemplarReservoir[N int64 | float64] struct { // reservoir.ConcurrentSafe in order to improve performance. reservoirMux sync.Mutex concurrentSafe bool + // reservoirs can indicate that they would like to record the exemplars' + // timestamp by embedding reservoir.DeferTimestamp. The + // FilteredExemplarReservoir will pass a zero timestamp in this case. + deferTimestamp bool } // NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values @@ -45,17 +49,22 @@ func NewFilteredExemplarReservoir[N int64 | float64]( r exemplar.Reservoir, ) FilteredExemplarReservoir[N] { _, concurrentSafe := r.(reservoir.ConcurrentSafe) + _, deferTimestamp := r.(reservoir.DeferTimestamp) return &filteredExemplarReservoir[N]{ filter: f, reservoir: r, concurrentSafe: concurrentSafe, + deferTimestamp: deferTimestamp, } } func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { if f.filter(ctx) { // only record the current time if we are sampling this measurement. - ts := time.Now() + var ts time.Time + if !f.deferTimestamp { + ts = time.Now() + } if !f.concurrentSafe { f.reservoirMux.Lock() defer f.reservoirMux.Unlock() diff --git a/sdk/metric/internal/reservoir/deferred_timestamp.go b/sdk/metric/internal/reservoir/deferred_timestamp.go new file mode 100644 index 00000000000..56080f6e118 --- /dev/null +++ b/sdk/metric/internal/reservoir/deferred_timestamp.go @@ -0,0 +1,10 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package reservoir // import "go.opentelemetry.io/otel/sdk/metric/internal/reservoir" + +// DeferTimestamp is an interface that can be embedded in an +// exemplar.Reservoir to indicate to the SDK that it would like to take control +// over measuring the current timestamp for performance reasons. The SDK will +// provide a zero timestamp to reservoirs that embed this interface. +type DeferTimestamp interface{ deferTimestamp() }