diff --git a/CHANGELOG.md b/CHANGELOG.md index 29d5010a21a..e835c1d9085 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421) - The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types. If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442) +- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 10x. (#7443) +- Improve the concurrent performance of `FixedSizeReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 3x. (#7447) diff --git a/sdk/metric/exemplar/benchmark_test.go b/sdk/metric/exemplar/benchmark_test.go index f00a570f5cf..f1d90c7243d 100644 --- a/sdk/metric/exemplar/benchmark_test.go +++ b/sdk/metric/exemplar/benchmark_test.go @@ -18,9 +18,9 @@ func BenchmarkFixedSizeReservoirOffer(b *testing.B) { i := 0 for pb.Next() { reservoir.Offer(ctx, ts, val, nil) - // Periodically trigger a reset, because the algorithm for fixed-size - // reservoirs records exemplars very infrequently after a large - // number of collect calls. + // Periodically trigger a reset, because the algorithm records + // exemplars very infrequently after a large number of collect + // calls. if i%100 == 99 { reservoir.mu.Lock() reservoir.reset() @@ -44,6 +44,14 @@ func BenchmarkHistogramReservoirOffer(b *testing.B) { i := 0 for pb.Next() { res.Offer(ctx, ts, values[i%len(values)], nil) + // Periodically trigger a reset, because the algorithm records + // exemplars very infrequently after a large number of collect + // calls. + if i%100 == 99 { + for i := range res.trackers { + res.trackers[i].reset() + } + } i++ } }) diff --git a/sdk/metric/exemplar/fixed_size_reservoir.go b/sdk/metric/exemplar/fixed_size_reservoir.go index 6afb3bed3af..40dd7926c66 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir.go +++ b/sdk/metric/exemplar/fixed_size_reservoir.go @@ -7,6 +7,8 @@ import ( "context" "math" "math/rand/v2" + "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" @@ -25,7 +27,13 @@ func FixedSizeReservoirProvider(k int) ReservoirProvider { // sample each one. If there are more than k, the Reservoir will then randomly // sample all additional measurement with a decreasing probability. func NewFixedSizeReservoir(k int) *FixedSizeReservoir { - return newFixedSizeReservoir(newStorage(k)) + if k < 0 { + k = 0 + } + return &FixedSizeReservoir{ + storage: newStorage(k), + nextTracker: newNextTracker(k), + } } var _ Reservoir = &FixedSizeReservoir{} @@ -37,39 +45,7 @@ var _ Reservoir = &FixedSizeReservoir{} type FixedSizeReservoir struct { reservoir.ConcurrentSafe *storage - - // count is the number of measurement seen. - count int64 - // next is the next count that will store a measurement at a random index - // once the reservoir has been filled. - next int64 - // w is the largest random number in a distribution that is used to compute - // the next next. - w float64 -} - -func newFixedSizeReservoir(s *storage) *FixedSizeReservoir { - r := &FixedSizeReservoir{ - storage: s, - } - r.reset() - return r -} - -// randomFloat64 returns, as a float64, a uniform pseudo-random number in the -// open interval (0.0,1.0). -func (*FixedSizeReservoir) randomFloat64() float64 { - // TODO: Use an algorithm that avoids rejection sampling. For example: - // - // const precision = 1 << 53 // 2^53 - // // Generate an integer in [1, 2^53 - 1] - // v := rand.Uint64() % (precision - 1) + 1 - // return float64(v) / float64(precision) - f := rand.Float64() - for f == 0 { - f = rand.Float64() - } - return f + *nextTracker } // Offer accepts the parameters associated with a measurement. The @@ -125,25 +101,58 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a // https://github.com/MrAlias/reservoir-sampling for a performance // comparison of reservoir sampling algorithms. - r.mu.Lock() - defer r.mu.Unlock() - if int(r.count) < cap(r.measurements) { - r.store(int(r.count), newMeasurement(ctx, t, n, a)) - } else if r.count == r.next { + count, next := r.incrementCount() + intCount := int(count) // nolint:gosec // count is at most 32 bits in length + if intCount < r.k { + r.store(intCount, newMeasurement(ctx, t, n, a)) + } else if count == next { // Overwrite a random existing measurement with the one offered. - idx := int(rand.Int64N(int64(cap(r.measurements)))) + idx := rand.IntN(r.k) r.store(idx, newMeasurement(ctx, t, n, a)) + r.wMu.Lock() + defer r.wMu.Unlock() r.advance() } - r.count++ +} + +// Collect returns all the held exemplars. +// +// The Reservoir state is preserved after this call. +func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) { + r.storage.Collect(dest) + // Call reset here even though it will reset r.count and restart the random + // number series. This will persist any old exemplars as long as no new + // measurements are offered, but it will also prioritize those new + // measurements that are made over the older collection cycle ones. + r.reset() +} + +func newNextTracker(k int) *nextTracker { + nt := &nextTracker{k: k} + nt.reset() + return nt +} + +type nextTracker struct { + // count is the number of measurement seen, and is in the lower 32 bits. + // once the reservoir has been filled, and is in the upper 32 bits. + countAndNext atomic.Uint64 + // w is the largest random number in a distribution that is used to compute + // the next next. + w float64 + // wMu ensures w is kept consistent with next during advance and reset. + wMu sync.Mutex + // k is the number of measurements that can be stored in the reservoir. + k int } // reset resets r to the initial state. -func (r *FixedSizeReservoir) reset() { +func (r *nextTracker) reset() { + r.wMu.Lock() + defer r.wMu.Unlock() // This resets the number of exemplars known. - r.count = 0 // Random index inserts should only happen after the storage is full. - r.next = int64(cap(r.measurements)) + r.setCountAndNext(0, uint64(r.k)) // nolint:gosec // we ensure k is 1 or greater. // Initial random number in the series used to generate r.next. // @@ -154,14 +163,30 @@ func (r *FixedSizeReservoir) reset() { // This maps the uniform random number in (0,1) to a geometric distribution // over the same interval. The mean of the distribution is inversely // proportional to the storage capacity. - r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements))) + r.w = math.Exp(math.Log(randomFloat64()) / float64(r.k)) r.advance() } +// returns the count before the increment and next value. +func (r *nextTracker) incrementCount() (uint64, uint64) { + n := r.countAndNext.Add(1) + return n&((1<<32)-1) - 1, n >> 32 +} + +// returns the count before the increment and next value. +func (r *nextTracker) incrementNext(inc uint64) { + r.countAndNext.Add(inc << 32) +} + +// returns the count before the increment and next value. +func (r *nextTracker) setCountAndNext(count, next uint64) { + r.countAndNext.Store(next<<32 + count) +} + // advance updates the count at which the offered measurement will overwrite an // existing exemplar. -func (r *FixedSizeReservoir) advance() { +func (r *nextTracker) advance() { // Calculate the next value in the random number series. // // The current value of r.w is based on the max of a distribution of random @@ -174,7 +199,7 @@ func (r *FixedSizeReservoir) advance() { // therefore the next r.w will be based on the same distribution (i.e. // `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by // computing the next random number `u` and take r.w as `w * u^(1/k)`. - r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements))) + r.w *= math.Exp(math.Log(randomFloat64()) / float64(r.k)) // Use the new random number in the series to calculate the count of the // next measurement that will be stored. // @@ -185,17 +210,21 @@ func (r *FixedSizeReservoir) advance() { // // Important to note, the new r.next will always be at least 1 more than // the last r.next. - r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1 + r.incrementNext(uint64(math.Log(randomFloat64())/math.Log(1-r.w)) + 1) } -// Collect returns all the held exemplars. -// -// The Reservoir state is preserved after this call. -func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) { - r.storage.Collect(dest) - // Call reset here even though it will reset r.count and restart the random - // number series. This will persist any old exemplars as long as no new - // measurements are offered, but it will also prioritize those new - // measurements that are made over the older collection cycle ones. - r.reset() +// randomFloat64 returns, as a float64, a uniform pseudo-random number in the +// open interval (0.0,1.0). +func randomFloat64() float64 { + // TODO: Use an algorithm that avoids rejection sampling. For example: + // + // const precision = 1 << 53 // 2^53 + // // Generate an integer in [1, 2^53 - 1] + // v := rand.Uint64() % (precision - 1) + 1 + // return float64(v) / float64(precision) + f := rand.Float64() + for f == 0 { + f = rand.Float64() + } + return f } diff --git a/sdk/metric/exemplar/fixed_size_reservoir_test.go b/sdk/metric/exemplar/fixed_size_reservoir_test.go index 914a2238d69..90992e54ce6 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir_test.go +++ b/sdk/metric/exemplar/fixed_size_reservoir_test.go @@ -45,8 +45,15 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) { } var sum float64 - for _, m := range r.measurements { - sum += m.Value.Float64() + for _, val := range r.measurements { + loaded := val.Load() + if loaded == nil { + continue + } + m := loaded.(*measurement) + if m != nil { + sum += m.Value.Float64() + } } mean := sum / float64(sampleSize) @@ -54,3 +61,19 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) { // ensuring no bias in our random sampling algorithm. assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ. } + +func TestNextTrackerAtomics(t *testing.T) { + capacity := 10 + nt := newNextTracker(capacity) + nt.setCountAndNext(0, 11) + count, next := nt.incrementCount() + assert.Equal(t, uint64(0), count) + assert.Equal(t, uint64(11), next) + count, secondNext := nt.incrementCount() + assert.Equal(t, uint64(1), count) + assert.Equal(t, next, secondNext) + nt.setCountAndNext(50, 100) + count, next = nt.incrementCount() + assert.Equal(t, uint64(50), count) + assert.Equal(t, uint64(100), next) +} diff --git a/sdk/metric/exemplar/histogram_reservoir.go b/sdk/metric/exemplar/histogram_reservoir.go index 12cf8d36a63..7da3ca908d8 100644 --- a/sdk/metric/exemplar/histogram_reservoir.go +++ b/sdk/metric/exemplar/histogram_reservoir.go @@ -29,8 +29,9 @@ func HistogramReservoirProvider(bounds []float64) ReservoirProvider { // The passed bounds must be sorted before calling this function. func NewHistogramReservoir(bounds []float64) *HistogramReservoir { return &HistogramReservoir{ - bounds: bounds, - storage: newStorage(len(bounds) + 1), + bounds: bounds, + storage: newStorage(len(bounds) + 1), + trackers: make([]nextTracker, len(bounds)+1), } } @@ -43,6 +44,8 @@ type HistogramReservoir struct { reservoir.ConcurrentSafe *storage + trackers []nextTracker + // bounds are bucket bounds in ascending order. bounds []float64 } @@ -68,11 +71,27 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a default: panic("unknown value type") } - idx := sort.SearchFloat64s(r.bounds, n) - m := newMeasurement(ctx, t, v, a) - r.mu.Lock() - defer r.mu.Unlock() - r.store(idx, m) + count, next := r.trackers[idx].incrementCount() + if count == 0 || count == next { + r.store(idx, newMeasurement(ctx, t, v, a)) + r.trackers[idx].wMu.Lock() + defer r.trackers[idx].wMu.Unlock() + r.trackers[idx].advance() + } +} + +// Collect returns all the held exemplars. +// +// The Reservoir state is preserved after this call. +func (r *HistogramReservoir) Collect(dest *[]Exemplar) { + r.storage.Collect(dest) + // Call reset here even though it will reset r.count and restart the random + // number series. This will persist any old exemplars as long as no new + // measurements are offered, but it will also prioritize those new + // measurements that are made over the older collection cycle ones. + for i := range r.trackers { + r.trackers[i].reset() + } } diff --git a/sdk/metric/exemplar/storage.go b/sdk/metric/exemplar/storage.go index 760c3c87119..ad9d2b87d3e 100644 --- a/sdk/metric/exemplar/storage.go +++ b/sdk/metric/exemplar/storage.go @@ -6,6 +6,7 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" import ( "context" "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" @@ -19,15 +20,18 @@ type storage struct { // // This does not use []metricdata.Exemplar because it potentially would // require an allocation for trace and span IDs in the hot path of Offer. - measurements []measurement + measurements []atomic.Value } func newStorage(n int) *storage { - return &storage{measurements: make([]measurement, n)} + return &storage{measurements: make([]atomic.Value, n)} } -func (r *storage) store(idx int, m measurement) { - r.measurements[idx] = m +func (r *storage) store(idx int, m *measurement) { + old := r.measurements[idx].Swap(m) + if old != nil { + mPool.Put(old) + } } // Collect returns all the held exemplars. @@ -38,7 +42,12 @@ func (r *storage) Collect(dest *[]Exemplar) { defer r.mu.Unlock() *dest = reset(*dest, len(r.measurements), len(r.measurements)) var n int - for _, m := range r.measurements { + for _, val := range r.measurements { + loaded := val.Load() + if loaded == nil { + continue + } + m := loaded.(*measurement) if !m.valid { continue } @@ -58,20 +67,26 @@ type measurement struct { // Value is the value of the measurement. Value Value // SpanContext is the SpanContext active when a measurement was made. - SpanContext trace.SpanContext + Ctx context.Context valid bool } +var mPool = sync.Pool{ + New: func() any { + return &measurement{} + }, +} + // newMeasurement returns a new non-empty Measurement. -func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement { - return measurement{ - FilteredAttributes: droppedAttr, - Time: ts, - Value: v, - SpanContext: trace.SpanContextFromContext(ctx), - valid: true, - } +func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) *measurement { + m := mPool.Get().(*measurement) + m.FilteredAttributes = droppedAttr + m.Time = ts + m.Value = v + m.Ctx = ctx + m.valid = true + return m } // exemplar returns m as an [Exemplar]. @@ -80,15 +95,16 @@ func (m measurement) exemplar(dest *Exemplar) { dest.Time = m.Time dest.Value = m.Value - if m.SpanContext.HasTraceID() { - traceID := m.SpanContext.TraceID() + sc := trace.SpanContextFromContext(m.Ctx) + if sc.HasTraceID() { + traceID := sc.TraceID() dest.TraceID = traceID[:] } else { dest.TraceID = dest.TraceID[:0] } - if m.SpanContext.HasSpanID() { - spanID := m.SpanContext.SpanID() + if sc.HasSpanID() { + spanID := sc.SpanID() dest.SpanID = spanID[:] } else { dest.SpanID = dest.SpanID[:0]