Skip to content

Commit

Permalink
Move global RNG to randRes field
Browse files Browse the repository at this point in the history
Instead of using a global random number generator for all `randRes`,
have each value use its own. This removes the need for locking and
managing concurrent safe access to the global. Also, the field, given
the `Reservoir` type is not concurrent safe and the metric pipeline
guards this, does not need a `sync.Mutex` to guard it.
  • Loading branch information
MrAlias committed Sep 13, 2024
1 parent e9ac0d2 commit 8794be7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 58 deletions.
81 changes: 40 additions & 41 deletions sdk/metric/internal/exemplar/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,50 @@ import (
"context"
"math"
"math/rand"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
)

var (
// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize(k int) Reservoir {
return newRandRes(newStorage(k))
}

type randRes struct {
*storage

// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64

// rng is used to make sampling decisions.
//
// Do not use crypto/rand. There is no reason for the decrease in performance
// given this is not a security sensitive decision.
rng = rand.New(rand.NewSource(time.Now().UnixNano()))
// Ensure concurrent safe access to rng and its underlying source.
rngMu sync.Mutex
)
rng *rand.Rand
}

// random returns, as a float64, a uniform pseudo-random number in the open
// interval (0.0,1.0).
func random() float64 {
func newRandRes(s *storage) *randRes {
r := &randRes{
storage: s,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
r.reset()
return r
}

// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func (r *randRes) randomFloat64() float64 {
// TODO: This does not return a uniform number. rng.Float64 returns a
// uniformly random int in [0,2^53) that is divided by 2^53. Meaning it
// returns multiples of 2^-53, and not all floating point numbers between 0
Expand All @@ -43,39 +68,13 @@ func random() float64 {
//
// There are likely many other methods to explore here as well.

rngMu.Lock()
defer rngMu.Unlock()

f := rng.Float64()
f := r.rng.Float64()
for f == 0 {
f = rng.Float64()
f = r.rng.Float64()
}
return f
}

// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize(k int) Reservoir {
r := &randRes{storage: newStorage(k)}
r.reset()
return r
}

type randRes struct {
*storage

// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
}

func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
Expand Down Expand Up @@ -123,7 +122,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute
} else {
if r.count == r.next {
// Overwrite a random existing measurement with the one offered.
idx := int(rng.Int63n(int64(cap(r.store))))
idx := int(r.rng.Int63n(int64(cap(r.store))))
r.store[idx] = newMeasurement(ctx, t, n, a)
r.advance()
}
Expand All @@ -147,7 +146,7 @@ func (r *randRes) reset() {
// This maps the uniform random number in (0,1) to a geometric distribution
// over the same interval. The mean of the distribution is inversely
// proportional to the storage capacity.
r.w = math.Exp(math.Log(random()) / float64(cap(r.store)))
r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))

r.advance()
}
Expand All @@ -167,7 +166,7 @@ func (r *randRes) advance() {
// therefore the next r.w will be based on the same distribution (i.e.
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
r.w *= math.Exp(math.Log(random()) / float64(cap(r.store)))
r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))
// Use the new random number in the series to calculate the count of the
// next measurement that will be stored.
//
Expand All @@ -178,7 +177,7 @@ func (r *randRes) advance() {
//
// Important to note, the new r.next will always be at least 1 more than
// the last r.next.
r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
}

func (r *randRes) Collect(dest *[]Exemplar) {
Expand Down
22 changes: 5 additions & 17 deletions sdk/metric/internal/exemplar/rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package exemplar
import (
"context"
"math"
"math/rand"
"slices"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -27,10 +28,12 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
intensity := 0.1
sampleSize := 1000

rng := rand.New(rand.NewSource(time.Now().UnixNano()))

data := make([]float64, sampleSize*1000)
for i := range data {
// Generate exponentially distributed data.
data[i] = (-1.0 / intensity) * math.Log(random())
data[i] = (-1.0 / intensity) * math.Log(rng.Float64())
}
// Sort to test position bias.
slices.Sort(data)
Expand All @@ -50,18 +53,3 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
// ensuring no bias in our random sampling algorithm.
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
}

func TestRandomConcurrentSafe(t *testing.T) {
const goRoutines = 10

var wg sync.WaitGroup
for n := 0; n < goRoutines; n++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = random()
}()
}

wg.Wait()
}

0 comments on commit 8794be7

Please sign in to comment.