Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 4x. (#7443)
- Improve performance of concurrent synchronous gauge measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7478)
- Improve performance of concurrent exponential histogram measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7702)
- Improve the concurrent performance of `FixedSizeReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar`. (#7447)

### Fixed

Expand Down
2 changes: 0 additions & 2 deletions sdk/metric/exemplar/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ func BenchmarkFixedSizeReservoirOffer(b *testing.B) {
// reservoirs records exemplars very infrequently after a large
// number of collect calls.
if i%100 == 99 {
reservoir.mu.Lock()
reservoir.reset()
reservoir.mu.Unlock()
}
i++
}
Expand Down
170 changes: 109 additions & 61 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"math/rand/v2"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -26,7 +27,19 @@ func FixedSizeReservoirProvider(k int) ReservoirProvider {
// sample each one. If there are more than k, the Reservoir will then randomly
// sample all additional measurement with a decreasing probability.
func NewFixedSizeReservoir(k int) *FixedSizeReservoir {
return newFixedSizeReservoir(newStorage(k))
if k < 0 {
k = 0
}
Comment thread
dashpole marked this conversation as resolved.
// Use math.MaxInt32 instead of math.MaxUint32 to prevent overflowing int
// on 32-bit systems.
Comment thread
pellared marked this conversation as resolved.
if k > math.MaxInt32 {
k = math.MaxInt32
}
return &FixedSizeReservoir{
storage: newStorage(k),
// Above we ensure k is positive, and less than MaxInt32.
nextTracker: newNextTracker(uint32(k)), // nolint: gosec
}
}

var _ Reservoir = &FixedSizeReservoir{}
Expand All @@ -38,40 +51,7 @@ var _ Reservoir = &FixedSizeReservoir{}
type FixedSizeReservoir struct {
reservoir.ConcurrentSafe
*storage
mu sync.Mutex

// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
}

func newFixedSizeReservoir(s *storage) *FixedSizeReservoir {
r := &FixedSizeReservoir{
storage: s,
}
r.reset()
return r
}

// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func (*FixedSizeReservoir) randomFloat64() float64 {
// TODO: Use an algorithm that avoids rejection sampling. For example:
//
// const precision = 1 << 53 // 2^53
// // Generate an integer in [1, 2^53 - 1]
// v := rand.Uint64() % (precision - 1) + 1
// return float64(v) / float64(precision)
f := rand.Float64()
for f == 0 {
f = rand.Float64()
}
return f
*nextTracker
}

// Offer accepts the parameters associated with a measurement. The
Expand Down Expand Up @@ -127,25 +107,65 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a
// https://github.com/MrAlias/reservoir-sampling for a performance
// comparison of reservoir sampling algorithms.

r.mu.Lock()
defer r.mu.Unlock()
if int(r.count) < cap(r.measurements) {
r.store(ctx, int(r.count), t, n, a)
} else if r.count == r.next {
count, next := r.incrementCount()
if count < r.k {
r.store(ctx, int(count), t, n, a)
} else if count == next {
// Overwrite a random existing measurement with the one offered.
idx := int(rand.Int64N(int64(cap(r.measurements))))
idx := rand.IntN(int(r.k))
r.store(ctx, idx, t, n, a)
r.wMu.Lock()
defer r.wMu.Unlock()
newCount, newNext := r.loadCountAndNext()
if newNext < next || newCount < count {
// This Observe() raced with Collect(), and r.reset() has been
// called since r.incrementCount(). Skip the call to advance in
// this case because our exemplar may have been collected in the
// previous interval.
return
}
r.advance()
}
r.count++
}

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
// measurements are offered, but it will also prioritize those new
// measurements that are made over the older collection cycle ones.
r.reset()
}

func newNextTracker(k uint32) *nextTracker {
nt := &nextTracker{k: k}
nt.reset()
return nt
}

type nextTracker struct {
// countAndNext holds the current counts in the lower 32 bits and the next
// value in the upper 32 bits.
countAndNext atomic.Uint64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
// wMu ensures w is kept consistent with next during advance and reset.
wMu sync.Mutex
// k is the number of measurements that can be stored in the reservoir.
k uint32
}

// reset resets r to the initial state.
func (r *FixedSizeReservoir) reset() {
func (r *nextTracker) reset() {
r.wMu.Lock()
defer r.wMu.Unlock()
// This resets the number of exemplars known.
r.count = 0
// Random index inserts should only happen after the storage is full.
r.next = int64(cap(r.measurements))
r.setCountAndNext(0, r.k)

// Initial random number in the series used to generate r.next.
//
Expand All @@ -156,14 +176,40 @@ func (r *FixedSizeReservoir) reset() {
// This maps the uniform random number in (0,1) to a geometric distribution
// over the same interval. The mean of the distribution is inversely
// proportional to the storage capacity.
r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements)))
r.w = math.Exp(math.Log(randomFloat64()) / float64(r.k))

