Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 72 additions & 22 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,42 +139,92 @@ func test[N int64 | float64](meas Measure[N], comp ComputeAggregation, steps []t
}
}

func testAggergationConcurrentSafe[N int64 | float64](
func getConcurrentVals[N int64 | float64]() []N {
// Keep length of v in sync with concurrentNumRecords
// and expectedConcurrentSum.
switch any(*new(N)).(type) {
case float64:
v := []float64{2.5, 6.1, 4.4, 10.0, 22.0, -3.5, -6.5, 3.0, -6.0}
return any(v).([]N)
default:
v := []int64{2, 6, 4, 10, 22, -3, -6, 3, -6}
return any(v).([]N)
}
}

const (
concurrentValsSum = 32
concurrentNumGoroutines = 10
concurrentNumRecords = 90 // Multiple of 9 (length of values sequences)
expectedConcurrentCount = uint64(concurrentNumGoroutines * concurrentNumRecords)
)

func expectedConcurrentSum[N int64 | float64]() N {
return N(int64(concurrentNumGoroutines) * int64(concurrentNumRecords/9) * concurrentValsSum)
}

// testAggregationConcurrentSafe provides a unified stress test for all generic aggregators
// by generating high contention, cardinality limit overflow, and validating exact results.
func testAggregationConcurrentSafe[N int64 | float64](
meas Measure[N],
comp ComputeAggregation,
validate func(t *testing.T, agg metricdata.Aggregation),
validate func(t *testing.T, aggs []metricdata.Aggregation),
) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

got := new(metricdata.Aggregation)
ctx := t.Context()
var wg sync.WaitGroup
for _, args := range []arg[N]{
{ctx, 2, alice},
{ctx, 6, alice},
{ctx, 4, alice},
{ctx, 10, alice},
{ctx, 22, alice},
{ctx, -3, bob},
{ctx, -6, bob},
{ctx, 3, bob},
{ctx, 6, bob},
} {
wg.Go(func() {
meas(args.ctx, args.value, args.attr)
})

// Use 10 different attribute sets to force overflow on the AggregationLimit
// which is typically set to 3.
attrs := make([]attribute.Set, concurrentNumGoroutines)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.String(keyUser, strconv.Itoa(i)))
}

vals := getConcurrentVals[N]()

wg.Add(concurrentNumGoroutines)
for i := range concurrentNumGoroutines {
go func(id int) {
defer wg.Done()
// Each goroutine records to a distinct attribute set
attr := attrs[id]

for j := range concurrentNumRecords {
meas(ctx, vals[j%len(vals)], attr)
}
}(i)
}

var results []metricdata.Aggregation

// Run computation concurrently with measurements to stress hot/cold swaps
wg.Go(func() {
for range 2 {
for range concurrentNumRecords {
got := new(metricdata.Aggregation)
comp(got)
// We do not check expected output for each step because
// computeAggregation is run concurrently with steps. Instead,
// we validate that the output is a valid possible output.
validate(t, *got)
results = append(results, *got)
}
})

wg.Wait()

// Final flush to get final values
got := new(metricdata.Aggregation)
comp(got)
results = append(results, *got)

validate(t, results)
}
}

func assertSumEqual[N int64 | float64](t *testing.T, expected, actual N) {
if _, ok := any(*new(N)).(float64); ok {
assert.InDelta(t, float64(expected), float64(actual), 0.0001)
} else {
assert.Equal(t, expected, actual)
}
}

Expand Down
93 changes: 59 additions & 34 deletions sdk/metric/internal/aggregate/exponential_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
Expand Down Expand Up @@ -1052,7 +1053,7 @@ func testDeltaExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
Filter: attrFltr,
AggregationLimit: 3,
}.ExponentialBucketHistogram(4, 20, false, false)
return testAggergationConcurrentSafe[N](in, out, validateExponentialHistogram[N])
return testAggregationConcurrentSafe[N](in, out, validateExponentialHistogram[N])
}

func testCumulativeExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
Expand All @@ -1061,45 +1062,69 @@ func testCumulativeExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T
Filter: attrFltr,
AggregationLimit: 3,
}.ExponentialBucketHistogram(4, 20, false, false)
return testAggergationConcurrentSafe[N](in, out, validateExponentialHistogram[N])
return testAggregationConcurrentSafe[N](in, out, validateExponentialHistogram[N])
}

