diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 291a156ebfd..055f38a6266 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -139,42 +139,92 @@ func test[N int64 | float64](meas Measure[N], comp ComputeAggregation, steps []t } } -func testAggergationConcurrentSafe[N int64 | float64]( +func getConcurrentVals[N int64 | float64]() []N { + // Keep length of v in sync with concurrentNumRecords + // and expectedConcurrentSum. + switch any(*new(N)).(type) { + case float64: + v := []float64{2.5, 6.1, 4.4, 10.0, 22.0, -3.5, -6.5, 3.0, -6.0} + return any(v).([]N) + default: + v := []int64{2, 6, 4, 10, 22, -3, -6, 3, -6} + return any(v).([]N) + } +} + +const ( + concurrentValsSum = 32 + concurrentNumGoroutines = 10 + concurrentNumRecords = 90 // Multiple of 9 (length of values sequences) + expectedConcurrentCount = uint64(concurrentNumGoroutines * concurrentNumRecords) +) + +func expectedConcurrentSum[N int64 | float64]() N { + return N(int64(concurrentNumGoroutines) * int64(concurrentNumRecords/9) * concurrentValsSum) +} + +// testAggregationConcurrentSafe provides a unified stress test for all generic aggregators +// by generating high contention, cardinality limit overflow, and validating exact results. +func testAggregationConcurrentSafe[N int64 | float64]( meas Measure[N], comp ComputeAggregation, - validate func(t *testing.T, agg metricdata.Aggregation), + validate func(t *testing.T, aggs []metricdata.Aggregation), ) func(*testing.T) { return func(t *testing.T) { t.Helper() - got := new(metricdata.Aggregation) ctx := t.Context() var wg sync.WaitGroup - for _, args := range []arg[N]{ - {ctx, 2, alice}, - {ctx, 6, alice}, - {ctx, 4, alice}, - {ctx, 10, alice}, - {ctx, 22, alice}, - {ctx, -3, bob}, - {ctx, -6, bob}, - {ctx, 3, bob}, - {ctx, 6, bob}, - } { - wg.Go(func() { - meas(args.ctx, args.value, args.attr) - }) + + // Use 10 different attribute sets to force overflow on the AggregationLimit + // which is typically set to 3. + attrs := make([]attribute.Set, concurrentNumGoroutines) + for i := range attrs { + attrs[i] = attribute.NewSet(attribute.String(keyUser, strconv.Itoa(i))) } + + vals := getConcurrentVals[N]() + + wg.Add(concurrentNumGoroutines) + for i := range concurrentNumGoroutines { + go func(id int) { + defer wg.Done() + // Each goroutine records to a distinct attribute set + attr := attrs[id] + + for j := range concurrentNumRecords { + meas(ctx, vals[j%len(vals)], attr) + } + }(i) + } + + var results []metricdata.Aggregation + + // Run computation concurrently with measurements to stress hot/cold swaps wg.Go(func() { - for range 2 { + for range concurrentNumRecords { + got := new(metricdata.Aggregation) comp(got) - // We do not check expected output for each step because - // computeAggregation is run concurrently with steps. Instead, - // we validate that the output is a valid possible output. - validate(t, *got) + results = append(results, *got) } }) + wg.Wait() + + // Final flush to get final values + got := new(metricdata.Aggregation) + comp(got) + results = append(results, *got) + + validate(t, results) + } +} + +func assertSumEqual[N int64 | float64](t *testing.T, expected, actual N) { + if _, ok := any(*new(N)).(float64); ok { + assert.InDelta(t, float64(expected), float64(actual), 0.0001) + } else { + assert.Equal(t, expected, actual) } } diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index a4519eafbb4..0acc4ff3e06 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -1052,7 +1053,7 @@ func testDeltaExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T) { Filter: attrFltr, AggregationLimit: 3, }.ExponentialBucketHistogram(4, 20, false, false) - return testAggergationConcurrentSafe[N](in, out, validateExponentialHistogram[N]) + return testAggregationConcurrentSafe[N](in, out, validateExponentialHistogram[N]) } func testCumulativeExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T) { @@ -1061,45 +1062,69 @@ func testCumulativeExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T Filter: attrFltr, AggregationLimit: 3, }.ExponentialBucketHistogram(4, 20, false, false) - return testAggergationConcurrentSafe[N](in, out, validateExponentialHistogram[N]) + return testAggregationConcurrentSafe[N](in, out, validateExponentialHistogram[N]) } -func validateExponentialHistogram[N int64 | float64](t *testing.T, got metricdata.Aggregation) { - s, ok := got.(metricdata.ExponentialHistogram[N]) - if !ok { - t.Fatalf("wrong aggregation type: %+v", got) - } - for _, dp := range s.DataPoints { - assert.False(t, - dp.Time.Before(dp.StartTime), - "Timestamp %v must not be before start time %v", dp.Time, dp.StartTime, - ) - switch dp.Attributes { - case fltrAlice: - // alice observations are always a multiple of 2 - assert.Equal(t, int64(0), int64(dp.Sum)%2) - case fltrBob: - // bob observations are always a multiple of 3 - assert.Equal(t, int64(0), int64(dp.Sum)%3) - default: - t.Fatalf("wrong attributes %+v", dp.Attributes) - } - avg := float64(dp.Sum) / float64(dp.Count) - if minVal, ok := dp.Min.Value(); ok { - assert.GreaterOrEqual(t, avg, float64(minVal)) - } - if maxVal, ok := dp.Max.Value(); ok { - assert.LessOrEqual(t, avg, float64(maxVal)) +func validateExponentialHistogram[N int64 | float64](t *testing.T, aggs []metricdata.Aggregation) { + sums := make(map[attribute.Set]N) + counts := make(map[attribute.Set]uint64) + var isDelta bool + for i, agg := range aggs { + s, ok := agg.(metricdata.ExponentialHistogram[N]) + require.True(t, ok) + if s.Temporality == metricdata.DeltaTemporality { + isDelta = true } - var totalCount uint64 - for _, bc := range dp.PositiveBucket.Counts { - totalCount += bc + require.LessOrEqual(t, len(s.DataPoints), 3, "AggregationLimit of 3 exceeded in a single cycle") + for _, dp := range s.DataPoints { + assert.False(t, + dp.Time.Before(dp.StartTime), + "Timestamp %v must not be before start time %v", dp.Time, dp.StartTime, + ) + + if s.Temporality == metricdata.DeltaTemporality { + sums[dp.Attributes] += dp.Sum + counts[dp.Attributes] += dp.Count + } else if i == len(aggs)-1 { + sums[dp.Attributes] = dp.Sum + counts[dp.Attributes] = dp.Count + } + + var totalCount uint64 + for _, bc := range dp.PositiveBucket.Counts { + totalCount += bc + } + for _, bc := range dp.NegativeBucket.Counts { + totalCount += bc + } + assert.Equal(t, totalCount, dp.Count) } - for _, bc := range dp.NegativeBucket.Counts { - totalCount += bc + } + + var totalSum N + var totalCount uint64 + for attr, sum := range sums { + totalSum += sum + count := counts[attr] + totalCount += count + + expectedSingleSum := expectedConcurrentSum[N]() / N(concurrentNumGoroutines) + expectedSingleCount := expectedConcurrentCount / uint64(concurrentNumGoroutines) + + if !isDelta { + if attr == overflowSet { + // The overflow set contains all the goroutines that didn't make the limit of 3 + assert.Equal(t, uint64(0), count%expectedSingleCount) + assert.Equal(t, count/expectedSingleCount*uint64(expectedSingleSum), uint64(sum)) + } else { + // Individual attributes should have exactly one goroutine's worth of data + assert.Equal(t, expectedSingleSum, sum) + assert.Equal(t, expectedSingleCount, count) + } } - assert.Equal(t, totalCount, dp.Count) } + assertSumEqual[N](t, expectedConcurrentSum[N](), totalSum) + assert.Equal(t, expectedConcurrentCount, totalCount) } func FuzzGetBin(f *testing.F) { diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 0bfa8e9970e..2f1cfdf984a 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -230,57 +230,95 @@ func TestHistogramConcurrentSafe(t *testing.T) { t.Run("Float64/Cumulative", testCumulativeHistConcurrentSafe[float64]()) } -func validateHistogram[N int64 | float64](t *testing.T, got metricdata.Aggregation) { - s, ok := got.(metricdata.Histogram[N]) - if !ok { - t.Fatalf("wrong aggregation type: %+v", got) - } - for _, dp := range s.DataPoints { - assert.False(t, - dp.Time.Before(dp.StartTime), - "Timestamp %v must not be before start time %v", dp.Time, dp.StartTime, - ) - switch dp.Attributes { - case fltrAlice: - // alice observations are always a multiple of 2 - assert.Equal(t, int64(0), int64(dp.Sum)%2) - case fltrBob: - // bob observations are always a multiple of 3 - assert.Equal(t, int64(0), int64(dp.Sum)%3) - default: - t.Fatalf("wrong attributes %+v", dp.Attributes) - } - avg := float64(dp.Sum) / float64(dp.Count) - if minVal, ok := dp.Min.Value(); ok { - assert.GreaterOrEqual(t, avg, float64(minVal)) +func validateHistogram[N int64 | float64](t *testing.T, aggs []metricdata.Aggregation) { + sums := make(map[attribute.Set]N) + counts := make(map[attribute.Set]uint64) + bucketCounts := make(map[attribute.Set][]uint64) + + for i, agg := range aggs { + s, ok := agg.(metricdata.Histogram[N]) + require.True(t, ok) + require.LessOrEqual(t, len(s.DataPoints), 3, "AggregationLimit of 3 exceeded in a single cycle") + for _, dp := range s.DataPoints { + if s.Temporality == metricdata.DeltaTemporality { + sums[dp.Attributes] += dp.Sum + counts[dp.Attributes] += dp.Count + if bucketCounts[dp.Attributes] == nil { + bucketCounts[dp.Attributes] = make([]uint64, len(dp.BucketCounts)) + } + for idx, c := range dp.BucketCounts { + bucketCounts[dp.Attributes][idx] += c + } + } else if i == len(aggs)-1 { + sums[dp.Attributes] = dp.Sum + counts[dp.Attributes] = dp.Count + bucketCounts[dp.Attributes] = make([]uint64, len(dp.BucketCounts)) + copy(bucketCounts[dp.Attributes], dp.BucketCounts) + } } - if maxVal, ok := dp.Max.Value(); ok { - assert.LessOrEqual(t, avg, float64(maxVal)) - } - var totalCount uint64 - for _, bc := range dp.BucketCounts { - totalCount += bc + } + + var totalSum N + var totalCount uint64 + totalBuckets := make([]uint64, 4) + + for _, val := range sums { + totalSum += val + } + for _, val := range counts { + totalCount += val + } + for _, bc := range bucketCounts { + for idx, c := range bc { + if idx < len(totalBuckets) { + totalBuckets[idx] += c + } } - assert.Equal(t, totalCount, dp.Count) } + + assertSumEqual[N](t, expectedConcurrentSum[N](), totalSum) + assert.Equal(t, expectedConcurrentCount, totalCount) + + var expectedBuckets []uint64 + switch any(*new(N)).(type) { + case float64: + // Float sequence: 2.5, 6.1, 4.4, 10.0, 22.0, -3.5, -6.5, 3.0, -6.0 + // Bounds {0, 2, 4}: + // (-inf, 0]: -3.5, -6.5, -6.0 (3x) + // (0, 2]: none (0x) + // (2, 4]: 2.5, 3.0 (2x) + // (4, +inf): 6.1, 4.4, 10.0, 22.0 (4x) + // 10 full loops per goroutine * 10 goroutines = 100x + expectedBuckets = []uint64{300, 0, 200, 400} + default: + // Int sequence: 2, 6, 4, 10, 22, -3, -6, 3, -6 + // Bounds {0, 2, 4}: + // (-inf, 0]: -3, -6, -6 (3x) + // (0, 2]: 2 (1x) + // (2, 4]: 4, 3 (2x) + // (4, +inf): 6, 10, 22 (3x) + // 10 full loops per goroutine * 10 goroutines = 100x + expectedBuckets = []uint64{300, 100, 200, 300} + } + assert.Equal(t, expectedBuckets, totalBuckets) } -func testDeltaHistConcurrentSafe[N int64 | float64]() func(t *testing.T) { +func testCumulativeHistConcurrentSafe[N int64 | float64]() func(*testing.T) { in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, + Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, - }.ExplicitBucketHistogram(bounds, noMinMax, false) - return testAggergationConcurrentSafe[N](in, out, validateHistogram[N]) + }.ExplicitBucketHistogram([]float64{0, 2, 4}, false, false) + return testAggregationConcurrentSafe[N](in, out, validateHistogram[N]) } -func testCumulativeHistConcurrentSafe[N int64 | float64]() func(t *testing.T) { +func testDeltaHistConcurrentSafe[N int64 | float64]() func(*testing.T) { in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, + Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, - }.ExplicitBucketHistogram(bounds, noMinMax, false) - return testAggergationConcurrentSafe[N](in, out, validateHistogram[N]) + }.ExplicitBucketHistogram([]float64{0, 2, 4}, false, false) + return testAggregationConcurrentSafe[N](in, out, validateHistogram[N]) } // hPointSummed returns an HistogramDataPoint that started and ended now with diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index f7c2199f5b3..9bc2851b5d1 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -481,25 +482,24 @@ func TestLastValueConcurrentSafe(t *testing.T) { t.Run("Float64/CumulativePrecomputedLastValue", testCumulativePrecomputedLastValueConcurrentSafe[float64]()) } -func validateGauge[N int64 | float64](t *testing.T, got metricdata.Aggregation) { - s, ok := got.(metricdata.Gauge[N]) - if !ok { - t.Fatalf("wrong aggregation type: %+v", got) +func validateGauge[N int64 | float64](t *testing.T, aggs []metricdata.Aggregation) { + // A gauge takes the *last* recorded value. + // During high concurrency, reading the Gauge can snap any value in the + // iteration cycle of the corresponding Goroutines. + valid := make(map[N]bool) + for _, v := range getConcurrentVals[N]() { + valid[v] = true } - for _, dp := range s.DataPoints { - assert.False(t, - dp.Time.Before(dp.StartTime), - "Timestamp %v must not be before start time %v", dp.Time, dp.StartTime, - ) - switch dp.Attributes { - case fltrAlice: - // alice observations are always a multiple of 2 - assert.Equal(t, int64(0), int64(dp.Value)%2) - case fltrBob: - // bob observations are always a multiple of 3 - assert.Equal(t, int64(0), int64(dp.Value)%3) - default: - t.Fatalf("wrong attributes %+v", dp.Attributes) + // TODO(dashpole): Fix a concurrency bug where a gauge can be collected with + // the value zero even when no zero-value measurements have been recorded. + valid[0] = true + + for _, agg := range aggs { + s, ok := agg.(metricdata.Gauge[N]) + require.True(t, ok) + require.LessOrEqual(t, len(s.DataPoints), 3, "AggregationLimit of 3 exceeded") + for _, dp := range s.DataPoints { + assert.True(t, valid[dp.Value], "Unexpected gauge value: %v", dp.Value) } } } @@ -510,7 +510,7 @@ func testCumulativeLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) Filter: attrFltr, AggregationLimit: 3, }.LastValue() - return testAggergationConcurrentSafe[N](in, out, validateGauge[N]) + return testAggregationConcurrentSafe[N](in, out, validateGauge[N]) } func testDeltaLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) { @@ -519,7 +519,7 @@ func testDeltaLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) { Filter: attrFltr, AggregationLimit: 3, }.LastValue() - return testAggergationConcurrentSafe[N](in, out, validateGauge[N]) + return testAggregationConcurrentSafe[N](in, out, validateGauge[N]) } func testDeltaPrecomputedLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) { @@ -528,7 +528,7 @@ func testDeltaPrecomputedLastValueConcurrentSafe[N int64 | float64]() func(*test Filter: attrFltr, AggregationLimit: 3, }.PrecomputedLastValue() - return testAggergationConcurrentSafe[N](in, out, validateGauge[N]) + return testAggregationConcurrentSafe[N](in, out, validateGauge[N]) } func testCumulativePrecomputedLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) { @@ -537,7 +537,7 @@ func testCumulativePrecomputedLastValueConcurrentSafe[N int64 | float64]() func( Filter: attrFltr, AggregationLimit: 3, }.PrecomputedLastValue() - return testAggergationConcurrentSafe[N](in, out, validateGauge[N]) + return testAggregationConcurrentSafe[N](in, out, validateGauge[N]) } func BenchmarkLastValue(b *testing.B) { diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index 1b6c896f804..b2d0cc98f10 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -7,8 +7,9 @@ import ( "context" "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -551,67 +552,74 @@ func TestSumConcurrentSafe(t *testing.T) { t.Run("Float64/CumulativePrecomputedSum", testCumulativePrecomputedSumConcurrentSafe[float64]()) } -func validateSum[N int64 | float64](t *testing.T, got metricdata.Aggregation) { - s, ok := got.(metricdata.Sum[N]) - if !ok { - t.Fatalf("wrong aggregation type: %+v", got) - } - for _, dp := range s.DataPoints { - assert.False(t, - dp.Time.Before(dp.StartTime), - "Timestamp %v must not be before start time %v", dp.Time, dp.StartTime, - ) - switch dp.Attributes { - case fltrAlice: - // alice observations are always a multiple of 2 - assert.Equal(t, int64(0), int64(dp.Value)%2) - case fltrBob: - // bob observations are always a multiple of 3 - assert.Equal(t, int64(0), int64(dp.Value)%3) - default: - t.Fatalf("wrong attributes %+v", dp.Attributes) +//nolint:revive // isPrecomputed is used for configuring validation +func validateSum[N int64 | float64](isPrecomputed bool) func(t *testing.T, aggs []metricdata.Aggregation) { + return func(t *testing.T, aggs []metricdata.Aggregation) { + sums := make(map[attribute.Set]N) + for i, agg := range aggs { + s, ok := agg.(metricdata.Sum[N]) + require.True(t, ok) + require.LessOrEqual(t, len(s.DataPoints), 3, "AggregationLimit of 3 exceeded in a single cycle") + for _, dp := range s.DataPoints { + if s.Temporality == metricdata.DeltaTemporality { + sums[dp.Attributes] += dp.Value + } else if i == len(aggs)-1 { + sums[dp.Attributes] = dp.Value + } + } + } + + if isPrecomputed { + // Precomputed Sums clear the state when collected concurrently. Due to hot/cold overlap + // during flush, the sum drops intermediate updates, so the final calculation won't cleanly + // add up to the total number of operations performed by the workers. Therefore, skip exact + // invariant check, verifying only that limits and map updates occurred safely. + return } + + var total N + for _, val := range sums { + total += val + } + + assertSumEqual[N](t, expectedConcurrentSum[N](), total) } } func testDeltaSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { - mono := false in, out := Builder[N]{ Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, - }.Sum(mono) - return testAggergationConcurrentSafe[N](in, out, validateSum[N]) + }.Sum(false) + return testAggregationConcurrentSafe[N](in, out, validateSum[N](false)) } func testCumulativeSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { - mono := false in, out := Builder[N]{ Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, - }.Sum(mono) - return testAggergationConcurrentSafe[N](in, out, validateSum[N]) + }.Sum(false) + return testAggregationConcurrentSafe[N](in, out, validateSum[N](false)) } func testDeltaPrecomputedSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { - mono := false in, out := Builder[N]{ Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, - }.PrecomputedSum(mono) - return testAggergationConcurrentSafe[N](in, out, validateSum[N]) + }.PrecomputedSum(false) + return testAggregationConcurrentSafe[N](in, out, validateSum[N](true)) } func testCumulativePrecomputedSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { - mono := false in, out := Builder[N]{ Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, - }.PrecomputedSum(mono) - return testAggergationConcurrentSafe[N](in, out, validateSum[N]) + }.PrecomputedSum(false) + return testAggregationConcurrentSafe[N](in, out, validateSum[N](true)) } func BenchmarkSum(b *testing.B) {