From 3a833d196b0dcc5831d882441b10b4c2e66a1b72 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 7 Oct 2025 16:00:49 +0000 Subject: [PATCH 01/15] rename valueMap in sum to sumValueMap --- sdk/metric/internal/aggregate/sum.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 81690855114..66cb68085fd 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -17,12 +17,12 @@ type sumValue[N int64 | float64] struct { attrs attribute.Set } -type valueMap[N int64 | float64] struct { +type sumValueMap[N int64 | float64] struct { values limitedSyncMap newRes func(attribute.Set) FilteredExemplarReservoir[N] } -func (s *valueMap[N]) measure( +func (s *sumValueMap[N]) measure( ctx context.Context, value N, fltrAttr attribute.Set, @@ -52,7 +52,7 @@ func newDeltaSum[N int64 | float64]( return &deltaSum[N]{ monotonic: monotonic, start: now(), - hotColdValMap: [2]valueMap[N]{ + hotColdValMap: [2]sumValueMap[N]{ { values: limitedSyncMap{aggLimit: limit}, newRes: r, @@ -71,7 +71,7 @@ type deltaSum[N int64 | float64] struct { start time.Time hcwg hotColdWaitGroup - hotColdValMap [2]valueMap[N] + hotColdValMap [2]sumValueMap[N] } func (s *deltaSum[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { @@ -130,7 +130,7 @@ func newCumulativeSum[N int64 | float64]( return &cumulativeSum[N]{ monotonic: monotonic, start: now(), - valueMap: valueMap[N]{ + sumValueMap: sumValueMap[N]{ values: limitedSyncMap{aggLimit: limit}, newRes: r, }, @@ -142,7 +142,7 @@ type cumulativeSum[N int64 | float64] struct { monotonic bool start time.Time - valueMap[N] + sumValueMap[N] } func (s *cumulativeSum[N]) collect( From 6e4793679ccac12b11d0c348f04779844b2fbdc9 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 7 Oct 2025 16:56:28 +0000 Subject: [PATCH 02/15] use sync Map for explicit bucket histogram aggregation --- sdk/metric/internal/aggregate/aggregate.go | 7 +- sdk/metric/internal/aggregate/histogram.go | 235 ++++++++++-------- .../internal/aggregate/histogram_test.go | 34 ++- 3 files changed, 151 insertions(+), 125 deletions(-) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 2b60410801b..afaefb63f21 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -126,12 +126,13 @@ func (b Builder[N]) ExplicitBucketHistogram( boundaries []float64, noMinMax, noSum bool, ) (Measure[N], ComputeAggregation) { - h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: - return b.filter(h.measure), h.delta + h := newDeltaHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) + return b.filter(h.measure), h.collect default: - return b.filter(h.measure), h.cumulative + h := newCumulativeHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) + return b.filter(h.measure), h.collect } } diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index a094519cf6d..e4b47dba756 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -14,7 +14,8 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -type buckets[N int64 | float64] struct { +type histogramPoint[N int64 | float64] struct { + sync.Mutex attrs attribute.Set res FilteredExemplarReservoir[N] @@ -24,19 +25,14 @@ type buckets[N int64 | float64] struct { min, max N } -// newBuckets returns buckets with n bins. -func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] { - return &buckets[N]{attrs: attrs, counts: make([]uint64, n)} -} - -func (b *buckets[N]) sum(value N) { b.total += value } +func (b *histogramPoint[N]) sum(value N) { b.total += value } -func (b *buckets[N]) bin(idx int) { +func (b *histogramPoint[N]) bin(idx int) { b.counts[idx]++ b.count++ } -func (b *buckets[N]) minMax(value N) { +func (b *histogramPoint[N]) minMax(value N) { if value < b.min { b.min = value } else if value > b.max { @@ -44,114 +40,104 @@ func (b *buckets[N]) minMax(value N) { } } -// histValues summarizes a set of measurements as an histValues with +// histogramValueMap summarizes a set of measurements as an histogramValueMap with // explicitly defined buckets. -type histValues[N int64 | float64] struct { +type histogramValueMap[N int64 | float64] struct { noMinMax bool noSum bool bounds []float64 - newRes func(attribute.Set) FilteredExemplarReservoir[N] - limit limiter[buckets[N]] - values map[attribute.Distinct]*buckets[N] - valuesMu sync.Mutex + newRes func(attribute.Set) FilteredExemplarReservoir[N] + values limitedSyncMap } -func newHistValues[N int64 | float64]( +func newHistogramValueMap[N int64 | float64]( bounds []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], -) *histValues[N] { - // The responsibility of keeping all buckets correctly associated with the +) histogramValueMap[N] { + // The responsibility of keeping all histogramPoint correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have // complete control over the fix. b := slices.Clone(bounds) slices.Sort(b) - return &histValues[N]{ + return histogramValueMap[N]{ noMinMax: noMinMax, noSum: noSum, bounds: b, newRes: r, - limit: newLimiter[buckets[N]](limit), - values: make(map[attribute.Distinct]*buckets[N]), + values: limitedSyncMap{aggLimit: limit}, } } -// Aggregate records the measurement value, scoped by attr, and aggregates it -// into a histogram. -func (s *histValues[N]) measure( - ctx context.Context, - value N, - fltrAttr attribute.Set, - droppedAttr []attribute.KeyValue, -) { +func (s *histogramValueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { // This search will return an index in the range [0, len(s.bounds)], where // it will return len(s.bounds) if value is greater than the last element - // of s.bounds. This aligns with the buckets in that the length of buckets + // of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint // is len(s.bounds)+1, with the last bucket representing: // (s.bounds[len(s.bounds)-1], +∞). idx := sort.SearchFloat64s(s.bounds, float64(value)) - - s.valuesMu.Lock() - defer s.valuesMu.Unlock() - - b, ok := s.values[fltrAttr.Equivalent()] - if !ok { - fltrAttr = s.limit.Attributes(fltrAttr, s.values) - // If we overflowed, make sure we add to the existing overflow series - // if it already exists. - b, ok = s.values[fltrAttr.Equivalent()] - if !ok { - // N+1 buckets. For example: - // - // bounds = [0, 5, 10] - // - // Then, - // - // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) - b = newBuckets[N](fltrAttr, len(s.bounds)+1) - b.res = s.newRes(fltrAttr) - - // Ensure min and max are recorded values (not zero), for new buckets. - b.min, b.max = value, value - s.values[fltrAttr.Equivalent()] = b + h := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + return &histogramPoint[N]{ + res: s.newRes(attr), + attrs: attr, + min: value, + max: value, + counts: make([]uint64, len(s.bounds)+1), } - } - b.bin(idx) + }).(*histogramPoint[N]) + h.Lock() + defer h.Unlock() + + h.bin(idx) if !s.noMinMax { - b.minMax(value) + h.minMax(value) } if !s.noSum { - b.sum(value) + h.sum(value) } - b.res.Offer(ctx, value, droppedAttr) + h.res.Offer(ctx, value, droppedAttr) +} + +// deltaHistogram TODO +type deltaHistogram[N int64 | float64] struct { + hcwg hotColdWaitGroup + hotColdValMap [2]histogramValueMap[N] + + start time.Time } -// newHistogram returns an Aggregator that summarizes a set of measurements as -// an histogram. -func newHistogram[N int64 | float64]( +// measure TODO +func (s *deltaHistogram[N]) measure( + ctx context.Context, + value N, + fltrAttr attribute.Set, + droppedAttr []attribute.KeyValue, +) { + hotIdx := s.hcwg.start() + defer s.hcwg.done(hotIdx) + s.hotColdValMap[hotIdx].measure(ctx, value, fltrAttr, droppedAttr) +} + +// newDeltaHistogram TODO +func newDeltaHistogram[N int64 | float64]( boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], -) *histogram[N] { - return &histogram[N]{ - histValues: newHistValues[N](boundaries, noMinMax, noSum, limit, r), - start: now(), +) *deltaHistogram[N] { + return &deltaHistogram[N]{ + start: now(), + hotColdValMap: [2]histogramValueMap[N]{ + newHistogramValueMap(boundaries, noMinMax, noSum, limit, r), + newHistogramValueMap(boundaries, noMinMax, noSum, limit, r), + }, } } -// histogram summarizes a set of measurements as an histogram with explicitly -// defined buckets. -type histogram[N int64 | float64] struct { - *histValues[N] - - start time.Time -} - -func (s *histogram[N]) delta( +func (s *deltaHistogram[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() @@ -161,17 +147,22 @@ func (s *histogram[N]) delta( h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.DeltaTemporality - s.valuesMu.Lock() - defer s.valuesMu.Unlock() + // delta always clears values on collection + readIdx := s.hcwg.swapHotAndWait() // Do not allow modification of our copy of bounds. - bounds := slices.Clone(s.bounds) + bounds := slices.Clone(s.hotColdValMap[readIdx].bounds) - n := len(s.values) + // The len will not change while we iterate over values, since we waited + // for all writes to finish to the cold values and len. + n := s.hotColdValMap[readIdx].values.Len() hDPts := reset(h.DataPoints, n, n) var i int - for _, val := range s.values { + s.hotColdValMap[readIdx].values.Range(func(key, value any) bool { + val := value.(*histogramPoint[N]) + val.Lock() + defer val.Unlock() hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t @@ -179,11 +170,11 @@ func (s *histogram[N]) delta( hDPts[i].Bounds = bounds hDPts[i].BucketCounts = val.counts - if !s.noSum { + if !s.hotColdValMap[readIdx].noSum { hDPts[i].Sum = val.total } - if !s.noMinMax { + if !s.hotColdValMap[readIdx].noMinMax { hDPts[i].Min = metricdata.NewExtrema(val.min) hDPts[i].Max = metricdata.NewExtrema(val.max) } @@ -191,9 +182,10 @@ func (s *histogram[N]) delta( collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ - } + return true + }) // Unused attribute sets do not report. - clear(s.values) + s.hotColdValMap[readIdx].values.Clear() // The delta collection cycle resets. s.start = t @@ -203,7 +195,28 @@ func (s *histogram[N]) delta( return n } -func (s *histogram[N]) cumulative( +// cumulativeHistogram summarizes a set of measurements as an histogram with explicitly +// defined histogramPoint. +type cumulativeHistogram[N int64 | float64] struct { + histogramValueMap[N] + + start time.Time +} + +// newDeltaHistogram TODO +func newCumulativeHistogram[N int64 | float64]( + boundaries []float64, + noMinMax, noSum bool, + limit int, + r func(attribute.Set) FilteredExemplarReservoir[N], +) *cumulativeHistogram[N] { + return &cumulativeHistogram[N]{ + start: now(), + histogramValueMap: newHistogramValueMap(boundaries, noMinMax, noSum, limit, r), + } +} + +func (s *cumulativeHistogram[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() @@ -213,50 +226,54 @@ func (s *histogram[N]) cumulative( h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.CumulativeTemporality - s.valuesMu.Lock() - defer s.valuesMu.Unlock() - // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) - n := len(s.values) - hDPts := reset(h.DataPoints, n, n) + // Values are being concurrently written while we iterate, so only use the + // current length for capacity. + hDPts := reset(h.DataPoints, 0, s.values.Len()) var i int - for _, val := range s.values { - hDPts[i].Attributes = val.attrs - hDPts[i].StartTime = s.start - hDPts[i].Time = t - hDPts[i].Count = val.count - hDPts[i].Bounds = bounds - - // The HistogramDataPoint field values returned need to be copies of - // the buckets value as we will keep updating them. - // - // TODO (#3047): Making copies for bounds and counts incurs a large - // memory allocation footprint. Alternatives should be explored. - hDPts[i].BucketCounts = slices.Clone(val.counts) + s.values.Range(func(key, value any) bool { + val := value.(*histogramPoint[N]) + val.Lock() + defer val.Unlock() + newPt := metricdata.HistogramDataPoint[N]{ + Attributes: val.attrs, + StartTime: s.start, + Time: t, + Count: val.count, + Bounds: bounds, + // The HistogramDataPoint field values returned need to be copies of + // the histogramPoint value as we will keep updating them. + // + // TODO (#3047): Making copies for bounds and counts incurs a large + // memory allocation footprint. Alternatives should be explored. + BucketCounts: slices.Clone(val.counts), + } if !s.noSum { - hDPts[i].Sum = val.total + newPt.Sum = val.total } if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + newPt.Min = metricdata.NewExtrema(val.min) + newPt.Max = metricdata.NewExtrema(val.max) } - collectExemplars(&hDPts[i].Exemplars, val.res.Collect) + collectExemplars(&newPt.Exemplars, val.res.Collect) + hDPts = append(hDPts, newPt) i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. - } + return true + }) h.DataPoints = hDPts *dest = h - return n + return i } diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 6e0f3948de0..a427e4fc23c 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -22,6 +22,10 @@ var ( noMinMax = false ) +func newHistogramPoint[N int64 | float64](attrs attribute.Set, n int) *histogramPoint[N] { + return &histogramPoint[N]{attrs: attrs, counts: make([]uint64, n)} +} + func TestHistogram(t *testing.T) { c := new(clock) t.Cleanup(c.Register()) @@ -337,7 +341,7 @@ func TestBucketsBin(t *testing.T) { func testBucketsBin[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { - b := newBuckets[N](alice, 3) + b := newHistogramPoint[N](alice, 3) assertB := func(counts []uint64, count uint64, mi, ma N) { t.Helper() assert.Equal(t, counts, b.counts) @@ -363,7 +367,7 @@ func TestBucketsSum(t *testing.T) { func testBucketsSum[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { - b := newBuckets[N](alice, 3) + b := newHistogramPoint[N](alice, 3) var want N assert.Equal(t, want, b.total) @@ -383,7 +387,7 @@ func TestHistogramImmutableBounds(t *testing.T) { cpB := make([]float64, len(b)) copy(cpB, b) - h := newHistogram[int64](b, false, false, 0, dropExemplars[int64]) + h := newCumulativeHistogram[int64](b, false, false, 0, dropExemplars[int64]) require.Equal(t, cpB, h.bounds) b[0] = 10 @@ -392,29 +396,33 @@ func TestHistogramImmutableBounds(t *testing.T) { h.measure(t.Context(), 5, alice, nil) var data metricdata.Aggregation = metricdata.Histogram[int64]{} - h.cumulative(&data) + h.collect(&data) hdp := data.(metricdata.Histogram[int64]).DataPoints[0] hdp.Bounds[1] = 10 assert.Equal(t, cpB, h.bounds, "modifying the Aggregation bounds should not change the bounds") } func TestCumulativeHistogramImmutableCounts(t *testing.T) { - h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) + h := newCumulativeHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) h.measure(t.Context(), 5, alice, nil) var data metricdata.Aggregation = metricdata.Histogram[int64]{} - h.cumulative(&data) + h.collect(&data) hdp := data.(metricdata.Histogram[int64]).DataPoints[0] - require.Equal(t, hdp.BucketCounts, h.values[alice.Equivalent()].counts) + hPt, ok := h.values.Load(alice.Equivalent()) + require.True(t, ok) + require.Equal(t, hdp.BucketCounts, hPt.(*histogramPoint[int64]).counts) cpCounts := make([]uint64, len(hdp.BucketCounts)) copy(cpCounts, hdp.BucketCounts) hdp.BucketCounts[0] = 10 + hPt, ok = h.values.Load(alice.Equivalent()) + require.True(t, ok) assert.Equal( t, cpCounts, - h.values[alice.Equivalent()].counts, + hPt.(*histogramPoint[int64]).counts, "modifying the Aggregator bucket counts should not change the Aggregator", ) } @@ -424,28 +432,28 @@ func TestDeltaHistogramReset(t *testing.T) { now = func() time.Time { return y2k } t.Cleanup(func() { now = orig }) - h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) + h := newDeltaHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) var data metricdata.Aggregation = metricdata.Histogram[int64]{} - require.Equal(t, 0, h.delta(&data)) + require.Equal(t, 0, h.collect(&data)) require.Empty(t, data.(metricdata.Histogram[int64]).DataPoints) h.measure(t.Context(), 1, alice, nil) expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality} expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1, now(), now())} - h.delta(&data) + h.collect(&data) metricdatatest.AssertAggregationsEqual(t, expect, data) // The attr set should be forgotten once Aggregations is called. expect.DataPoints = nil - assert.Equal(t, 0, h.delta(&data)) + assert.Equal(t, 0, h.collect(&data)) assert.Empty(t, data.(metricdata.Histogram[int64]).DataPoints) // Aggregating another set should not affect the original (alice). h.measure(t.Context(), 1, bob, nil) expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1, now(), now())} - h.delta(&data) + h.collect(&data) metricdatatest.AssertAggregationsEqual(t, expect, data) } From 104cb3bc3583b04b162bbd89e7b174e74d684e89 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 8 Oct 2025 18:24:48 +0000 Subject: [PATCH 03/15] use atomics for explicit bucket histogram internals --- CHANGELOG.md | 2 +- sdk/metric/internal/aggregate/atomic.go | 63 ++++ sdk/metric/internal/aggregate/histogram.go | 291 +++++++++++------- .../internal/aggregate/histogram_test.go | 52 +++- 4 files changed, 283 insertions(+), 125 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0393e1a4ad..50070f1e8ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,7 +56,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Include W3C TraceFlags (bits 0–7) in the OTLP `Span.Flags` field in `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracehttp` and `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracegrpc`. (#7438) - The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types. If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442) -- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427) +- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427, #7474) diff --git a/sdk/metric/internal/aggregate/atomic.go b/sdk/metric/internal/aggregate/atomic.go index 0fa6d3c6fa8..2d849a66654 100644 --- a/sdk/metric/internal/aggregate/atomic.go +++ b/sdk/metric/internal/aggregate/atomic.go @@ -51,6 +51,69 @@ func (n *atomicCounter[N]) add(value N) { } } +// reset resets the internal state, and is not safe to call concurrently. +func (n *atomicCounter[N]) reset() { + n.nFloatBits.Store(0) + n.nInt.Store(0) +} + +// atomicIntOrFloat is an atomic type that can be an int64 or float64. +type atomicIntOrFloat[N int64 | float64] struct { + // nFloatBits contains the float bits if N is float64. + nFloatBits atomic.Uint64 + // nInt contains the int64 if N is int64 + nInt atomic.Int64 +} + +func (n *atomicIntOrFloat[N]) load() (value N) { + switch any(value).(type) { + case int64: + value = N(n.nInt.Load()) + case float64: + value = N(math.Float64frombits(n.nFloatBits.Load())) + } + return value +} + +func (n *atomicIntOrFloat[N]) compareAndSwap(oldVal, newVal N) bool { + switch any(oldVal).(type) { + case float64: + return n.nFloatBits.CompareAndSwap(math.Float64bits(float64(oldVal)), math.Float64bits(float64(newVal))) + default: + return n.nInt.CompareAndSwap(int64(oldVal), int64(newVal)) + } +} + +type atomicMinMax[N int64 | float64] struct { + minimum atomicIntOrFloat[N] + maximum atomicIntOrFloat[N] + isSet atomic.Bool +} + +func (n *atomicMinMax[N]) observe(value N) { + isSet := n.isSet.Load() + for { + minLoaded := n.minimum.load() + if ((!isSet && minLoaded == 0) || value < minLoaded) && !n.minimum.compareAndSwap(minLoaded, value) { + // We got a new min value, but lost the race. Try again. + continue + } + maxLoaded := n.maximum.load() + if ((!isSet && minLoaded == 0) || value > maxLoaded) && !n.maximum.compareAndSwap(maxLoaded, value) { + // We got a new max value, but lost the race. Try again. + continue + } + break + } + if !isSet { + n.isSet.Store(true) + } +} + +func (n *atomicMinMax[N]) load() (minimum, maximum N, ok bool) { + return n.minimum.load(), n.maximum.load(), n.isSet.Load() +} + // hotColdWaitGroup is a synchronization primitive which enables lockless // writes for concurrent writers and enables a reader to acquire exclusive // access to a snapshot of state including only completed operations. diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index e4b47dba756..af9603a5721 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -7,7 +7,7 @@ import ( "context" "slices" "sort" - "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" @@ -15,101 +15,85 @@ import ( ) type histogramPoint[N int64 | float64] struct { - sync.Mutex attrs attribute.Set res FilteredExemplarReservoir[N] + histogramPointCounters[N] +} - counts []uint64 - count uint64 - total N - min, max N +// histogramPointCounters contains only the atomic counter data, and is used by +// both histogramPoint and hotColdHistogramPoint. +type histogramPointCounters[N int64 | float64] struct { + counts []atomic.Uint64 + total atomicCounter[N] + minMax atomicMinMax[N] } -func (b *histogramPoint[N]) sum(value N) { b.total += value } +func (b *histogramPointCounters[N]) sum(value N) { b.total.add(value) } -func (b *histogramPoint[N]) bin(idx int) { - b.counts[idx]++ - b.count++ +func (b *histogramPointCounters[N]) bin(bounds []float64, value N) { + // This search will return an index in the range [0, len(s.bounds)], where + // it will return len(s.bounds) if value is greater than the last element + // of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint + // is len(s.bounds)+1, with the last bucket representing: + // (s.bounds[len(s.bounds)-1], +∞). + idx := sort.SearchFloat64s(bounds, float64(value)) + b.counts[idx].Add(1) } -func (b *histogramPoint[N]) minMax(value N) { - if value < b.min { - b.min = value - } else if value > b.max { - b.max = value +func (b *histogramPointCounters[N]) loadCounts() ([]uint64, uint64) { + // TODO (#3047): Making copies for bounds and counts incurs a large + // memory allocation footprint. Alternatives should be explored. + counts := make([]uint64, len(b.counts)) + count := uint64(0) + for i := range counts { + c := b.counts[i].Load() + counts[i] = c + count += c } + return counts, count } -// histogramValueMap summarizes a set of measurements as an histogramValueMap with -// explicitly defined buckets. -type histogramValueMap[N int64 | float64] struct { - noMinMax bool - noSum bool - bounds []float64 - - newRes func(attribute.Set) FilteredExemplarReservoir[N] - values limitedSyncMap -} - -func newHistogramValueMap[N int64 | float64]( - bounds []float64, +// mergeIntoAndReset merges this set of histogram counter data into another, +// and resets the state of this set of counters. This is used by +// hotColdHistogramPoint to ensure that the cumulative counters continue to +// accumulate after being read. +func (b *histogramPointCounters[N]) mergeIntoAndReset( // nolint:revive // Intentional internal control flag + into *histogramPointCounters[N], noMinMax, noSum bool, - limit int, - r func(attribute.Set) FilteredExemplarReservoir[N], -) histogramValueMap[N] { - // The responsibility of keeping all histogramPoint correctly associated with the - // passed boundaries is ultimately this type's responsibility. Make a copy - // here so we can always guarantee this. Or, in the case of failure, have - // complete control over the fix. - b := slices.Clone(bounds) - slices.Sort(b) - return histogramValueMap[N]{ - noMinMax: noMinMax, - noSum: noSum, - bounds: b, - newRes: r, - values: limitedSyncMap{aggLimit: limit}, +) { + for i := range b.counts { + into.counts[i].Add(b.counts[i].Load()) + b.counts[i].Store(0) } -} -func (s *histogramValueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { - // This search will return an index in the range [0, len(s.bounds)], where - // it will return len(s.bounds) if value is greater than the last element - // of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint - // is len(s.bounds)+1, with the last bucket representing: - // (s.bounds[len(s.bounds)-1], +∞). - idx := sort.SearchFloat64s(s.bounds, float64(value)) - h := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { - return &histogramPoint[N]{ - res: s.newRes(attr), - attrs: attr, - min: value, - max: value, - counts: make([]uint64, len(s.bounds)+1), - } - }).(*histogramPoint[N]) - h.Lock() - defer h.Unlock() + if !noMinMax { + // Do not reset min or max because cumulative min and max only ever grow + // smaller or larger respectively. - h.bin(idx) - if !s.noMinMax { - h.minMax(value) + if minimum, maximum, ok := b.minMax.load(); ok { + into.minMax.observe(minimum) + into.minMax.observe(maximum) + } } - if !s.noSum { - h.sum(value) + if !noSum { + into.total.add(b.total.load()) + b.total.reset() } - h.res.Offer(ctx, value, droppedAttr) } -// deltaHistogram TODO +// deltaHistogram is a histogram whose internal storage is reset when it is +// collected. type deltaHistogram[N int64 | float64] struct { hcwg hotColdWaitGroup - hotColdValMap [2]histogramValueMap[N] + hotColdValMap [2]limitedSyncMap - start time.Time + start time.Time + noMinMax bool + noSum bool + bounds []float64 + newRes func(attribute.Set) FilteredExemplarReservoir[N] } -// measure TODO func (s *deltaHistogram[N]) measure( ctx context.Context, value N, @@ -118,21 +102,48 @@ func (s *deltaHistogram[N]) measure( ) { hotIdx := s.hcwg.start() defer s.hcwg.done(hotIdx) - s.hotColdValMap[hotIdx].measure(ctx, value, fltrAttr, droppedAttr) + h := s.hotColdValMap[hotIdx].LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + hPt := &histogramPoint[N]{ + res: s.newRes(attr), + attrs: attr, + histogramPointCounters: histogramPointCounters[N]{counts: make([]atomic.Uint64, len(s.bounds)+1)}, + } + return hPt + }).(*histogramPoint[N]) + + h.bin(s.bounds, value) + if !s.noMinMax { + h.minMax.observe(value) + } + if !s.noSum { + h.sum(value) + } + h.res.Offer(ctx, value, droppedAttr) } -// newDeltaHistogram TODO +// newDeltaHistogram returns a histogram that is reset each time it is +// collected. func newDeltaHistogram[N int64 | float64]( boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], ) *deltaHistogram[N] { + // The responsibility of keeping all histogramPoint correctly associated with the + // passed boundaries is ultimately this type's responsibility. Make a copy + // here so we can always guarantee this. Or, in the case of failure, have + // complete control over the fix. + b := slices.Clone(boundaries) + slices.Sort(b) return &deltaHistogram[N]{ - start: now(), - hotColdValMap: [2]histogramValueMap[N]{ - newHistogramValueMap(boundaries, noMinMax, noSum, limit, r), - newHistogramValueMap(boundaries, noMinMax, noSum, limit, r), + start: now(), + noMinMax: noMinMax, + noSum: noSum, + bounds: b, + newRes: r, + hotColdValMap: [2]limitedSyncMap{ + {aggLimit: limit}, + {aggLimit: limit}, }, } } @@ -151,32 +162,33 @@ func (s *deltaHistogram[N]) collect( readIdx := s.hcwg.swapHotAndWait() // Do not allow modification of our copy of bounds. - bounds := slices.Clone(s.hotColdValMap[readIdx].bounds) + bounds := slices.Clone(s.bounds) // The len will not change while we iterate over values, since we waited // for all writes to finish to the cold values and len. - n := s.hotColdValMap[readIdx].values.Len() + n := s.hotColdValMap[readIdx].Len() hDPts := reset(h.DataPoints, n, n) var i int - s.hotColdValMap[readIdx].values.Range(func(key, value any) bool { + s.hotColdValMap[readIdx].Range(func(_, value any) bool { val := value.(*histogramPoint[N]) - val.Lock() - defer val.Unlock() + bucketCounts, count := val.loadCounts() hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t - hDPts[i].Count = val.count + hDPts[i].Count = count hDPts[i].Bounds = bounds - hDPts[i].BucketCounts = val.counts + hDPts[i].BucketCounts = bucketCounts - if !s.hotColdValMap[readIdx].noSum { - hDPts[i].Sum = val.total + if !s.noSum { + hDPts[i].Sum = val.total.load() } - if !s.hotColdValMap[readIdx].noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + if !s.noMinMax { + if minimum, maximum, ok := val.minMax.load(); ok { + hDPts[i].Min = metricdata.NewExtrema(minimum) + hDPts[i].Max = metricdata.NewExtrema(maximum) + } } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) @@ -185,7 +197,7 @@ func (s *deltaHistogram[N]) collect( return true }) // Unused attribute sets do not report. - s.hotColdValMap[readIdx].values.Clear() + s.hotColdValMap[readIdx].Clear() // The delta collection cycle resets. s.start = t @@ -198,24 +210,82 @@ func (s *deltaHistogram[N]) collect( // cumulativeHistogram summarizes a set of measurements as an histogram with explicitly // defined histogramPoint. type cumulativeHistogram[N int64 | float64] struct { - histogramValueMap[N] + values limitedSyncMap - start time.Time + start time.Time + noMinMax bool + noSum bool + bounds []float64 + newRes func(attribute.Set) FilteredExemplarReservoir[N] } -// newDeltaHistogram TODO +// newCumulativeHistogram returns a histogram that accumulates measurements +// into a histogram data structure. It is never reset. func newCumulativeHistogram[N int64 | float64]( boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], ) *cumulativeHistogram[N] { + // The responsibility of keeping all histogramPoint correctly associated with the + // passed boundaries is ultimately this type's responsibility. Make a copy + // here so we can always guarantee this. Or, in the case of failure, have + // complete control over the fix. + b := slices.Clone(boundaries) + slices.Sort(b) return &cumulativeHistogram[N]{ - start: now(), - histogramValueMap: newHistogramValueMap(boundaries, noMinMax, noSum, limit, r), + start: now(), + noMinMax: noMinMax, + noSum: noSum, + bounds: b, + newRes: r, + values: limitedSyncMap{aggLimit: limit}, } } +type hotColdHistogramPoint[N int64 | float64] struct { + hcwg hotColdWaitGroup + hotColdPoint [2]histogramPointCounters[N] + + attrs attribute.Set + res FilteredExemplarReservoir[N] +} + +func (s *cumulativeHistogram[N]) measure( + ctx context.Context, + value N, + fltrAttr attribute.Set, + droppedAttr []attribute.KeyValue, +) { + h := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + hPt := &hotColdHistogramPoint[N]{ + res: s.newRes(attr), + attrs: attr, + hotColdPoint: [2]histogramPointCounters[N]{ + { + counts: make([]atomic.Uint64, len(s.bounds)+1), + }, + { + counts: make([]atomic.Uint64, len(s.bounds)+1), + }, + }, + } + return hPt + }).(*hotColdHistogramPoint[N]) + + hotIdx := h.hcwg.start() + defer h.hcwg.done(hotIdx) + + h.hotColdPoint[hotIdx].bin(s.bounds, value) + if !s.noMinMax { + h.hotColdPoint[hotIdx].minMax.observe(value) + } + if !s.noSum { + h.hotColdPoint[hotIdx].sum(value) + } + h.res.Offer(ctx, value, droppedAttr) +} + func (s *cumulativeHistogram[N]) collect( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { @@ -234,32 +304,35 @@ func (s *cumulativeHistogram[N]) collect( hDPts := reset(h.DataPoints, 0, s.values.Len()) var i int - s.values.Range(func(key, value any) bool { - val := value.(*histogramPoint[N]) - val.Lock() - defer val.Unlock() + s.values.Range(func(_, value any) bool { + val := value.(*hotColdHistogramPoint[N]) + // swap, observe, and clear the point + readIdx := val.hcwg.swapHotAndWait() + bucketCounts, count := val.hotColdPoint[readIdx].loadCounts() newPt := metricdata.HistogramDataPoint[N]{ Attributes: val.attrs, StartTime: s.start, Time: t, - Count: val.count, + Count: count, Bounds: bounds, // The HistogramDataPoint field values returned need to be copies of // the histogramPoint value as we will keep updating them. - // - // TODO (#3047): Making copies for bounds and counts incurs a large - // memory allocation footprint. Alternatives should be explored. - BucketCounts: slices.Clone(val.counts), + BucketCounts: bucketCounts, } if !s.noSum { - newPt.Sum = val.total + newPt.Sum = val.hotColdPoint[readIdx].total.load() } - if !s.noMinMax { - newPt.Min = metricdata.NewExtrema(val.min) - newPt.Max = metricdata.NewExtrema(val.max) + if minimum, maximum, ok := val.hotColdPoint[readIdx].minMax.load(); ok { + newPt.Min = metricdata.NewExtrema(minimum) + newPt.Max = metricdata.NewExtrema(maximum) + } } + // Once we've read the point, merge it back into the hot histogram + // point since it is cumulative. + hotIdx := (readIdx + 1) % 2 + val.hotColdPoint[readIdx].mergeIntoAndReset(&val.hotColdPoint[hotIdx], s.noMinMax, s.noSum) collectExemplars(&newPt.Exemplars, val.res.Collect) hDPts = append(hDPts, newPt) diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index a427e4fc23c..2bd43562c54 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -6,6 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" "sort" + "sync/atomic" "testing" "time" @@ -23,7 +24,11 @@ var ( ) func newHistogramPoint[N int64 | float64](attrs attribute.Set, n int) *histogramPoint[N] { - return &histogramPoint[N]{attrs: attrs, counts: make([]uint64, n)} + hPt := &histogramPoint[N]{ + attrs: attrs, + histogramPointCounters: histogramPointCounters[N]{counts: make([]atomic.Uint64, n)}, + } + return hPt } func TestHistogram(t *testing.T) { @@ -342,20 +347,29 @@ func TestBucketsBin(t *testing.T) { func testBucketsBin[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { b := newHistogramPoint[N](alice, 3) - assertB := func(counts []uint64, count uint64, mi, ma N) { + assertB := func(expectedBucketCounts []uint64, expectedCount uint64, mi, ma N) { t.Helper() - assert.Equal(t, counts, b.counts) - assert.Equal(t, count, b.count) - assert.Equal(t, mi, b.min) - assert.Equal(t, ma, b.max) + bucketCounts, count := b.loadCounts() + assert.Equal(t, expectedBucketCounts, bucketCounts) + assert.Equal(t, expectedCount, count) + minimum, maximum, ok := b.minMax.load() + if mi != 0 { + assert.True(t, ok) + assert.Equal(t, mi, minimum) + } + if ma != 0 { + assert.True(t, ok) + assert.Equal(t, ma, maximum) + } } + bounds := []float64{0, 2, 4} assertB([]uint64{0, 0, 0}, 0, 0, 0) - b.bin(1) - b.minMax(2) + b.bin(bounds, 1) + b.minMax.observe(2) assertB([]uint64{0, 1, 0}, 1, 0, 2) - b.bin(0) - b.minMax(-1) + b.bin(bounds, -1) + b.minMax.observe(-1) assertB([]uint64{1, 1, 0}, 2, -1, 2) } } @@ -370,15 +384,15 @@ func testBucketsSum[N int64 | float64]() func(t *testing.T) { b := newHistogramPoint[N](alice, 3) var want N - assert.Equal(t, want, b.total) + assert.Equal(t, want, b.total.load()) b.sum(2) want = 2 - assert.Equal(t, want, b.total) + assert.Equal(t, want, b.total.load()) b.sum(-1) want = 1 - assert.Equal(t, want, b.total) + assert.Equal(t, want, b.total.load()) } } @@ -412,17 +426,25 @@ func TestCumulativeHistogramImmutableCounts(t *testing.T) { hPt, ok := h.values.Load(alice.Equivalent()) require.True(t, ok) - require.Equal(t, hdp.BucketCounts, hPt.(*histogramPoint[int64]).counts) + hcHistPt := hPt.(*hotColdHistogramPoint[int64]) + readIdx := hcHistPt.hcwg.swapHotAndWait() + bucketCounts, _ := hcHistPt.hotColdPoint[readIdx].loadCounts() + require.Equal(t, hdp.BucketCounts, bucketCounts) + hotIdx := (readIdx + 1) % 2 + hcHistPt.hotColdPoint[readIdx].mergeIntoAndReset(&hcHistPt.hotColdPoint[hotIdx], noMinMax, false) cpCounts := make([]uint64, len(hdp.BucketCounts)) copy(cpCounts, hdp.BucketCounts) hdp.BucketCounts[0] = 10 hPt, ok = h.values.Load(alice.Equivalent()) require.True(t, ok) + hcHistPt = hPt.(*hotColdHistogramPoint[int64]) + readIdx = hcHistPt.hcwg.swapHotAndWait() + bucketCounts, _ = hcHistPt.hotColdPoint[readIdx].loadCounts() assert.Equal( t, cpCounts, - hPt.(*histogramPoint[int64]).counts, + bucketCounts, "modifying the Aggregator bucket counts should not change the Aggregator", ) } From 6b5eaded8ea4cb7ff1a8425ab0c2b08f55981081 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 10 Oct 2025 19:37:52 +0000 Subject: [PATCH 04/15] fix concurrency bug in atomicMinMax, and add tests --- sdk/metric/internal/aggregate/atomic.go | 98 ++++++++++++------- sdk/metric/internal/aggregate/atomic_test.go | 70 +++++++++++++ sdk/metric/internal/aggregate/histogram.go | 22 ++--- .../internal/aggregate/histogram_test.go | 13 ++- 4 files changed, 150 insertions(+), 53 deletions(-) diff --git a/sdk/metric/internal/aggregate/atomic.go b/sdk/metric/internal/aggregate/atomic.go index 2d849a66654..eb95c32d68e 100644 --- a/sdk/metric/internal/aggregate/atomic.go +++ b/sdk/metric/internal/aggregate/atomic.go @@ -57,61 +57,89 @@ func (n *atomicCounter[N]) reset() { n.nInt.Store(0) } -// atomicIntOrFloat is an atomic type that can be an int64 or float64. -type atomicIntOrFloat[N int64 | float64] struct { - // nFloatBits contains the float bits if N is float64. - nFloatBits atomic.Uint64 - // nInt contains the int64 if N is int64 - nInt atomic.Int64 +// atomicN is a generic atomic number value +type atomicN[N int64 | float64] struct { + val atomic.Uint64 } -func (n *atomicIntOrFloat[N]) load() (value N) { +func (a *atomicN[N]) Load() (value N) { + v := a.val.Load() switch any(value).(type) { case int64: - value = N(n.nInt.Load()) + value = N(v) case float64: - value = N(math.Float64frombits(n.nFloatBits.Load())) + value = N(math.Float64frombits(v)) + default: + panic("unsupported type") } return value } -func (n *atomicIntOrFloat[N]) compareAndSwap(oldVal, newVal N) bool { - switch any(oldVal).(type) { +func (a *atomicN[N]) Store(v N) { + var val uint64 + switch any(v).(type) { + case int64: + val = uint64(v) + case float64: + val = math.Float64bits(float64(v)) + default: + panic("unsupported type") + } + a.val.Store(val) +} + +func (a *atomicN[N]) CompareAndSwap(old, new N) bool { + var o, n uint64 + switch any(old).(type) { + case int64: + o, n = uint64(old), uint64(new) case float64: - return n.nFloatBits.CompareAndSwap(math.Float64bits(float64(oldVal)), math.Float64bits(float64(newVal))) + o, n = math.Float64bits(float64(old)), math.Float64bits(float64(new)) default: - return n.nInt.CompareAndSwap(int64(oldVal), int64(newVal)) + panic("unsupported type") } + return a.val.CompareAndSwap(o, n) } type atomicMinMax[N int64 | float64] struct { - minimum atomicIntOrFloat[N] - maximum atomicIntOrFloat[N] - isSet atomic.Bool + minimum, maximum atomicN[N] + set atomic.Bool + mu sync.Mutex } -func (n *atomicMinMax[N]) observe(value N) { - isSet := n.isSet.Load() - for { - minLoaded := n.minimum.load() - if ((!isSet && minLoaded == 0) || value < minLoaded) && !n.minimum.compareAndSwap(minLoaded, value) { - // We got a new min value, but lost the race. Try again. - continue - } - maxLoaded := n.maximum.load() - if ((!isSet && minLoaded == 0) || value > maxLoaded) && !n.maximum.compareAndSwap(maxLoaded, value) { - // We got a new max value, but lost the race. Try again. - continue - } - break - } - if !isSet { - n.isSet.Store(true) +// init returns true if the value was used to initialize min and max. +func (s *atomicMinMax[N]) init(val N) bool { + s.mu.Lock() + defer s.mu.Unlock() + if !s.set.Load() { + defer s.set.Store(true) + s.minimum.Store(val) + s.maximum.Store(val) + return true } + return false } -func (n *atomicMinMax[N]) load() (minimum, maximum N, ok bool) { - return n.minimum.load(), n.maximum.load(), n.isSet.Load() +func (s *atomicMinMax[N]) Update(val N) { + if !s.set.Load() && s.init(val) { + return + } + + old := s.minimum.Load() + for val < old { + if s.minimum.CompareAndSwap(old, val) { + return + } + old = s.minimum.Load() + } + + old = s.maximum.Load() + for old < val { + if s.maximum.CompareAndSwap(old, val) { + return + } + old = s.maximum.Load() + } } // hotColdWaitGroup is a synchronization primitive which enables lockless diff --git a/sdk/metric/internal/aggregate/atomic_test.go b/sdk/metric/internal/aggregate/atomic_test.go index 52f053248d7..3acc28c903c 100644 --- a/sdk/metric/internal/aggregate/atomic_test.go +++ b/sdk/metric/internal/aggregate/atomic_test.go @@ -76,3 +76,73 @@ func TestHotColdWaitGroupConcurrentSafe(t *testing.T) { } wg.Wait() } + +func TestAtomicN(t *testing.T) { + t.Run("Int64", testAtomicN[int64]) + t.Run("Float64", testAtomicN[float64]) + +} + +func testAtomicN[N int64 | float64](t *testing.T) { + var v atomicN[N] + assert.Equal(t, N(0), v.Load()) + assert.True(t, v.CompareAndSwap(0, 6)) + assert.Equal(t, N(6), v.Load()) + assert.False(t, v.CompareAndSwap(0, 6)) + v.Store(22) + assert.Equal(t, N(22), v.Load()) +} + +func TestAtomicNConcurrentSafe(t *testing.T) { + t.Run("Int64", testAtomicNConcurrentSafe[int64]) + t.Run("Float64", testAtomicNConcurrentSafe[float64]) +} + +func testAtomicNConcurrentSafe[N int64 | float64](t *testing.T) { + var wg sync.WaitGroup + var v atomicN[N] + + for range 2 { + wg.Add(1) + go func() { + defer wg.Done() + got := v.Load() + assert.Equal(t, int64(0), int64(got)%6) + }() + wg.Add(1) + go func() { + defer wg.Done() + v.Store(12) + }() + wg.Add(1) + go func() { + defer wg.Done() + v.CompareAndSwap(0, 6) + }() + } + wg.Wait() +} + +func TestAtomicMinMaxConcurrentSafe(t *testing.T) { + t.Run("Int64", testAtomicMinMaxConcurrentSafe[int64]) + t.Run("Float64", testAtomicMinMaxConcurrentSafe[float64]) +} + +func testAtomicMinMaxConcurrentSafe[N int64 | float64](t *testing.T) { + var wg sync.WaitGroup + var minMax atomicMinMax[N] + + assert.False(t, minMax.set.Load()) + for _, i := range []float64{2, 4, 6, 8, -3, 0, 8, 0} { + wg.Add(1) + go func() { + defer wg.Done() + minMax.Update(N(i)) + }() + } + wg.Wait() + + assert.True(t, minMax.set.Load()) + assert.Equal(t, N(-3), minMax.minimum.Load()) + assert.Equal(t, N(8), minMax.maximum.Load()) +} diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index af9603a5721..b436cebe70e 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -70,9 +70,9 @@ func (b *histogramPointCounters[N]) mergeIntoAndReset( // nolint:revive // Inten // Do not reset min or max because cumulative min and max only ever grow // smaller or larger respectively. - if minimum, maximum, ok := b.minMax.load(); ok { - into.minMax.observe(minimum) - into.minMax.observe(maximum) + if b.minMax.set.Load() { + into.minMax.Update(b.minMax.minimum.Load()) + into.minMax.Update(b.minMax.maximum.Load()) } } if !noSum { @@ -113,7 +113,7 @@ func (s *deltaHistogram[N]) measure( h.bin(s.bounds, value) if !s.noMinMax { - h.minMax.observe(value) + h.minMax.Update(value) } if !s.noSum { h.sum(value) @@ -185,9 +185,9 @@ func (s *deltaHistogram[N]) collect( } if !s.noMinMax { - if minimum, maximum, ok := val.minMax.load(); ok { - hDPts[i].Min = metricdata.NewExtrema(minimum) - hDPts[i].Max = metricdata.NewExtrema(maximum) + if val.minMax.set.Load() { + hDPts[i].Min = metricdata.NewExtrema(val.minMax.minimum.Load()) + hDPts[i].Max = metricdata.NewExtrema(val.minMax.maximum.Load()) } } @@ -278,7 +278,7 @@ func (s *cumulativeHistogram[N]) measure( h.hotColdPoint[hotIdx].bin(s.bounds, value) if !s.noMinMax { - h.hotColdPoint[hotIdx].minMax.observe(value) + h.hotColdPoint[hotIdx].minMax.Update(value) } if !s.noSum { h.hotColdPoint[hotIdx].sum(value) @@ -324,9 +324,9 @@ func (s *cumulativeHistogram[N]) collect( newPt.Sum = val.hotColdPoint[readIdx].total.load() } if !s.noMinMax { - if minimum, maximum, ok := val.hotColdPoint[readIdx].minMax.load(); ok { - newPt.Min = metricdata.NewExtrema(minimum) - newPt.Max = metricdata.NewExtrema(maximum) + if val.hotColdPoint[readIdx].minMax.set.Load() { + newPt.Min = metricdata.NewExtrema(val.hotColdPoint[readIdx].minMax.minimum.Load()) + newPt.Max = metricdata.NewExtrema(val.hotColdPoint[readIdx].minMax.maximum.Load()) } } // Once we've read the point, merge it back into the hot histogram diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 2bd43562c54..8f12e311794 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -352,24 +352,23 @@ func testBucketsBin[N int64 | float64]() func(t *testing.T) { bucketCounts, count := b.loadCounts() assert.Equal(t, expectedBucketCounts, bucketCounts) assert.Equal(t, expectedCount, count) - minimum, maximum, ok := b.minMax.load() if mi != 0 { - assert.True(t, ok) - assert.Equal(t, mi, minimum) + assert.True(t, b.minMax.set.Load()) + assert.Equal(t, mi, b.minMax.minimum.Load()) } if ma != 0 { - assert.True(t, ok) - assert.Equal(t, ma, maximum) + assert.True(t, b.minMax.set.Load()) + assert.Equal(t, ma, b.minMax.maximum.Load()) } } bounds := []float64{0, 2, 4} assertB([]uint64{0, 0, 0}, 0, 0, 0) b.bin(bounds, 1) - b.minMax.observe(2) + b.minMax.Update(2) assertB([]uint64{0, 1, 0}, 1, 0, 2) b.bin(bounds, -1) - b.minMax.observe(-1) + b.minMax.Update(-1) assertB([]uint64{1, 1, 0}, 2, -1, 2) } } From 1fba5e401d3010e26fd5ee2c75e752adc10ba5d4 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 10 Oct 2025 19:40:29 +0000 Subject: [PATCH 05/15] restore comment --- sdk/metric/internal/aggregate/histogram.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index b436cebe70e..06ae5c391e2 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -104,8 +104,15 @@ func (s *deltaHistogram[N]) measure( defer s.hcwg.done(hotIdx) h := s.hotColdValMap[hotIdx].LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { hPt := &histogramPoint[N]{ - res: s.newRes(attr), - attrs: attr, + res: s.newRes(attr), + attrs: attr, + // N+1 buckets. For example: + // + // bounds = [0, 5, 10] + // + // Then, + // + // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) histogramPointCounters: histogramPointCounters[N]{counts: make([]atomic.Uint64, len(s.bounds)+1)}, } return hPt @@ -261,6 +268,13 @@ func (s *cumulativeHistogram[N]) measure( hPt := &hotColdHistogramPoint[N]{ res: s.newRes(attr), attrs: attr, + // N+1 buckets. For example: + // + // bounds = [0, 5, 10] + // + // Then, + // + // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) hotColdPoint: [2]histogramPointCounters[N]{ { counts: make([]atomic.Uint64, len(s.bounds)+1), From dbc4702233a349c04b8be451a5fae16fa020e448 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 10 Oct 2025 20:09:51 +0000 Subject: [PATCH 06/15] add benchmarks for new atomic types --- sdk/metric/internal/aggregate/atomic_test.go | 105 +++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/sdk/metric/internal/aggregate/atomic_test.go b/sdk/metric/internal/aggregate/atomic_test.go index 3acc28c903c..c6e444f8cce 100644 --- a/sdk/metric/internal/aggregate/atomic_test.go +++ b/sdk/metric/internal/aggregate/atomic_test.go @@ -52,6 +52,33 @@ func TestAtomicSumAddIntConcurrentSafe(t *testing.T) { assert.Equal(t, int64(15), aSum.load()) } +func BenchmarkAtomicCounter(b *testing.B) { + b.Run("Int64", benchmarkAtomicCounter[int64]) + b.Run("Float64", benchmarkAtomicCounter[float64]) +} + +func benchmarkAtomicCounter[N int64 | float64](b *testing.B) { + b.Run("add", func(b *testing.B) { + var a atomicCounter[N] + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + a.add(2) + } + }) + }) + b.Run("load", func(b *testing.B) { + var a atomicCounter[N] + a.add(2) + var v N + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + v = a.load() + } + }) + assert.Equal(b, N(2), v) + }) +} + func TestHotColdWaitGroupConcurrentSafe(t *testing.T) { var wg sync.WaitGroup hcwg := &hotColdWaitGroup{} @@ -123,6 +150,48 @@ func testAtomicNConcurrentSafe[N int64 | float64](t *testing.T) { wg.Wait() } +func BenchmarkAtomicN(b *testing.B) { + b.Run("Int64", benchmarkAtomicN[int64]) + b.Run("Float64", benchmarkAtomicN[float64]) +} + +func benchmarkAtomicN[N int64 | float64](b *testing.B) { + b.Run("Load", func(b *testing.B) { + var a atomicN[N] + a.Store(2) + var v N + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + v = a.Load() + } + }) + assert.Equal(b, N(2), v) + }) + b.Run("Store", func(b *testing.B) { + var a atomicN[N] + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + a.Store(3) + } + }) + }) + b.Run("CompareAndSwap", func(b *testing.B) { + var a atomicN[N] + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + // Make sure we swap back and forth, in-case that matters. + if i%2 == 0 { + a.CompareAndSwap(0, 1) + } else { + a.CompareAndSwap(1, 0) + } + i++ + } + }) + }) +} + func TestAtomicMinMaxConcurrentSafe(t *testing.T) { t.Run("Int64", testAtomicMinMaxConcurrentSafe[int64]) t.Run("Float64", testAtomicMinMaxConcurrentSafe[float64]) @@ -146,3 +215,39 @@ func testAtomicMinMaxConcurrentSafe[N int64 | float64](t *testing.T) { assert.Equal(t, N(-3), minMax.minimum.Load()) assert.Equal(t, N(8), minMax.maximum.Load()) } + +func BenchmarkAtomicMinMax(b *testing.B) { + b.Run("Int64", benchmarkAtomicMinMax[int64]) + b.Run("Float64", benchmarkAtomicMinMax[float64]) +} + +func benchmarkAtomicMinMax[N int64 | float64](b *testing.B) { + b.Run("UpdateIncreasing", func(b *testing.B) { + var a atomicMinMax[N] + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + a.Update(N(i)) + i++ + } + }) + }) + b.Run("UpdateDecreasing", func(b *testing.B) { + var a atomicMinMax[N] + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + a.Update(N(i)) + i-- + } + }) + }) + b.Run("UpdateConstant", func(b *testing.B) { + var a atomicMinMax[N] + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + a.Update(N(5)) + } + }) + }) +} From c7e449f3906aff6f161987dd7ddae59002b5a1a8 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 10 Oct 2025 20:20:04 +0000 Subject: [PATCH 07/15] move hotColdHistogramPoint and re-use bucketCounts --- sdk/metric/internal/aggregate/histogram.go | 35 +++++++++++-------- .../internal/aggregate/histogram_test.go | 8 +++-- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 06ae5c391e2..918a9d9e239 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -14,12 +14,23 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +// histogramPoint is a single histogram point, used in delta aggregations. type histogramPoint[N int64 | float64] struct { attrs attribute.Set res FilteredExemplarReservoir[N] histogramPointCounters[N] } +// hotColdHistogramPoint a hot and cold histogram points, used in cumulative +// aggregations. +type hotColdHistogramPoint[N int64 | float64] struct { + hcwg hotColdWaitGroup + hotColdPoint [2]histogramPointCounters[N] + + attrs attribute.Set + res FilteredExemplarReservoir[N] +} + // histogramPointCounters contains only the atomic counter data, and is used by // both histogramPoint and hotColdHistogramPoint. type histogramPointCounters[N int64 | float64] struct { @@ -40,17 +51,18 @@ func (b *histogramPointCounters[N]) bin(bounds []float64, value N) { b.counts[idx].Add(1) } -func (b *histogramPointCounters[N]) loadCounts() ([]uint64, uint64) { +func (b *histogramPointCounters[N]) loadCountsInto(into *[]uint64) uint64 { // TODO (#3047): Making copies for bounds and counts incurs a large // memory allocation footprint. Alternatives should be explored. - counts := make([]uint64, len(b.counts)) + counts := reset(*into, len(b.counts), len(b.counts)) count := uint64(0) - for i := range counts { + for i := range b.counts { c := b.counts[i].Load() counts[i] = c count += c } - return counts, count + *into = counts + return count } // mergeIntoAndReset merges this set of histogram counter data into another, @@ -179,13 +191,13 @@ func (s *deltaHistogram[N]) collect( var i int s.hotColdValMap[readIdx].Range(func(_, value any) bool { val := value.(*histogramPoint[N]) - bucketCounts, count := val.loadCounts() + + count := val.loadCountsInto(&hDPts[i].BucketCounts) hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t hDPts[i].Count = count hDPts[i].Bounds = bounds - hDPts[i].BucketCounts = bucketCounts if !s.noSum { hDPts[i].Sum = val.total.load() @@ -250,14 +262,6 @@ func newCumulativeHistogram[N int64 | float64]( } } -type hotColdHistogramPoint[N int64 | float64] struct { - hcwg hotColdWaitGroup - hotColdPoint [2]histogramPointCounters[N] - - attrs attribute.Set - res FilteredExemplarReservoir[N] -} - func (s *cumulativeHistogram[N]) measure( ctx context.Context, value N, @@ -322,7 +326,8 @@ func (s *cumulativeHistogram[N]) collect( val := value.(*hotColdHistogramPoint[N]) // swap, observe, and clear the point readIdx := val.hcwg.swapHotAndWait() - bucketCounts, count := val.hotColdPoint[readIdx].loadCounts() + var bucketCounts []uint64 + count := val.hotColdPoint[readIdx].loadCountsInto(&bucketCounts) newPt := metricdata.HistogramDataPoint[N]{ Attributes: val.attrs, StartTime: s.start, diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 8f12e311794..8766e3a2765 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -349,7 +349,8 @@ func testBucketsBin[N int64 | float64]() func(t *testing.T) { b := newHistogramPoint[N](alice, 3) assertB := func(expectedBucketCounts []uint64, expectedCount uint64, mi, ma N) { t.Helper() - bucketCounts, count := b.loadCounts() + var bucketCounts []uint64 + count := b.loadCountsInto(&bucketCounts) assert.Equal(t, expectedBucketCounts, bucketCounts) assert.Equal(t, expectedCount, count) if mi != 0 { @@ -427,7 +428,8 @@ func TestCumulativeHistogramImmutableCounts(t *testing.T) { require.True(t, ok) hcHistPt := hPt.(*hotColdHistogramPoint[int64]) readIdx := hcHistPt.hcwg.swapHotAndWait() - bucketCounts, _ := hcHistPt.hotColdPoint[readIdx].loadCounts() + var bucketCounts []uint64 + hcHistPt.hotColdPoint[readIdx].loadCountsInto(&bucketCounts) require.Equal(t, hdp.BucketCounts, bucketCounts) hotIdx := (readIdx + 1) % 2 hcHistPt.hotColdPoint[readIdx].mergeIntoAndReset(&hcHistPt.hotColdPoint[hotIdx], noMinMax, false) @@ -439,7 +441,7 @@ func TestCumulativeHistogramImmutableCounts(t *testing.T) { require.True(t, ok) hcHistPt = hPt.(*hotColdHistogramPoint[int64]) readIdx = hcHistPt.hcwg.swapHotAndWait() - bucketCounts, _ = hcHistPt.hotColdPoint[readIdx].loadCounts() + hcHistPt.hotColdPoint[readIdx].loadCountsInto(&bucketCounts) assert.Equal( t, cpCounts, From ed6fb26f67c42c1c5fb9ad4a75d573d16a22749d Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Sat, 11 Oct 2025 01:10:01 +0000 Subject: [PATCH 08/15] lint --- sdk/metric/internal/aggregate/atomic.go | 10 +++++----- sdk/metric/internal/aggregate/atomic_test.go | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sdk/metric/internal/aggregate/atomic.go b/sdk/metric/internal/aggregate/atomic.go index eb95c32d68e..eb69e965079 100644 --- a/sdk/metric/internal/aggregate/atomic.go +++ b/sdk/metric/internal/aggregate/atomic.go @@ -57,7 +57,7 @@ func (n *atomicCounter[N]) reset() { n.nInt.Store(0) } -// atomicN is a generic atomic number value +// atomicN is a generic atomic number value. type atomicN[N int64 | float64] struct { val atomic.Uint64 } @@ -88,13 +88,13 @@ func (a *atomicN[N]) Store(v N) { a.val.Store(val) } -func (a *atomicN[N]) CompareAndSwap(old, new N) bool { +func (a *atomicN[N]) CompareAndSwap(oldN, newN N) bool { var o, n uint64 - switch any(old).(type) { + switch any(oldN).(type) { case int64: - o, n = uint64(old), uint64(new) + o, n = uint64(oldN), uint64(newN) case float64: - o, n = math.Float64bits(float64(old)), math.Float64bits(float64(new)) + o, n = math.Float64bits(float64(oldN)), math.Float64bits(float64(newN)) default: panic("unsupported type") } diff --git a/sdk/metric/internal/aggregate/atomic_test.go b/sdk/metric/internal/aggregate/atomic_test.go index c6e444f8cce..174ef764f21 100644 --- a/sdk/metric/internal/aggregate/atomic_test.go +++ b/sdk/metric/internal/aggregate/atomic_test.go @@ -107,7 +107,6 @@ func TestHotColdWaitGroupConcurrentSafe(t *testing.T) { func TestAtomicN(t *testing.T) { t.Run("Int64", testAtomicN[int64]) t.Run("Float64", testAtomicN[float64]) - } func testAtomicN[N int64 | float64](t *testing.T) { From de9bcf3034ad0db6f50050e6d42f5dad2e3aab24 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Sat, 11 Oct 2025 01:59:12 +0000 Subject: [PATCH 09/15] document locking strategy for fixed bucket histogram --- sdk/metric/internal/aggregate/histogram.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 918a9d9e239..508a5d43154 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -95,6 +95,14 @@ func (b *histogramPointCounters[N]) mergeIntoAndReset( // nolint:revive // Inten // deltaHistogram is a histogram whose internal storage is reset when it is // collected. +// +// deltaHistogram's measure is implemented without locking, even when called +// concurrently with collect. This is done by maintaining two separate maps: +// one "hot" which is concurrently updated by measure(), and one "cold", which +// is read and reset by collect(). The [hotcoldWaitGroup] allows collect() to +// swap the hot and cold maps, and wait for updates to the cold map to complete +// prior to reading. deltaHistogram swaps ald clears complete maps so that +// unused attribute sets do not report in subsequent collect() calls. type deltaHistogram[N int64 | float64] struct { hcwg hotColdWaitGroup hotColdValMap [2]limitedSyncMap @@ -228,6 +236,15 @@ func (s *deltaHistogram[N]) collect( // cumulativeHistogram summarizes a set of measurements as an histogram with explicitly // defined histogramPoint. +// +// cumulativeHistogram's measure is implemented without locking, even when +// called concurrently with collect. This is done by maintaining two separate +// histogramPointCounters for each attribute set: one "hot" which is +// concurrently updated by measure(), and one "cold", which is read and reset +// by collect(). The [hotcoldWaitGroup] allows collect() to swap the hot and +// cold counters, and wait for updates to the cold counters to complete prior +// to reading. Unlike deltaHistogram, this maintains a single map so that the +// preserved attribute sets do not change when collect() is called. type cumulativeHistogram[N int64 | float64] struct { values limitedSyncMap From fadddac4b6dcd5ece6625198e80159d212825309 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 15 Oct 2025 00:51:59 +0000 Subject: [PATCH 10/15] remove bin and sum --- sdk/metric/internal/aggregate/histogram.go | 32 +++++------ .../internal/aggregate/histogram_test.go | 57 ------------------- 2 files changed, 16 insertions(+), 73 deletions(-) diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 508a5d43154..867a4819eaf 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -39,18 +39,6 @@ type histogramPointCounters[N int64 | float64] struct { minMax atomicMinMax[N] } -func (b *histogramPointCounters[N]) sum(value N) { b.total.add(value) } - -func (b *histogramPointCounters[N]) bin(bounds []float64, value N) { - // This search will return an index in the range [0, len(s.bounds)], where - // it will return len(s.bounds) if value is greater than the last element - // of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint - // is len(s.bounds)+1, with the last bucket representing: - // (s.bounds[len(s.bounds)-1], +∞). - idx := sort.SearchFloat64s(bounds, float64(value)) - b.counts[idx].Add(1) -} - func (b *histogramPointCounters[N]) loadCountsInto(into *[]uint64) uint64 { // TODO (#3047): Making copies for bounds and counts incurs a large // memory allocation footprint. Alternatives should be explored. @@ -138,12 +126,18 @@ func (s *deltaHistogram[N]) measure( return hPt }).(*histogramPoint[N]) - h.bin(s.bounds, value) + // This search will return an index in the range [0, len(s.bounds)], where + // it will return len(s.bounds) if value is greater than the last element + // of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint + // is len(s.bounds)+1, with the last bucket representing: + // (s.bounds[len(s.bounds)-1], +∞). + idx := sort.SearchFloat64s(s.bounds, float64(value)) + h.counts[idx].Add(1) if !s.noMinMax { h.minMax.Update(value) } if !s.noSum { - h.sum(value) + h.total.add(value) } h.res.Offer(ctx, value, droppedAttr) } @@ -311,12 +305,18 @@ func (s *cumulativeHistogram[N]) measure( hotIdx := h.hcwg.start() defer h.hcwg.done(hotIdx) - h.hotColdPoint[hotIdx].bin(s.bounds, value) + // This search will return an index in the range [0, len(s.bounds)], where + // it will return len(s.bounds) if value is greater than the last element + // of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint + // is len(s.bounds)+1, with the last bucket representing: + // (s.bounds[len(s.bounds)-1], +∞). + idx := sort.SearchFloat64s(s.bounds, float64(value)) + h.hotColdPoint[hotIdx].counts[idx].Add(1) if !s.noMinMax { h.hotColdPoint[hotIdx].minMax.Update(value) } if !s.noSum { - h.hotColdPoint[hotIdx].sum(value) + h.hotColdPoint[hotIdx].total.add(value) } h.res.Offer(ctx, value, droppedAttr) } diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 8766e3a2765..d774141d19b 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -339,63 +339,6 @@ func hPoint[N int64 | float64]( } } -func TestBucketsBin(t *testing.T) { - t.Run("Int64", testBucketsBin[int64]()) - t.Run("Float64", testBucketsBin[float64]()) -} - -func testBucketsBin[N int64 | float64]() func(t *testing.T) { - return func(t *testing.T) { - b := newHistogramPoint[N](alice, 3) - assertB := func(expectedBucketCounts []uint64, expectedCount uint64, mi, ma N) { - t.Helper() - var bucketCounts []uint64 - count := b.loadCountsInto(&bucketCounts) - assert.Equal(t, expectedBucketCounts, bucketCounts) - assert.Equal(t, expectedCount, count) - if mi != 0 { - assert.True(t, b.minMax.set.Load()) - assert.Equal(t, mi, b.minMax.minimum.Load()) - } - if ma != 0 { - assert.True(t, b.minMax.set.Load()) - assert.Equal(t, ma, b.minMax.maximum.Load()) - } - } - - bounds := []float64{0, 2, 4} - assertB([]uint64{0, 0, 0}, 0, 0, 0) - b.bin(bounds, 1) - b.minMax.Update(2) - assertB([]uint64{0, 1, 0}, 1, 0, 2) - b.bin(bounds, -1) - b.minMax.Update(-1) - assertB([]uint64{1, 1, 0}, 2, -1, 2) - } -} - -func TestBucketsSum(t *testing.T) { - t.Run("Int64", testBucketsSum[int64]()) - t.Run("Float64", testBucketsSum[float64]()) -} - -func testBucketsSum[N int64 | float64]() func(t *testing.T) { - return func(t *testing.T) { - b := newHistogramPoint[N](alice, 3) - - var want N - assert.Equal(t, want, b.total.load()) - - b.sum(2) - want = 2 - assert.Equal(t, want, b.total.load()) - - b.sum(-1) - want = 1 - assert.Equal(t, want, b.total.load()) - } -} - func TestHistogramImmutableBounds(t *testing.T) { b := []float64{0, 1, 2} cpB := make([]float64, len(b)) From 44172bb6f16b4a69da6a3ef2c16b216863a3dc14 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 15 Oct 2025 20:14:44 +0000 Subject: [PATCH 11/15] lint --- sdk/metric/internal/aggregate/histogram_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index d774141d19b..0bfa8e9970e 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -6,7 +6,6 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" "sort" - "sync/atomic" "testing" "time" @@ -23,14 +22,6 @@ var ( noMinMax = false ) -func newHistogramPoint[N int64 | float64](attrs attribute.Set, n int) *histogramPoint[N] { - hPt := &histogramPoint[N]{ - attrs: attrs, - histogramPointCounters: histogramPointCounters[N]{counts: make([]atomic.Uint64, n)}, - } - return hPt -} - func TestHistogram(t *testing.T) { c := new(clock) t.Cleanup(c.Register()) From 61230f93a437b65efcde75b05e66b4c21115ef22 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 10 Nov 2025 11:17:52 -0500 Subject: [PATCH 12/15] Apply suggestions from code review Co-authored-by: Bartlomiej Plotka --- sdk/metric/internal/aggregate/histogram.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 867a4819eaf..a7b82a39024 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -40,7 +40,7 @@ type histogramPointCounters[N int64 | float64] struct { } func (b *histogramPointCounters[N]) loadCountsInto(into *[]uint64) uint64 { - // TODO (#3047): Making copies for bounds and counts incurs a large + // TODO (#3047): Making copies for counts incurs a large // memory allocation footprint. Alternatives should be explored. counts := reset(*into, len(b.counts), len(b.counts)) count := uint64(0) @@ -120,7 +120,7 @@ func (s *deltaHistogram[N]) measure( // // Then, // - // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) + // counts = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) histogramPointCounters: histogramPointCounters[N]{counts: make([]atomic.Uint64, len(s.bounds)+1)}, } return hPt From 219e89e4c701e37a4857cf3e91a7a0acd9945a79 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 10 Nov 2025 16:24:09 +0000 Subject: [PATCH 13/15] find index before starting write operation --- sdk/metric/internal/aggregate/histogram.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index a7b82a39024..93f7c5449f1 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -302,15 +302,16 @@ func (s *cumulativeHistogram[N]) measure( return hPt }).(*hotColdHistogramPoint[N]) - hotIdx := h.hcwg.start() - defer h.hcwg.done(hotIdx) - // This search will return an index in the range [0, len(s.bounds)], where // it will return len(s.bounds) if value is greater than the last element // of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint // is len(s.bounds)+1, with the last bucket representing: // (s.bounds[len(s.bounds)-1], +∞). idx := sort.SearchFloat64s(s.bounds, float64(value)) + + hotIdx := h.hcwg.start() + defer h.hcwg.done(hotIdx) + h.hotColdPoint[hotIdx].counts[idx].Add(1) if !s.noMinMax { h.hotColdPoint[hotIdx].minMax.Update(value) From bf7f3076650c7aabe3ace189444ca116eb65d018 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 10 Nov 2025 11:24:57 -0500 Subject: [PATCH 14/15] Update sdk/metric/internal/aggregate/histogram.go Co-authored-by: Bartlomiej Plotka --- sdk/metric/internal/aggregate/histogram.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 93f7c5449f1..421325fb728 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -289,7 +289,7 @@ func (s *cumulativeHistogram[N]) measure( // // Then, // - // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) + // count = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) hotColdPoint: [2]histogramPointCounters[N]{ { counts: make([]atomic.Uint64, len(s.bounds)+1), From 7ce277bd5c1fe88b569c5f39717ec4fc054b353a Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 9 Dec 2025 11:07:22 -0500 Subject: [PATCH 15/15] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb08cbf4db7..d3aa15876bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,7 +55,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm To preserve the prior behavior (scrapes succeed while errors are logged), configure your Prometheus HTTP handler with: `promhttp.HandlerOpts{ ErrorHandling: promhttp.ContinueOnError }`. (#7363) - Replace fnv hash with xxhash in `go.opentelemetry.io/otel/attribute` for better performance. (#7371) - The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421) -- Improve performance of concurrent counter measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427) +- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427) - Include W3C TraceFlags (bits 0–7) in the OTLP `Span.Flags` field in `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracehttp` and `go.opentelemetry.io/exporters/otlp/otlptrace/otlptracegrpc`. (#7438) - The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types. If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)