func validateExponentialHistogram[N int64 | float64](t *testing.T, got metricdata.Aggregation) {
s, ok := got.(metricdata.ExponentialHistogram[N])
if !ok {
t.Fatalf("wrong aggregation type: %+v", got)
}
for _, dp := range s.DataPoints {
assert.False(t,
dp.Time.Before(dp.StartTime),
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
)
switch dp.Attributes {
case fltrAlice:
// alice observations are always a multiple of 2
assert.Equal(t, int64(0), int64(dp.Sum)%2)
case fltrBob:
// bob observations are always a multiple of 3
assert.Equal(t, int64(0), int64(dp.Sum)%3)
default:
t.Fatalf("wrong attributes %+v", dp.Attributes)
}
avg := float64(dp.Sum) / float64(dp.Count)
if minVal, ok := dp.Min.Value(); ok {
assert.GreaterOrEqual(t, avg, float64(minVal))
}
if maxVal, ok := dp.Max.Value(); ok {
assert.LessOrEqual(t, avg, float64(maxVal))
func validateExponentialHistogram[N int64 | float64](t *testing.T, aggs []metricdata.Aggregation) {
sums := make(map[attribute.Set]N)
counts := make(map[attribute.Set]uint64)
var isDelta bool
for i, agg := range aggs {
s, ok := agg.(metricdata.ExponentialHistogram[N])
require.True(t, ok)
if s.Temporality == metricdata.DeltaTemporality {
isDelta = true
}
var totalCount uint64
for _, bc := range dp.PositiveBucket.Counts {
totalCount += bc
require.LessOrEqual(t, len(s.DataPoints), 3, "AggregationLimit of 3 exceeded in a single cycle")
for _, dp := range s.DataPoints {
assert.False(t,
dp.Time.Before(dp.StartTime),
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
)

if s.Temporality == metricdata.DeltaTemporality {
sums[dp.Attributes] += dp.Sum
counts[dp.Attributes] += dp.Count
} else if i == len(aggs)-1 {
sums[dp.Attributes] = dp.Sum
counts[dp.Attributes] = dp.Count
}

var totalCount uint64
for _, bc := range dp.PositiveBucket.Counts {
totalCount += bc
}
for _, bc := range dp.NegativeBucket.Counts {
totalCount += bc
}
assert.Equal(t, totalCount, dp.Count)
}
for _, bc := range dp.NegativeBucket.Counts {
totalCount += bc
}

var totalSum N
var totalCount uint64
for attr, sum := range sums {
totalSum += sum
count := counts[attr]
totalCount += count

expectedSingleSum := expectedConcurrentSum[N]() / N(concurrentNumGoroutines)
expectedSingleCount := expectedConcurrentCount / uint64(concurrentNumGoroutines)

if !isDelta {
if attr == overflowSet {
// The overflow set contains all the goroutines that didn't make the limit of 3
assert.Equal(t, uint64(0), count%expectedSingleCount)
assert.Equal(t, count/expectedSingleCount*uint64(expectedSingleSum), uint64(sum))
} else {
// Individual attributes should have exactly one goroutine's worth of data
assert.Equal(t, expectedSingleSum, sum)
assert.Equal(t, expectedSingleCount, count)
}
}
assert.Equal(t, totalCount, dp.Count)
}
assertSumEqual[N](t, expectedConcurrentSum[N](), totalSum)
assert.Equal(t, expectedConcurrentCount, totalCount)
}

func FuzzGetBin(f *testing.F) {
Expand Down
114 changes: 76 additions & 38 deletions sdk/metric/internal/aggregate/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,57 +230,95 @@ func TestHistogramConcurrentSafe(t *testing.T) {
t.Run("Float64/Cumulative", testCumulativeHistConcurrentSafe[float64]())
}

func validateHistogram[N int64 | float64](t *testing.T, got metricdata.Aggregation) {
s, ok := got.(metricdata.Histogram[N])
if !ok {
t.Fatalf("wrong aggregation type: %+v", got)
}
for _, dp := range s.DataPoints {
assert.False(t,
dp.Time.Before(dp.StartTime),
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
)
switch dp.Attributes {
case fltrAlice:
// alice observations are always a multiple of 2
assert.Equal(t, int64(0), int64(dp.Sum)%2)
case fltrBob:
// bob observations are always a multiple of 3
assert.Equal(t, int64(0), int64(dp.Sum)%3)
default:
t.Fatalf("wrong attributes %+v", dp.Attributes)
}
avg := float64(dp.Sum) / float64(dp.Count)
if minVal, ok := dp.Min.Value(); ok {
assert.GreaterOrEqual(t, avg, float64(minVal))
func validateHistogram[N int64 | float64](t *testing.T, aggs []metricdata.Aggregation) {
sums := make(map[attribute.Set]N)
counts := make(map[attribute.Set]uint64)
bucketCounts := make(map[attribute.Set][]uint64)

for i, agg := range aggs {
s, ok := agg.(metricdata.Histogram[N])
require.True(t, ok)
require.LessOrEqual(t, len(s.DataPoints), 3, "AggregationLimit of 3 exceeded in a single cycle")
for _, dp := range s.DataPoints {
if s.Temporality == metricdata.DeltaTemporality {
sums[dp.Attributes] += dp.Sum
counts[dp.Attributes] += dp.Count
if bucketCounts[dp.Attributes] == nil {
bucketCounts[dp.Attributes] = make([]uint64, len(dp.BucketCounts))
}
for idx, c := range dp.BucketCounts {
bucketCounts[dp.Attributes][idx] += c
}
} else if i == len(aggs)-1 {
sums[dp.Attributes] = dp.Sum
counts[dp.Attributes] = dp.Count
bucketCounts[dp.Attributes] = make([]uint64, len(dp.BucketCounts))
copy(bucketCounts[dp.Attributes], dp.BucketCounts)
}
}
if maxVal, ok := dp.Max.Value(); ok {
assert.LessOrEqual(t, avg, float64(maxVal))
}
var totalCount uint64
for _, bc := range dp.BucketCounts {
totalCount += bc
}

var totalSum N
var totalCount uint64
totalBuckets := make([]uint64, 4)

for _, val := range sums {
totalSum += val
}
for _, val := range counts {
totalCount += val
}
for _, bc := range bucketCounts {
for idx, c := range bc {
if idx < len(totalBuckets) {
totalBuckets[idx] += c
}
}
assert.Equal(t, totalCount, dp.Count)
}

assertSumEqual[N](t, expectedConcurrentSum[N](), totalSum)
assert.Equal(t, expectedConcurrentCount, totalCount)

var expectedBuckets []uint64
switch any(*new(N)).(type) {
case float64:
// Float sequence: 2.5, 6.1, 4.4, 10.0, 22.0, -3.5, -6.5, 3.0, -6.0
// Bounds {0, 2, 4}:
// (-inf, 0]: -3.5, -6.5, -6.0 (3x)
// (0, 2]: none (0x)
// (2, 4]: 2.5, 3.0 (2x)
// (4, +inf): 6.1, 4.4, 10.0, 22.0 (4x)
// 10 full loops per goroutine * 10 goroutines = 100x
expectedBuckets = []uint64{300, 0, 200, 400}
default:
// Int sequence: 2, 6, 4, 10, 22, -3, -6, 3, -6
// Bounds {0, 2, 4}:
// (-inf, 0]: -3, -6, -6 (3x)
// (0, 2]: 2 (1x)
// (2, 4]: 4, 3 (2x)
// (4, +inf): 6, 10, 22 (3x)
// 10 full loops per goroutine * 10 goroutines = 100x
expectedBuckets = []uint64{300, 100, 200, 300}
}
assert.Equal(t, expectedBuckets, totalBuckets)
}

func testDeltaHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
func testCumulativeHistConcurrentSafe[N int64 | float64]() func(*testing.T) {
in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality,
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
AggregationLimit: 3,
}.ExplicitBucketHistogram(bounds, noMinMax, false)
return testAggergationConcurrentSafe[N](in, out, validateHistogram[N])
}.ExplicitBucketHistogram([]float64{0, 2, 4}, false, false)
return testAggregationConcurrentSafe[N](in, out, validateHistogram[N])
}

func testCumulativeHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
func testDeltaHistConcurrentSafe[N int64 | float64]() func(*testing.T) {
in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality,
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
AggregationLimit: 3,
}.ExplicitBucketHistogram(bounds, noMinMax, false)
return testAggergationConcurrentSafe[N](in, out, validateHistogram[N])
}.ExplicitBucketHistogram([]float64{0, 2, 4}, false, false)
return testAggregationConcurrentSafe[N](in, out, validateHistogram[N])
}

// hPointSummed returns an HistogramDataPoint that started and ended now with
Expand Down
Loading
Loading