Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `DefaultWithContext` and `EnvironmentWithContext` in `go.opentelemetry.io/otel/sdk/resource` to support plumbing `context.Context` through default and environment detectors. (#8051)
- Support attributes with empty value (`attribute.EMPTY`) in OTLP exporters (`go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`, `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`, `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`, `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`, `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`, `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`). (#8038)
- Support attributes with empty value (`attribute.EMPTY`) in `go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest`. (#8038)
- Add support for per-series start time tracking for cumulative metrics in `go.opentelemetry.io/otel/sdk/metric`.
Set `OTEL_GO_X_PER_SERIES_START_TIMESTAMPS=true` to enable. (#8060)

### Changed

Expand Down
15 changes: 15 additions & 0 deletions sdk/internal/x/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,18 @@ var Observability = newFeature(
return "", false
},
)

// PerSeriesStartTimestamps is an experimental feature flag that determines if the SDK
// uses the new Start Timestamps specification.
//
// To enable this feature set the OTEL_GO_X_PER_SERIES_START_TIMESTAMPS environment variable
// to the case-insensitive string value of "true".
var PerSeriesStartTimestamps = newFeature(
[]string{"PER_SERIES_START_TIMESTAMPS"},
func(v string) (bool, bool) {
if strings.EqualFold(v, "true") {
return true, true
}
return false, false
},
)
11 changes: 11 additions & 0 deletions sdk/internal/x/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,14 @@ func TestResource(t *testing.T) {
t.Run("false", run(setenv(key, "false"), assertDisabled(Resource)))
t.Run("empty", run(assertDisabled(Resource)))
}

func TestPerSeriesStartTimestamps(t *testing.T) {
const key = "OTEL_GO_X_PER_SERIES_START_TIMESTAMPS"
require.Contains(t, PerSeriesStartTimestamps.Keys(), key)

t.Run("100", run(setenv(key, "100"), assertDisabled(PerSeriesStartTimestamps)))
t.Run("true", run(setenv(key, "true"), assertEnabled(PerSeriesStartTimestamps, true)))
t.Run("True", run(setenv(key, "True"), assertEnabled(PerSeriesStartTimestamps, true)))
t.Run("false", run(setenv(key, "false"), assertDisabled(PerSeriesStartTimestamps)))
t.Run("empty", run(assertDisabled(PerSeriesStartTimestamps)))
}
20 changes: 15 additions & 5 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/internal/x"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -40,6 +41,7 @@ type expoHistogramDataPoint[N int64 | float64] struct {
posBuckets expoBuckets
negBuckets expoBuckets
zeroCount atomic.Uint64
startTime time.Time
}

func newExpoHistogramDataPoint[N int64 | float64](
Expand All @@ -49,10 +51,11 @@ func newExpoHistogramDataPoint[N int64 | float64](
noMinMax, noSum bool,
) *expoHistogramDataPoint[N] { // nolint:revive // we need this control flag
dp := &expoHistogramDataPoint[N]{
attrs: attrs,
maxSize: maxSize,
noMinMax: noMinMax,
noSum: noSum,
attrs: attrs,
maxSize: maxSize,
noMinMax: noMinMax,
noSum: noSum,
startTime: now(),
}
dp.scale.Store(maxScale)
return dp
Expand Down Expand Up @@ -440,10 +443,17 @@ func (e *expoHistogram[N]) cumulative(
n := len(e.values)
hDPts := reset(h.DataPoints, n, n)

perSeriesStartTimeEnabled := x.PerSeriesStartTimestamps.Enabled()

var i int
for _, val := range e.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = e.start

startTime := e.start
if perSeriesStartTimeEnabled {
startTime = val.startTime
}
hDPts[i].StartTime = startTime
hDPts[i].Time = t
hDPts[i].Count = val.count()
hDPts[i].Scale = val.scale.Load()
Expand Down
69 changes: 52 additions & 17 deletions sdk/metric/internal/aggregate/exponential_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/internal/x"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -638,6 +639,8 @@ func TestSubNormal(t *testing.T) {
ehdp.record(math.SmallestNonzeroFloat64)
ehdp.record(math.SmallestNonzeroFloat64)

want.startTime = ehdp.startTime

assert.Equal(t, want, ehdp)
}

Expand All @@ -651,10 +654,33 @@ func TestExponentialHistogramAggregation(t *testing.T) {
t.Run("Float64/Delta", testDeltaExpoHist[float64]())
c.Reset()

t.Run("Int64/Cumulative", testCumulativeExpoHist[int64]())
t.Run("Int64/Cumulative", func(t *testing.T) {
t.Setenv("OTEL_GO_X_PER_SERIES_START_TIMESTAMPS", "false")
assert.False(t, x.PerSeriesStartTimestamps.Enabled())
testCumulativeExpoHist[int64]()(t)
})
c.Reset()

t.Run("Int64/Cumulative/PerSeriesStartTimeEnabled", func(t *testing.T) {
t.Setenv("OTEL_GO_X_PER_SERIES_START_TIMESTAMPS", "true")
assert.True(t, x.PerSeriesStartTimestamps.Enabled())
testCumulativeExpoHist[int64]()(t)
})
c.Reset()

t.Run("Float64/Cumulative", func(t *testing.T) {
t.Setenv("OTEL_GO_X_PER_SERIES_START_TIMESTAMPS", "false")
assert.False(t, x.PerSeriesStartTimestamps.Enabled())
testCumulativeExpoHist[float64]()(t)
})
c.Reset()

t.Run("Float64/Cumulative", testCumulativeExpoHist[float64]())
t.Run("Float64/Cumulative/PerSeriesStartTimeEnabled", func(t *testing.T) {
t.Setenv("OTEL_GO_X_PER_SERIES_START_TIMESTAMPS", "true")
assert.True(t, x.PerSeriesStartTimestamps.Enabled())
testCumulativeExpoHist[float64]()(t)
})
c.Reset()
}

func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
Expand Down Expand Up @@ -693,7 +719,7 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(1),
Time: y2kPlus(2),
Time: y2kPlus(3),
Count: 7,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand Down Expand Up @@ -747,8 +773,8 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(3),
Time: y2kPlus(4),
StartTime: y2kPlus(4),
Time: y2kPlus(7),
Count: 7,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand All @@ -765,8 +791,8 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
},
{
Attributes: overflowSet,
StartTime: y2kPlus(3),
Time: y2kPlus(4),
StartTime: y2kPlus(4),
Time: y2kPlus(7),
Count: 6,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](16),
Expand All @@ -790,6 +816,15 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
Filter: attrFltr,
AggregationLimit: 2,
}.ExponentialBucketHistogram(4, 20, false, false)

aliceStartTime := y2kPlus(0)
overflowStartTime := y2kPlus(0)

if x.PerSeriesStartTimestamps.Enabled() {
aliceStartTime = y2kPlus(2)
overflowStartTime = y2kPlus(6)
}

ctx := context.Background()
return test[N](in, out, []teststep[N]{
{
Expand Down Expand Up @@ -819,8 +854,8 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(2),
StartTime: aliceStartTime,
Time: y2kPlus(3),
Count: 7,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand Down Expand Up @@ -852,8 +887,8 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(3),
StartTime: aliceStartTime,
Time: y2kPlus(4),
Count: 10,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand Down Expand Up @@ -881,8 +916,8 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(4),
StartTime: aliceStartTime,
Time: y2kPlus(5),
Count: 10,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand Down Expand Up @@ -918,8 +953,8 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(5),
StartTime: aliceStartTime,
Time: y2kPlus(7),
Count: 10,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand All @@ -936,8 +971,8 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
},
{
Attributes: overflowSet,
StartTime: y2kPlus(0),
Time: y2kPlus(5),
StartTime: overflowStartTime,
Time: y2kPlus(7),
Count: 6,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](16),
Expand Down
16 changes: 13 additions & 3 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/internal/x"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -27,8 +28,9 @@ type hotColdHistogramPoint[N int64 | float64] struct {
hcwg hotColdWaitGroup
hotColdPoint [2]histogramPointCounters[N]

attrs attribute.Set
res FilteredExemplarReservoir[N]
attrs attribute.Set
res FilteredExemplarReservoir[N]
startTime time.Time
}

// histogramPointCounters contains only the atomic counter data, and is used by
Expand Down Expand Up @@ -298,6 +300,7 @@ func (s *cumulativeHistogram[N]) measure(
counts: make([]atomic.Uint64, len(s.bounds)+1),
},
},
startTime: now(),
}
return hPt
}).(*hotColdHistogramPoint[N])
Expand Down Expand Up @@ -339,16 +342,23 @@ func (s *cumulativeHistogram[N]) collect(
// current length for capacity.
hDPts := reset(h.DataPoints, 0, s.values.Len())

perSeriesStartTimeEnabled := x.PerSeriesStartTimestamps.Enabled()

var i int
s.values.Range(func(_, value any) bool {
val := value.(*hotColdHistogramPoint[N])

startTime := s.start
if perSeriesStartTimeEnabled {
startTime = val.startTime
}
// swap, observe, and clear the point
readIdx := val.hcwg.swapHotAndWait()
var bucketCounts []uint64
count := val.hotColdPoint[readIdx].loadCountsInto(&bucketCounts)
newPt := metricdata.HistogramDataPoint[N]{
Attributes: val.attrs,
StartTime: s.start,
StartTime: startTime,
Time: t,
Count: count,
Bounds: bounds,
Expand Down
Loading
Loading