Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- `Exporter` in `go.opentelemetry.io/otel/exporter/prometheus` ignores metrics with the scope `go.opentelemetry.io/contrib/bridges/prometheus`.
This prevents scrape failures when the Prometheus exporter is misconfigured to get data from the Prometheus bridge. (#7688)
- Improve performance of concurrent histogram measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7474)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
7 changes: 4 additions & 3 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ func (b Builder[N]) ExplicitBucketHistogram(
boundaries []float64,
noMinMax, noSum bool,
) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
h := newDeltaHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
return b.filter(h.measure), h.collect
default:
return b.filter(h.measure), h.cumulative
h := newCumulativeHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
return b.filter(h.measure), h.collect
}
}

Expand Down
91 changes: 91 additions & 0 deletions sdk/metric/internal/aggregate/atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,97 @@ func (n *atomicCounter[N]) add(value N) {
}
}

// reset resets the internal state, and is not safe to call concurrently.
func (n *atomicCounter[N]) reset() {
n.nFloatBits.Store(0)
n.nInt.Store(0)
}

// atomicN is a generic atomic number value.
type atomicN[N int64 | float64] struct {
Comment thread
dashpole marked this conversation as resolved.
val atomic.Uint64
}

func (a *atomicN[N]) Load() (value N) {
v := a.val.Load()
switch any(value).(type) {
case int64:
value = N(v)
case float64:
value = N(math.Float64frombits(v))
default:
panic("unsupported type")
}
return value
}

func (a *atomicN[N]) Store(v N) {
var val uint64
switch any(v).(type) {
case int64:
val = uint64(v)
case float64:
val = math.Float64bits(float64(v))
default:
panic("unsupported type")
}
a.val.Store(val)
}

func (a *atomicN[N]) CompareAndSwap(oldN, newN N) bool {
var o, n uint64
switch any(oldN).(type) {
case int64:
o, n = uint64(oldN), uint64(newN)
case float64:
o, n = math.Float64bits(float64(oldN)), math.Float64bits(float64(newN))
default:
panic("unsupported type")
}
return a.val.CompareAndSwap(o, n)
}

type atomicMinMax[N int64 | float64] struct {
Comment thread
dashpole marked this conversation as resolved.
minimum, maximum atomicN[N]
set atomic.Bool
mu sync.Mutex
}

// init returns true if the value was used to initialize min and max.
func (s *atomicMinMax[N]) init(val N) bool {
s.mu.Lock()
defer s.mu.Unlock()
if !s.set.Load() {
defer s.set.Store(true)
s.minimum.Store(val)
s.maximum.Store(val)
return true
}
return false
}

func (s *atomicMinMax[N]) Update(val N) {
if !s.set.Load() && s.init(val) {
return
}

old := s.minimum.Load()
for val < old {
if s.minimum.CompareAndSwap(old, val) {
return
}
old = s.minimum.Load()
}

old = s.maximum.Load()
for old < val {
if s.maximum.CompareAndSwap(old, val) {
return
}
old = s.maximum.Load()
}
}

// hotColdWaitGroup is a synchronization primitive which enables lockless
// writes for concurrent writers and enables a reader to acquire exclusive
// access to a snapshot of state including only completed operations.
Expand Down
174 changes: 174 additions & 0 deletions sdk/metric/internal/aggregate/atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,33 @@ func TestAtomicSumAddIntConcurrentSafe(t *testing.T) {
assert.Equal(t, int64(15), aSum.load())
}

func BenchmarkAtomicCounter(b *testing.B) {
Comment thread
dashpole marked this conversation as resolved.
b.Run("Int64", benchmarkAtomicCounter[int64])
b.Run("Float64", benchmarkAtomicCounter[float64])
}

func benchmarkAtomicCounter[N int64 | float64](b *testing.B) {
b.Run("add", func(b *testing.B) {
var a atomicCounter[N]
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.add(2)
}
})
})
b.Run("load", func(b *testing.B) {
var a atomicCounter[N]
a.add(2)
var v N
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
v = a.load()
}
})
assert.Equal(b, N(2), v)
})
}

