Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/spanmetricsv2] Scale spans based on adjusted count #95

Merged
merged 9 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion connector/spanmetricsconnectorv2/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package spanmetricsconnectorv2 // import "github.com/elastic/opentelemetry-colle
import (
"context"
"errors"
"math"
"strconv"
"time"

"github.com/elastic/opentelemetry-collector-components/connector/spanmetricsconnectorv2/internal/aggregator"
"github.com/elastic/opentelemetry-collector-components/connector/spanmetricsconnectorv2/internal/model"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -64,8 +67,9 @@ func (sm *spanMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro
duration = time.Duration(endTime - startTime)
}
spanAttrs := span.Attributes()
adjustedCount := calculateAdjustedCount(span.TraceState().AsRaw())
for _, md := range sm.metricDefs {
multiError = errors.Join(multiError, aggregator.Add(md, spanAttrs, duration))
multiError = errors.Join(multiError, aggregator.Add(md, spanAttrs, duration, adjustedCount))
}
}
}
Expand All @@ -88,3 +92,58 @@ func (sm *spanMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro
}
return sm.next.ConsumeMetrics(ctx, processedMetrics)
}

// calculateAdjustedCount calculates the adjusted count which represents
// the number of spans in the population that are represented by the
// individually sampled span. If the span is not-sampled OR if a non-
// probability sampler is used then adjusted count defaults to 1.
// https://opentelemetry.io/docs/specs/otel/trace/tracestate-probability-sampling/#adjusted-count
func calculateAdjustedCount(tracestate string) uint64 {
w3cTraceState, err := sampling.NewW3CTraceState(tracestate)
if err != nil {
return 1
}
otTraceState := w3cTraceState.OTelValue()
if otTraceState == nil {
return 1
}
// For probability sampler, calculate the adjusted count based on t-value (`th`).
if len(otTraceState.TValue()) != 0 {
// TODO (lahsivjar): Should we handle fractional adjusted count?
lahsivjar marked this conversation as resolved.
Show resolved Hide resolved
// One way to do this would be to scale the values in the histograms
// for some precision.
return uint64(otTraceState.AdjustedCount())
}
// For consistent probablity sampler, calculate the adjusted count based on
// p-value, negative base-2 logarithm of sampling probability
var p uint64
for _, kv := range otTraceState.ExtraValues() {
if kv.Key == "p" {
// If p-value is present then calculate the adjusted count as per
// the consistent probability sampling specs.
if kv.Value != "" {
// p-value is represented as unsigned decimal integers
// requiring at most 6 bits of information. We parse to
// 7 bits as 63 holds a special meaning and thus needs
// to be distinguished w.r.t. other invalid >63 values.
p, _ = strconv.ParseUint(kv.Value, 10, 7)
}
break
}
}
switch {
case p == 0:
return 1
case p == 63:
// p-value == 63 represents zero adjusted count
return 0
case p > 63:
// Invalid value, default to 1
return 1
default:
// TODO (lahsivjar): Should we handle fractional adjusted count?
// One way to do this would be to scale the values in the histograms
// for some precision.
return uint64(math.Pow(2, float64(p)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this...

My understanding is that the old (experimental) p-value/r-value spec is superseded by OTEP 235, which introduces t-value.

If we need to support both, perhaps it should be a separate processor that transforms the old tracestate? We might want to do something similar to handle the Elastic APM tracestate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the old (experimental) p-value/r-value spec is superseded by OTEP 235, which introduces t-value.

+1. I didn't know that probabilistic sampling processor already supported the new specs and wanted to continue supporting that processor.

If we need to support both, perhaps it should be a separate processor that transforms the old tracestate? We might want to do something similar to handle the Elastic APM tracestate.

This makes more sense. I have updated the PR to remove the p-value handling in favor of OTEP 235.

}
21 changes: 21 additions & 0 deletions connector/spanmetricsconnectorv2/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,27 @@ func TestConnector(t *testing.T) {
}
}

func TestCalculateAdjustedCount(t *testing.T) {
for _, tc := range []struct {
tracestate string
expected uint64
}{
{"", 1},
{"invalid=p:8;th:8", 1},
{"ot=404:8", 1},
{"ot=th:0", 1}, // 100% sampling
{"ot=th:8", 2}, // 50% sampling
{"ot=th:c", 4}, // 25% sampling
{"ot=p:8;r:4", 256},
{"ot=p:63;r:4", 0}, // p==63 represents zero adjusted count
{"ot=p:64;r:4", 1}, // p-value invalid, defaulting to 1
} {
t.Run("tracestate/"+tc.tracestate, func(t *testing.T) {
assert.Equal(t, tc.expected, calculateAdjustedCount(tc.tracestate))
})
}
}

func BenchmarkConnector(b *testing.B) {
factory := NewFactory()
settings := connectortest.NewNopSettings()
Expand Down
1 change: 1 addition & 0 deletions connector/spanmetricsconnectorv2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.107.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.107.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.107.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.107.0
go.opentelemetry.io/collector/confmap v0.107.0
Expand Down
2 changes: 2 additions & 0 deletions connector/spanmetricsconnectorv2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0/go.mod h1:oG/PliNiIOUHVARyDrFdvxFvG8DUPEjMGlmxjEqeoKM=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.107.0 h1:zTeRh4V3rMlXgNvfbDBnET6nvhOeZpYIbKTjVbSl9Ws=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.107.0/go.mod h1:/RtBag3LuHIkqN4bo8Erd3jCzA3gea70l9WyJ9TncXM=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.107.0 h1:iQU5Ewvba4taz7/wVZZKwfEie9A5NfPe/X2eH1kkP2w=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.107.0/go.mod h1:2KQGSx593h/VHpO5wGZ+/8vHHSFmc46P6Zrg7c8ejbw=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,21 @@ func NewAggregator() *Aggregator {
}
}

// Add adds a span duration into the configured metrics.
// Add adds a span duration into the configured metrics. It also takes
// `adjustedCount` parameter to denote the total number of spans in the
// population that are represented by an individually sampled span.
// The adjusted count is is calculated as per:
// https://opentelemetry.io/docs/specs/otel/trace/tracestate-probability-sampling/#adjusted-count
func (a *Aggregator) Add(
md model.MetricDef,
srcAttrs pcommon.Map,
spanDuration time.Duration,
adjustedCount uint64,
) error {
if adjustedCount == 0 {
// Nothing to do as the span represents `0` spans
return nil
}
filteredAttrs := pcommon.NewMap()
for _, definedAttr := range md.Attributes {
if srcAttr, ok := srcAttrs.Get(definedAttr.Key); ok {
Expand Down Expand Up @@ -88,7 +97,7 @@ func (a *Aggregator) Add(
a.datapoints[md.Key][attrKey] = newAggregatorDP(md, filteredAttrs)
}
value := float64(spanDuration.Nanoseconds()) / metricUnitToDivider[md.Unit]
a.datapoints[md.Key][attrKey].Add(value)
a.datapoints[md.Key][attrKey].Add(value, adjustedCount)
return nil
}

Expand Down Expand Up @@ -179,15 +188,15 @@ func newAggregatorDP(
return &dp
}

func (dp *aggregatorDP) Add(value float64) {
func (dp *aggregatorDP) Add(value float64, count uint64) {
if dp.expHistogramDP != nil {
dp.expHistogramDP.Add(value)
dp.expHistogramDP.Add(value, count)
}
if dp.explicitHistogramDP != nil {
dp.explicitHistogramDP.Add(value)
dp.explicitHistogramDP.Add(value, count)
}
if dp.summaryDP != nil {
dp.summaryDP.Add(value)
dp.summaryDP.Add(value, count)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestExplicitBounds(t *testing.T) {
for _, span := range tc.input {
duration := time.Duration(span.EndTimestamp() - span.StartTimestamp())
for _, md := range tc.metricDefs {
require.NoError(t, agg.Add(md, span.Attributes(), duration))
require.NoError(t, agg.Add(md, span.Attributes(), duration, 1))
}
}
for _, md := range tc.metricDefs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func newExponentialHistogramDP(attrs pcommon.Map, maxSize int32) *exponentialHis
}
}

func (dp *exponentialHistogramDP) Add(value float64) {
dp.data.Update(value)
func (dp *exponentialHistogramDP) Add(value float64, count uint64) {
dp.data.UpdateByIncr(value, count)
}

func (dp *exponentialHistogramDP) Copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func newExplicitHistogramDP(attrs pcommon.Map, bounds []float64) *explicitHistog
}
}

func (dp *explicitHistogramDP) Add(value float64) {
dp.sum += value
dp.count++
dp.counts[sort.SearchFloat64s(dp.bounds, value)]++
func (dp *explicitHistogramDP) Add(value float64, count uint64) {
dp.sum += value * float64(count)
dp.count += count
dp.counts[sort.SearchFloat64s(dp.bounds, value)] += count
}

func (dp *explicitHistogramDP) Copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func newSummaryDP(attrs pcommon.Map) *summaryDP {
}
}

func (dp *summaryDP) Add(value float64) {
dp.sum += value
dp.count++
func (dp *summaryDP) Add(value float64, count uint64) {
dp.sum += value * float64(count)
dp.count += count
}

func (dp *summaryDP) Copy(
Expand Down
48 changes: 48 additions & 0 deletions connector/spanmetricsconnectorv2/testdata/traces.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,51 @@ resourceSpans:
name: msg-span-2
parentSpanId: "bcff497b5a47310f"
startTimeUnixNano: "1581452772000000321"
- attributes:
- key: db.name
value:
stringValue: main
- key: db.system
value:
stringValue: mysql
endTimeUnixNano: "1581452772500000795"
name: invalid-p-value # will be counted as 1 span
parentSpanId: ""
startTimeUnixNano: "1581452772000000400"
traceState: "ot=r:3;p:64"
- attributes:
- key: db.name
value:
stringValue: main
- key: db.system
value:
stringValue: mysql
endTimeUnixNano: "1581452772500000799"
name: p-value-63 # will be ignored, adusted_count=0
parentSpanId: ""
startTimeUnixNano: "1581452772000000402"
traceState: "ot=r:3;p:63"
- attributes:
- key: db.name
value:
stringValue: main
- key: db.system
value:
stringValue: mysql
endTimeUnixNano: "1581452772500000800"
name: p-value-4 # represents 16 sampled spans
parentSpanId: ""
startTimeUnixNano: "1581452772000000401"
traceState: "ot=r:3;p:4"
- attributes:
- key: db.name
value:
stringValue: main
- key: db.system
value:
stringValue: mysql
endTimeUnixNano: "1581452772500000804"
name: th-value-8 # represents 2 sampled spans
parentSpanId: ""
startTimeUnixNano: "1581452772000000381"
traceState: "ot=th:8"
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ resourceMetrics:
- key: db.system
value:
stringValue: mysql
count: "2"
count: "21"
max: 1000.000468
min: 500.000468
min: 500.000395
negative: {}
positive:
bucketCounts:
- "1"
- "20"
- "0"
- "0"
- "0"
Expand Down Expand Up @@ -336,7 +336,7 @@ resourceMetrics:
- "1"
offset: 1147
scale: 7
sum: 1500.000936
sum: 11000.008561
timeUnixNano: "1000000"
name: db.trace.span.duration
- description: Span duration for DB spans
Expand All @@ -357,15 +357,15 @@ resourceMetrics:
- "0"
- "0"
- "0"
- "1"
- "20"
- "0"
- "1"
- "0"
- "0"
- "0"
- "0"
- "0"
count: "2"
count: "21"
explicitBounds:
- 2
- 4
Expand All @@ -383,7 +383,7 @@ resourceMetrics:
- 5000
- 10000
- 15000
sum: 1500.000936
sum: 11000.008561
timeUnixNano: "1000000"
name: db.trace.span.duration
- description: Span duration for messaging spans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ resourceMetrics:
exponentialHistogram:
aggregationTemporality: 1
dataPoints:
- count: "6"
- count: "25"
max: 17.000000468
min: 0.002000468
negative: {}
positive:
bucketCounts:
- "3"
- "22"
- "3"
offset: -1
scale: -4
sum: 30.402002808
sum: 39.902010433
timeUnixNano: "1000000"
name: trace.span.duration
- description: Span duration with custom histogram buckets
Expand All @@ -33,16 +33,16 @@ resourceMetrics:
- bucketCounts:
- "0"
- "1"
- "2"
- "21"
- "1"
- "2"
count: "6"
count: "25"
explicitBounds:
- 0.001
- 0.1
- 1
- 10
sum: 30.402002808
sum: 39.902010433
timeUnixNano: "1000000"
name: trace.span.duration
scope:
Expand Down
Loading