r.advance()
}

// incrementCount increments the count. It returns the count before the
// increment and the current next value.
func (r *nextTracker) incrementCount() (uint32, uint32) {
n := r.countAndNext.Add(1)
// Both count and next are stored in the upper and lower 32 bits, and thus
// can't overflow.
return uint32(n&((1<<32)-1) - 1), uint32(n >> 32) // nolint: gosec
}

// incrementNext increments the next value.
func (r *nextTracker) incrementNext(inc uint32) {
r.countAndNext.Add(uint64(inc) << 32)
}

// setCountAndNext sets the count and next values.
func (r *nextTracker) setCountAndNext(count, next uint32) {
r.countAndNext.Store(uint64(next)<<32 + uint64(count))
}

func (r *nextTracker) loadCountAndNext() (uint32, uint32) {
n := r.countAndNext.Load()
// Both count and next are stored in the upper and lower 32 bits, and thus
// can't overflow.
return uint32(n&((1<<32)-1) - 1), uint32(n >> 32) // nolint: gosec
}

// advance updates the count at which the offered measurement will overwrite an
// existing exemplar.
func (r *FixedSizeReservoir) advance() {
func (r *nextTracker) advance() {
// Calculate the next value in the random number series.
//
// The current value of r.w is based on the max of a distribution of random
Expand All @@ -176,7 +222,7 @@ func (r *FixedSizeReservoir) advance() {
// therefore the next r.w will be based on the same distribution (i.e.
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements)))
r.w *= math.Exp(math.Log(randomFloat64()) / float64(r.k))
// Use the new random number in the series to calculate the count of the
// next measurement that will be stored.
//
Expand All @@ -187,19 +233,21 @@ func (r *FixedSizeReservoir) advance() {
//
// Important to note, the new r.next will always be at least 1 more than
// the last r.next.
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
r.incrementNext(uint32(math.Log(randomFloat64())/math.Log(1-r.w)) + 1)
}

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
// measurements are offered, but it will also prioritize those new
// measurements that are made over the older collection cycle ones.
r.reset()
// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func randomFloat64() float64 {
// TODO: Use an algorithm that avoids rejection sampling. For example:
//
// const precision = 1 << 53 // 2^53
// // Generate an integer in [1, 2^53 - 1]
// v := rand.Uint64() % (precision - 1) + 1
// return float64(v) / float64(precision)
f := rand.Float64()
for f == 0 {
f = rand.Float64()
}
return f
}
24 changes: 22 additions & 2 deletions sdk/metric/exemplar/fixed_size_reservoir_test.go
Comment thread
dashpole marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
)

func TestNewFixedSizeReservoir(t *testing.T) {
t.Run("Int64", ReservoirTest[int64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
provider := FixedSizeReservoirProvider(n)
return provider, int(provider(attribute.NewSet()).(*FixedSizeReservoir).k)
}))

t.Run("Float64", ReservoirTest[float64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
provider := FixedSizeReservoirProvider(n)
return provider, int(provider(attribute.NewSet()).(*FixedSizeReservoir).k)
}))
}

Expand Down Expand Up @@ -63,3 +67,19 @@ func TestFixedSizeReservoirConcurrentSafe(t *testing.T) {
return FixedSizeReservoirProvider(n), n
}))
}

func TestNextTrackerAtomics(t *testing.T) {
capacity := uint32(10)
nt := newNextTracker(capacity)
nt.setCountAndNext(0, 11)
count, next := nt.incrementCount()
assert.Equal(t, uint32(0), count)
assert.Equal(t, uint32(11), next)
count, secondNext := nt.incrementCount()
assert.Equal(t, uint32(1), count)
assert.Equal(t, next, secondNext)
nt.setCountAndNext(50, 100)
count, next = nt.incrementCount()
assert.Equal(t, uint32(50), count)
assert.Equal(t, uint32(100), next)
}
17 changes: 17 additions & 0 deletions sdk/metric/exemplar/reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
r.Collect(&dest)
assert.Empty(t, dest, "no exemplars should be collected")
})

t.Run("Negative reservoir capacity drops all", func(t *testing.T) {
t.Helper()

rp, n := f(-1)
if n > 0 {
t.Skip("skipping, reservoir capacity greater than 0:", n)
}
assert.Zero(t, n)
r := rp(*attribute.EmptySet())

r.Offer(t.Context(), staticTime, NewValue(N(10)), nil)

dest := []Exemplar{{}} // Should be reset to empty.
r.Collect(&dest)
assert.Empty(t, dest, "no exemplars should be collected")
})
}
}

Expand Down