Skip to content

Commit

Permalink
[connector/spanmetricsv2] Scale spans based on adjusted count (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar authored Sep 13, 2024
1 parent 0ac3227 commit cf1086b
Show file tree
Hide file tree
Showing 15 changed files with 108 additions and 41 deletions.
27 changes: 26 additions & 1 deletion connector/spanmetricsconnectorv2/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/elastic/opentelemetry-collector-components/connector/spanmetricsconnectorv2/internal/aggregator"
"github.com/elastic/opentelemetry-collector-components/connector/spanmetricsconnectorv2/internal/model"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -64,8 +65,9 @@ func (sm *spanMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro
duration = time.Duration(endTime - startTime)
}
spanAttrs := span.Attributes()
adjustedCount := calculateAdjustedCount(span.TraceState().AsRaw())
for _, md := range sm.metricDefs {
multiError = errors.Join(multiError, aggregator.Add(md, spanAttrs, duration))
multiError = errors.Join(multiError, aggregator.Add(md, spanAttrs, duration, adjustedCount))
}
}
}
Expand All @@ -88,3 +90,26 @@ func (sm *spanMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro
}
return sm.next.ConsumeMetrics(ctx, processedMetrics)
}

// calculateAdjustedCount calculates the adjusted count which represents
// the number of spans in the population that are represented by the
// individually sampled span. If the span is not-sampled OR if a non-
// probability sampler is used then adjusted count defaults to 1.
// https://github.com/open-telemetry/oteps/blob/main/text/trace/0235-sampling-threshold-in-trace-state.md
func calculateAdjustedCount(tracestate string) uint64 {
w3cTraceState, err := sampling.NewW3CTraceState(tracestate)
if err != nil {
return 1
}
otTraceState := w3cTraceState.OTelValue()
if otTraceState == nil {
return 1
}
if len(otTraceState.TValue()) == 0 {
// For non-probabilistic sampler OR always sampling threshold, default to 1
return 1
}
// TODO (lahsivjar): Handle fractional adjusted count. One way to do this
// would be to scale the values in the histograms for some precision.
return uint64(otTraceState.AdjustedCount())
}
18 changes: 18 additions & 0 deletions connector/spanmetricsconnectorv2/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ func TestConnector(t *testing.T) {
}
}

func TestCalculateAdjustedCount(t *testing.T) {
for _, tc := range []struct {
tracestate string
expected uint64
}{
{"", 1},
{"invalid=p:8;th:8", 1},
{"ot=404:8", 1},
{"ot=th:0", 1}, // 100% sampling
{"ot=th:8", 2}, // 50% sampling
{"ot=th:c", 4}, // 25% sampling
} {
t.Run("tracestate/"+tc.tracestate, func(t *testing.T) {
assert.Equal(t, tc.expected, calculateAdjustedCount(tc.tracestate))
})
}
}

func BenchmarkConnector(b *testing.B) {
factory := NewFactory()
settings := connectortest.NewNopSettings()
Expand Down
1 change: 1 addition & 0 deletions connector/spanmetricsconnectorv2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.109.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.109.0
go.opentelemetry.io/collector/confmap v1.15.0
Expand Down
2 changes: 2 additions & 0 deletions connector/spanmetricsconnectorv2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.109.0/go.mod h1:KvJWxR0bDk9Qh0ktw4gOFsd/ZrJ7p5KTAQueEJsaK9Q=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0 h1:3kXFgdEEKw37ftdRC7SmXAiZuLahVavqOYRhlJVMLc8=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0/go.mod h1:HtaWI5WJKJkBhHz2R7Xb2n7R3fdBPhfKieYcQajNCTo=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.109.0 h1:YQB8+grNfmaLqiavbv4VKhBw1NF8O6pSmbLC+FjMrKM=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.109.0/go.mod h1:XOuilD83ZQWc0Te2B7+X0dRm9Z19M4h480UXTPO41Xc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.2 h1:5ctymQzZlyOON1666svgwn3s6IKWgfbjsejTMiXIyjg=
Expand Down
21 changes: 15 additions & 6 deletions connector/spanmetricsconnectorv2/internal/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,21 @@ func NewAggregator() *Aggregator {
}
}

