Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Histogram Extrema and Sum to be generic #3870

Merged
merged 11 commits into from
Mar 29, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Changed

- The `Extrema` in `go.opentelemetry.io/otel/sdk/metric/metricdata` is redefined with a generic argument of `[N int64 | float64]`. (#3870)
- Move No-Op implementation from `go.opentelemetry.io/otel/metric` into its own package `go.opentelemetry.io/otel/metric/noop`. (#3941)
- `metric.NewNoopMeterProvider` is replaced with `noop.NewMeterProvider`

Expand Down
8 changes: 5 additions & 3 deletions exporters/otlp/otlpmetric/internal/transform/metricdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func Histogram[N int64 | float64](h metricdata.Histogram[N]) (*mpb.Metric_Histog
func HistogramDataPoints[N int64 | float64](dPts []metricdata.HistogramDataPoint[N]) []*mpb.HistogramDataPoint {
out := make([]*mpb.HistogramDataPoint, 0, len(dPts))
for _, dPt := range dPts {
sum := dPt.Sum
sum := float64(dPt.Sum)
hdp := &mpb.HistogramDataPoint{
Attributes: AttrIter(dPt.Attributes.Iter()),
StartTimeUnixNano: uint64(dPt.StartTime.UnixNano()),
Expand All @@ -186,10 +186,12 @@ func HistogramDataPoints[N int64 | float64](dPts []metricdata.HistogramDataPoint
ExplicitBounds: dPt.Bounds,
}
if v, ok := dPt.Min.Value(); ok {
hdp.Min = &v
vF64 := float64(v)
hdp.Min = &vF64
}
if v, ok := dPt.Max.Value(); ok {
hdp.Max = &v
vF64 := float64(v)
hdp.Max = &vF64
}
out = append(out, hdp)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ var (
Count: 30,
Bounds: []float64{1, 5},
BucketCounts: []uint64{0, 30, 0},
Min: metricdata.NewExtrema(minA),
Max: metricdata.NewExtrema(maxA),
Sum: sumA,
Min: metricdata.NewExtrema(int64(minA)),
Max: metricdata.NewExtrema(int64(maxA)),
Sum: int64(sumA),
}, {
Attributes: bob,
StartTime: start,
Time: end,
Count: 3,
Bounds: []float64{1, 5},
BucketCounts: []uint64{0, 1, 2},
Min: metricdata.NewExtrema(minB),
Max: metricdata.NewExtrema(maxB),
Sum: sumB,
Min: metricdata.NewExtrema(int64(minB)),
Max: metricdata.NewExtrema(int64(maxB)),
Sum: int64(sumB),
}}
otelHDPFloat64 = []metricdata.HistogramDataPoint[float64]{{
Attributes: alice,
Expand Down
2 changes: 1 addition & 1 deletion exporters/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogra
cumulativeCount += dp.BucketCounts[i]
buckets[bound] = cumulativeCount
}
m, err := prometheus.NewConstHistogram(desc, dp.Count, dp.Sum, buckets, values...)
m, err := prometheus.NewConstHistogram(desc, dp.Count, float64(dp.Sum), buckets, values...)
if err != nil {
otel.Handle(err)
continue
Expand Down
15 changes: 10 additions & 5 deletions sdk/metric/internal/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ var (
bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))

monoIncr = setMap{alice: 1, bob: 10, carol: 2}
nonMonoIncr = setMap{alice: 1, bob: -1, carol: 2}

// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)
staticNowFunc = func() time.Time { return staticTime }
Expand All @@ -52,8 +49,16 @@ var (
}
)

func monoIncr[N int64 | float64]() setMap[N] {
return setMap[N]{alice: 1, bob: 10, carol: 2}
}

func nonMonoIncr[N int64 | float64]() setMap[N] {
return setMap[N]{alice: 1, bob: -1, carol: 2}
}

// setMap maps attribute sets to a number.
type setMap map[attribute.Set]int
type setMap[N int64 | float64] map[attribute.Set]N

// expectFunc is a function that returns an Aggregation of expected values for
// a cycle that contains m measurements (total across all goroutines). Each
Expand All @@ -79,7 +84,7 @@ type aggregatorTester[N int64 | float64] struct {
CycleN int
}

func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFunc) func(*testing.T) {
func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap[N], eFunc expectFunc) func(*testing.T) {
m := at.MeasurementN * at.GoroutineN
return func(t *testing.T) {
t.Run("Comparable", func(t *testing.T) {
Expand Down
29 changes: 12 additions & 17 deletions sdk/metric/internal/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

type buckets struct {
type buckets[N int64 | float64] struct {
counts []uint64
count uint64
sum float64
min, max float64
sum N
min, max N
}

// newBuckets returns buckets with n bins.
func newBuckets(n int) *buckets {
return &buckets{counts: make([]uint64, n)}
func newBuckets[N int64 | float64](n int) *buckets[N] {
return &buckets[N]{counts: make([]uint64, n)}
}

func (b *buckets) bin(idx int, value float64) {
func (b *buckets[N]) bin(idx int, value N) {
b.counts[idx]++
b.count++
b.sum += value
Expand All @@ -52,7 +52,7 @@ func (b *buckets) bin(idx int, value float64) {
type histValues[N int64 | float64] struct {
bounds []float64

values map[attribute.Set]*buckets
values map[attribute.Set]*buckets[N]
valuesMu sync.Mutex
}

Expand All @@ -66,24 +66,19 @@ func newHistValues[N int64 | float64](bounds []float64) *histValues[N] {
sort.Float64s(b)
return &histValues[N]{
bounds: b,
values: make(map[attribute.Set]*buckets),
values: make(map[attribute.Set]*buckets[N]),
}
}

// Aggregate records the measurement value, scoped by attr, and aggregates it
// into a histogram.
func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
// Accept all types to satisfy the Aggregator interface. However, since
// the Aggregation produced by this Aggregator is only float64, convert
// here to only use this type.
v := float64(value)

// This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the buckets in that the length of buckets
// is len(s.bounds)+1, with the last bucket representing:
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, v)
idx := sort.SearchFloat64s(s.bounds, float64(value))

s.valuesMu.Lock()
defer s.valuesMu.Unlock()
Expand All @@ -97,12 +92,12 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
// Then,
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets(len(s.bounds) + 1)
b = newBuckets[N](len(s.bounds) + 1)
// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = v, v
b.min, b.max = value, value
s.values[attr] = b
}
b.bin(idx, v)
b.bin(idx, value)
}

// NewDeltaHistogram returns an Aggregator that summarizes a set of
Expand Down
49 changes: 28 additions & 21 deletions sdk/metric/internal/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,41 +48,41 @@ func testHistogram[N int64 | float64](t *testing.T) {
CycleN: defaultCycles,
}

incr := monoIncr
incr := monoIncr[N]()
eFunc := deltaHistExpecter[N](incr)
t.Run("Delta", tester.Run(NewDeltaHistogram[N](histConf), incr, eFunc))
eFunc = cumuHistExpecter[N](incr)
t.Run("Cumulative", tester.Run(NewCumulativeHistogram[N](histConf), incr, eFunc))
}

func deltaHistExpecter[N int64 | float64](incr setMap) expectFunc {
func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc {
h := metricdata.Histogram[N]{Temporality: metricdata.DeltaTemporality}
return func(m int) metricdata.Aggregation {
h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr))
for a, v := range incr {
h.DataPoints = append(h.DataPoints, hPoint[N](a, float64(v), uint64(m)))
h.DataPoints = append(h.DataPoints, hPoint[N](a, v, uint64(m)))
}
return h
}
}

func cumuHistExpecter[N int64 | float64](incr setMap) expectFunc {
func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc {
var cycle int
h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality}
return func(m int) metricdata.Aggregation {
cycle++
h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr))
for a, v := range incr {
h.DataPoints = append(h.DataPoints, hPoint[N](a, float64(v), uint64(cycle*m)))
h.DataPoints = append(h.DataPoints, hPoint[N](a, v, uint64(cycle*m)))
}
return h
}
}

// hPoint returns an HistogramDataPoint that started and ended now with multi
// number of measurements values v. It includes a min and max (set to v).
func hPoint[N int64 | float64](a attribute.Set, v float64, multi uint64) metricdata.HistogramDataPoint[N] {
idx := sort.SearchFloat64s(bounds, v)
func hPoint[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.HistogramDataPoint[N] {
idx := sort.SearchFloat64s(bounds, float64(v))
counts := make([]uint64, len(bounds)+1)
counts[idx] += multi
return metricdata.HistogramDataPoint[N]{
Expand All @@ -94,25 +94,32 @@ func hPoint[N int64 | float64](a attribute.Set, v float64, multi uint64) metricd
BucketCounts: counts,
Min: metricdata.NewExtrema(v),
Max: metricdata.NewExtrema(v),
Sum: v * float64(multi),
Sum: v * N(multi),
}
}

func TestBucketsBin(t *testing.T) {
b := newBuckets(3)
assertB := func(counts []uint64, count uint64, sum, min, max float64) {
assert.Equal(t, counts, b.counts)
assert.Equal(t, count, b.count)
assert.Equal(t, sum, b.sum)
assert.Equal(t, min, b.min)
assert.Equal(t, max, b.max)
}
t.Run("Int64", testBucketsBin[int64]())
t.Run("Float64", testBucketsBin[float64]())
}

func testBucketsBin[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
b := newBuckets[N](3)
assertB := func(counts []uint64, count uint64, sum, min, max N) {
assert.Equal(t, counts, b.counts)
assert.Equal(t, count, b.count)
assert.Equal(t, sum, b.sum)
assert.Equal(t, min, b.min)
assert.Equal(t, max, b.max)
}

assertB([]uint64{0, 0, 0}, 0, 0, 0, 0)
b.bin(1, 2)
assertB([]uint64{0, 1, 0}, 1, 2, 0, 2)
b.bin(0, -1)
assertB([]uint64{1, 1, 0}, 2, 1, -1, 2)
assertB([]uint64{0, 0, 0}, 0, 0, 0, 0)
b.bin(1, 2)
assertB([]uint64{0, 1, 0}, 1, 2, 0, 2)
b.bin(0, -1)
assertB([]uint64{1, 1, 0}, 2, 1, -1, 2)
}
}

func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram) Aggregator[N], getBounds func(Aggregator[N]) []float64) func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/lastvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func testLastValue[N int64 | float64]() func(*testing.T) {
CycleN: defaultCycles,
}

eFunc := func(increments setMap) expectFunc {
eFunc := func(increments setMap[N]) expectFunc {
data := make([]metricdata.DataPoint[N], 0, len(increments))
for a, v := range increments {
point := metricdata.DataPoint[N]{Attributes: a, Time: now(), Value: N(v)}
Expand All @@ -46,7 +46,7 @@ func testLastValue[N int64 | float64]() func(*testing.T) {
gauge := metricdata.Gauge[N]{DataPoints: data}
return func(int) metricdata.Aggregation { return gauge }
}
incr := monoIncr
incr := monoIncr[N]()
return tester.Run(NewLastValue[N](), incr, eFunc(incr))
}

Expand Down
30 changes: 15 additions & 15 deletions sdk/metric/internal/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,71 +39,71 @@ func testSum[N int64 | float64](t *testing.T) {
}

t.Run("Delta", func(t *testing.T) {
incr, mono := monoIncr, true
incr, mono := monoIncr[N](), true
eFunc := deltaExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewDeltaSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
incr, mono = nonMonoIncr[N](), false
eFunc = deltaExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewDeltaSum[N](mono), incr, eFunc))
})

t.Run("Cumulative", func(t *testing.T) {
incr, mono := monoIncr, true
incr, mono := monoIncr[N](), true
eFunc := cumuExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
incr, mono = nonMonoIncr[N](), false
eFunc = cumuExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc))
})

t.Run("PreComputedDelta", func(t *testing.T) {
incr, mono := monoIncr, true
incr, mono := monoIncr[N](), true
eFunc := preDeltaExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
incr, mono = nonMonoIncr[N](), false
eFunc = preDeltaExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc))
})

t.Run("PreComputedCumulative", func(t *testing.T) {
incr, mono := monoIncr, true
incr, mono := monoIncr[N](), true
eFunc := preCumuExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
incr, mono = nonMonoIncr[N](), false
eFunc = preCumuExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc))
})
}

func deltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
func deltaExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono}
return func(m int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point(a, N(v*m)))
sum.DataPoints = append(sum.DataPoints, point(a, v*N(m)))
}
return sum
}
}

func cumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
var cycle int
func cumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
var cycle N
sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono}
return func(m int) metricdata.Aggregation {
cycle++
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point(a, N(v*cycle*m)))
sum.DataPoints = append(sum.DataPoints, point(a, v*cycle*N(m)))
}
return sum
}
}

func preDeltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
func preDeltaExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono}
last := make(map[attribute.Set]N)
return func(int) metricdata.Aggregation {
Expand All @@ -117,7 +117,7 @@ func preDeltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
}
}

func preCumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
func preCumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono}
return func(int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
Expand Down
Loading