diff --git a/CHANGELOG.md b/CHANGELOG.md index 51be8440d3d..fbfa5e4f217 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 4x. (#7443) - Improve performance of concurrent synchronous gauge measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7478) - Improve performance of concurrent exponential histogram measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7702) +- Improve the concurrent performance of `FixedSizeReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar`. (#7447) ### Fixed diff --git a/sdk/metric/exemplar/benchmark_test.go b/sdk/metric/exemplar/benchmark_test.go index f00a570f5cf..cb907ec8d28 100644 --- a/sdk/metric/exemplar/benchmark_test.go +++ b/sdk/metric/exemplar/benchmark_test.go @@ -22,9 +22,7 @@ func BenchmarkFixedSizeReservoirOffer(b *testing.B) { // reservoirs records exemplars very infrequently after a large // number of collect calls. if i%100 == 99 { - reservoir.mu.Lock() reservoir.reset() - reservoir.mu.Unlock() } i++ } diff --git a/sdk/metric/exemplar/fixed_size_reservoir.go b/sdk/metric/exemplar/fixed_size_reservoir.go index 4dba8bdc64a..15d71389963 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir.go +++ b/sdk/metric/exemplar/fixed_size_reservoir.go @@ -8,6 +8,7 @@ import ( "math" "math/rand/v2" "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" @@ -26,7 +27,19 @@ func FixedSizeReservoirProvider(k int) ReservoirProvider { // sample each one. If there are more than k, the Reservoir will then randomly // sample all additional measurement with a decreasing probability. func NewFixedSizeReservoir(k int) *FixedSizeReservoir { - return newFixedSizeReservoir(newStorage(k)) + if k < 0 { + k = 0 + } + // Use math.MaxInt32 instead of math.MaxUint32 to prevent overflowing int + // on 32-bit systems. + if k > math.MaxInt32 { + k = math.MaxInt32 + } + return &FixedSizeReservoir{ + storage: newStorage(k), + // Above we ensure k is positive, and less than MaxInt32. + nextTracker: newNextTracker(uint32(k)), // nolint: gosec + } } var _ Reservoir = &FixedSizeReservoir{} @@ -38,40 +51,7 @@ var _ Reservoir = &FixedSizeReservoir{} type FixedSizeReservoir struct { reservoir.ConcurrentSafe *storage - mu sync.Mutex - - // count is the number of measurement seen. - count int64 - // next is the next count that will store a measurement at a random index - // once the reservoir has been filled. - next int64 - // w is the largest random number in a distribution that is used to compute - // the next next. - w float64 -} - -func newFixedSizeReservoir(s *storage) *FixedSizeReservoir { - r := &FixedSizeReservoir{ - storage: s, - } - r.reset() - return r -} - -// randomFloat64 returns, as a float64, a uniform pseudo-random number in the -// open interval (0.0,1.0). -func (*FixedSizeReservoir) randomFloat64() float64 { - // TODO: Use an algorithm that avoids rejection sampling. For example: - // - // const precision = 1 << 53 // 2^53 - // // Generate an integer in [1, 2^53 - 1] - // v := rand.Uint64() % (precision - 1) + 1 - // return float64(v) / float64(precision) - f := rand.Float64() - for f == 0 { - f = rand.Float64() - } - return f + *nextTracker } // Offer accepts the parameters associated with a measurement. The @@ -127,25 +107,65 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a // https://github.com/MrAlias/reservoir-sampling for a performance // comparison of reservoir sampling algorithms. - r.mu.Lock() - defer r.mu.Unlock() - if int(r.count) < cap(r.measurements) { - r.store(ctx, int(r.count), t, n, a) - } else if r.count == r.next { + count, next := r.incrementCount() + if count < r.k { + r.store(ctx, int(count), t, n, a) + } else if count == next { // Overwrite a random existing measurement with the one offered. - idx := int(rand.Int64N(int64(cap(r.measurements)))) + idx := rand.IntN(int(r.k)) r.store(ctx, idx, t, n, a) + r.wMu.Lock() + defer r.wMu.Unlock() + newCount, newNext := r.loadCountAndNext() + if newNext < next || newCount < count { + // This Observe() raced with Collect(), and r.reset() has been + // called since r.incrementCount(). Skip the call to advance in + // this case because our exemplar may have been collected in the + // previous interval. + return + } r.advance() } - r.count++ +} + +// Collect returns all the held exemplars. +// +// The Reservoir state is preserved after this call. +func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) { + r.storage.Collect(dest) + // Call reset here even though it will reset r.count and restart the random + // number series. This will persist any old exemplars as long as no new + // measurements are offered, but it will also prioritize those new + // measurements that are made over the older collection cycle ones. + r.reset() +} + +func newNextTracker(k uint32) *nextTracker { + nt := &nextTracker{k: k} + nt.reset() + return nt +} + +type nextTracker struct { + // countAndNext holds the current counts in the lower 32 bits and the next + // value in the upper 32 bits. + countAndNext atomic.Uint64 + // w is the largest random number in a distribution that is used to compute + // the next next. + w float64 + // wMu ensures w is kept consistent with next during advance and reset. + wMu sync.Mutex + // k is the number of measurements that can be stored in the reservoir. + k uint32 } // reset resets r to the initial state. -func (r *FixedSizeReservoir) reset() { +func (r *nextTracker) reset() { + r.wMu.Lock() + defer r.wMu.Unlock() // This resets the number of exemplars known. - r.count = 0 // Random index inserts should only happen after the storage is full. - r.next = int64(cap(r.measurements)) + r.setCountAndNext(0, r.k) // Initial random number in the series used to generate r.next. // @@ -156,14 +176,40 @@ func (r *FixedSizeReservoir) reset() { // This maps the uniform random number in (0,1) to a geometric distribution // over the same interval. The mean of the distribution is inversely // proportional to the storage capacity. - r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements))) + r.w = math.Exp(math.Log(randomFloat64()) / float64(r.k)) r.advance() } +// incrementCount increments the count. It returns the count before the +// increment and the current next value. +func (r *nextTracker) incrementCount() (uint32, uint32) { + n := r.countAndNext.Add(1) + // Both count and next are stored in the upper and lower 32 bits, and thus + // can't overflow. + return uint32(n&((1<<32)-1) - 1), uint32(n >> 32) // nolint: gosec +} + +// incrementNext increments the next value. +func (r *nextTracker) incrementNext(inc uint32) { + r.countAndNext.Add(uint64(inc) << 32) +} + +// setCountAndNext sets the count and next values. +func (r *nextTracker) setCountAndNext(count, next uint32) { + r.countAndNext.Store(uint64(next)<<32 + uint64(count)) +} + +func (r *nextTracker) loadCountAndNext() (uint32, uint32) { + n := r.countAndNext.Load() + // Both count and next are stored in the upper and lower 32 bits, and thus + // can't overflow. + return uint32(n&((1<<32)-1) - 1), uint32(n >> 32) // nolint: gosec +} + // advance updates the count at which the offered measurement will overwrite an // existing exemplar. -func (r *FixedSizeReservoir) advance() { +func (r *nextTracker) advance() { // Calculate the next value in the random number series. // // The current value of r.w is based on the max of a distribution of random @@ -176,7 +222,7 @@ func (r *FixedSizeReservoir) advance() { // therefore the next r.w will be based on the same distribution (i.e. // `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by // computing the next random number `u` and take r.w as `w * u^(1/k)`. - r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements))) + r.w *= math.Exp(math.Log(randomFloat64()) / float64(r.k)) // Use the new random number in the series to calculate the count of the // next measurement that will be stored. // @@ -187,19 +233,21 @@ func (r *FixedSizeReservoir) advance() { // // Important to note, the new r.next will always be at least 1 more than // the last r.next. - r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1 + r.incrementNext(uint32(math.Log(randomFloat64())/math.Log(1-r.w)) + 1) } -// Collect returns all the held exemplars. -// -// The Reservoir state is preserved after this call. -func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) { - r.mu.Lock() - defer r.mu.Unlock() - r.storage.Collect(dest) - // Call reset here even though it will reset r.count and restart the random - // number series. This will persist any old exemplars as long as no new - // measurements are offered, but it will also prioritize those new - // measurements that are made over the older collection cycle ones. - r.reset() +// randomFloat64 returns, as a float64, a uniform pseudo-random number in the +// open interval (0.0,1.0). +func randomFloat64() float64 { + // TODO: Use an algorithm that avoids rejection sampling. For example: + // + // const precision = 1 << 53 // 2^53 + // // Generate an integer in [1, 2^53 - 1] + // v := rand.Uint64() % (precision - 1) + 1 + // return float64(v) / float64(precision) + f := rand.Float64() + for f == 0 { + f = rand.Float64() + } + return f } diff --git a/sdk/metric/exemplar/fixed_size_reservoir_test.go b/sdk/metric/exemplar/fixed_size_reservoir_test.go index 76b9a5f1be5..1d5fb5c6a77 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir_test.go +++ b/sdk/metric/exemplar/fixed_size_reservoir_test.go @@ -10,15 +10,19 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/attribute" ) func TestNewFixedSizeReservoir(t *testing.T) { t.Run("Int64", ReservoirTest[int64](func(n int) (ReservoirProvider, int) { - return FixedSizeReservoirProvider(n), n + provider := FixedSizeReservoirProvider(n) + return provider, int(provider(attribute.NewSet()).(*FixedSizeReservoir).k) })) t.Run("Float64", ReservoirTest[float64](func(n int) (ReservoirProvider, int) { - return FixedSizeReservoirProvider(n), n + provider := FixedSizeReservoirProvider(n) + return provider, int(provider(attribute.NewSet()).(*FixedSizeReservoir).k) })) } @@ -63,3 +67,19 @@ func TestFixedSizeReservoirConcurrentSafe(t *testing.T) { return FixedSizeReservoirProvider(n), n })) } + +func TestNextTrackerAtomics(t *testing.T) { + capacity := uint32(10) + nt := newNextTracker(capacity) + nt.setCountAndNext(0, 11) + count, next := nt.incrementCount() + assert.Equal(t, uint32(0), count) + assert.Equal(t, uint32(11), next) + count, secondNext := nt.incrementCount() + assert.Equal(t, uint32(1), count) + assert.Equal(t, next, secondNext) + nt.setCountAndNext(50, 100) + count, next = nt.incrementCount() + assert.Equal(t, uint32(50), count) + assert.Equal(t, uint32(100), next) +} diff --git a/sdk/metric/exemplar/reservoir_test.go b/sdk/metric/exemplar/reservoir_test.go index bff5bc997cd..6d1f63c589d 100644 --- a/sdk/metric/exemplar/reservoir_test.go +++ b/sdk/metric/exemplar/reservoir_test.go @@ -144,6 +144,23 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { r.Collect(&dest) assert.Empty(t, dest, "no exemplars should be collected") }) + + t.Run("Negative reservoir capacity drops all", func(t *testing.T) { + t.Helper() + + rp, n := f(-1) + if n > 0 { + t.Skip("skipping, reservoir capacity greater than 0:", n) + } + assert.Zero(t, n) + r := rp(*attribute.EmptySet()) + + r.Offer(t.Context(), staticTime, NewValue(N(10)), nil) + + dest := []Exemplar{{}} // Should be reset to empty. + r.Collect(&dest) + assert.Empty(t, dest, "no exemplars should be collected") + }) } }