From 764b9f269bc195b8b19c5ed400979d222aa5c57f Mon Sep 17 00:00:00 2001 From: highlyavailable Date: Fri, 30 May 2025 06:47:30 -0500 Subject: [PATCH 1/4] feat(prometheus): validate exponential histogram scale range Fixes #6779 Add scale validation for Prometheus exponential histograms to ensure compatibility with Prometheus native histogram format. Changes: - Validate scale is within Prometheus supported range [-4, 8] - Reject scales below -4 (log error and skip data point) - Downscale histograms with scale > 8 by re-aggregating buckets - Add comprehensive test coverage for scale validation and downscaling - Implement downscaling logic based on OpenTelemetry Collector Contrib The downscaling implementation merges buckets using bit-shifting to maintain accuracy while conforming to Prometheus limitations. --- CHANGELOG.md | 4 + exporters/prometheus/exporter.go | 103 +++- exporters/prometheus/exporter_test.go | 733 ++++++++++++++++++++++++++ 3 files changed, 835 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a31d79d3f57..dfced2e162a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The semantic conventions have been upgraded from `v1.26.0` to `v1.34.0` in `go.opentelemetry.io/otel/sdk/trace`. (#6835) - The semantic conventions have been upgraded from `v1.26.0` to `v1.34.0` in `go.opentelemetry.io/otel/trace`. (#6836) +### Fixed + +- Validate exponential histogram scale range for Prometheus compatibility in `go.opentelemetry.io/otel/exporters/prometheus`. (#6822) + diff --git a/exporters/prometheus/exporter.go b/exporters/prometheus/exporter.go index c2b0aa0d4ad..80dd3cba1dc 100644 --- a/exporters/prometheus/exporter.go +++ b/exporters/prometheus/exporter.go @@ -244,6 +244,79 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { } } +// downscaleExponentialBucket re-aggregates bucket counts when downscaling to a coarser resolution. +func downscaleExponentialBucket(bucket metricdata.ExponentialBucket, scaleDelta int32) metricdata.ExponentialBucket { + if len(bucket.Counts) == 0 || scaleDelta < 1 { + return metricdata.ExponentialBucket{ + Offset: bucket.Offset >> scaleDelta, + Counts: append([]uint64(nil), bucket.Counts...), // copy slice + } + } + + // The new offset is scaled down + newOffset := bucket.Offset >> scaleDelta + + // Pre-calculate the new bucket count to avoid growing slice + // Each group of 2^scaleDelta buckets will merge into one bucket + //nolint:gosec // Length is bounded by slice allocation + lastBucketIdx := bucket.Offset + int32(len(bucket.Counts)) - 1 + lastNewIdx := lastBucketIdx >> scaleDelta + newBucketCount := int(lastNewIdx - newOffset + 1) + + if newBucketCount <= 0 { + return metricdata.ExponentialBucket{ + Offset: newOffset, + Counts: []uint64{}, + } + } + + newCounts := make([]uint64, newBucketCount) + + // Merge buckets according to the scale difference + for i, count := range bucket.Counts { + if count == 0 { + continue + } + + // Calculate which new bucket this count belongs to + //nolint:gosec // Index is bounded by loop iteration + originalIdx := bucket.Offset + int32(i) + newIdx := originalIdx >> scaleDelta + + // Calculate the position in the new counts array + position := newIdx - newOffset + //nolint:gosec // Length is bounded by allocation + if position >= 0 && position < int32(len(newCounts)) { + newCounts[position] += count + } + } + + // Trim leading and trailing zeros to minimize memory usage + firstNonZero := 0 + for firstNonZero < len(newCounts) && newCounts[firstNonZero] == 0 { + firstNonZero++ + } + + if firstNonZero == len(newCounts) { + // All zeros + return metricdata.ExponentialBucket{ + Offset: newOffset, + Counts: []uint64{}, + } + } + + lastNonZero := len(newCounts) - 1 + for lastNonZero > firstNonZero && newCounts[lastNonZero] == 0 { + lastNonZero-- + } + + return metricdata.ExponentialBucket{ + //nolint:gosec // firstNonZero is bounded by newCounts length + Offset: newOffset + int32(firstNonZero), + Counts: newCounts[firstNonZero : lastNonZero+1], + } +} + func addExponentialHistogramMetric[N int64 | float64]( ch chan<- prometheus.Metric, histogram metricdata.ExponentialHistogram[N], @@ -258,23 +331,43 @@ func addExponentialHistogramMetric[N int64 | float64]( desc := prometheus.NewDesc(name, m.Description, keys, nil) + // Prometheus native histograms support scales in the range [-4, 8] + scale := dp.Scale + if scale < -4 { + // Reject scales below -4 as they cannot be represented in Prometheus + otel.Handle(fmt.Errorf( + "exponential histogram scale %d is below minimum supported scale -4, skipping data point", + scale)) + continue + } + + // If scale > 8, we need to downscale the buckets to match the clamped scale + positiveBucket := dp.PositiveBucket + negativeBucket := dp.NegativeBucket + if scale > 8 { + scaleDelta := scale - 8 + positiveBucket = downscaleExponentialBucket(dp.PositiveBucket, scaleDelta) + negativeBucket = downscaleExponentialBucket(dp.NegativeBucket, scaleDelta) + scale = 8 + } + // From spec: note that Prometheus Native Histograms buckets are indexed by upper boundary while Exponential Histograms are indexed by lower boundary, the result being that the Offset fields are different-by-one. positiveBuckets := make(map[int]int64) - for i, c := range dp.PositiveBucket.Counts { + for i, c := range positiveBucket.Counts { if c > math.MaxInt64 { otel.Handle(fmt.Errorf("positive count %d is too large to be represented as int64", c)) continue } - positiveBuckets[int(dp.PositiveBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above. + positiveBuckets[int(positiveBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above. } negativeBuckets := make(map[int]int64) - for i, c := range dp.NegativeBucket.Counts { + for i, c := range negativeBucket.Counts { if c > math.MaxInt64 { otel.Handle(fmt.Errorf("negative count %d is too large to be represented as int64", c)) continue } - negativeBuckets[int(dp.NegativeBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above. + negativeBuckets[int(negativeBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above. } m, err := prometheus.NewConstNativeHistogram( @@ -284,7 +377,7 @@ func addExponentialHistogramMetric[N int64 | float64]( positiveBuckets, negativeBuckets, dp.ZeroCount, - dp.Scale, + scale, dp.ZeroThreshold, dp.StartTime, values...) diff --git a/exporters/prometheus/exporter_test.go b/exporters/prometheus/exporter_test.go index 6b606e58f0f..223697719d1 100644 --- a/exporters/prometheus/exporter_test.go +++ b/exporters/prometheus/exporter_test.go @@ -6,9 +6,12 @@ package prometheus import ( "context" "errors" + "math" "os" + "strings" "sync" "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -21,6 +24,7 @@ import ( "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.34.0" "go.opentelemetry.io/otel/trace" @@ -1134,3 +1138,732 @@ func TestExemplars(t *testing.T) { }) } } + +func TestExponentialHistogramScaleValidation(t *testing.T) { + ctx := context.Background() + + t.Run("normal_exponential_histogram_works", func(t *testing.T) { + registry := prometheus.NewRegistry() + exporter, err := New(WithRegisterer(registry), WithoutTargetInfo(), WithoutScopeInfo()) + require.NoError(t, err) + + provider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithResource(resource.Default()), + ) + defer func() { + err := provider.Shutdown(ctx) + require.NoError(t, err) + }() + + // Create a histogram with a valid scale + meter := provider.Meter("test") + hist, err := meter.Float64Histogram( + "test_exponential_histogram", + otelmetric.WithDescription("test histogram"), + ) + require.NoError(t, err) + hist.Record(ctx, 1.0) + hist.Record(ctx, 10.0) + hist.Record(ctx, 100.0) + + metricFamilies, err := registry.Gather() + require.NoError(t, err) + assert.NotEmpty(t, metricFamilies) + }) + + t.Run("error_handling_for_invalid_scales", func(t *testing.T) { + var capturedError error + originalHandler := otel.GetErrorHandler() + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + capturedError = err + })) + defer otel.SetErrorHandler(originalHandler) + + now := time.Now() + invalidScaleData := metricdata.ExponentialHistogramDataPoint[float64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: 1, + Sum: 10.0, + Scale: -5, // Invalid scale below -4 + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{1}, + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{}, + }, + } + + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + histogram := metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{invalidScaleData}, + } + + m := metricdata.Metrics{ + Name: "test_histogram", + Description: "test", + } + + addExponentialHistogramMetric(ch, histogram, m, "test_histogram", keyVals{}) + assert.Error(t, capturedError) + assert.Contains(t, capturedError.Error(), "scale -5 is below minimum") + select { + case <-ch: + t.Error("Expected no metrics to be produced for invalid scale") + default: + // No metrics were produced for the invalid scale + } + }) +} + +func TestDownscaleExponentialBucket(t *testing.T) { + tests := []struct { + name string + bucket metricdata.ExponentialBucket + scaleDelta int32 + want metricdata.ExponentialBucket + }{ + { + name: "Empty bucket", + bucket: metricdata.ExponentialBucket{}, + scaleDelta: 3, + want: metricdata.ExponentialBucket{}, + }, + { + name: "1 size bucket", + bucket: metricdata.ExponentialBucket{ + Offset: 50, + Counts: []uint64{7}, + }, + scaleDelta: 4, + want: metricdata.ExponentialBucket{ + Offset: 3, + Counts: []uint64{7}, + }, + }, + { + name: "zero scale delta", + bucket: metricdata.ExponentialBucket{ + Offset: 50, + Counts: []uint64{7, 5}, + }, + scaleDelta: 0, + want: metricdata.ExponentialBucket{ + Offset: 50, + Counts: []uint64{7, 5}, + }, + }, + { + name: "aligned bucket scale 1", + bucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 2, 3, 4, 5, 6}, + }, + scaleDelta: 1, + want: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{3, 7, 11}, + }, + }, + { + name: "aligned bucket scale 2", + bucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 2, 3, 4, 5, 6}, + }, + scaleDelta: 2, + want: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{10, 11}, + }, + }, + { + name: "unaligned bucket scale 1", + bucket: metricdata.ExponentialBucket{ + Offset: 5, + Counts: []uint64{1, 2, 3, 4, 5, 6}, + }, // This is equivalent to [0,0,0,0,0,1,2,3,4,5,6] + scaleDelta: 1, + want: metricdata.ExponentialBucket{ + Offset: 2, + Counts: []uint64{1, 5, 9, 6}, + }, // This is equivalent to [0,0,1,5,9,6] + }, + { + name: "negative startBin", + bucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 0, 3}, + }, + scaleDelta: 1, + want: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 3}, + }, + }, + { + name: "negative startBin 2", + bucket: metricdata.ExponentialBucket{ + Offset: -4, + Counts: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }, + scaleDelta: 1, + want: metricdata.ExponentialBucket{ + Offset: -2, + Counts: []uint64{3, 7, 11, 15, 19}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := downscaleExponentialBucket(tt.bucket, tt.scaleDelta) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestExponentialHistogramHighScaleDownscaling(t *testing.T) { + t.Run("scale_10_downscales_to_8", func(t *testing.T) { + // Test that scale 10 gets properly downscaled to 8 with correct bucket re-aggregation + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + now := time.Now() + + // Create an exponential histogram data point with scale 10 + dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: 8, + Sum: 55.0, + Scale: 10, // This should be downscaled to 8 + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 1, 1, 1, 1, 1, 1, 1}, // 8 buckets with 1 count each + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{}, + }, + } + + histogram := metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint}, + } + + m := metricdata.Metrics{ + Name: "test_high_scale_histogram", + Description: "test histogram with high scale", + } + + // This should not produce any errors and should properly downscale buckets + addExponentialHistogramMetric(ch, histogram, m, "test_high_scale_histogram", keyVals{}) + + // Verify a metric was produced + select { + case metric := <-ch: + // Check that the metric was created successfully + require.NotNil(t, metric) + + // The scale should have been clamped to 8, and buckets should be re-aggregated + // With scale 10 -> 8, we have a scaleDelta of 2, meaning 2^2 = 4 buckets merge into 1 + // Original: 8 buckets with 1 count each at scale 10 + // After downscaling: 2 buckets with 4 counts each at scale 8 + default: + t.Error("Expected a metric to be produced") + } + }) + + t.Run("scale_12_downscales_to_8", func(t *testing.T) { + // Test that scale 12 gets properly downscaled to 8 with correct bucket re-aggregation + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + now := time.Now() + + // Create an exponential histogram data point with scale 12 + dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: 16, + Sum: 120.0, + Scale: 12, // This should be downscaled to 8 + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, // 16 buckets with 1 count each + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{}, + }, + } + + histogram := metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint}, + } + + m := metricdata.Metrics{ + Name: "test_very_high_scale_histogram", + Description: "test histogram with very high scale", + } + + // This should not produce any errors and should properly downscale buckets + addExponentialHistogramMetric(ch, histogram, m, "test_very_high_scale_histogram", keyVals{}) + + // Verify a metric was produced + select { + case metric := <-ch: + // Check that the metric was created successfully + require.NotNil(t, metric) + + // The scale should have been clamped to 8, and buckets should be re-aggregated + // With scale 12 -> 8, we have a scaleDelta of 4, meaning 2^4 = 16 buckets merge into 1 + // Original: 16 buckets with 1 count each at scale 12 + // After downscaling: 1 bucket with 16 counts at scale 8 + default: + t.Error("Expected a metric to be produced") + } + }) + + t.Run("exponential_histogram_with_negative_buckets", func(t *testing.T) { + // Test that exponential histograms with negative buckets are handled correctly + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + now := time.Now() + + // Create an exponential histogram data point with both positive and negative buckets + dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: 6, + Sum: 25.0, + Scale: 2, + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{1, 2}, // 2 positive buckets + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{2, 1}, // 2 negative buckets + }, + } + + histogram := metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint}, + } + + m := metricdata.Metrics{ + Name: "test_histogram_with_negative_buckets", + Description: "test histogram with negative buckets", + } + + // This should handle negative buckets correctly + addExponentialHistogramMetric(ch, histogram, m, "test_histogram_with_negative_buckets", keyVals{}) + + // Verify a metric was produced + select { + case metric := <-ch: + require.NotNil(t, metric) + default: + t.Error("Expected a metric to be produced") + } + }) + + t.Run("exponential_histogram_int64_type", func(t *testing.T) { + // Test that int64 exponential histograms are handled correctly + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + now := time.Now() + + // Create an exponential histogram data point with int64 type + dataPoint := metricdata.ExponentialHistogramDataPoint[int64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: 4, + Sum: 20, + Scale: 3, + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 1, 1, 1}, // 4 buckets with 1 count each + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{}, + }, + } + + histogram := metricdata.ExponentialHistogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[int64]{dataPoint}, + } + + m := metricdata.Metrics{ + Name: "test_int64_exponential_histogram", + Description: "test int64 exponential histogram", + } + + // This should handle int64 exponential histograms correctly + addExponentialHistogramMetric(ch, histogram, m, "test_int64_exponential_histogram", keyVals{}) + + // Verify a metric was produced + select { + case metric := <-ch: + require.NotNil(t, metric) + default: + t.Error("Expected a metric to be produced") + } + }) +} + +func TestDownscaleExponentialBucketEdgeCases(t *testing.T) { + t.Run("min_idx_larger_than_current", func(t *testing.T) { + // Test case where we find a minIdx that's smaller than the current + bucket := metricdata.ExponentialBucket{ + Offset: 10, // Start at offset 10 + Counts: []uint64{1, 0, 0, 0, 1}, + } + + // Scale delta of 3 will cause downscaling: original indices 10->1, 14->1 + result := downscaleExponentialBucket(bucket, 3) + + // Both original buckets 10 and 14 should map to the same downscaled bucket at index 1 + expected := metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{2}, // Both counts combined + } + + assert.Equal(t, expected, result) + }) + + t.Run("empty_downscaled_counts", func(t *testing.T) { + // Create a scenario that results in empty downscaled counts + bucket := metricdata.ExponentialBucket{ + Offset: math.MaxInt32 - 5, // Very large offset that won't cause overflow in this case + Counts: []uint64{1, 1, 1, 1, 1}, + } + + // This should work normally and downscale the buckets + result := downscaleExponentialBucket(bucket, 1) + + // Should return bucket with downscaled values + expected := metricdata.ExponentialBucket{ + Offset: 1073741821, // ((MaxInt32-5) + 0) >> 1 = 1073741821 + Counts: []uint64{2, 2, 1}, // Buckets get combined during downscaling + } + + assert.Equal(t, expected, result) + }) +} + +func TestGaugeMetrics(t *testing.T) { + t.Run("int64_gauge", func(t *testing.T) { + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + gauge := metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("key", "value")), + Value: 42, + }, + }, + } + + m := metricdata.Metrics{ + Name: "test_int64_gauge", + Description: "test int64 gauge", + } + + addGaugeMetric(ch, gauge, m, "test_int64_gauge", keyVals{}) + + // Verify a metric was produced + select { + case metric := <-ch: + require.NotNil(t, metric) + default: + t.Error("Expected a metric to be produced") + } + }) + + t.Run("float64_gauge", func(t *testing.T) { + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + gauge := metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: attribute.NewSet(attribute.String("key", "value")), + Value: 42.5, + }, + }, + } + + m := metricdata.Metrics{ + Name: "test_float64_gauge", + Description: "test float64 gauge", + } + + addGaugeMetric(ch, gauge, m, "test_float64_gauge", keyVals{}) + + // Verify a metric was produced + select { + case metric := <-ch: + require.NotNil(t, metric) + default: + t.Error("Expected a metric to be produced") + } + }) +} + +func TestMarshalLog(t *testing.T) { + exporter, err := New() + require.NoError(t, err) + + result := exporter.MarshalLog() + require.NotNil(t, result) + + // Check the type and content - should contain Type field + t.Logf("Result type: %T, value: %+v", result, result) + + // Should be a struct with Type field equal to "Prometheus exporter" + switch v := result.(type) { + case struct{ Type string }: + assert.Equal(t, "Prometheus exporter", v.Type) + case struct { + Type string + Registered bool + Shutdown bool + }: + assert.Equal(t, "Prometheus exporter", v.Type) + default: + t.Fatalf("Unexpected type: %T", result) + } +} + +// Note: TestNewWithRegistererError is difficult to reproduce in tests +// due to how prometheus registerer works, so skipping this coverage line + +func TestCollectErrorHandling(t *testing.T) { + // Test error handling paths in Collect method + exporter, err := New() + require.NoError(t, err) + + // Shutdown the reader to trigger ErrReaderShutdown + err = exporter.Shutdown(context.Background()) + require.NoError(t, err) + + // Create a collector and call Collect - should handle ErrReaderShutdown + collector := &collector{ + reader: exporter.Reader, + metricFamilies: make(map[string]*dto.MetricFamily), + } + + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + // This should return without error due to ErrReaderShutdown handling + collector.Collect(ch) +} + +func TestExponentialHistogramLargeCounts(t *testing.T) { + t.Run("large_positive_count", func(t *testing.T) { + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + now := time.Now() + + // Create a data point with a bucket count larger than math.MaxInt64 + largeCount := uint64(math.MaxInt64) + 1 + dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: largeCount, // Total count matches bucket count + Sum: 25.0, + Scale: 2, + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{largeCount}, // This will be > math.MaxInt64 + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{}, + }, + } + + histogram := metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint}, + } + + m := metricdata.Metrics{ + Name: "test_large_positive_count", + Description: "test histogram with large positive count", + } + + // Capture any errors + var capturedError error + originalHandler := otel.GetErrorHandler() + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + capturedError = err + })) + defer otel.SetErrorHandler(originalHandler) + + addExponentialHistogramMetric(ch, histogram, m, "test_large_positive_count", keyVals{}) + + // Should have captured an error about the large count + assert.Error(t, capturedError) + // The error could be from our check or from Prometheus validation + errMsg := capturedError.Error() + assert.True(t, + strings.Contains(errMsg, "is too large to be represented as int64") || + strings.Contains(errMsg, "the sum of all bucket populations exceeds the count of observations"), + "Expected error about large count or bucket population, got: %v", capturedError) + }) + + t.Run("large_negative_count", func(t *testing.T) { + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + now := time.Now() + + // Create a data point with a bucket count larger than math.MaxInt64 in negative bucket + largeCount := uint64(math.MaxInt64) + 1 + dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: largeCount, // Total count matches bucket count + Sum: 25.0, + Scale: 2, + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{}, + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{largeCount}, // This will be > math.MaxInt64 + }, + } + + histogram := metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint}, + } + + m := metricdata.Metrics{ + Name: "test_large_negative_count", + Description: "test histogram with large negative count", + } + + // Capture any errors + var capturedError error + originalHandler := otel.GetErrorHandler() + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + capturedError = err + })) + defer otel.SetErrorHandler(originalHandler) + + addExponentialHistogramMetric(ch, histogram, m, "test_large_negative_count", keyVals{}) + + // Should have captured an error about the large count + assert.Error(t, capturedError) + // The error could be from our check or from Prometheus validation + errMsg := capturedError.Error() + assert.True(t, + strings.Contains(errMsg, "is too large to be represented as int64") || + strings.Contains(errMsg, "the sum of all bucket populations exceeds the count of observations"), + "Expected error about large count or bucket population, got: %v", capturedError) + }) +} + +func TestDownscaleExponentialBucketOverflow(t *testing.T) { + t.Run("large_index_handling", func(t *testing.T) { + // Test handling of indices near MaxInt32 + // We can't actually create a slice larger than MaxInt on 32-bit systems, + // but we can test the bounds checking logic with a smaller slice + bucket := metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 2, 3, 4, 5}, // Small slice for testing + } + + // The downscaleExponentialBucket function should handle any size input correctly + result := downscaleExponentialBucket(bucket, 1) + + // Should process all elements correctly + assert.NotEmpty(t, result.Counts) + // With scale delta of 1, pairs of buckets get merged + assert.Equal(t, []uint64{3, 7, 5}, result.Counts) + }) + + t.Run("positive_offset_overflow", func(t *testing.T) { + // Test when bucket.Offset > 0 && idx > math.MaxInt32-bucket.Offset + bucket := metricdata.ExponentialBucket{ + Offset: math.MaxInt32 - 2, + Counts: []uint64{1, 1, 1, 1, 1}, // indices that will cause overflow + } + + // This should trigger overflow protection for positive offset + result := downscaleExponentialBucket(bucket, 1) + assert.NotNil(t, result) + }) + + t.Run("negative_offset_underflow", func(t *testing.T) { + // Test when bucket.Offset < 0 && idx < math.MinInt32-bucket.Offset + bucket := metricdata.ExponentialBucket{ + Offset: math.MinInt32 + 2, + Counts: []uint64{1, 1, 1, 1, 1}, // indices that will cause underflow + } + + // This should trigger underflow protection for negative offset + result := downscaleExponentialBucket(bucket, 1) + assert.NotNil(t, result) + }) + + t.Run("max_offset_scenario", func(t *testing.T) { + // Test with maximum offset value + bucket := metricdata.ExponentialBucket{ + Offset: math.MaxInt32, + Counts: []uint64{1}, // Single count at max offset + } + + // This should work correctly even with max offset + result := downscaleExponentialBucket(bucket, 1) + // The offset gets downscaled: MaxInt32 >> 1 = 1073741823 + // The single count at index 0 maps to downscaled index 1073741823 + assert.Equal(t, metricdata.ExponentialBucket{ + Offset: math.MaxInt32 >> 1, + Counts: []uint64{1}, + }, result) + }) +} From f6be0b5cc7a852aa6d2a25050f95276f086873ad Mon Sep 17 00:00:00 2001 From: highlyavailable Date: Fri, 30 May 2025 10:42:23 -0500 Subject: [PATCH 2/4] Based on @dashpole 's feedback Removed unnecessary logic for trimming leading and trailing zeros in the downscaleExponentialBucket function. --- exporters/prometheus/exporter.go | 24 ++---------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/exporters/prometheus/exporter.go b/exporters/prometheus/exporter.go index 80dd3cba1dc..c80f8e1551b 100644 --- a/exporters/prometheus/exporter.go +++ b/exporters/prometheus/exporter.go @@ -291,29 +291,9 @@ func downscaleExponentialBucket(bucket metricdata.ExponentialBucket, scaleDelta } } - // Trim leading and trailing zeros to minimize memory usage - firstNonZero := 0 - for firstNonZero < len(newCounts) && newCounts[firstNonZero] == 0 { - firstNonZero++ - } - - if firstNonZero == len(newCounts) { - // All zeros - return metricdata.ExponentialBucket{ - Offset: newOffset, - Counts: []uint64{}, - } - } - - lastNonZero := len(newCounts) - 1 - for lastNonZero > firstNonZero && newCounts[lastNonZero] == 0 { - lastNonZero-- - } - return metricdata.ExponentialBucket{ - //nolint:gosec // firstNonZero is bounded by newCounts length - Offset: newOffset + int32(firstNonZero), - Counts: newCounts[firstNonZero : lastNonZero+1], + Offset: newOffset, + Counts: newCounts, } } From 55e6ccc8efaf4caf6c93e9c803119b8b4db7c323 Mon Sep 17 00:00:00 2001 From: highlyavailable Date: Fri, 13 Jun 2025 13:25:54 -0500 Subject: [PATCH 3/4] Remove unused gauge metric tests (mistakingly added from a different PR) --- exporters/prometheus/exporter_test.go | 288 -------------------------- 1 file changed, 288 deletions(-) diff --git a/exporters/prometheus/exporter_test.go b/exporters/prometheus/exporter_test.go index 223697719d1..bc729ba8a89 100644 --- a/exporters/prometheus/exporter_test.go +++ b/exporters/prometheus/exporter_test.go @@ -8,7 +8,6 @@ import ( "errors" "math" "os" - "strings" "sync" "testing" "time" @@ -1580,290 +1579,3 @@ func TestDownscaleExponentialBucketEdgeCases(t *testing.T) { assert.Equal(t, expected, result) }) } - -func TestGaugeMetrics(t *testing.T) { - t.Run("int64_gauge", func(t *testing.T) { - ch := make(chan prometheus.Metric, 10) - defer close(ch) - - gauge := metricdata.Gauge[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(attribute.String("key", "value")), - Value: 42, - }, - }, - } - - m := metricdata.Metrics{ - Name: "test_int64_gauge", - Description: "test int64 gauge", - } - - addGaugeMetric(ch, gauge, m, "test_int64_gauge", keyVals{}) - - // Verify a metric was produced - select { - case metric := <-ch: - require.NotNil(t, metric) - default: - t.Error("Expected a metric to be produced") - } - }) - - t.Run("float64_gauge", func(t *testing.T) { - ch := make(chan prometheus.Metric, 10) - defer close(ch) - - gauge := metricdata.Gauge[float64]{ - DataPoints: []metricdata.DataPoint[float64]{ - { - Attributes: attribute.NewSet(attribute.String("key", "value")), - Value: 42.5, - }, - }, - } - - m := metricdata.Metrics{ - Name: "test_float64_gauge", - Description: "test float64 gauge", - } - - addGaugeMetric(ch, gauge, m, "test_float64_gauge", keyVals{}) - - // Verify a metric was produced - select { - case metric := <-ch: - require.NotNil(t, metric) - default: - t.Error("Expected a metric to be produced") - } - }) -} - -func TestMarshalLog(t *testing.T) { - exporter, err := New() - require.NoError(t, err) - - result := exporter.MarshalLog() - require.NotNil(t, result) - - // Check the type and content - should contain Type field - t.Logf("Result type: %T, value: %+v", result, result) - - // Should be a struct with Type field equal to "Prometheus exporter" - switch v := result.(type) { - case struct{ Type string }: - assert.Equal(t, "Prometheus exporter", v.Type) - case struct { - Type string - Registered bool - Shutdown bool - }: - assert.Equal(t, "Prometheus exporter", v.Type) - default: - t.Fatalf("Unexpected type: %T", result) - } -} - -// Note: TestNewWithRegistererError is difficult to reproduce in tests -// due to how prometheus registerer works, so skipping this coverage line - -func TestCollectErrorHandling(t *testing.T) { - // Test error handling paths in Collect method - exporter, err := New() - require.NoError(t, err) - - // Shutdown the reader to trigger ErrReaderShutdown - err = exporter.Shutdown(context.Background()) - require.NoError(t, err) - - // Create a collector and call Collect - should handle ErrReaderShutdown - collector := &collector{ - reader: exporter.Reader, - metricFamilies: make(map[string]*dto.MetricFamily), - } - - ch := make(chan prometheus.Metric, 10) - defer close(ch) - - // This should return without error due to ErrReaderShutdown handling - collector.Collect(ch) -} - -func TestExponentialHistogramLargeCounts(t *testing.T) { - t.Run("large_positive_count", func(t *testing.T) { - ch := make(chan prometheus.Metric, 10) - defer close(ch) - - now := time.Now() - - // Create a data point with a bucket count larger than math.MaxInt64 - largeCount := uint64(math.MaxInt64) + 1 - dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{ - Attributes: attribute.NewSet(), - StartTime: now, - Time: now, - Count: largeCount, // Total count matches bucket count - Sum: 25.0, - Scale: 2, - ZeroCount: 0, - ZeroThreshold: 0.0, - PositiveBucket: metricdata.ExponentialBucket{ - Offset: 1, - Counts: []uint64{largeCount}, // This will be > math.MaxInt64 - }, - NegativeBucket: metricdata.ExponentialBucket{ - Offset: 0, - Counts: []uint64{}, - }, - } - - histogram := metricdata.ExponentialHistogram[float64]{ - Temporality: metricdata.CumulativeTemporality, - DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint}, - } - - m := metricdata.Metrics{ - Name: "test_large_positive_count", - Description: "test histogram with large positive count", - } - - // Capture any errors - var capturedError error - originalHandler := otel.GetErrorHandler() - otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { - capturedError = err - })) - defer otel.SetErrorHandler(originalHandler) - - addExponentialHistogramMetric(ch, histogram, m, "test_large_positive_count", keyVals{}) - - // Should have captured an error about the large count - assert.Error(t, capturedError) - // The error could be from our check or from Prometheus validation - errMsg := capturedError.Error() - assert.True(t, - strings.Contains(errMsg, "is too large to be represented as int64") || - strings.Contains(errMsg, "the sum of all bucket populations exceeds the count of observations"), - "Expected error about large count or bucket population, got: %v", capturedError) - }) - - t.Run("large_negative_count", func(t *testing.T) { - ch := make(chan prometheus.Metric, 10) - defer close(ch) - - now := time.Now() - - // Create a data point with a bucket count larger than math.MaxInt64 in negative bucket - largeCount := uint64(math.MaxInt64) + 1 - dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{ - Attributes: attribute.NewSet(), - StartTime: now, - Time: now, - Count: largeCount, // Total count matches bucket count - Sum: 25.0, - Scale: 2, - ZeroCount: 0, - ZeroThreshold: 0.0, - PositiveBucket: metricdata.ExponentialBucket{ - Offset: 0, - Counts: []uint64{}, - }, - NegativeBucket: metricdata.ExponentialBucket{ - Offset: 1, - Counts: []uint64{largeCount}, // This will be > math.MaxInt64 - }, - } - - histogram := metricdata.ExponentialHistogram[float64]{ - Temporality: metricdata.CumulativeTemporality, - DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint}, - } - - m := metricdata.Metrics{ - Name: "test_large_negative_count", - Description: "test histogram with large negative count", - } - - // Capture any errors - var capturedError error - originalHandler := otel.GetErrorHandler() - otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { - capturedError = err - })) - defer otel.SetErrorHandler(originalHandler) - - addExponentialHistogramMetric(ch, histogram, m, "test_large_negative_count", keyVals{}) - - // Should have captured an error about the large count - assert.Error(t, capturedError) - // The error could be from our check or from Prometheus validation - errMsg := capturedError.Error() - assert.True(t, - strings.Contains(errMsg, "is too large to be represented as int64") || - strings.Contains(errMsg, "the sum of all bucket populations exceeds the count of observations"), - "Expected error about large count or bucket population, got: %v", capturedError) - }) -} - -func TestDownscaleExponentialBucketOverflow(t *testing.T) { - t.Run("large_index_handling", func(t *testing.T) { - // Test handling of indices near MaxInt32 - // We can't actually create a slice larger than MaxInt on 32-bit systems, - // but we can test the bounds checking logic with a smaller slice - bucket := metricdata.ExponentialBucket{ - Offset: 0, - Counts: []uint64{1, 2, 3, 4, 5}, // Small slice for testing - } - - // The downscaleExponentialBucket function should handle any size input correctly - result := downscaleExponentialBucket(bucket, 1) - - // Should process all elements correctly - assert.NotEmpty(t, result.Counts) - // With scale delta of 1, pairs of buckets get merged - assert.Equal(t, []uint64{3, 7, 5}, result.Counts) - }) - - t.Run("positive_offset_overflow", func(t *testing.T) { - // Test when bucket.Offset > 0 && idx > math.MaxInt32-bucket.Offset - bucket := metricdata.ExponentialBucket{ - Offset: math.MaxInt32 - 2, - Counts: []uint64{1, 1, 1, 1, 1}, // indices that will cause overflow - } - - // This should trigger overflow protection for positive offset - result := downscaleExponentialBucket(bucket, 1) - assert.NotNil(t, result) - }) - - t.Run("negative_offset_underflow", func(t *testing.T) { - // Test when bucket.Offset < 0 && idx < math.MinInt32-bucket.Offset - bucket := metricdata.ExponentialBucket{ - Offset: math.MinInt32 + 2, - Counts: []uint64{1, 1, 1, 1, 1}, // indices that will cause underflow - } - - // This should trigger underflow protection for negative offset - result := downscaleExponentialBucket(bucket, 1) - assert.NotNil(t, result) - }) - - t.Run("max_offset_scenario", func(t *testing.T) { - // Test with maximum offset value - bucket := metricdata.ExponentialBucket{ - Offset: math.MaxInt32, - Counts: []uint64{1}, // Single count at max offset - } - - // This should work correctly even with max offset - result := downscaleExponentialBucket(bucket, 1) - // The offset gets downscaled: MaxInt32 >> 1 = 1073741823 - // The single count at index 0 maps to downscaled index 1073741823 - assert.Equal(t, metricdata.ExponentialBucket{ - Offset: math.MaxInt32 >> 1, - Counts: []uint64{1}, - }, result) - }) -} From 41518cb235a5b975896daa03df652ac9bcdd79e4 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 13 Jun 2025 14:54:59 -0400 Subject: [PATCH 4/4] Update CHANGELOG.md --- CHANGELOG.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d31a53ba32b..febf6a41c2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,9 +42,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#6710) - Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#6710) - Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#6710) - -### Fixed - - Validate exponential histogram scale range for Prometheus compatibility in `go.opentelemetry.io/otel/exporters/prometheus`. (#6822)