func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
hcwg := &hotColdWaitGroup{}
Expand All @@ -76,3 +103,150 @@ func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
}
wg.Wait()
}

func TestAtomicN(t *testing.T) {
t.Run("Int64", testAtomicN[int64])
t.Run("Float64", testAtomicN[float64])
}

func testAtomicN[N int64 | float64](t *testing.T) {
var v atomicN[N]
assert.Equal(t, N(0), v.Load())
assert.True(t, v.CompareAndSwap(0, 6))
assert.Equal(t, N(6), v.Load())
assert.False(t, v.CompareAndSwap(0, 6))
v.Store(22)
assert.Equal(t, N(22), v.Load())
}

func TestAtomicNConcurrentSafe(t *testing.T) {
t.Run("Int64", testAtomicNConcurrentSafe[int64])
t.Run("Float64", testAtomicNConcurrentSafe[float64])
}

func testAtomicNConcurrentSafe[N int64 | float64](t *testing.T) {
var wg sync.WaitGroup
var v atomicN[N]

for range 2 {
wg.Add(1)
go func() {
defer wg.Done()
got := v.Load()
assert.Equal(t, int64(0), int64(got)%6)
}()
wg.Add(1)
go func() {
defer wg.Done()
v.Store(12)
}()
wg.Add(1)
go func() {
defer wg.Done()
v.CompareAndSwap(0, 6)
}()
}
wg.Wait()
}

func BenchmarkAtomicN(b *testing.B) {
b.Run("Int64", benchmarkAtomicN[int64])
b.Run("Float64", benchmarkAtomicN[float64])
}

func benchmarkAtomicN[N int64 | float64](b *testing.B) {
b.Run("Load", func(b *testing.B) {
var a atomicN[N]
a.Store(2)
var v N
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
v = a.Load()
}
})
assert.Equal(b, N(2), v)
})
b.Run("Store", func(b *testing.B) {
var a atomicN[N]
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Store(3)
}
})
})
b.Run("CompareAndSwap", func(b *testing.B) {
var a atomicN[N]
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
// Make sure we swap back and forth, in-case that matters.
if i%2 == 0 {
a.CompareAndSwap(0, 1)
} else {
a.CompareAndSwap(1, 0)
}
i++
}
})
})
}

func TestAtomicMinMaxConcurrentSafe(t *testing.T) {
t.Run("Int64", testAtomicMinMaxConcurrentSafe[int64])
t.Run("Float64", testAtomicMinMaxConcurrentSafe[float64])
}

func testAtomicMinMaxConcurrentSafe[N int64 | float64](t *testing.T) {
var wg sync.WaitGroup
var minMax atomicMinMax[N]

assert.False(t, minMax.set.Load())
for _, i := range []float64{2, 4, 6, 8, -3, 0, 8, 0} {
wg.Add(1)
go func() {
defer wg.Done()
minMax.Update(N(i))
}()
}
wg.Wait()

assert.True(t, minMax.set.Load())
assert.Equal(t, N(-3), minMax.minimum.Load())
assert.Equal(t, N(8), minMax.maximum.Load())
}

func BenchmarkAtomicMinMax(b *testing.B) {
b.Run("Int64", benchmarkAtomicMinMax[int64])
b.Run("Float64", benchmarkAtomicMinMax[float64])
}

func benchmarkAtomicMinMax[N int64 | float64](b *testing.B) {
b.Run("UpdateIncreasing", func(b *testing.B) {
var a atomicMinMax[N]
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
a.Update(N(i))
i++
}
})
})
b.Run("UpdateDecreasing", func(b *testing.B) {
var a atomicMinMax[N]
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
a.Update(N(i))
i--
}
})
})
b.Run("UpdateConstant", func(b *testing.B) {
var a atomicMinMax[N]
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Update(N(5))
}
})
})
}
Loading
Loading