Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move global random number generator to randRes field #5819

Merged
merged 5 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `Logger.Enabled` in `go.opentelemetry.io/otel/log` now accepts a newly introduced `EnabledParameters` type instead of `Record`. (#5791)
- `FilterProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log/internal/x` now accepts `EnabledParameters` instead of `Record`. (#5791)

### Fixed

- The race condition for multiple `FixedSize` exemplar reservoirs identified in #5814 is resolved. (#5819)

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
63 changes: 63 additions & 0 deletions sdk/metric/exemplar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"runtime"
"sync"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestFixedSizeExemplarConcurrentSafe(t *testing.T) {
// Tests https://github.com/open-telemetry/opentelemetry-go/issues/5814

t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_on")

r := NewManualReader()
m := NewMeterProvider(WithReader(r)).Meter("exemplar-concurrency")
// Use two instruments to get concurrent access to any shared globals.
i0, err := m.Int64Counter("counter.0")
require.NoError(t, err)
i1, err := m.Int64Counter("counter.1")
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())

add := func() {
i0.Add(ctx, 1)
i1.Add(ctx, 2)
}

goRoutines := max(10, runtime.NumCPU())

var wg sync.WaitGroup
for n := 0; n < goRoutines; n++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
require.NotPanics(t, add)
}
}
}()
}

const collections = 100
var rm metricdata.ResourceMetrics
for c := 0; c < collections; c++ {
require.NotPanics(t, func() { _ = r.Collect(ctx, &rm) })
}

cancel()
wg.Wait()
}
81 changes: 40 additions & 41 deletions sdk/metric/internal/exemplar/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,50 @@ import (
"context"
"math"
"math/rand"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
)

var (
// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize(k int) Reservoir {
return newRandRes(newStorage(k))
}

type randRes struct {
*storage

// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64

// rng is used to make sampling decisions.
//
// Do not use crypto/rand. There is no reason for the decrease in performance
// given this is not a security sensitive decision.
rng = rand.New(rand.NewSource(time.Now().UnixNano()))
// Ensure concurrent safe access to rng and its underlying source.
rngMu sync.Mutex
)
rng *rand.Rand
}

// random returns, as a float64, a uniform pseudo-random number in the open
// interval (0.0,1.0).
func random() float64 {
func newRandRes(s *storage) *randRes {
r := &randRes{
storage: s,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
r.reset()
return r
}

// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func (r *randRes) randomFloat64() float64 {
// TODO: This does not return a uniform number. rng.Float64 returns a
// uniformly random int in [0,2^53) that is divided by 2^53. Meaning it
// returns multiples of 2^-53, and not all floating point numbers between 0
Expand All @@ -43,39 +68,13 @@ func random() float64 {
//
// There are likely many other methods to explore here as well.

rngMu.Lock()
defer rngMu.Unlock()

f := rng.Float64()
f := r.rng.Float64()
for f == 0 {
f = rng.Float64()
f = r.rng.Float64()
}
return f
}

// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize(k int) Reservoir {
r := &randRes{storage: newStorage(k)}
r.reset()
return r
}

type randRes struct {
*storage

// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
}

func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
Expand Down Expand Up @@ -123,7 +122,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute
} else {
if r.count == r.next {
// Overwrite a random existing measurement with the one offered.
idx := int(rng.Int63n(int64(cap(r.store))))
idx := int(r.rng.Int63n(int64(cap(r.store))))
r.store[idx] = newMeasurement(ctx, t, n, a)
r.advance()
}
Expand All @@ -147,7 +146,7 @@ func (r *randRes) reset() {
// This maps the uniform random number in (0,1) to a geometric distribution
// over the same interval. The mean of the distribution is inversely
// proportional to the storage capacity.
r.w = math.Exp(math.Log(random()) / float64(cap(r.store)))
r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))

r.advance()
}
Expand All @@ -167,7 +166,7 @@ func (r *randRes) advance() {
// therefore the next r.w will be based on the same distribution (i.e.
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
r.w *= math.Exp(math.Log(random()) / float64(cap(r.store)))
r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))
// Use the new random number in the series to calculate the count of the
// next measurement that will be stored.
//
Expand All @@ -178,7 +177,7 @@ func (r *randRes) advance() {
//
// Important to note, the new r.next will always be at least 1 more than
// the last r.next.
r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
}

func (r *randRes) Collect(dest *[]Exemplar) {
Expand Down
22 changes: 5 additions & 17 deletions sdk/metric/internal/exemplar/rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package exemplar
import (
"context"
"math"
"math/rand"
"slices"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -27,10 +28,12 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
intensity := 0.1
sampleSize := 1000

rng := rand.New(rand.NewSource(time.Now().UnixNano()))

data := make([]float64, sampleSize*1000)
for i := range data {
// Generate exponentially distributed data.
data[i] = (-1.0 / intensity) * math.Log(random())
data[i] = (-1.0 / intensity) * math.Log(rng.Float64())
}
// Sort to test position bias.
slices.Sort(data)
Expand All @@ -50,18 +53,3 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
// ensuring no bias in our random sampling algorithm.
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
}

func TestRandomConcurrentSafe(t *testing.T) {
const goRoutines = 10

var wg sync.WaitGroup
for n := 0; n < goRoutines; n++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = random()
}()
}

wg.Wait()
}
Loading