Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This prevents scrape failures when the Prometheus exporter is misconfigured to get data from the Prometheus bridge. (#7688)
- Improve performance of concurrent histogram measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7474)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/stdout/stdoutmetric`. (#7492)
- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 4x. (#7443)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
60 changes: 36 additions & 24 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,117 +81,127 @@ func benchSyncViews(sc trace.SpanContext, views ...View) func(*testing.B) {
expRdr := NewManualReader(WithAggregationSelector(exponentialAggregationSelector))
expProvider := NewMeterProvider(WithReader(expRdr), WithView(views...))
expMeter := expProvider.Meter("benchSyncViews")
// Precompute histogram values so they are distributed equally to buckets.
histogramBuckets := DefaultAggregationSelector(InstrumentKindHistogram).(AggregationExplicitBucketHistogram).Boundaries
histogramObservations := make([]float64, len(histogramBuckets))
for i, bucket := range histogramBuckets {
histogramObservations[i] = bucket + 1
}
return func(b *testing.B) {
ctx := trace.ContextWithSpanContext(b.Context(), sc)
iCtr, err := meter.Int64Counter("int64-counter")
assert.NoError(b, err)
b.Run("Int64Counter", benchMeasAttrs(func() measF {
return func(s attribute.Set) func() {
return func(s attribute.Set) func(int) {
o := []metric.AddOption{metric.WithAttributeSet(s)}
return func() { iCtr.Add(ctx, 1, o...) }
return func(int) { iCtr.Add(ctx, 1, o...) }
}
}()))

fCtr, err := meter.Float64Counter("float64-counter")
assert.NoError(b, err)
b.Run("Float64Counter", benchMeasAttrs(func() measF {
return func(s attribute.Set) func() {
return func(s attribute.Set) func(int) {
o := []metric.AddOption{metric.WithAttributeSet(s)}
return func() { fCtr.Add(ctx, 1, o...) }
return func(int) { fCtr.Add(ctx, 1, o...) }
}
}()))

iUDCtr, err := meter.Int64UpDownCounter("int64-up-down-counter")
assert.NoError(b, err)
b.Run("Int64UpDownCounter", benchMeasAttrs(func() measF {
return func(s attribute.Set) func() {
return func(s attribute.Set) func(int) {
o := []metric.AddOption{metric.WithAttributeSet(s)}
return func() { iUDCtr.Add(ctx, 1, o...) }
return func(int) { iUDCtr.Add(ctx, 1, o...) }
}
}()))

fUDCtr, err := meter.Float64UpDownCounter("float64-up-down-counter")
assert.NoError(b, err)
b.Run("Float64UpDownCounter", benchMeasAttrs(func() measF {
return func(s attribute.Set) func() {
return func(s attribute.Set) func(int) {
o := []metric.AddOption{metric.WithAttributeSet(s)}
return func() { fUDCtr.Add(ctx, 1, o...) }
return func(int) { fUDCtr.Add(ctx, 1, o...) }
}
}()))

iGauge, err := meter.Int64Gauge("int64-gauge")
assert.NoError(b, err)
b.Run("Int64Gauge", benchMeasAttrs(func() measF {
return func(s attribute.Set) func() {
return func(s attribute.Set) func(int) {
o := []metric.RecordOption{metric.WithAttributeSet(s)}
return func() { iGauge.Record(ctx, 1, o...) }
return func(int) { iGauge.Record(ctx, 1, o...) }
}
}()))

fGauge, err := meter.Float64Gauge("float64-gauge")
assert.NoError(b, err)
b.Run("Float64Gauge", benchMeasAttrs(func() measF {
return func(s attribute.Set) func() {
return func(s attribute.Set) func(int) {
o := []metric.RecordOption{metric.WithAttributeSet(s)}
return func() { fGauge.Record(ctx, 1, o...) }
return func(int) { fGauge.Record(ctx, 1, o...) }
}
}()))

iHist, err := meter.Int64Histogram("int64-histogram")
assert.NoError(b, err)
b.Run("Int64Histogram", benchMeasAttrs(func() measF {
return func(s attribute.Set) func() {
return func(s attribute.Set) func(int) {
o := []metric.RecordOption{metric.WithAttributeSet(s)}
return func() { iHist.Record(ctx, 1, o...) }
return func(i int) { iHist.Record(ctx, int64(histogramObservations[i%len(histogramObservations)]), o...) }
}
}()))

fHist, err := meter.Float64Histogram("float64-histogram")
assert.NoError(b, err)
b.Run("Float64Histogram", benchMeasAttrs(func() measF {
return func(s attribute.Set) func() {
return func(s attribute.Set) func(i int) {
o := []metric.RecordOption{metric.WithAttributeSet(s)}
return func() { fHist.Record(ctx, 1, o...) }
return func(i int) { fHist.Record(ctx, histogramObservations[i%len(histogramObservations)], o...) }
}
}()))

expIHist, err := expMeter.Int64Histogram("exponential-int64-histogram")
assert.NoError(b, err)
b.Run("ExponentialInt64Histogram", benchMeasAttrs(func() measF {
return func(s attribute.Set) func() {
return func(s attribute.Set) func(int) {
o := []metric.RecordOption{metric.WithAttributeSet(s)}
return func() { expIHist.Record(ctx, 1, o...) }
return func(int) { expIHist.Record(ctx, 1, o...) }
}
}()))

expFHist, err := expMeter.Float64Histogram("exponential-float64-histogram")
assert.NoError(b, err)
b.Run("ExponentialFloat64Histogram", benchMeasAttrs(func() measF {
return func(s attribute.Set) func() {
return func(s attribute.Set) func(int) {
o := []metric.RecordOption{metric.WithAttributeSet(s)}
return func() { expFHist.Record(ctx, 1, o...) }
return func(int) { expFHist.Record(ctx, 1, o...) }
}
}()))
}
}

type measF func(s attribute.Set) func()
type measF func(s attribute.Set) func(i int)

func benchMeasAttrs(meas measF) func(*testing.B) {
return func(b *testing.B) {
b.Run("Attributes/0", func(b *testing.B) {
f := meas(*attribute.EmptySet())
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
f()
f(i)
i++
}
})
})
b.Run("Attributes/1", func(b *testing.B) {
f := meas(attribute.NewSet(attribute.Bool("K", true)))
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
f()
f(i)
i++
}
})
})
Expand All @@ -204,8 +214,10 @@ func benchMeasAttrs(meas measF) func(*testing.B) {
}
f := meas(attribute.NewSet(attrs...))
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
f()
f(i)
i++
}
})
})
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a
r.mu.Lock()
defer r.mu.Unlock()
if int(r.count) < cap(r.measurements) {
Comment thread
dashpole marked this conversation as resolved.
r.store(int(r.count), newMeasurement(ctx, t, n, a))
r.store(ctx, int(r.count), t, n, a)
} else if r.count == r.next {
// Overwrite a random existing measurement with the one offered.
idx := int(rand.Int64N(int64(cap(r.measurements))))
r.store(idx, newMeasurement(ctx, t, n, a))
r.store(ctx, idx, t, n, a)
r.advance()
}
r.count++
Expand Down
13 changes: 11 additions & 2 deletions sdk/metric/exemplar/fixed_size_reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,21 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
}

var sum float64
for _, m := range r.measurements {
sum += m.Value.Float64()
for i := range r.measurements {
sum += r.measurements[i].Value.Float64()
}
mean := sum / float64(sampleSize)

// Check the intensity/rate of the sampled distribution is preserved
// ensuring no bias in our random sampling algorithm.
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
}

func TestFixedSizeReservoirConcurrentSafe(t *testing.T) {
t.Run("Int64", reservoirConcurrentSafeTest[int64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
}))
t.Run("Float64", reservoirConcurrentSafeTest[float64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
}))
}
9 changes: 1 addition & 8 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"slices"
"sort"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -43,7 +42,6 @@ var _ Reservoir = &HistogramReservoir{}
type HistogramReservoir struct {
reservoir.ConcurrentSafe
*storage
mu sync.Mutex

// bounds are bucket bounds in ascending order.
bounds []float64
Expand Down Expand Up @@ -72,18 +70,13 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
}

idx := sort.SearchFloat64s(r.bounds, n)
m := newMeasurement(ctx, t, v, a)

r.mu.Lock()
defer r.mu.Unlock()
r.store(idx, m)
r.store(ctx, idx, t, v, a)
}

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
r.storage.Collect(dest)
}
10 changes: 10 additions & 0 deletions sdk/metric/exemplar/histogram_reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,13 @@ func TestHist(t *testing.T) {
return HistogramReservoirProvider(bounds), len(bounds)
}))
}

func TestHistogramReservoirConcurrentSafe(t *testing.T) {
bounds := []float64{0, 100}
t.Run("Int64", reservoirConcurrentSafeTest[int64](func(int) (ReservoirProvider, int) {
return HistogramReservoirProvider(bounds), len(bounds)
}))
t.Run("Float64", reservoirConcurrentSafeTest[float64](func(int) (ReservoirProvider, int) {
return HistogramReservoirProvider(bounds), len(bounds)
}))
}
Comment thread
MrAlias marked this conversation as resolved.
84 changes: 84 additions & 0 deletions sdk/metric/exemplar/reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package exemplar

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -144,3 +146,85 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
})
}
}

func reservoirConcurrentSafeTest[N int64 | float64](f factory) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
rp, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

var wg sync.WaitGroup

const goroutines = 2

// Call Offer concurrently with another Offer, and with Collect.
for i := range goroutines {
wg.Add(1)
go func(iteration int) {
ctx, ts, val, attrs := generateOfferInputs[N](iteration + 1)
r.Offer(ctx, ts, val, attrs)
wg.Done()
}(i)
}

// Also test concurrent Collect calls
wg.Add(1)
go func() {
var dest []Exemplar
r.Collect(&dest)
wg.Done()
}()

wg.Wait()

// Final collect to validate state
var dest []Exemplar
r.Collect(&dest)
assert.NotEmpty(t, dest)
for _, e := range dest {
validateExemplar[N](t, e)
}
}
}

func generateOfferInputs[N int64 | float64](
i int,
) (context.Context, time.Time, Value, []attribute.KeyValue) {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID([16]byte{byte(i)}),
SpanID: trace.SpanID([8]byte{byte(i)}),
TraceFlags: trace.FlagsSampled,
})
ctx := trace.ContextWithSpanContext(context.Background(), sc)
ts := time.Unix(int64(i), int64(i))
val := NewValue(N(i))
attrs := []attribute.KeyValue{attribute.Int("i", i)}
return ctx, ts, val, attrs
}

func validateExemplar[N int64 | float64](t *testing.T, e Exemplar) {
Comment thread
dashpole marked this conversation as resolved.
t.Helper()
i := 0
switch e.Value.Type() {
case Int64ValueType:
i = int(e.Value.Int64())
case Float64ValueType:
i = int(e.Value.Float64())
Comment thread
dashpole marked this conversation as resolved.
default:
t.Fatalf("unexpected value type: %v", e.Value.Type())
}
if i == 0 {
t.Fatal("empty exemplar")
}
Comment thread
dashpole marked this conversation as resolved.
ctx, ts, _, attrs := generateOfferInputs[N](i)
sc := trace.SpanContextFromContext(ctx)
tID := sc.TraceID()
sID := sc.SpanID()
assert.Equal(t, tID[:], e.TraceID)
assert.Equal(t, sID[:], e.SpanID)
assert.Equal(t, ts, e.Time)
assert.Equal(t, attrs, e.FilteredAttributes)
}
Loading