From 42fd8fe325c125a40a21796c3f14b599e5e5f586 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 16 Sep 2024 07:31:15 -0700 Subject: [PATCH] Move global random number generator to `randRes` field (#5819) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of using a global random number generator for all `randRes`, have each value use its own. This removes the need for locking and managing concurrent safe access to the global. Also, the field, given the `Reservoir` type is not concurrent safe and the metric pipeline guards this, does not need a `sync.Mutex` to guard it. Supersedes https://github.com/open-telemetry/opentelemetry-go/pull/5815 Fix #5814 ### Performance Analysis This change has approximately equivalent performance as the existing code based on existing benchmarks. ```terminal goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/metric cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz │ old.txt │ new.txt │ │ sec/op │ sec/op vs base │ Exemplars/Int64Counter/8-8 14.00µ ± 3% 13.44µ ± 4% -3.98% (p=0.001 n=10) │ old.txt │ new.txt │ │ B/op │ B/op vs base │ Exemplars/Int64Counter/8-8 3.791Ki ± 0% 3.791Ki ± 0% ~ (p=1.000 n=10) ¹ ¹ all samples are equal │ old.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ Exemplars/Int64Counter/8-8 84.00 ± 0% 84.00 ± 0% ~ (p=1.000 n=10) ¹ ¹ all samples are equal ``` --- CHANGELOG.md | 4 ++ sdk/metric/exemplar_test.go | 63 ++++++++++++++++++ sdk/metric/internal/exemplar/rand.go | 81 +++++++++++------------ sdk/metric/internal/exemplar/rand_test.go | 22 ++---- 4 files changed, 112 insertions(+), 58 deletions(-) create mode 100644 sdk/metric/exemplar_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8261f72ca3c..fe29d0a6e7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `Logger.Enabled` in `go.opentelemetry.io/otel/log` now accepts a newly introduced `EnabledParameters` type instead of `Record`. (#5791) - `FilterProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log/internal/x` now accepts `EnabledParameters` instead of `Record`. (#5791) +### Fixed + +- The race condition for multiple `FixedSize` exemplar reservoirs identified in #5814 is resolved. (#5819) + diff --git a/sdk/metric/exemplar_test.go b/sdk/metric/exemplar_test.go new file mode 100644 index 00000000000..8a0529e9d93 --- /dev/null +++ b/sdk/metric/exemplar_test.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "context" + "runtime" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestFixedSizeExemplarConcurrentSafe(t *testing.T) { + // Tests https://github.com/open-telemetry/opentelemetry-go/issues/5814 + + t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_on") + + r := NewManualReader() + m := NewMeterProvider(WithReader(r)).Meter("exemplar-concurrency") + // Use two instruments to get concurrent access to any shared globals. + i0, err := m.Int64Counter("counter.0") + require.NoError(t, err) + i1, err := m.Int64Counter("counter.1") + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + + add := func() { + i0.Add(ctx, 1) + i1.Add(ctx, 2) + } + + goRoutines := max(10, runtime.NumCPU()) + + var wg sync.WaitGroup + for n := 0; n < goRoutines; n++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + require.NotPanics(t, add) + } + } + }() + } + + const collections = 100 + var rm metricdata.ResourceMetrics + for c := 0; c < collections; c++ { + require.NotPanics(t, func() { _ = r.Collect(ctx, &rm) }) + } + + cancel() + wg.Wait() +} diff --git a/sdk/metric/internal/exemplar/rand.go b/sdk/metric/internal/exemplar/rand.go index 8cbcff2404a..fd10b8ea19c 100644 --- a/sdk/metric/internal/exemplar/rand.go +++ b/sdk/metric/internal/exemplar/rand.go @@ -7,25 +7,50 @@ import ( "context" "math" "math/rand" - "sync" "time" "go.opentelemetry.io/otel/attribute" ) -var ( +// FixedSize returns a [Reservoir] that samples at most k exemplars. If there +// are k or less measurements made, the Reservoir will sample each one. If +// there are more than k, the Reservoir will then randomly sample all +// additional measurement with a decreasing probability. +func FixedSize(k int) Reservoir { + return newRandRes(newStorage(k)) +} + +type randRes struct { + *storage + + // count is the number of measurement seen. + count int64 + // next is the next count that will store a measurement at a random index + // once the reservoir has been filled. + next int64 + // w is the largest random number in a distribution that is used to compute + // the next next. + w float64 + // rng is used to make sampling decisions. // // Do not use crypto/rand. There is no reason for the decrease in performance // given this is not a security sensitive decision. - rng = rand.New(rand.NewSource(time.Now().UnixNano())) - // Ensure concurrent safe access to rng and its underlying source. - rngMu sync.Mutex -) + rng *rand.Rand +} -// random returns, as a float64, a uniform pseudo-random number in the open -// interval (0.0,1.0). -func random() float64 { +func newRandRes(s *storage) *randRes { + r := &randRes{ + storage: s, + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + } + r.reset() + return r +} + +// randomFloat64 returns, as a float64, a uniform pseudo-random number in the +// open interval (0.0,1.0). +func (r *randRes) randomFloat64() float64 { // TODO: This does not return a uniform number. rng.Float64 returns a // uniformly random int in [0,2^53) that is divided by 2^53. Meaning it // returns multiples of 2^-53, and not all floating point numbers between 0 @@ -43,39 +68,13 @@ func random() float64 { // // There are likely many other methods to explore here as well. - rngMu.Lock() - defer rngMu.Unlock() - - f := rng.Float64() + f := r.rng.Float64() for f == 0 { - f = rng.Float64() + f = r.rng.Float64() } return f } -// FixedSize returns a [Reservoir] that samples at most k exemplars. If there -// are k or less measurements made, the Reservoir will sample each one. If -// there are more than k, the Reservoir will then randomly sample all -// additional measurement with a decreasing probability. -func FixedSize(k int) Reservoir { - r := &randRes{storage: newStorage(k)} - r.reset() - return r -} - -type randRes struct { - *storage - - // count is the number of measurement seen. - count int64 - // next is the next count that will store a measurement at a random index - // once the reservoir has been filled. - next int64 - // w is the largest random number in a distribution that is used to compute - // the next next. - w float64 -} - func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) { // The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December // 1994). "Reservoir-Sampling Algorithms of Time Complexity @@ -123,7 +122,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute } else { if r.count == r.next { // Overwrite a random existing measurement with the one offered. - idx := int(rng.Int63n(int64(cap(r.store)))) + idx := int(r.rng.Int63n(int64(cap(r.store)))) r.store[idx] = newMeasurement(ctx, t, n, a) r.advance() } @@ -147,7 +146,7 @@ func (r *randRes) reset() { // This maps the uniform random number in (0,1) to a geometric distribution // over the same interval. The mean of the distribution is inversely // proportional to the storage capacity. - r.w = math.Exp(math.Log(random()) / float64(cap(r.store))) + r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store))) r.advance() } @@ -167,7 +166,7 @@ func (r *randRes) advance() { // therefore the next r.w will be based on the same distribution (i.e. // `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by // computing the next random number `u` and take r.w as `w * u^(1/k)`. - r.w *= math.Exp(math.Log(random()) / float64(cap(r.store))) + r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store))) // Use the new random number in the series to calculate the count of the // next measurement that will be stored. // @@ -178,7 +177,7 @@ func (r *randRes) advance() { // // Important to note, the new r.next will always be at least 1 more than // the last r.next. - r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1 + r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1 } func (r *randRes) Collect(dest *[]Exemplar) { diff --git a/sdk/metric/internal/exemplar/rand_test.go b/sdk/metric/internal/exemplar/rand_test.go index a4c42dcf72c..f9e1a847523 100644 --- a/sdk/metric/internal/exemplar/rand_test.go +++ b/sdk/metric/internal/exemplar/rand_test.go @@ -6,9 +6,10 @@ package exemplar import ( "context" "math" + "math/rand" "slices" - "sync" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -27,10 +28,12 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) { intensity := 0.1 sampleSize := 1000 + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + data := make([]float64, sampleSize*1000) for i := range data { // Generate exponentially distributed data. - data[i] = (-1.0 / intensity) * math.Log(random()) + data[i] = (-1.0 / intensity) * math.Log(rng.Float64()) } // Sort to test position bias. slices.Sort(data) @@ -50,18 +53,3 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) { // ensuring no bias in our random sampling algorithm. assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ. } - -func TestRandomConcurrentSafe(t *testing.T) { - const goRoutines = 10 - - var wg sync.WaitGroup - for n := 0; n < goRoutines; n++ { - wg.Add(1) - go func() { - defer wg.Done() - _ = random() - }() - } - - wg.Wait() -}