Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var _ Reservoir = &FixedSizeReservoir{}
// additional measurement with a decreasing probability.
type FixedSizeReservoir struct {
reservoir.ConcurrentSafe
reservoir.DeferTimestamp
*storage
mu sync.Mutex

Expand Down
1 change: 1 addition & 0 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var _ Reservoir = &HistogramReservoir{}
// define by bounds.
type HistogramReservoir struct {
reservoir.ConcurrentSafe
reservoir.DeferTimestamp
*storage

// bounds are bucket bounds in ascending order.
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/exemplar/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func newStorage(n int) *storage {
}

func (r *storage) store(ctx context.Context, idx int, ts time.Time, v Value, droppedAttr []attribute.KeyValue) {
if ts.IsZero() {
ts = time.Now()
}
r.measurements[idx].mux.Lock()
defer r.measurements[idx].mux.Unlock()
r.measurements[idx].FilteredAttributes = droppedAttr
Expand Down
11 changes: 10 additions & 1 deletion sdk/metric/internal/aggregate/filtered_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type filteredExemplarReservoir[N int64 | float64] struct {
// reservoir.ConcurrentSafe in order to improve performance.
reservoirMux sync.Mutex
concurrentSafe bool
// reservoirs can indicate that they would like to record the exemplars'
// timestamp by embedding reservoir.DeferTimestamp. The
// FilteredExemplarReservoir will pass a zero timestamp in this case.
deferTimestamp bool
}

// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
Expand All @@ -45,17 +49,22 @@ func NewFilteredExemplarReservoir[N int64 | float64](
r exemplar.Reservoir,
) FilteredExemplarReservoir[N] {
_, concurrentSafe := r.(reservoir.ConcurrentSafe)
_, deferTimestamp := r.(reservoir.DeferTimestamp)
return &filteredExemplarReservoir[N]{
filter: f,
reservoir: r,
concurrentSafe: concurrentSafe,
deferTimestamp: deferTimestamp,
}
}

func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurement.
ts := time.Now()
var ts time.Time
if !f.deferTimestamp {
ts = time.Now()
}
if !f.concurrentSafe {
f.reservoirMux.Lock()
defer f.reservoirMux.Unlock()
Expand Down
10 changes: 10 additions & 0 deletions sdk/metric/internal/reservoir/deferred_timestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package reservoir // import "go.opentelemetry.io/otel/sdk/metric/internal/reservoir"

// DeferTimestamp is an interface that can be embedded in an
// exemplar.Reservoir to indicate to the SDK that it would like to take control
// over measuring the current timestamp for performance reasons. The SDK will
// provide a zero timestamp to reservoirs that embed this interface.
type DeferTimestamp interface{ deferTimestamp() }
Loading