// Add adds a span duration into the configured metrics.
// Add adds a span duration into the configured metrics. It also takes
// `adjustedCount` parameter to denote the total number of spans in the
// population that are represented by an individually sampled span.
// The adjusted count is is calculated as per:
// https://opentelemetry.io/docs/specs/otel/trace/tracestate-probability-sampling/#adjusted-count
func (a *Aggregator) Add(
md model.MetricDef,
srcAttrs pcommon.Map,
spanDuration time.Duration,
adjustedCount uint64,
) error {
if adjustedCount == 0 {
// Nothing to do as the span represents `0` spans
return nil
}
filteredAttrs := pcommon.NewMap()
for _, definedAttr := range md.Attributes {
if srcAttr, ok := srcAttrs.Get(definedAttr.Key); ok {
Expand Down Expand Up @@ -88,7 +97,7 @@ func (a *Aggregator) Add(
a.datapoints[md.Key][attrKey] = newAggregatorDP(md, filteredAttrs)
}
value := float64(spanDuration.Nanoseconds()) / metricUnitToDivider[md.Unit]
a.datapoints[md.Key][attrKey].Add(value)
a.datapoints[md.Key][attrKey].Add(value, adjustedCount)
return nil
}

Expand Down Expand Up @@ -179,15 +188,15 @@ func newAggregatorDP(
return &dp
}

func (dp *aggregatorDP) Add(value float64) {
func (dp *aggregatorDP) Add(value float64, count uint64) {
if dp.expHistogramDP != nil {
dp.expHistogramDP.Add(value)
dp.expHistogramDP.Add(value, count)
}
if dp.explicitHistogramDP != nil {
dp.explicitHistogramDP.Add(value)
dp.explicitHistogramDP.Add(value, count)
}
if dp.summaryDP != nil {
dp.summaryDP.Add(value)
dp.summaryDP.Add(value, count)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestExplicitBounds(t *testing.T) {
for _, span := range tc.input {
duration := time.Duration(span.EndTimestamp() - span.StartTimestamp())
for _, md := range tc.metricDefs {
require.NoError(t, agg.Add(md, span.Attributes(), duration))
require.NoError(t, agg.Add(md, span.Attributes(), duration, 1))
}
}
for _, md := range tc.metricDefs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func newExponentialHistogramDP(attrs pcommon.Map, maxSize int32) *exponentialHis
}
}

func (dp *exponentialHistogramDP) Add(value float64) {
dp.data.Update(value)
func (dp *exponentialHistogramDP) Add(value float64, count uint64) {
dp.data.UpdateByIncr(value, count)
}

func (dp *exponentialHistogramDP) Copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func newExplicitHistogramDP(attrs pcommon.Map, bounds []float64) *explicitHistog
}
}

func (dp *explicitHistogramDP) Add(value float64) {
dp.sum += value
dp.count++
dp.counts[sort.SearchFloat64s(dp.bounds, value)]++
func (dp *explicitHistogramDP) Add(value float64, count uint64) {
dp.sum += value * float64(count)
dp.count += count
dp.counts[sort.SearchFloat64s(dp.bounds, value)] += count
}

func (dp *explicitHistogramDP) Copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func newSummaryDP(attrs pcommon.Map) *summaryDP {
}
}

func (dp *summaryDP) Add(value float64) {
dp.sum += value
dp.count++
func (dp *summaryDP) Add(value float64, count uint64) {
dp.sum += value * float64(count)
dp.count += count
}

func (dp *summaryDP) Copy(
Expand Down
12 changes: 12 additions & 0 deletions connector/spanmetricsconnectorv2/testdata/traces.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,15 @@ resourceSpans:
name: msg-span-2
parentSpanId: "bcff497b5a47310f"
startTimeUnixNano: "1581452772000000321"
- attributes:
- key: db.name
value:
stringValue: main
- key: db.system
value:
stringValue: mysql
endTimeUnixNano: "1581452772500000804"
name: th-value-8 # represents 2 sampled spans
parentSpanId: ""
startTimeUnixNano: "1581452772000000381"
traceState: "ot=th:8"
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ resourceMetrics:
- key: db.system
value:
stringValue: mysql
count: "2"
count: "4"
max: 1000.000468
min: 500.000468
min: 500.000423
negative: {}
positive:
bucketCounts:
- "1"
- "3"
- "0"
- "0"
- "0"
Expand Down Expand Up @@ -336,7 +336,7 @@ resourceMetrics:
- "1"
offset: 1147
scale: 7
sum: 1500.000936
sum: 2500.001782
timeUnixNano: "1000000"
name: db.trace.span.duration
- description: Span duration for DB spans
Expand All @@ -357,15 +357,15 @@ resourceMetrics:
- "0"
- "0"
- "0"
- "1"
- "3"
- "0"
- "1"
- "0"
- "0"
- "0"
- "0"
- "0"
count: "2"
count: "4"
explicitBounds:
- 2
- 4
Expand All @@ -383,7 +383,7 @@ resourceMetrics:
- 5000
- 10000
- 15000
sum: 1500.000936
sum: 2500.001782
timeUnixNano: "1000000"
name: db.trace.span.duration
- description: Span duration for messaging spans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ resourceMetrics:
exponentialHistogram:
aggregationTemporality: 1
dataPoints:
- count: "6"
- count: "8"
max: 17.000000468
min: 0.002000468
negative: {}
positive:
bucketCounts:
- "3"
- "5"
- "3"
offset: -1
scale: -4
sum: 30.402002808
sum: 31.402003653999998
timeUnixNano: "1000000"
name: trace.span.duration
- description: Span duration with custom histogram buckets
Expand All @@ -33,16 +33,16 @@ resourceMetrics:
- bucketCounts:
- "0"
- "1"
- "2"
- "4"
- "1"
- "2"
count: "6"
count: "8"
explicitBounds:
- 0.001
- 0.1
- 1
- 10
sum: 30.402002808
sum: 31.402003653999998
timeUnixNano: "1000000"
name: trace.span.duration
scope:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ resourceMetrics:
exponentialHistogram:
aggregationTemporality: 1
dataPoints:
- count: "6"
- count: "8"
max: 17000.000468
min: 2.000468
negative: {}
Expand Down Expand Up @@ -82,7 +82,7 @@ resourceMetrics:
- "0"
- "0"
- "0"
- "1"
- "3"
- "0"
- "0"
- "0"
Expand Down Expand Up @@ -126,7 +126,7 @@ resourceMetrics:
- "1"
offset: 8
scale: 3
sum: 30402.002807999997
sum: 31402.003653999996
timeUnixNano: "1000000"
name: trace.span.duration
scope:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ resourceMetrics:
- key: 404.attribute
value:
stringValue: undefined
count: "6"
count: "8"
max: 17000.000468
min: 2.000468
negative: {}
Expand Down Expand Up @@ -86,7 +86,7 @@ resourceMetrics:
- "0"
- "0"
- "0"
- "1"
- "3"
- "0"
- "0"
- "0"
Expand Down Expand Up @@ -130,7 +130,7 @@ resourceMetrics:
- "1"
offset: 8
scale: 3
sum: 30402.002807999997
sum: 31402.003653999996
timeUnixNano: "1000000"
name: 404.span.duration
- description: Span duration with missing attribute but default value
Expand All @@ -151,15 +151,15 @@ resourceMetrics:
- "0"
- "0"
- "0"
- "1"
- "3"
- "1"
- "1"
- "0"
- "0"
- "0"
- "1"
- "1"
count: "6"
count: "8"
explicitBounds:
- 2
- 4
Expand All @@ -177,7 +177,7 @@ resourceMetrics:
- 5000
- 10000
- 15000
sum: 30402.002807999997
sum: 31402.003653999996
timeUnixNano: "1000000"
name: 404.span.duration
scope:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ resourceMetrics:
- key: db.system
value:
stringValue: mysql
count: "2"
sum: 1500.000936
count: "4"
sum: 2500.001782
timeUnixNano: "1000000"
- description: Summary for messaging spans
name: msg.trace.span.summary
Expand Down

0 comments on commit cf1086b

Please sign in to comment.