From e3a71d023cc872a3ca8cad98440b9332eeb74d05 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 28 Oct 2025 10:03:03 -0600 Subject: [PATCH 01/11] Implement early decisions Allow the processor to run early evaluations at batch ingest time and make sampling decisions based on the result. This will drop or forward traces before the entire batch is ready to reduce the memory required keeping all trace data in memory until the end of the decision wait. To begin with only implement "basic" samplers, those that do not support invert, or nest other policies. This will still provide some gains while keeping this change manageable. Drop specifically will take more work to implement as we cannot make an early Sampled decision until all drop policies have been evaluated which will require some state to be maintained. --- processor/tailsamplingprocessor/config.go | 2 + .../tailsamplingprocessor/documentation.md | 2 + .../internal/sampling/always_sample.go | 10 +- .../internal/sampling/always_sample_test.go | 13 ++ .../internal/sampling/boolean_tag_filter.go | 30 ++- .../sampling/boolean_tag_filter_test.go | 40 ++++ .../internal/sampling/latency.go | 17 +- .../internal/sampling/latency_test.go | 63 +++++- .../internal/sampling/numeric_tag_filter.go | 44 ++++- .../sampling/numeric_tag_filter_test.go | 72 +++++++ .../internal/sampling/ottl.go | 95 +++++---- .../internal/sampling/ottl_test.go | 89 ++++++++- .../internal/sampling/probabilistic.go | 10 +- .../internal/sampling/probabilistic_test.go | 46 +++-- .../internal/sampling/span_count_sampler.go | 18 +- .../sampling/span_count_sampler_test.go | 64 +++++++ .../internal/sampling/status_code.go | 16 +- .../internal/sampling/status_code_test.go | 59 +++++- .../internal/sampling/string_tag_filter.go | 35 +++- .../sampling/string_tag_filter_test.go | 71 +++++++ .../internal/sampling/trace_state_filter.go | 13 ++ .../sampling/trace_state_filter_test.go | 71 ++++++- .../internal/sampling/util.go | 35 +++- processor/tailsamplingprocessor/metadata.yaml | 8 +- .../pkg/samplingpolicy/samplingpolicy.go | 10 + processor/tailsamplingprocessor/processor.go | 180 ++++++++++++++---- .../processor_telemetry_test.go | 8 + .../tailsamplingprocessor/processor_test.go | 36 ++++ 28 files changed, 1044 insertions(+), 113 deletions(-) diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index fc7b0979a8c0a..5066b3cdcb2b2 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -309,4 +309,6 @@ type Config struct { // DropPendingTracesOnShutdown will drop all traces that are part of batches that have not yet reached the decision // wait when the processor is shutdown. DropPendingTracesOnShutdown bool `mapstructure:"drop_pending_traces_on_shutdown"` + // EarlyDecisions enables making early decisions to sample traces rather than waiting for the DecisionWait duration. + EarlyDecisions bool `mapstructure:"early_decisions"` } diff --git a/processor/tailsamplingprocessor/documentation.md b/processor/tailsamplingprocessor/documentation.md index b3673653ef0e8..c55140a2e7101 100644 --- a/processor/tailsamplingprocessor/documentation.md +++ b/processor/tailsamplingprocessor/documentation.md @@ -21,6 +21,7 @@ Count of spans that were sampled or not per sampling policy [Development] | policy | Name of the policy | Any Str | | sampled | Whether the sampling decision was sampled or not, false can mean either not sampled or dropped | Any Bool | | decision | The sampling decision | Str: ``sampled``, ``not_sampled``, ``dropped`` | +| early | Whether the decision was made early or after the decision wait | Any Bool | ### otelcol_processor_tail_sampling_count_traces_sampled @@ -37,6 +38,7 @@ Count of traces that were sampled or not per sampling policy [Development] | policy | Name of the policy | Any Str | | sampled | Whether the sampling decision was sampled or not, false can mean either not sampled or dropped | Any Bool | | decision | The sampling decision | Str: ``sampled``, ``not_sampled``, ``dropped`` | +| early | Whether the decision was made early or after the decision wait | Any Bool | ### otelcol_processor_tail_sampling_early_releases_from_cache_decision diff --git a/processor/tailsamplingprocessor/internal/sampling/always_sample.go b/processor/tailsamplingprocessor/internal/sampling/always_sample.go index c3fa4ec793cba..8a8434a5237c1 100644 --- a/processor/tailsamplingprocessor/internal/sampling/always_sample.go +++ b/processor/tailsamplingprocessor/internal/sampling/always_sample.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" @@ -17,7 +18,10 @@ type alwaysSample struct { logger *zap.Logger } -var _ samplingpolicy.Evaluator = (*alwaysSample)(nil) +var ( + _ samplingpolicy.Evaluator = (*alwaysSample)(nil) + _ samplingpolicy.EarlyEvaluator = (*statusCodeFilter)(nil) +) // NewAlwaysSample creates a policy evaluator the samples all traces. func NewAlwaysSample(settings component.TelemetrySettings) samplingpolicy.Evaluator { @@ -31,3 +35,7 @@ func (as *alwaysSample) Evaluate(context.Context, pcommon.TraceID, *samplingpoli as.logger.Debug("Evaluating spans in always-sample filter") return samplingpolicy.Sampled, nil } + +func (*alwaysSample) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + return samplingpolicy.Sampled, nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/always_sample_test.go b/processor/tailsamplingprocessor/internal/sampling/always_sample_test.go index 7f9a84b01edee..cf2787c599310 100644 --- a/processor/tailsamplingprocessor/internal/sampling/always_sample_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/always_sample_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" ) @@ -22,3 +23,15 @@ func TestEvaluate_AlwaysSample(t *testing.T) { assert.NoError(t, err) assert.Equal(t, samplingpolicy.Sampled, decision) } + +func TestEarlyEvaluate_AlwaysSample(t *testing.T) { + filter := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) + decision, err := filter.(samplingpolicy.EarlyEvaluator).EarlyEvaluate( + t.Context(), + pcommon.TraceID([16]byte{ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, + }), ptrace.NewResourceSpans(), nil, + ) + assert.NoError(t, err) + assert.Equal(t, samplingpolicy.Sampled, decision) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go index b5dfa8cb66eee..c987cdc895494 100644 --- a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go @@ -21,7 +21,10 @@ type booleanAttributeFilter struct { invertMatch bool } -var _ samplingpolicy.Evaluator = (*booleanAttributeFilter)(nil) +var ( + _ samplingpolicy.Evaluator = (*booleanAttributeFilter)(nil) + _ samplingpolicy.EarlyEvaluator = (*booleanAttributeFilter)(nil) +) // NewBooleanAttributeFilter creates a policy evaluator that samples all traces with // the given attribute that match the supplied boolean value. @@ -57,6 +60,7 @@ func (baf *booleanAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID }, ), nil } + return hasResourceOrSpanWithCondition( batches, func(resource pcommon.Resource) bool { @@ -74,3 +78,27 @@ func (baf *booleanAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID return false }), nil } + +func (baf *booleanAttributeFilter) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, batch ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + // Do not support the deprecated invert match code for early evaluations. + if baf.invertMatch { + return samplingpolicy.Unspecified, nil + } + + return batchHasResourceOrSpanWithCondition( + batch, + func(resource pcommon.Resource) bool { + if v, ok := resource.Attributes().Get(baf.key); ok { + value := v.Bool() + return value == baf.value + } + return false + }, + func(span ptrace.Span) bool { + if v, ok := span.Attributes().Get(baf.key); ok { + value := v.Bool() + return value == baf.value + } + return false + }), nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go index 2d5c6d21e03e1..31526326f0a6d 100644 --- a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go @@ -55,6 +55,46 @@ func TestBooleanTagFilter(t *testing.T) { } } +func TestBooleanTagFilter_EarlyEvaluate(t *testing.T) { + empty := map[string]any{} + filter := NewBooleanAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", true, false).(samplingpolicy.EarlyEvaluator) + + resAttr := map[string]any{} + resAttr["example"] = 8 + + cases := []struct { + Desc string + Trace *samplingpolicy.TraceData + Decision samplingpolicy.Decision + }{ + { + Desc: "non-matching span attribute", + Trace: newTraceBoolAttrs(empty, "non_matching", true), + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "span attribute with unwanted boolean value", + Trace: newTraceBoolAttrs(empty, "example", false), + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "span attribute with wanted boolean value", + Trace: newTraceBoolAttrs(empty, "example", true), + Decision: samplingpolicy.Sampled, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + u, _ := uuid.NewRandom() + rs := c.Trace.ReceivedBatches.ResourceSpans().At(0) + decision, err := filter.EarlyEvaluate(t.Context(), pcommon.TraceID(u), rs, c.Trace) + assert.NoError(t, err) + assert.Equal(t, decision, c.Decision) + }) + } +} + func TestBooleanTagFilterInverted(t *testing.T) { empty := map[string]any{} filter := NewBooleanAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", true, true) diff --git a/processor/tailsamplingprocessor/internal/sampling/latency.go b/processor/tailsamplingprocessor/internal/sampling/latency.go index e6e305dc79c7a..c2b31e3739191 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency.go @@ -20,7 +20,10 @@ type latency struct { upperThresholdMs int64 } -var _ samplingpolicy.Evaluator = (*latency)(nil) +var ( + _ samplingpolicy.Evaluator = (*latency)(nil) + _ samplingpolicy.EarlyEvaluator = (*latency)(nil) +) // NewLatency creates a policy evaluator sampling traces with a duration greater than a configured threshold func NewLatency(settings component.TelemetrySettings, thresholdMs, upperThresholdMs int64) samplingpolicy.Evaluator { @@ -37,10 +40,18 @@ func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *samp batches := traceData.ReceivedBatches + return hasSpanWithCondition(batches, l.condition()), nil +} + +func (l *latency) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, batch ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + return batchHasSpanWithCondition(batch, l.condition()), nil +} + +func (l *latency) condition() func(span ptrace.Span) bool { var minTime pcommon.Timestamp var maxTime pcommon.Timestamp - return hasSpanWithCondition(batches, func(span ptrace.Span) bool { + return func(span ptrace.Span) bool { if minTime == 0 || span.StartTimestamp() < minTime { minTime = span.StartTimestamp() } @@ -53,5 +64,5 @@ func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *samp return duration.Milliseconds() >= l.thresholdMs } return (l.thresholdMs < duration.Milliseconds() && duration.Milliseconds() <= l.upperThresholdMs) - }), nil + } } diff --git a/processor/tailsamplingprocessor/internal/sampling/latency_test.go b/processor/tailsamplingprocessor/internal/sampling/latency_test.go index 23e71585802d6..a2aef8e46b014 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency_test.go @@ -15,7 +15,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" ) -func TestEvaluate_Latency(t *testing.T) { +func TestLatency_Evaluate(t *testing.T) { filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000, 0) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) @@ -72,7 +72,7 @@ func TestEvaluate_Latency(t *testing.T) { } } -func TestEvaluate_Bounded_Latency(t *testing.T) { +func TestLatency_Evaluate_Bounded(t *testing.T) { filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000, 10000) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) @@ -159,6 +159,65 @@ func TestEvaluate_Bounded_Latency(t *testing.T) { } } +func TestLatency_EarlyEvaluate(t *testing.T) { + filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000, 0).(samplingpolicy.EarlyEvaluator) + + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + now := time.Now() + + cases := []struct { + Desc string + Spans []spanWithTimeAndDuration + Decision samplingpolicy.Decision + }{ + { + "trace duration shorter than threshold", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 4500 * time.Millisecond, + }, + }, + samplingpolicy.Unspecified, + }, + { + "trace duration is equal to threshold", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 5000 * time.Millisecond, + }, + }, + samplingpolicy.Sampled, + }, + { + "total trace duration is longer than threshold but every single span is shorter", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 3000 * time.Millisecond, + }, + { + StartTime: now.Add(2500 * time.Millisecond), + Duration: 3000 * time.Millisecond, + }, + }, + samplingpolicy.Sampled, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + trace := newTraceWithSpans(c.Spans) + rs := trace.ReceivedBatches.ResourceSpans().At(0) + decision, err := filter.EarlyEvaluate(t.Context(), traceID, rs, trace) + + assert.NoError(t, err) + assert.Equal(t, decision, c.Decision) + }) + } +} + type spanWithTimeAndDuration struct { StartTime time.Time Duration time.Duration diff --git a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go index 134a995cb4d5f..8f0786dcc48ac 100644 --- a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go @@ -23,7 +23,10 @@ type numericAttributeFilter struct { invertMatch bool } -var _ samplingpolicy.Evaluator = (*numericAttributeFilter)(nil) +var ( + _ samplingpolicy.Evaluator = (*numericAttributeFilter)(nil) + _ samplingpolicy.EarlyEvaluator = (*numericAttributeFilter)(nil) +) // NewNumericAttributeFilter creates a policy evaluator that samples all traces with // the given attribute in the given numeric range. If minValue is nil, it will use math.MinInt64. @@ -101,3 +104,42 @@ func (naf *numericAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID }, ), nil } + +func (naf *numericAttributeFilter) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, batch ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + // Do not support the deprecated invert match code for early evaluations. + if naf.invertMatch { + return samplingpolicy.Unspecified, nil + } + + // Get the effective min/max values + minVal := int64(math.MinInt64) + if naf.minValue != nil { + minVal = *naf.minValue + } + maxVal := int64(math.MaxInt64) + if naf.maxValue != nil { + maxVal = *naf.maxValue + } + + return batchHasResourceOrSpanWithCondition( + batch, + func(resource pcommon.Resource) bool { + if v, ok := resource.Attributes().Get(naf.key); ok { + value := v.Int() + if value >= minVal && value <= maxVal { + return true + } + } + return false + }, + func(span ptrace.Span) bool { + if v, ok := span.Attributes().Get(naf.key); ok { + value := v.Int() + if value >= minVal && value <= maxVal { + return true + } + } + return false + }, + ), nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go index fe60a50618db0..bc4a0459ca4e5 100644 --- a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go @@ -89,6 +89,78 @@ func TestNumericTagFilter(t *testing.T) { } } +func TestNumericTagFilter_EarlyEvaluate(t *testing.T) { + empty := map[string]any{} + minVal := int64(math.MinInt32) + maxVal := int64(math.MaxInt32) + filter := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", &minVal, &maxVal, false).(samplingpolicy.EarlyEvaluator) + + resAttr := map[string]any{} + resAttr["example"] = 8 + + cases := []struct { + Desc string + Trace *samplingpolicy.TraceData + Decision samplingpolicy.Decision + }{ + { + Desc: "nonmatching span attribute", + Trace: newTraceIntAttrs(empty, "non_matching", math.MinInt32), + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "span attribute at the lower limit", + Trace: newTraceIntAttrs(empty, "example", math.MinInt32), + Decision: samplingpolicy.Sampled, + }, + { + Desc: "resource attribute at the lower limit", + Trace: newTraceIntAttrs(map[string]any{"example": math.MinInt32}, "non_matching", math.MinInt32), + Decision: samplingpolicy.Sampled, + }, + { + Desc: "span attribute at the upper limit", + Trace: newTraceIntAttrs(empty, "example", math.MaxInt32), + Decision: samplingpolicy.Sampled, + }, + { + Desc: "resource attribute at the upper limit", + Trace: newTraceIntAttrs(map[string]any{"example": math.MaxInt32}, "non_matching", math.MaxInt), + Decision: samplingpolicy.Sampled, + }, + { + Desc: "span attribute below min limit", + Trace: newTraceIntAttrs(empty, "example", math.MinInt32-1), + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "resource attribute below min limit", + Trace: newTraceIntAttrs(map[string]any{"example": math.MinInt32 - 1}, "non_matching", math.MinInt32), + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "span attribute above max limit", + Trace: newTraceIntAttrs(empty, "example", math.MaxInt32+1), + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "resource attribute above max limit", + Trace: newTraceIntAttrs(map[string]any{"example": math.MaxInt32 + 1}, "non_matching", math.MaxInt32), + Decision: samplingpolicy.Unspecified, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + u, _ := uuid.NewRandom() + rs := c.Trace.ReceivedBatches.ResourceSpans().At(0) + decision, err := filter.EarlyEvaluate(t.Context(), pcommon.TraceID(u), rs, c.Trace) + assert.NoError(t, err) + assert.Equal(t, decision, c.Decision) + }) + } +} + func TestNumericTagFilterInverted(t *testing.T) { empty := map[string]any{} minVal := int64(math.MinInt32) diff --git a/processor/tailsamplingprocessor/internal/sampling/ottl.go b/processor/tailsamplingprocessor/internal/sampling/ottl.go index 859382a7ed003..4dd162f8fc3aa 100644 --- a/processor/tailsamplingprocessor/internal/sampling/ottl.go +++ b/processor/tailsamplingprocessor/internal/sampling/ottl.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl" @@ -68,28 +69,59 @@ func (ocf *ottlConditionFilter) Evaluate(ctx context.Context, traceID pcommon.Tr for i := 0; i < batches.ResourceSpans().Len(); i++ { rs := batches.ResourceSpans().At(i) - resource := rs.Resource() - for j := 0; j < rs.ScopeSpans().Len(); j++ { - ss := rs.ScopeSpans().At(j) - scope := ss.Scope() - for k := 0; k < ss.Spans().Len(); k++ { - span := ss.Spans().At(k) - - var ( - ok bool - err error - ) - - // Now we reach span level and begin evaluation with parsed expr. - // The evaluation will break when: - // 1. error happened. - // 2. "Sampled" decision made. - // Otherwise, it will keep evaluating and finally exit with "NotSampled" decision. - - // Span evaluation - if ocf.sampleSpanExpr != nil { - tCtx := ottlspan.NewTransformContextPtr(span, scope, resource, ss, rs) - ok, err = ocf.sampleSpanExpr.Eval(ctx, tCtx) + decision, err := ocf.evaluateResourceSpans(ctx, rs) + if err != nil { + return samplingpolicy.Error, err + } + if decision != samplingpolicy.Unspecified { + return decision, nil + } + } + return samplingpolicy.NotSampled, nil +} + +func (ocf *ottlConditionFilter) EarlyEvaluate(ctx context.Context, _ pcommon.TraceID, batch ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + return ocf.evaluateResourceSpans(ctx, batch) +} + +func (ocf *ottlConditionFilter) evaluateResourceSpans(ctx context.Context, rs ptrace.ResourceSpans) (samplingpolicy.Decision, error) { + resource := rs.Resource() + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + scope := ss.Scope() + for k := 0; k < ss.Spans().Len(); k++ { + span := ss.Spans().At(k) + + var ( + ok bool + err error + ) + + // Now we reach span level and begin evaluation with parsed expr. + // The evaluation will break when: + // 1. error happened. + // 2. "Sampled" decision made. + // Otherwise, it will keep evaluating and finally exit with "NotSampled" decision. + + // Span evaluation + if ocf.sampleSpanExpr != nil { + tCtx := ottlspan.NewTransformContextPtr(span, scope, resource, ss, rs) + ok, err = ocf.sampleSpanExpr.Eval(ctx, tCtx) + tCtx.Close() + if err != nil { + return samplingpolicy.Error, err + } + if ok { + return samplingpolicy.Sampled, nil + } + } + + // Span event evaluation + if ocf.sampleSpanEventExpr != nil { + spanEvents := span.Events() + for l := 0; l < spanEvents.Len(); l++ { + tCtx := ottlspanevent.NewTransformContextPtr(spanEvents.At(l), span, scope, resource, ss, rs) + ok, err = ocf.sampleSpanEventExpr.Eval(ctx, tCtx) tCtx.Close() if err != nil { return samplingpolicy.Error, err @@ -98,24 +130,9 @@ func (ocf *ottlConditionFilter) Evaluate(ctx context.Context, traceID pcommon.Tr return samplingpolicy.Sampled, nil } } - - // Span event evaluation - if ocf.sampleSpanEventExpr != nil { - spanEvents := span.Events() - for l := 0; l < spanEvents.Len(); l++ { - tCtx := ottlspanevent.NewTransformContextPtr(spanEvents.At(l), span, scope, resource, ss, rs) - ok, err = ocf.sampleSpanEventExpr.Eval(ctx, tCtx) - tCtx.Close() - if err != nil { - return samplingpolicy.Error, err - } - if ok { - return samplingpolicy.Sampled, nil - } - } - } } } } - return samplingpolicy.NotSampled, nil + // We need to look at more resource spans to make a decision. + return samplingpolicy.Unspecified, nil } diff --git a/processor/tailsamplingprocessor/internal/sampling/ottl_test.go b/processor/tailsamplingprocessor/internal/sampling/ottl_test.go index f4b07fec206d6..5ab9af60c1c5d 100644 --- a/processor/tailsamplingprocessor/internal/sampling/ottl_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/ottl_test.go @@ -15,7 +15,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" ) -func TestEvaluate_OTTL(t *testing.T) { +func TestOTTL_Evaluate(t *testing.T) { traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) cases := []struct { @@ -99,6 +99,93 @@ func TestEvaluate_OTTL(t *testing.T) { } } +func TestOTTL_EarlyEvaluate(t *testing.T) { + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + + cases := []struct { + Desc string + SpanConditions []string + SpanEventConditions []string + Spans []spanWithAttributes + WantErr bool + Decision samplingpolicy.Decision + }{ + { + // policy + "OTTL conditions not set", + []string{}, + []string{}, + []spanWithAttributes{{SpanAttributes: map[string]string{"attr_k_1": "attr_v_1"}}}, + true, + samplingpolicy.Unspecified, + }, + { + "OTTL conditions match specific span attributes 1", + []string{"attributes[\"attr_k_1\"] == \"attr_v_1\""}, + []string{}, + []spanWithAttributes{{SpanAttributes: map[string]string{"attr_k_1": "attr_v_1"}}}, + false, + samplingpolicy.Sampled, + }, + { + "OTTL conditions match specific span attributes 2", + []string{"attributes[\"attr_k_1\"] != \"attr_v_1\""}, + []string{}, + []spanWithAttributes{{SpanAttributes: map[string]string{"attr_k_1": "attr_v_1"}}}, + false, + samplingpolicy.Unspecified, + }, + { + "OTTL conditions inverse match(!=) span attributes 2", + []string{"attributes[\"attr_k_1\"] != \"attr_v_1\""}, + []string{}, + []spanWithAttributes{{SpanAttributes: map[string]string{"attr_k_1": "attr_v_2"}}}, + false, + samplingpolicy.Sampled, + }, + { + "OTTL conditions match specific span event attributes", + []string{}, + []string{"attributes[\"event_attr_k_1\"] == \"event_attr_v_1\""}, + []spanWithAttributes{{SpanEventAttributes: map[string]string{"event_attr_k_1": "event_attr_v_1"}}}, + false, + samplingpolicy.Sampled, + }, + { + "OTTL conditions match specific span event name", + []string{}, + []string{"name != \"incorrect event name\""}, + []spanWithAttributes{{SpanEventAttributes: nil}}, + false, + samplingpolicy.Sampled, + }, + { + "OTTL conditions not matched", + []string{"attributes[\"attr_k_1\"] == \"attr_v_1\""}, + []string{"attributes[\"event_attr_k_1\"] == \"event_attr_v_1\""}, + []spanWithAttributes{}, + false, + samplingpolicy.Unspecified, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + filter, err := NewOTTLConditionFilter(componenttest.NewNopTelemetrySettings(), c.SpanConditions, c.SpanEventConditions, ottl.IgnoreError) + assert.Equal(t, err != nil, c.WantErr) + + if err == nil { + trace := newTraceWithSpansAttributes(c.Spans) + rs := trace.ReceivedBatches.ResourceSpans().At(0) + + decision, err := filter.(samplingpolicy.EarlyEvaluator).EarlyEvaluate(t.Context(), traceID, rs, trace) + assert.Equal(t, err != nil, c.WantErr) + assert.Equal(t, decision, c.Decision) + } + }) + } +} + type spanWithAttributes struct { SpanAttributes map[string]string SpanEventAttributes map[string]string diff --git a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go index 5557b5b79201c..42be4b31c68a3 100644 --- a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go +++ b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" @@ -26,7 +27,10 @@ type probabilisticSampler struct { hashSalt string } -var _ samplingpolicy.Evaluator = (*probabilisticSampler)(nil) +var ( + _ samplingpolicy.Evaluator = (*probabilisticSampler)(nil) + _ samplingpolicy.EarlyEvaluator = (*probabilisticSampler)(nil) +) // NewProbabilisticSampler creates a policy evaluator that samples a percentage of // traces. @@ -54,6 +58,10 @@ func (s *probabilisticSampler) Evaluate(_ context.Context, traceID pcommon.Trace return samplingpolicy.NotSampled, nil } +func (s *probabilisticSampler) EarlyEvaluate(ctx context.Context, traceID pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + return s.Evaluate(ctx, traceID, nil) +} + // calculateThreshold converts a ratio into a value between 0 and MaxUint64 func calculateThreshold(ratio float64) uint64 { // Use big.Float and big.Int to calculate threshold because directly convert diff --git a/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go b/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go index cc5b29f8ab66b..2766c6b062102 100644 --- a/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go @@ -5,6 +5,7 @@ package sampling import ( "encoding/binary" + "fmt" "math/rand/v2" "testing" @@ -65,29 +66,40 @@ func TestProbabilisticSampling(t *testing.T) { 100, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - traceCount := 100_000 + for _, earlyEvaluation := range []bool{false, true} { + for _, tt := range tests { + t.Run(fmt.Sprintf("EarlyEvaluate=%t/%s", earlyEvaluation, tt.name), func(t *testing.T) { + traceCount := 100_000 - probabilisticSampler := NewProbabilisticSampler(componenttest.NewNopTelemetrySettings(), tt.hashSalt, tt.samplingPercentage) + probabilisticSampler := NewProbabilisticSampler(componenttest.NewNopTelemetrySettings(), tt.hashSalt, tt.samplingPercentage) - sampled := 0 - for _, traceID := range genRandomTraceIDs(traceCount) { - trace := newTraceStringAttrs(nil, "example", "value") + sampled := 0 + for _, traceID := range genRandomTraceIDs(traceCount) { + trace := newTraceStringAttrs(nil, "example", "value") + rs := trace.ReceivedBatches.ResourceSpans().At(0) - decision, err := probabilisticSampler.Evaluate(t.Context(), traceID, trace) - assert.NoError(t, err) + var ( + decision samplingpolicy.Decision + err error + ) + if earlyEvaluation { + decision, err = probabilisticSampler.(samplingpolicy.EarlyEvaluator).EarlyEvaluate(t.Context(), traceID, rs, trace) + } else { + decision, err = probabilisticSampler.Evaluate(t.Context(), traceID, trace) + } + assert.NoError(t, err) - if decision == samplingpolicy.Sampled { - sampled++ + if decision == samplingpolicy.Sampled { + sampled++ + } } - } - effectiveSamplingPercentage := float32(sampled) / float32(traceCount) * 100 - assert.InDelta(t, tt.expectedSamplingPercentage, effectiveSamplingPercentage, 0.2, - "Effective sampling percentage is %f, expected %f", effectiveSamplingPercentage, tt.expectedSamplingPercentage, - ) - }) + effectiveSamplingPercentage := float32(sampled) / float32(traceCount) * 100 + assert.InDelta(t, tt.expectedSamplingPercentage, effectiveSamplingPercentage, 0.2, + "Effective sampling percentage is %f, expected %f", effectiveSamplingPercentage, tt.expectedSamplingPercentage, + ) + }) + } } } diff --git a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go index cdaed349f1947..d3e0bae95b105 100644 --- a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go +++ b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" @@ -19,7 +20,10 @@ type spanCount struct { maxSpans int64 } -var _ samplingpolicy.Evaluator = (*spanCount)(nil) +var ( + _ samplingpolicy.Evaluator = (*spanCount)(nil) + _ samplingpolicy.EarlyEvaluator = (*spanCount)(nil) +) // NewSpanCount creates a policy evaluator sampling traces with more than one span per trace func NewSpanCount(settings component.TelemetrySettings, minSpans, maxSpans int32) samplingpolicy.Evaluator { @@ -44,3 +48,15 @@ func (c *spanCount) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *sa return samplingpolicy.NotSampled, nil } } + +func (c *spanCount) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, traceData *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + spanCount := traceData.SpanCount + if c.maxSpans == 0 && spanCount >= c.minSpans { + return samplingpolicy.Sampled, nil + } + if c.maxSpans > 0 && spanCount > c.maxSpans { + return samplingpolicy.NotSampled, nil + } + // The cases where we are less than min spans or between min and max spans are not known yet. + return samplingpolicy.Unspecified, nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go index 405196ab25bad..c632ef7d1ff00 100644 --- a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go @@ -234,6 +234,70 @@ func TestEvaluate_RangeOfSpans(t *testing.T) { } } +func TestSpanCount_EarlyEvaluate(t *testing.T) { + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + + cases := []struct { + Desc string + minSpans, maxSpans int32 + NumberSpans []int32 + Decision samplingpolicy.Decision + }{ + { + "Spans less than the maxSpans", + 0, 20, + []int32{ + 5, 3, + }, + samplingpolicy.Unspecified, + }, + { + "Spans greater than minSpans, no maxSpans", + 5, 0, + []int32{ + 5, 3, + }, + samplingpolicy.Sampled, + }, + { + "Spans less than minSpans", + 5, 0, + []int32{ + 3, + }, + samplingpolicy.Unspecified, + }, + { + "Spans greater than the maxSpans, in one single batch", + 0, 20, + []int32{ + 21, + }, + samplingpolicy.NotSampled, + }, + { + "Spans greater than the maxSpans, in multiple batches", + 0, 20, + []int32{ + 5, 10, 6, + }, + samplingpolicy.NotSampled, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + filter := NewSpanCount(componenttest.NewNopTelemetrySettings(), c.minSpans, c.maxSpans).(samplingpolicy.EarlyEvaluator) + trace := newTraceWithMultipleSpans(c.NumberSpans) + rss := trace.ReceivedBatches.ResourceSpans() + decision, err := filter.EarlyEvaluate(t.Context(), traceID, rss.At(rss.Len()-1), trace) + + assert.NoError(t, err) + assert.Equal(t, decision, c.Decision) + }) + } +} + func newTraceWithMultipleSpans(numberSpans []int32) *samplingpolicy.TraceData { totalNumberSpans := int32(0) diff --git a/processor/tailsamplingprocessor/internal/sampling/status_code.go b/processor/tailsamplingprocessor/internal/sampling/status_code.go index 200d6b23ee206..082c0415d4cf5 100644 --- a/processor/tailsamplingprocessor/internal/sampling/status_code.go +++ b/processor/tailsamplingprocessor/internal/sampling/status_code.go @@ -22,7 +22,10 @@ type statusCodeFilter struct { statusCodes []ptrace.StatusCode } -var _ samplingpolicy.Evaluator = (*statusCodeFilter)(nil) +var ( + _ samplingpolicy.Evaluator = (*statusCodeFilter)(nil) + _ samplingpolicy.EarlyEvaluator = (*statusCodeFilter)(nil) +) // NewStatusCodeFilter creates a policy evaluator that samples all traces with // a given status code. @@ -62,3 +65,14 @@ func (r *statusCodeFilter) Evaluate(_ context.Context, _ pcommon.TraceID, trace return slices.Contains(r.statusCodes, span.Status().Code()) }), nil } + +func (r *statusCodeFilter) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, newData ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + found := hasInstrumentationLibrarySpanWithCondition(newData.ScopeSpans(), func(span ptrace.Span) bool { + return slices.Contains(r.statusCodes, span.Status().Code()) + }, false) + + if found { + return samplingpolicy.Sampled, nil + } + return samplingpolicy.Unspecified, nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/status_code_test.go b/processor/tailsamplingprocessor/internal/sampling/status_code_test.go index b54c40841a3e3..519e40e9110bc 100644 --- a/processor/tailsamplingprocessor/internal/sampling/status_code_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/status_code_test.go @@ -22,7 +22,7 @@ func TestNewStatusCodeFilter_errorHandling(t *testing.T) { assert.EqualError(t, err, "unknown status code \"ERR\", supported: OK, ERROR, UNSET") } -func TestStatusCodeSampling(t *testing.T) { +func TestStatusCodeFilter_Evaluate(t *testing.T) { traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) cases := []struct { @@ -83,3 +83,60 @@ func TestStatusCodeSampling(t *testing.T) { }) } } + +func TestStatusCodeFilter_EarlyEvaluate(t *testing.T) { + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + + cases := []struct { + Desc string + StatusCodesToFilterOn []string + StatusCodesPresent []ptrace.StatusCode + Decision samplingpolicy.Decision + }{ + { + Desc: "filter on ERROR - none match", + StatusCodesToFilterOn: []string{"ERROR"}, + StatusCodesPresent: []ptrace.StatusCode{ptrace.StatusCodeOk, ptrace.StatusCodeUnset, ptrace.StatusCodeOk}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "filter on OK and ERROR - none match", + StatusCodesToFilterOn: []string{"OK", "ERROR"}, + StatusCodesPresent: []ptrace.StatusCode{ptrace.StatusCodeUnset, ptrace.StatusCodeUnset}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "filter on UNSET - matches", + StatusCodesToFilterOn: []string{"UNSET"}, + StatusCodesPresent: []ptrace.StatusCode{ptrace.StatusCodeUnset}, + Decision: samplingpolicy.Sampled, + }, + { + Desc: "filter on OK and UNSET - matches", + StatusCodesToFilterOn: []string{"OK", "UNSET"}, + StatusCodesPresent: []ptrace.StatusCode{ptrace.StatusCodeError, ptrace.StatusCodeOk}, + Decision: samplingpolicy.Sampled, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + rs := ptrace.NewResourceSpans() + ils := rs.ScopeSpans().AppendEmpty() + + for _, statusCode := range c.StatusCodesPresent { + span := ils.Spans().AppendEmpty() + span.Status().SetCode(statusCode) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + } + + statusCodeFilter, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), c.StatusCodesToFilterOn) + assert.NoError(t, err) + + decision, err := statusCodeFilter.(samplingpolicy.EarlyEvaluator).EarlyEvaluate(t.Context(), traceID, rs, nil) + assert.NoError(t, err) + assert.Equal(t, c.Decision, decision) + }) + } +} diff --git a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go index 6e5d6dca09372..fb109bffaf300 100644 --- a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go @@ -33,7 +33,10 @@ type regexStrSetting struct { filterList []*regexp.Regexp } -var _ samplingpolicy.Evaluator = (*stringAttributeFilter)(nil) +var ( + _ samplingpolicy.Evaluator = (*stringAttributeFilter)(nil) + _ samplingpolicy.EarlyEvaluator = (*stringAttributeFilter)(nil) +) // NewStringAttributeFilter creates a policy evaluator that samples all traces with // the given attribute in the given numeric range. @@ -151,6 +154,36 @@ func (saf *stringAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID, ), nil } +func (saf *stringAttributeFilter) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, batch ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + // Do not support the deprecated invert match code for early evaluations. + if saf.invertMatch { + return samplingpolicy.Unspecified, nil + } + + return batchHasResourceOrSpanWithCondition( + batch, + func(resource pcommon.Resource) bool { + if v, ok := resource.Attributes().Get(saf.key); ok { + if ok := saf.matcher(v.Str()); ok { + return true + } + } + return false + }, + func(span ptrace.Span) bool { + if v, ok := span.Attributes().Get(saf.key); ok { + truncatableStr := v.Str() + if truncatableStr != "" { + if ok := saf.matcher(v.Str()); ok { + return true + } + } + } + return false + }, + ), nil +} + // addFilters compiles all the given filters and stores them as regexes. // All regexes are automatically anchored to enforce full string matches. func addFilters(exprs []string) ([]*regexp.Regexp, error) { diff --git a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go index 8e4959e0d7cc7..1fc01cd2a0db9 100644 --- a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go @@ -250,6 +250,77 @@ func TestStringTagFilter_InvalidRegex(t *testing.T) { }) } +func TestStringTagFilter_EarlyEvaluate(t *testing.T) { + cases := []struct { + Desc string + Trace *samplingpolicy.TraceData + filterCfg *TestStringAttributeCfg + Decision samplingpolicy.Decision + DisableInvertDecision bool + }{ + { + Desc: "nonmatching node attribute key", + Trace: newTraceStringAttrs(map[string]any{"non_matching": "value"}, "", ""), + filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "nonmatching node attribute value", + Trace: newTraceStringAttrs(map[string]any{"example": "non_matching"}, "", ""), + filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "matching node attribute", + Trace: newTraceStringAttrs(map[string]any{"example": "value"}, "", ""), + filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize}, + Decision: samplingpolicy.Sampled, + }, + { + Desc: "nonmatching span attribute key", + Trace: newTraceStringAttrs(nil, "nonmatching", "value"), + filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "nonmatching span attribute value", + Trace: newTraceStringAttrs(nil, "example", "nonmatching"), + filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "matching span attribute", + Trace: newTraceStringAttrs(nil, "example", "value"), + filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize}, + Decision: samplingpolicy.Sampled, + }, + { + Desc: "matching span attribute with regex", + Trace: newTraceStringAttrs(nil, "example", "grpc.health.v1.HealthCheck"), + filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"v[0-9]+.HealthCheck$"}, EnabledRegexMatching: true, CacheMaxSize: defaultCacheSize}, + Decision: samplingpolicy.Sampled, + }, + { + Desc: "nonmatching span attribute with regex", + Trace: newTraceStringAttrs(nil, "example", "grpc.health.v1.HealthCheck"), + filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"v[a-z]+.HealthCheck$"}, EnabledRegexMatching: true, CacheMaxSize: defaultCacheSize}, + Decision: samplingpolicy.Unspecified, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + filter, err := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), c.filterCfg.Key, c.filterCfg.Values, c.filterCfg.EnabledRegexMatching, c.filterCfg.CacheMaxSize, c.filterCfg.InvertMatch) + require.NoError(t, err) + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + rs := c.Trace.ReceivedBatches.ResourceSpans().At(0) + decision, err := filter.(samplingpolicy.EarlyEvaluator).EarlyEvaluate(t.Context(), traceID, rs, c.Trace) + assert.NoError(t, err) + assert.Equal(t, decision, c.Decision) + }) + } +} + func BenchmarkStringTagFilterEvaluatePlainText(b *testing.B) { trace := newTraceStringAttrs(map[string]any{"example": "value"}, "", "") filter, err := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", []string{"value"}, false, 0, false) diff --git a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go index 1bcc73f658883..5caf2ba8d1bbd 100644 --- a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go @@ -59,3 +59,16 @@ func (tsf *traceStateFilter) Evaluate(_ context.Context, _ pcommon.TraceID, trac return false }), nil } + +func (tsf *traceStateFilter) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, spans ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + return batchHasSpanWithCondition(spans, func(span ptrace.Span) bool { + traceState, err := tracesdk.ParseTraceState(span.TraceState().AsRaw()) + if err != nil { + return false + } + if ok := tsf.matcher(traceState.Get(tsf.key)); ok { + return true + } + return false + }), nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go index 2c6f355dbc5e0..e8a561f197618 100644 --- a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go @@ -20,7 +20,7 @@ type TestTraceStateCfg struct { Values []string } -func TestTraceStateFilter(t *testing.T) { +func TestTraceStateFilter_Evaluate(t *testing.T) { cases := []struct { Desc string Trace *samplingpolicy.TraceData @@ -87,6 +87,75 @@ func TestTraceStateFilter(t *testing.T) { } } +func TestTraceStateFilter_EarlyEvaluate(t *testing.T) { + cases := []struct { + Desc string + Trace *samplingpolicy.TraceData + filterCfg *TestTraceStateCfg + Decision samplingpolicy.Decision + }{ + { + Desc: "nonmatching trace_state key", + Trace: newTraceState("non_matching=value"), + filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value"}}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "nonmatching trace_state value", + Trace: newTraceState("example=non_matching"), + filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value"}}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "matching trace_state", + Trace: newTraceState("example=value"), + filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value"}}, + Decision: samplingpolicy.Sampled, + }, + { + Desc: "nonmatching trace_state on empty filter list", + Trace: newTraceState("example=value"), + filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{}}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "nonmatching trace_state on multiple key-values", + Trace: newTraceState("example=non_matching,non_matching=value"), + filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value"}}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "matching trace_state on multiple key-values", + Trace: newTraceState("example=value,non_matching=value"), + filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value"}}, + Decision: samplingpolicy.Sampled, + }, + { + Desc: "nonmatching trace_state on multiple filter list", + Trace: newTraceState("example=non_matching"), + filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value1", "value2"}}, + Decision: samplingpolicy.Unspecified, + }, + { + Desc: "matching trace_state on multiple filter list", + Trace: newTraceState("example=value1"), + filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value1", "value2"}}, + Decision: samplingpolicy.Sampled, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + filter := NewTraceStateFilter(componenttest.NewNopTelemetrySettings(), c.filterCfg.Key, c.filterCfg.Values).(samplingpolicy.EarlyEvaluator) + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + rs := c.Trace.ReceivedBatches.ResourceSpans().At(0) + decision, err := filter.EarlyEvaluate(t.Context(), traceID, rs, c.Trace) + assert.NoError(t, err) + assert.Equal(t, decision, c.Decision) + }) + } +} + func newTraceState(traceState string) *samplingpolicy.TraceData { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() diff --git a/processor/tailsamplingprocessor/internal/sampling/util.go b/processor/tailsamplingprocessor/internal/sampling/util.go index 48b9709962618..1faa14ac2ab0f 100644 --- a/processor/tailsamplingprocessor/internal/sampling/util.go +++ b/processor/tailsamplingprocessor/internal/sampling/util.go @@ -20,18 +20,30 @@ func hasResourceOrSpanWithCondition( for i := 0; i < td.ResourceSpans().Len(); i++ { rs := td.ResourceSpans().At(i) - resource := rs.Resource() - if shouldSampleResource(resource) { - return samplingpolicy.Sampled - } - - if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSampleSpan, false) { - return samplingpolicy.Sampled + decision := batchHasResourceOrSpanWithCondition(rs, shouldSampleResource, shouldSampleSpan) + if decision != samplingpolicy.Unspecified { + return decision } } return samplingpolicy.NotSampled } +func batchHasResourceOrSpanWithCondition( + rs ptrace.ResourceSpans, + shouldSampleResource func(resource pcommon.Resource) bool, + shouldSampleSpan func(span ptrace.Span) bool, +) samplingpolicy.Decision { + resource := rs.Resource() + if shouldSampleResource(resource) { + return samplingpolicy.Sampled + } + + if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSampleSpan, false) { + return samplingpolicy.Sampled + } + return samplingpolicy.Unspecified +} + // invertHasResourceOrSpanWithCondition iterates through all the resources and instrumentation library spans until any // callback returns false. func invertHasResourceOrSpanWithCondition( @@ -78,6 +90,15 @@ func hasSpanWithCondition(td ptrace.Traces, shouldSample func(span ptrace.Span) return samplingpolicy.NotSampled } +// batchHasSpanWithCondition iterates through all the instrumentation library spans until any callback returns true. It +// is for use in EarlyEvaluators. +func batchHasSpanWithCondition(rs ptrace.ResourceSpans, shouldSample func(span ptrace.Span) bool) samplingpolicy.Decision { + if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSample, false) { + return samplingpolicy.Sampled + } + return samplingpolicy.Unspecified +} + func hasInstrumentationLibrarySpanWithCondition(ilss ptrace.ScopeSpansSlice, check func(span ptrace.Span) bool, invert bool) bool { for i := 0; i < ilss.Len(); i++ { ils := ilss.At(i) diff --git a/processor/tailsamplingprocessor/metadata.yaml b/processor/tailsamplingprocessor/metadata.yaml index f06fb293d1030..b89d431af4fa3 100644 --- a/processor/tailsamplingprocessor/metadata.yaml +++ b/processor/tailsamplingprocessor/metadata.yaml @@ -19,6 +19,10 @@ attributes: enum: [sampled, not_sampled, dropped] type: string + early: + description: Whether the decision was made early or after the decision wait + type: bool + policy: description: Name of the policy type: string @@ -38,7 +42,7 @@ telemetry: sum: value_type: int monotonic: true - attributes: [policy, sampled, decision] + attributes: [policy, sampled, decision, early] processor_tail_sampling_count_traces_sampled: description: Count of traces that were sampled or not per sampling policy @@ -49,7 +53,7 @@ telemetry: sum: value_type: int monotonic: true - attributes: [policy, sampled, decision] + attributes: [policy, sampled, decision, early] processor_tail_sampling_early_releases_from_cache_decision: description: Number of spans that were able to be immediately released due to a decision cache hit. diff --git a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go index e1bf7d56de700..77e9fa9aa5987 100644 --- a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go +++ b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go @@ -52,6 +52,16 @@ type Evaluator interface { Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) } +// EarlyEvaluator uses partial traces (newData) in order to make a sampling +// decision. The partial data could be as little as a single span, up to all +// the spans for a given resource. +type EarlyEvaluator interface { + // EarlyEvaluate does the actual evaluation of the batch of spans. Any + // implementations must only return Sampled, NotSampled, Dropped, or + // Unspecified decisions. Any other values will be treated as Unspecified. + EarlyEvaluate(ctx context.Context, traceID pcommon.TraceID, newData ptrace.ResourceSpans, allData *TraceData) (Decision, error) +} + type Extension interface { NewEvaluator(policyName string, cfg map[string]any) (Evaluator, error) } diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index b526cc2b3d160..1f6c4dc6e5ac0 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -37,6 +37,8 @@ type policy struct { name string // evaluator that decides if a trace is sampled or not by this policy instance. evaluator samplingpolicy.Evaluator + // earlyEvaluator is not nil iff this policy can be used for early decisions. + earlyEvaluator samplingpolicy.EarlyEvaluator // attribute to use in the telemetry to denote the policy. attribute metric.MeasurementOption } @@ -59,17 +61,18 @@ type tailSamplingSpanProcessor struct { telemetry *metadata.TelemetryBuilder logger *zap.Logger - deleteTraceQueue *list.List - nextConsumer consumer.Traces - policies []*policy - idToTrace map[pcommon.TraceID]*traceData - tickerFrequency time.Duration - decisionBatcher idbatcher.Batcher - sampledIDCache cache.Cache[bool] - nonSampledIDCache cache.Cache[bool] - recordPolicy bool - sampleOnFirstMatch bool - blockOnOverflow bool + deleteTraceQueue *list.List + nextConsumer consumer.Traces + policies []*policy + earlyEvaluationPossible bool + idToTrace map[pcommon.TraceID]*traceData + tickerFrequency time.Duration + decisionBatcher idbatcher.Batcher + sampledIDCache cache.Cache[bool] + nonSampledIDCache cache.Cache[bool] + recordPolicy bool + sampleOnFirstMatch bool + blockOnOverflow bool cfg Config host component.Host @@ -143,7 +146,7 @@ func (*tailSamplingSpanProcessor) Capabilities() consumer.Capabilities { // Start is invoked during service startup. func (tsp *tailSamplingSpanProcessor) Start(_ context.Context, host component.Host) error { tsp.host = host - policies, err := tsp.loadSamplingPolicies(host, tsp.cfg.PolicyCfgs) + policies, earlyEvaluationsPossible, err := tsp.loadSamplingPolicies(host, tsp.cfg.PolicyCfgs) if err != nil { return err } @@ -151,6 +154,7 @@ func (tsp *tailSamplingSpanProcessor) Start(_ context.Context, host component.Ho // If the policies are not set, set them. This is only for testing purposes, // so that withPolicies can inject custom policies. if tsp.policies == nil { + tsp.earlyEvaluationPossible = earlyEvaluationsPossible tsp.policies = policies } @@ -195,15 +199,18 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraces(_ context.Context, td ptrace } func (tsp *tailSamplingSpanProcessor) SetSamplingPolicy(cfgs []PolicyCfg) { - policies, err := tsp.loadSamplingPolicies(tsp.host, cfgs) + policies, earlyEvaluationPossible, err := tsp.loadSamplingPolicies(tsp.host, cfgs) if err != nil { tsp.logger.Error("Failed to load sampling policies", zap.Error(err)) return } - tsp.newPolicyChan <- newPolicyCmd{policies: policies} + tsp.newPolicyChan <- newPolicyCmd{ + policies: policies, + earlyEvaluationPossible: earlyEvaluationPossible, + } } -func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, cfgs []PolicyCfg) ([]*policy, error) { +func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, cfgs []PolicyCfg) ([]*policy, bool, error) { telemetrySettings := tsp.set.TelemetrySettings componentID := tsp.set.ID.Name() @@ -212,20 +219,21 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, dropPolicies := make([]*policy, 0, cLen) policyNames := make(map[string]struct{}, cLen) + earlyEvaluationPossible := false for i := range cfgs { cfg := cfgs[i] if cfg.Name == "" { - return nil, errors.New("policy name cannot be empty") + return nil, false, errors.New("policy name cannot be empty") } if _, exists := policyNames[cfg.Name]; exists { - return nil, fmt.Errorf("duplicate policy name %q", cfg.Name) + return nil, false, fmt.Errorf("duplicate policy name %q", cfg.Name) } policyNames[cfg.Name] = struct{}{} eval, err := getPolicyEvaluator(telemetrySettings, &cfg, extensions(host)) if err != nil { - return nil, fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err) + return nil, false, fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err) } uniquePolicyName := cfg.Name @@ -238,6 +246,10 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, evaluator: eval, attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)), } + if earlyEvaluator, ok := eval.(samplingpolicy.EarlyEvaluator); ok { + earlyEvaluationPossible = true + p.earlyEvaluator = earlyEvaluator + } if cfg.Type == Drop { dropPolicies = append(dropPolicies, p) @@ -245,8 +257,14 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, policies = append(policies, p) } } + + // We do not support early evaluation when drop policies are present yet. + if len(dropPolicies) > 0 { + earlyEvaluationPossible = false + } + // Dropped decision takes precedence over all others, therefore we evaluate them first. - return slices.Concat(dropPolicies, policies), nil + return slices.Concat(dropPolicies, policies), earlyEvaluationPossible, nil } // traceBatch contains all spans from a single batch for a single trace. @@ -257,7 +275,8 @@ type traceBatch struct { } type newPolicyCmd struct { - policies []*policy + policies []*policy + earlyEvaluationPossible bool } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -272,6 +291,8 @@ var ( attrDecisionSampled = metric.WithAttributes(attribute.String("sampled", "true"), attribute.String("decision", "sampled")) attrDecisionNotSampled = metric.WithAttributes(attribute.String("sampled", "false"), attribute.String("decision", "not_sampled")) attrDecisionDropped = metric.WithAttributes(attribute.String("sampled", "false"), attribute.String("decision", "dropped")) + attrEarlyDecision = metric.WithAttributes(attribute.Bool("early", true)) + attrNormalDecision = metric.WithAttributes(attribute.Bool("early", false)) decisionToAttributes = map[samplingpolicy.Decision]metric.MeasurementOption{ samplingpolicy.Sampled: attrDecisionSampled, samplingpolicy.NotSampled: attrDecisionNotSampled, @@ -469,6 +490,7 @@ func (tsp *tailSamplingSpanProcessor) iter(tickChan <-chan time.Time, workChan < } case cmd := <-tsp.newPolicyChan: tsp.policies = cmd.policies + tsp.earlyEvaluationPossible = cmd.earlyEvaluationPossible tsp.logger.Debug("New policies loaded", zap.Int("policies.len", len(tsp.policies))) case <-tickChan: tsp.samplingPolicyOnTick() @@ -544,9 +566,19 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { for _, id := range batch { trace, ok := tsp.idToTrace[id] if !ok { + // This will be caused by an early decision in most cases, but if + // this metric is higher than early decisions there could be a + // problem. metrics.idNotFoundOnMapCount++ continue } + // This will only happen if an early decision occurred and caching is + // disabled. The spans will have already been forwarded when the early + // decision happened. + if trace.finalDecision != samplingpolicy.Unspecified { + continue + } + trace.decisionTime = time.Now() decision := tsp.makeDecision(id, &trace.TraceData, metrics) @@ -575,9 +607,9 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { for i, p := range tsp.policies { for decision, stats := range metrics.tracesSampledByPolicyDecision[i] { - tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(tsp.ctx, int64(stats.tracesSampled), p.attribute, decisionToAttributes[decision]) + tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(tsp.ctx, int64(stats.tracesSampled), p.attribute, decisionToAttributes[decision], attrNormalDecision) if telemetry.IsMetricStatCountSpansSampledEnabled() { - tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(tsp.ctx, stats.spansSampled, p.attribute, decisionToAttributes[decision]) + tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(tsp.ctx, stats.spansSampled, p.attribute, decisionToAttributes[decision], attrNormalDecision) } } tsp.telemetry.ProcessorTailSamplingSamplingPolicyExecutionTimeSum.Add(tsp.ctx, metrics.cumulativeExecutionTime[i].executionTime.Microseconds(), p.attribute) @@ -700,8 +732,8 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac tsp.telemetry.ProcessorTailSamplingNewTraceIDReceived.Add(tsp.ctx, newTraceIDs) }() - actualData, ok := tsp.idToTrace[id] - if !ok { + actualData, traceAlreadyExisted := tsp.idToTrace[id] + if !traceAlreadyExisted { actualData = &traceData{ arrivalTime: currTime, TraceData: samplingpolicy.TraceData{ @@ -713,7 +745,6 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac tsp.idToTrace[id] = actualData newTraceIDs++ - tsp.decisionBatcher.AddToCurrentBatch(id) if !tsp.blockOnOverflow { tsp.deleteTraceQueue.PushBack(id) @@ -723,14 +754,6 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac } finalDecision := actualData.finalDecision - - if finalDecision == samplingpolicy.Unspecified { - // If the final decision hasn't been made, add the new spans to the - // existing trace. - appendToTraces(actualData.ReceivedBatches, rss) - return - } - switch finalDecision { case samplingpolicy.Sampled: traceTd := ptrace.NewTraces() @@ -738,6 +761,32 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac tsp.forwardSpans(tsp.ctx, traceTd) case samplingpolicy.NotSampled: tsp.releaseNotSampledTrace(id) + case samplingpolicy.Unspecified: + // If the final decision hasn't been made, add the new spans to the + // existing trace. + appendToTraces(actualData.ReceivedBatches, rss) + + // See if any early decision policies will change the final decision. + if tsp.cfg.EarlyDecisions && tsp.earlyEvaluationPossible { + // Since rss has been moved set it to the value in actualData. + rss = actualData.ReceivedBatches.ResourceSpans().At(actualData.ReceivedBatches.ResourceSpans().Len() - 1) + finalDecision = tsp.processEarlyDecisions(id, rss, &actualData.TraceData) + actualData.finalDecision = finalDecision + // If the early decision is to sample or drop the trace release the trace appropriately. + switch finalDecision { + case samplingpolicy.Sampled: + tsp.releaseSampledTrace(tsp.ctx, id, actualData.ReceivedBatches) + return + case samplingpolicy.Dropped: + tsp.releaseNotSampledTrace(id) + } + } + // If no early sample was done new traces still need to be added to the + // current batch. + if !traceAlreadyExisted { + tsp.decisionBatcher.AddToCurrentBatch(id) + } + return default: tsp.logger.Warn("Unexpected sampling decision", zap.Int("decision", int(finalDecision))) } @@ -747,6 +796,71 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac } } +// processEarlyDecisions tries to sample based on just the spans in a batch. +func (tsp *tailSamplingSpanProcessor) processEarlyDecisions(id pcommon.TraceID, currentSpans ptrace.ResourceSpans, trace *samplingpolicy.TraceData) samplingpolicy.Decision { + var sampledPolicy *policy + finalDecision := samplingpolicy.Unspecified + notSampled := 0 + // Check all policies before making a final decision. +policyLoop: + for _, p := range tsp.policies { + earlyEval, ok := p.evaluator.(samplingpolicy.EarlyEvaluator) + if !ok { + continue + } + + decision, err := earlyEval.EarlyEvaluate(tsp.ctx, id, currentSpans, trace) + if err != nil { + tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, 1) + tsp.logger.Debug("Sampling policy error", zap.Error(err)) + continue + } + + switch decision { + case samplingpolicy.Dropped: + finalDecision = samplingpolicy.Dropped + // Store which policy was sampled first. + if sampledPolicy == nil { + sampledPolicy = p + } + break policyLoop + case samplingpolicy.NotSampled: + notSampled++ + case samplingpolicy.Sampled: + finalDecision = samplingpolicy.Sampled + // Store which policy was sampled first. + if sampledPolicy == nil { + sampledPolicy = p + } + if tsp.sampleOnFirstMatch { + break policyLoop + } + } + } + + if len(tsp.policies) > 0 && notSampled >= len(tsp.policies) { + finalDecision = samplingpolicy.NotSampled + sampledPolicy = tsp.policies[0] + } + + if finalDecision != samplingpolicy.Unspecified { + attrDecision := decisionToAttributes[finalDecision] + tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, attrDecision) + tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(tsp.ctx, 1, sampledPolicy.attribute, attrDecision, attrEarlyDecision) + if telemetry.IsMetricStatCountSpansSampledEnabled() { + tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(tsp.ctx, trace.SpanCount, sampledPolicy.attribute, attrDecision, attrEarlyDecision) + } + } + + if finalDecision == samplingpolicy.Sampled { + if tsp.recordPolicy && sampledPolicy != nil { + sampling.SetAttrOnScopeSpans(trace, "tailsampling.policy", sampledPolicy.name) + } + } + + return finalDecision +} + func extensions(host component.Host) map[string]samplingpolicy.Extension { if host == nil { return nil diff --git a/processor/tailsamplingprocessor/processor_telemetry_test.go b/processor/tailsamplingprocessor/processor_telemetry_test.go index 9ead754eea489..9cfae51354fa2 100644 --- a/processor/tailsamplingprocessor/processor_telemetry_test.go +++ b/processor/tailsamplingprocessor/processor_telemetry_test.go @@ -89,6 +89,7 @@ func TestMetricsAfterOneEvaluation(t *testing.T) { attribute.String("policy", "always"), attribute.String("sampled", "true"), attribute.String("decision", "sampled"), + attribute.Bool("early", false), ), Value: 1, }, @@ -307,6 +308,7 @@ func TestMetricsWithComponentID(t *testing.T) { attribute.String("policy", "unique_id.always"), attribute.String("sampled", "true"), attribute.String("decision", "sampled"), + attribute.Bool("early", false), ), Value: 1, }, @@ -418,6 +420,7 @@ func TestMetricsCountSampled(t *testing.T) { attribute.String("policy", "always"), attribute.String("sampled", "true"), attribute.String("decision", "sampled"), + attribute.Bool("early", false), ), Value: 1, }, @@ -437,6 +440,7 @@ func TestMetricsCountSampled(t *testing.T) { attribute.String("policy", "always"), attribute.String("sampled", "true"), attribute.String("decision", "sampled"), + attribute.Bool("early", false), ), Value: 1, }, @@ -490,6 +494,7 @@ func TestMetricsCountSampled(t *testing.T) { attribute.String("policy", "never"), attribute.String("sampled", "false"), attribute.String("decision", "not_sampled"), + attribute.Bool("early", false), ), Value: 1, }, @@ -509,6 +514,7 @@ func TestMetricsCountSampled(t *testing.T) { attribute.String("policy", "never"), attribute.String("sampled", "false"), attribute.String("decision", "not_sampled"), + attribute.Bool("early", false), ), Value: 1, }, @@ -569,6 +575,7 @@ func TestMetricsCountSampled(t *testing.T) { attribute.String("policy", "drop"), attribute.String("sampled", "false"), attribute.String("decision", "dropped"), + attribute.Bool("early", false), ), Value: 1, }, @@ -588,6 +595,7 @@ func TestMetricsCountSampled(t *testing.T) { attribute.String("policy", "drop"), attribute.String("sampled", "false"), attribute.String("decision", "dropped"), + attribute.Bool("early", false), ), Value: 1, }, diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index aae412e15471f..2e71946a61dc4 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -1086,6 +1086,42 @@ func TestNumericAttributeCases(t *testing.T) { } } +func TestEarlyDecision(t *testing.T) { + traceIDs, batches := generateIDsAndBatches(128) + controller := newTestTSPController() + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: uint64(2 * len(traceIDs)), + ExpectedNewTracesPerSec: 64, + PolicyCfgs: testPolicy, + Options: []Option{ + withTestController(controller), + }, + EarlyDecisions: true, + } + telem := setupTestTelemetry() + telemetrySettings := telem.newSettings() + nextConsumer := new(consumertest.TracesSink) + sp, err := newTracesProcessor(t.Context(), telemetrySettings, nextConsumer, cfg) + require.NoError(t, err) + + err = sp.Start(t.Context(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + err = sp.Shutdown(t.Context()) + require.NoError(t, err) + }() + + for _, batch := range batches { + require.NoError(t, sp.ConsumeTraces(t.Context(), batch)) + } + time.Sleep(5 * time.Millisecond) + + // Make sure all traces are sampled before a tick is called. + allSampledTraces := nextConsumer.AllTraces() + assert.Len(t, allSampledTraces, len(batches)) +} + func TestExtension(t *testing.T) { controller := newTestTSPController() msp := new(consumertest.TracesSink) From 448ea7e009e16d9f185bb61fe34b2c0676197371 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 18 Nov 2025 18:25:40 -0700 Subject: [PATCH 02/11] Add parent span to sampling TraceData --- .../pkg/samplingpolicy/samplingpolicy.go | 2 + processor/tailsamplingprocessor/processor.go | 66 +++++++++++-------- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go index 77e9fa9aa5987..ef7d0ed5c637f 100644 --- a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go +++ b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go @@ -16,6 +16,8 @@ type TraceData struct { SpanCount int64 // ReceivedBatches stores all the batches received for the trace. ReceivedBatches ptrace.Traces + // ParentSpan + ParentSpan *ptrace.Span } // Decision gives the status of sampling decision. diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 1f6c4dc6e5ac0..fee9bb57841d1 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -185,10 +185,12 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraces(_ context.Context, td ptrace // be more efficient on its single goroutine. batch := []traceBatch{} for traceID, spans := range idToSpansAndScope { + rss, parentSpan := newResourceSpanFromSpanAndScopes(rss, spans) batch = append(batch, traceBatch{ - id: traceID, - rss: newResourceSpanFromSpanAndScopes(rss, spans), - spanCount: int64(len(spans)), + id: traceID, + rss: rss, + parentSpan: parentSpan, + spanCount: int64(len(spans)), }) } if len(batch) > 0 { @@ -269,9 +271,10 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, // traceBatch contains all spans from a single batch for a single trace. type traceBatch struct { - id pcommon.TraceID - rss ptrace.ResourceSpans - spanCount int64 + id pcommon.TraceID + rss ptrace.ResourceSpans + parentSpan *ptrace.Span + spanCount int64 } type newPolicyCmd struct { @@ -486,7 +489,7 @@ func (tsp *tailSamplingSpanProcessor) iter(tickChan <-chan time.Time, workChan < tsp.waitForSpace(tickChan) } - tsp.processTrace(trace.id, trace.rss, trace.spanCount) + tsp.processTrace(trace) } case cmd := <-tsp.newPolicyChan: tsp.policies = cmd.policies @@ -723,7 +726,7 @@ func groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceI return idToSpans } -func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrace.ResourceSpans, spanCount int64) { +func (tsp *tailSamplingSpanProcessor) processTrace(tb traceBatch) { currTime := time.Now() var newTraceIDs int64 @@ -732,59 +735,63 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac tsp.telemetry.ProcessorTailSamplingNewTraceIDReceived.Add(tsp.ctx, newTraceIDs) }() - actualData, traceAlreadyExisted := tsp.idToTrace[id] + actualData, traceAlreadyExisted := tsp.idToTrace[tb.id] if !traceAlreadyExisted { actualData = &traceData{ arrivalTime: currTime, TraceData: samplingpolicy.TraceData{ - SpanCount: spanCount, + SpanCount: tb.spanCount, ReceivedBatches: ptrace.NewTraces(), }, } - tsp.idToTrace[id] = actualData + tsp.idToTrace[tb.id] = actualData newTraceIDs++ if !tsp.blockOnOverflow { - tsp.deleteTraceQueue.PushBack(id) + tsp.deleteTraceQueue.PushBack(tb.id) } } else { - actualData.SpanCount += spanCount + actualData.SpanCount += tb.spanCount + } + + if tb.parentSpan != nil && actualData.TraceData.ParentSpan == nil { + actualData.TraceData.ParentSpan = tb.parentSpan } finalDecision := actualData.finalDecision switch finalDecision { case samplingpolicy.Sampled: traceTd := ptrace.NewTraces() - appendToTraces(traceTd, rss) + appendToTraces(traceTd, tb.rss) tsp.forwardSpans(tsp.ctx, traceTd) case samplingpolicy.NotSampled: - tsp.releaseNotSampledTrace(id) + tsp.releaseNotSampledTrace(tb.id) case samplingpolicy.Unspecified: // If the final decision hasn't been made, add the new spans to the // existing trace. - appendToTraces(actualData.ReceivedBatches, rss) + appendToTraces(actualData.ReceivedBatches, tb.rss) // See if any early decision policies will change the final decision. if tsp.cfg.EarlyDecisions && tsp.earlyEvaluationPossible { // Since rss has been moved set it to the value in actualData. - rss = actualData.ReceivedBatches.ResourceSpans().At(actualData.ReceivedBatches.ResourceSpans().Len() - 1) - finalDecision = tsp.processEarlyDecisions(id, rss, &actualData.TraceData) + rss := actualData.ReceivedBatches.ResourceSpans().At(actualData.ReceivedBatches.ResourceSpans().Len() - 1) + finalDecision = tsp.processEarlyDecisions(tb.id, rss, &actualData.TraceData) actualData.finalDecision = finalDecision // If the early decision is to sample or drop the trace release the trace appropriately. switch finalDecision { case samplingpolicy.Sampled: - tsp.releaseSampledTrace(tsp.ctx, id, actualData.ReceivedBatches) + tsp.releaseSampledTrace(tsp.ctx, tb.id, actualData.ReceivedBatches) return case samplingpolicy.Dropped: - tsp.releaseNotSampledTrace(id) + tsp.releaseNotSampledTrace(tb.id) } } // If no early sample was done new traces still need to be added to the // current batch. if !traceAlreadyExisted { - tsp.decisionBatcher.AddToCurrentBatch(id) + tsp.decisionBatcher.AddToCurrentBatch(tb.id) } return default: @@ -934,24 +941,29 @@ func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans) { rss.MoveTo(rs) } -func newResourceSpanFromSpanAndScopes(rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) ptrace.ResourceSpans { +func newResourceSpanFromSpanAndScopes(rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) (ptrace.ResourceSpans, *ptrace.Span) { rs := ptrace.NewResourceSpans() rss.Resource().CopyTo(rs.Resource()) + var parentSpan *ptrace.Span scopePointerToNewScope := make(map[*pcommon.InstrumentationScope]*ptrace.ScopeSpans) for _, spanAndScope := range spanAndScopes { // If the scope of the spanAndScope is not in the map, add it to the map and the destination. + var sp ptrace.Span if scope, ok := scopePointerToNewScope[spanAndScope.instrumentationScope]; !ok { is := rs.ScopeSpans().AppendEmpty() spanAndScope.instrumentationScope.CopyTo(is.Scope()) scopePointerToNewScope[spanAndScope.instrumentationScope] = &is - sp := is.Spans().AppendEmpty() - spanAndScope.span.CopyTo(sp) + sp = is.Spans().AppendEmpty() } else { - sp := scope.Spans().AppendEmpty() - spanAndScope.span.CopyTo(sp) + sp = scope.Spans().AppendEmpty() + } + + spanAndScope.span.CopyTo(sp) + if sp.ParentSpanID().IsEmpty() { + parentSpan = &sp } } - return rs + return rs, parentSpan } From 2635bd96d52a2541324cd6ced6d7799c7519a013 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 21 Nov 2025 08:50:42 -0700 Subject: [PATCH 03/11] Clarify that allData should not be used on each request --- .../pkg/samplingpolicy/samplingpolicy.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go index ef7d0ed5c637f..3d0bdc6442da8 100644 --- a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go +++ b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go @@ -61,6 +61,11 @@ type EarlyEvaluator interface { // EarlyEvaluate does the actual evaluation of the batch of spans. Any // implementations must only return Sampled, NotSampled, Dropped, or // Unspecified decisions. Any other values will be treated as Unspecified. + // + // The `ReceivedBatches` on allData should not be iterated through each + // time this function is called. It is included for implementations that + // wait for a trigger, such as the parent span being received, before + // looking across all the received spans. EarlyEvaluate(ctx context.Context, traceID pcommon.TraceID, newData ptrace.ResourceSpans, allData *TraceData) (Decision, error) } From d6342f725ff01c2b3c9a96314ef08d38746c0c4b Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 25 Nov 2025 09:43:59 -0700 Subject: [PATCH 04/11] Fix latency upper threshold behavior --- .../internal/sampling/latency.go | 5 +++ .../internal/sampling/latency_test.go | 39 ++++++++++++++++--- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/processor/tailsamplingprocessor/internal/sampling/latency.go b/processor/tailsamplingprocessor/internal/sampling/latency.go index c2b31e3739191..82a0f780f3d11 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency.go @@ -44,6 +44,11 @@ func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *samp } func (l *latency) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, batch ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + // If an upper threshold is set we don't know if there will be a future + // span that will cause a not sampled decision. + if l.upperThresholdMs > 0 { + return samplingpolicy.Unspecified, nil + } return batchHasSpanWithCondition(batch, l.condition()), nil } diff --git a/processor/tailsamplingprocessor/internal/sampling/latency_test.go b/processor/tailsamplingprocessor/internal/sampling/latency_test.go index a2aef8e46b014..007e61e766463 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency_test.go @@ -160,18 +160,18 @@ func TestLatency_Evaluate_Bounded(t *testing.T) { } func TestLatency_EarlyEvaluate(t *testing.T) { - filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000, 0).(samplingpolicy.EarlyEvaluator) - traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) now := time.Now() cases := []struct { - Desc string - Spans []spanWithTimeAndDuration - Decision samplingpolicy.Decision + Desc string + Threshold, UpperThreshold time.Duration + Spans []spanWithTimeAndDuration + Decision samplingpolicy.Decision }{ { "trace duration shorter than threshold", + 5 * time.Second, 0, []spanWithTimeAndDuration{ { StartTime: now, @@ -182,6 +182,7 @@ func TestLatency_EarlyEvaluate(t *testing.T) { }, { "trace duration is equal to threshold", + 5 * time.Second, 0, []spanWithTimeAndDuration{ { StartTime: now, @@ -192,6 +193,7 @@ func TestLatency_EarlyEvaluate(t *testing.T) { }, { "total trace duration is longer than threshold but every single span is shorter", + 5 * time.Second, 0, []spanWithTimeAndDuration{ { StartTime: now, @@ -204,16 +206,41 @@ func TestLatency_EarlyEvaluate(t *testing.T) { }, samplingpolicy.Sampled, }, + { + "trace duration is longer than upper bound", + 1 * time.Second, 2 * time.Second, + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 3 * time.Second, + }, + }, + // TODO: This can be samplingpolicy.NotSampled in the future. + samplingpolicy.Unspecified, + }, + { + "trace duration is longer than lower bound but less than upper bound", + 2 * time.Second, 5 * time.Second, + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 4000 * time.Millisecond, + }, + }, + samplingpolicy.Unspecified, + }, } for _, c := range cases { t.Run(c.Desc, func(t *testing.T) { + filter := NewLatency(componenttest.NewNopTelemetrySettings(), c.Threshold.Milliseconds(), c.UpperThreshold.Milliseconds()).(samplingpolicy.EarlyEvaluator) + trace := newTraceWithSpans(c.Spans) rs := trace.ReceivedBatches.ResourceSpans().At(0) decision, err := filter.EarlyEvaluate(t.Context(), traceID, rs, trace) assert.NoError(t, err) - assert.Equal(t, decision, c.Decision) + assert.Equal(t, c.Decision, decision) }) } } From e4b424d56bb808b9f4c054223c6f0fa36367fd61 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 25 Nov 2025 09:44:18 -0700 Subject: [PATCH 05/11] Fix review feedback --- .../internal/sampling/always_sample.go | 2 +- .../pkg/samplingpolicy/samplingpolicy.go | 4 +-- processor/tailsamplingprocessor/processor.go | 33 +++++++++---------- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/processor/tailsamplingprocessor/internal/sampling/always_sample.go b/processor/tailsamplingprocessor/internal/sampling/always_sample.go index 8a8434a5237c1..c54de705d959d 100644 --- a/processor/tailsamplingprocessor/internal/sampling/always_sample.go +++ b/processor/tailsamplingprocessor/internal/sampling/always_sample.go @@ -20,7 +20,7 @@ type alwaysSample struct { var ( _ samplingpolicy.Evaluator = (*alwaysSample)(nil) - _ samplingpolicy.EarlyEvaluator = (*statusCodeFilter)(nil) + _ samplingpolicy.EarlyEvaluator = (*alwaysSample)(nil) ) // NewAlwaysSample creates a policy evaluator the samples all traces. diff --git a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go index 3d0bdc6442da8..1f083dca1b115 100644 --- a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go +++ b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go @@ -16,8 +16,8 @@ type TraceData struct { SpanCount int64 // ReceivedBatches stores all the batches received for the trace. ReceivedBatches ptrace.Traces - // ParentSpan - ParentSpan *ptrace.Span + // RootSpan contains the root span if it has been ingested yet. + RootSpan *ptrace.Span } // Decision gives the status of sampling decision. diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index fee9bb57841d1..8a51971477b6f 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -187,10 +187,10 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraces(_ context.Context, td ptrace for traceID, spans := range idToSpansAndScope { rss, parentSpan := newResourceSpanFromSpanAndScopes(rss, spans) batch = append(batch, traceBatch{ - id: traceID, - rss: rss, - parentSpan: parentSpan, - spanCount: int64(len(spans)), + id: traceID, + rss: rss, + rootSpan: parentSpan, + spanCount: int64(len(spans)), }) } if len(batch) > 0 { @@ -271,10 +271,10 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, // traceBatch contains all spans from a single batch for a single trace. type traceBatch struct { - id pcommon.TraceID - rss ptrace.ResourceSpans - parentSpan *ptrace.Span - spanCount int64 + id pcommon.TraceID + rss ptrace.ResourceSpans + rootSpan *ptrace.Span + spanCount int64 } type newPolicyCmd struct { @@ -756,8 +756,8 @@ func (tsp *tailSamplingSpanProcessor) processTrace(tb traceBatch) { actualData.SpanCount += tb.spanCount } - if tb.parentSpan != nil && actualData.TraceData.ParentSpan == nil { - actualData.TraceData.ParentSpan = tb.parentSpan + if tb.rootSpan != nil && actualData.TraceData.RootSpan == nil { + actualData.TraceData.RootSpan = tb.rootSpan } finalDecision := actualData.finalDecision @@ -811,15 +811,14 @@ func (tsp *tailSamplingSpanProcessor) processEarlyDecisions(id pcommon.TraceID, // Check all policies before making a final decision. policyLoop: for _, p := range tsp.policies { - earlyEval, ok := p.evaluator.(samplingpolicy.EarlyEvaluator) - if !ok { + if p.earlyEvaluator == nil { continue } - decision, err := earlyEval.EarlyEvaluate(tsp.ctx, id, currentSpans, trace) + decision, err := p.earlyEvaluator.EarlyEvaluate(tsp.ctx, id, currentSpans, trace) if err != nil { tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, 1) - tsp.logger.Debug("Sampling policy error", zap.Error(err)) + tsp.logger.Debug("Sampling policy error during early evaluation", zap.Error(err)) continue } @@ -944,7 +943,7 @@ func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans) { func newResourceSpanFromSpanAndScopes(rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) (ptrace.ResourceSpans, *ptrace.Span) { rs := ptrace.NewResourceSpans() rss.Resource().CopyTo(rs.Resource()) - var parentSpan *ptrace.Span + var rootSpan *ptrace.Span scopePointerToNewScope := make(map[*pcommon.InstrumentationScope]*ptrace.ScopeSpans) for _, spanAndScope := range spanAndScopes { @@ -962,8 +961,8 @@ func newResourceSpanFromSpanAndScopes(rss ptrace.ResourceSpans, spanAndScopes [] spanAndScope.span.CopyTo(sp) if sp.ParentSpanID().IsEmpty() { - parentSpan = &sp + rootSpan = &sp } } - return rs, parentSpan + return rs, rootSpan } From c7ce6de1530bd5416f4bbc303abcbb2afcc79e74 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 25 Nov 2025 10:32:36 -0700 Subject: [PATCH 06/11] Add EarlyEvaluate to the Evaluator interface Combine the two interfaces previously created to simplify the code and avoid as many casts everywhere. A sampler that does not support early decisions can just return samplingpolicy.Unspecified. --- .../internal/sampling/always_sample.go | 5 +---- .../internal/sampling/always_sample_test.go | 2 +- .../internal/sampling/and.go | 5 +++++ .../internal/sampling/boolean_tag_filter.go | 5 +---- .../sampling/boolean_tag_filter_test.go | 2 +- .../internal/sampling/bytes_limiting.go | 4 ++++ .../internal/sampling/composite.go | 5 +++++ .../internal/sampling/drop.go | 5 +++++ .../internal/sampling/latency.go | 5 +---- .../internal/sampling/latency_test.go | 2 +- .../internal/sampling/numeric_tag_filter.go | 5 +---- .../sampling/numeric_tag_filter_test.go | 2 +- .../internal/sampling/ottl_test.go | 2 +- .../internal/sampling/probabilistic.go | 5 +---- .../internal/sampling/probabilistic_test.go | 2 +- .../internal/sampling/rate_limiting.go | 5 +++++ .../internal/sampling/span_count_sampler.go | 5 +---- .../sampling/span_count_sampler_test.go | 2 +- .../internal/sampling/status_code.go | 5 +---- .../internal/sampling/status_code_test.go | 2 +- .../internal/sampling/string_tag_filter.go | 5 +---- .../internal/sampling/string_tag_filter_test.go | 2 +- .../sampling/trace_state_filter_test.go | 2 +- .../pkg/samplingpolicy/samplingpolicy.go | 12 ++++-------- processor/tailsamplingprocessor/processor.go | 17 ++--------------- .../tailsamplingprocessor/processor_test.go | 12 +++++++++--- 26 files changed, 57 insertions(+), 68 deletions(-) diff --git a/processor/tailsamplingprocessor/internal/sampling/always_sample.go b/processor/tailsamplingprocessor/internal/sampling/always_sample.go index c54de705d959d..b5369d5a526d9 100644 --- a/processor/tailsamplingprocessor/internal/sampling/always_sample.go +++ b/processor/tailsamplingprocessor/internal/sampling/always_sample.go @@ -18,10 +18,7 @@ type alwaysSample struct { logger *zap.Logger } -var ( - _ samplingpolicy.Evaluator = (*alwaysSample)(nil) - _ samplingpolicy.EarlyEvaluator = (*alwaysSample)(nil) -) +var _ samplingpolicy.Evaluator = (*alwaysSample)(nil) // NewAlwaysSample creates a policy evaluator the samples all traces. func NewAlwaysSample(settings component.TelemetrySettings) samplingpolicy.Evaluator { diff --git a/processor/tailsamplingprocessor/internal/sampling/always_sample_test.go b/processor/tailsamplingprocessor/internal/sampling/always_sample_test.go index cf2787c599310..af1fcdd826d56 100644 --- a/processor/tailsamplingprocessor/internal/sampling/always_sample_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/always_sample_test.go @@ -26,7 +26,7 @@ func TestEvaluate_AlwaysSample(t *testing.T) { func TestEarlyEvaluate_AlwaysSample(t *testing.T) { filter := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) - decision, err := filter.(samplingpolicy.EarlyEvaluator).EarlyEvaluate( + decision, err := filter.EarlyEvaluate( t.Context(), pcommon.TraceID([16]byte{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, diff --git a/processor/tailsamplingprocessor/internal/sampling/and.go b/processor/tailsamplingprocessor/internal/sampling/and.go index b89ffc8959452..d5cd44c506589 100644 --- a/processor/tailsamplingprocessor/internal/sampling/and.go +++ b/processor/tailsamplingprocessor/internal/sampling/and.go @@ -7,6 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" @@ -43,3 +44,7 @@ func (c *And) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *samp } return samplingpolicy.Sampled, nil } + +func (c *And) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + return samplingpolicy.Unspecified, nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go index c987cdc895494..75d27907c2b0b 100644 --- a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go @@ -21,10 +21,7 @@ type booleanAttributeFilter struct { invertMatch bool } -var ( - _ samplingpolicy.Evaluator = (*booleanAttributeFilter)(nil) - _ samplingpolicy.EarlyEvaluator = (*booleanAttributeFilter)(nil) -) +var _ samplingpolicy.Evaluator = (*booleanAttributeFilter)(nil) // NewBooleanAttributeFilter creates a policy evaluator that samples all traces with // the given attribute that match the supplied boolean value. diff --git a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go index 31526326f0a6d..bc5ece9d18897 100644 --- a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go @@ -57,7 +57,7 @@ func TestBooleanTagFilter(t *testing.T) { func TestBooleanTagFilter_EarlyEvaluate(t *testing.T) { empty := map[string]any{} - filter := NewBooleanAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", true, false).(samplingpolicy.EarlyEvaluator) + filter := NewBooleanAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", true, false) resAttr := map[string]any{} resAttr["example"] = 8 diff --git a/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go b/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go index 515428c1ef235..7050e3ec3ce80 100644 --- a/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go +++ b/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go @@ -57,6 +57,10 @@ func (b *bytesLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *sa return samplingpolicy.NotSampled, nil } +func (b *bytesLimiting) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + return samplingpolicy.Unspecified, nil +} + // calculateTraceSize calculates the accurate protobuf marshaled size of a trace in bytes // using the OpenTelemetry Collector's built-in ProtoMarshaler.TracesSize() method func calculateTraceSize(trace *samplingpolicy.TraceData) int64 { diff --git a/processor/tailsamplingprocessor/internal/sampling/composite.go b/processor/tailsamplingprocessor/internal/sampling/composite.go index bdbc94186f96f..f429427ef05eb 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite.go @@ -7,6 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" @@ -134,3 +135,7 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace return samplingpolicy.NotSampled, nil } + +func (c *Composite) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + return samplingpolicy.Unspecified, nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/drop.go b/processor/tailsamplingprocessor/internal/sampling/drop.go index 52d8758a2caee..39eb9531347a7 100644 --- a/processor/tailsamplingprocessor/internal/sampling/drop.go +++ b/processor/tailsamplingprocessor/internal/sampling/drop.go @@ -7,6 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" @@ -44,3 +45,7 @@ func (c *Drop) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *sam } return samplingpolicy.Dropped, nil } + +func (c *Drop) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + return samplingpolicy.Unspecified, nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/latency.go b/processor/tailsamplingprocessor/internal/sampling/latency.go index 82a0f780f3d11..58002ff0cb8ef 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency.go @@ -20,10 +20,7 @@ type latency struct { upperThresholdMs int64 } -var ( - _ samplingpolicy.Evaluator = (*latency)(nil) - _ samplingpolicy.EarlyEvaluator = (*latency)(nil) -) +var _ samplingpolicy.Evaluator = (*latency)(nil) // NewLatency creates a policy evaluator sampling traces with a duration greater than a configured threshold func NewLatency(settings component.TelemetrySettings, thresholdMs, upperThresholdMs int64) samplingpolicy.Evaluator { diff --git a/processor/tailsamplingprocessor/internal/sampling/latency_test.go b/processor/tailsamplingprocessor/internal/sampling/latency_test.go index 007e61e766463..9b99113c6993f 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency_test.go @@ -233,7 +233,7 @@ func TestLatency_EarlyEvaluate(t *testing.T) { for _, c := range cases { t.Run(c.Desc, func(t *testing.T) { - filter := NewLatency(componenttest.NewNopTelemetrySettings(), c.Threshold.Milliseconds(), c.UpperThreshold.Milliseconds()).(samplingpolicy.EarlyEvaluator) + filter := NewLatency(componenttest.NewNopTelemetrySettings(), c.Threshold.Milliseconds(), c.UpperThreshold.Milliseconds()) trace := newTraceWithSpans(c.Spans) rs := trace.ReceivedBatches.ResourceSpans().At(0) diff --git a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go index 8f0786dcc48ac..7e20660208e6a 100644 --- a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go @@ -23,10 +23,7 @@ type numericAttributeFilter struct { invertMatch bool } -var ( - _ samplingpolicy.Evaluator = (*numericAttributeFilter)(nil) - _ samplingpolicy.EarlyEvaluator = (*numericAttributeFilter)(nil) -) +var _ samplingpolicy.Evaluator = (*numericAttributeFilter)(nil) // NewNumericAttributeFilter creates a policy evaluator that samples all traces with // the given attribute in the given numeric range. If minValue is nil, it will use math.MinInt64. diff --git a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go index bc4a0459ca4e5..199cbdf0b210e 100644 --- a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go @@ -93,7 +93,7 @@ func TestNumericTagFilter_EarlyEvaluate(t *testing.T) { empty := map[string]any{} minVal := int64(math.MinInt32) maxVal := int64(math.MaxInt32) - filter := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", &minVal, &maxVal, false).(samplingpolicy.EarlyEvaluator) + filter := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", &minVal, &maxVal, false) resAttr := map[string]any{} resAttr["example"] = 8 diff --git a/processor/tailsamplingprocessor/internal/sampling/ottl_test.go b/processor/tailsamplingprocessor/internal/sampling/ottl_test.go index 5ab9af60c1c5d..51376b72a490b 100644 --- a/processor/tailsamplingprocessor/internal/sampling/ottl_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/ottl_test.go @@ -178,7 +178,7 @@ func TestOTTL_EarlyEvaluate(t *testing.T) { trace := newTraceWithSpansAttributes(c.Spans) rs := trace.ReceivedBatches.ResourceSpans().At(0) - decision, err := filter.(samplingpolicy.EarlyEvaluator).EarlyEvaluate(t.Context(), traceID, rs, trace) + decision, err := filter.EarlyEvaluate(t.Context(), traceID, rs, trace) assert.Equal(t, err != nil, c.WantErr) assert.Equal(t, decision, c.Decision) } diff --git a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go index 42be4b31c68a3..814af4d511e02 100644 --- a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go +++ b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go @@ -27,10 +27,7 @@ type probabilisticSampler struct { hashSalt string } -var ( - _ samplingpolicy.Evaluator = (*probabilisticSampler)(nil) - _ samplingpolicy.EarlyEvaluator = (*probabilisticSampler)(nil) -) +var _ samplingpolicy.Evaluator = (*probabilisticSampler)(nil) // NewProbabilisticSampler creates a policy evaluator that samples a percentage of // traces. diff --git a/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go b/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go index 2766c6b062102..a0b03f169f24d 100644 --- a/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go @@ -83,7 +83,7 @@ func TestProbabilisticSampling(t *testing.T) { err error ) if earlyEvaluation { - decision, err = probabilisticSampler.(samplingpolicy.EarlyEvaluator).EarlyEvaluate(t.Context(), traceID, rs, trace) + decision, err = probabilisticSampler.EarlyEvaluate(t.Context(), traceID, rs, trace) } else { decision, err = probabilisticSampler.Evaluate(t.Context(), traceID, trace) } diff --git a/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go b/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go index dd108db475f9c..501b5c937dbd9 100644 --- a/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go +++ b/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" @@ -48,3 +49,7 @@ func (r *rateLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *sam return samplingpolicy.NotSampled, nil } + +func (r *rateLimiting) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + return samplingpolicy.Unspecified, nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go index d3e0bae95b105..1b0b66349a4bb 100644 --- a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go +++ b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go @@ -20,10 +20,7 @@ type spanCount struct { maxSpans int64 } -var ( - _ samplingpolicy.Evaluator = (*spanCount)(nil) - _ samplingpolicy.EarlyEvaluator = (*spanCount)(nil) -) +var _ samplingpolicy.Evaluator = (*spanCount)(nil) // NewSpanCount creates a policy evaluator sampling traces with more than one span per trace func NewSpanCount(settings component.TelemetrySettings, minSpans, maxSpans int32) samplingpolicy.Evaluator { diff --git a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go index c632ef7d1ff00..59ab76a52ab5d 100644 --- a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler_test.go @@ -287,7 +287,7 @@ func TestSpanCount_EarlyEvaluate(t *testing.T) { for _, c := range cases { t.Run(c.Desc, func(t *testing.T) { - filter := NewSpanCount(componenttest.NewNopTelemetrySettings(), c.minSpans, c.maxSpans).(samplingpolicy.EarlyEvaluator) + filter := NewSpanCount(componenttest.NewNopTelemetrySettings(), c.minSpans, c.maxSpans) trace := newTraceWithMultipleSpans(c.NumberSpans) rss := trace.ReceivedBatches.ResourceSpans() decision, err := filter.EarlyEvaluate(t.Context(), traceID, rss.At(rss.Len()-1), trace) diff --git a/processor/tailsamplingprocessor/internal/sampling/status_code.go b/processor/tailsamplingprocessor/internal/sampling/status_code.go index 082c0415d4cf5..2b167a0e8d9b1 100644 --- a/processor/tailsamplingprocessor/internal/sampling/status_code.go +++ b/processor/tailsamplingprocessor/internal/sampling/status_code.go @@ -22,10 +22,7 @@ type statusCodeFilter struct { statusCodes []ptrace.StatusCode } -var ( - _ samplingpolicy.Evaluator = (*statusCodeFilter)(nil) - _ samplingpolicy.EarlyEvaluator = (*statusCodeFilter)(nil) -) +var _ samplingpolicy.Evaluator = (*statusCodeFilter)(nil) // NewStatusCodeFilter creates a policy evaluator that samples all traces with // a given status code. diff --git a/processor/tailsamplingprocessor/internal/sampling/status_code_test.go b/processor/tailsamplingprocessor/internal/sampling/status_code_test.go index 519e40e9110bc..1d394b41f73ad 100644 --- a/processor/tailsamplingprocessor/internal/sampling/status_code_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/status_code_test.go @@ -134,7 +134,7 @@ func TestStatusCodeFilter_EarlyEvaluate(t *testing.T) { statusCodeFilter, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), c.StatusCodesToFilterOn) assert.NoError(t, err) - decision, err := statusCodeFilter.(samplingpolicy.EarlyEvaluator).EarlyEvaluate(t.Context(), traceID, rs, nil) + decision, err := statusCodeFilter.EarlyEvaluate(t.Context(), traceID, rs, nil) assert.NoError(t, err) assert.Equal(t, c.Decision, decision) }) diff --git a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go index fb109bffaf300..2cae76cab0f4e 100644 --- a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go @@ -33,10 +33,7 @@ type regexStrSetting struct { filterList []*regexp.Regexp } -var ( - _ samplingpolicy.Evaluator = (*stringAttributeFilter)(nil) - _ samplingpolicy.EarlyEvaluator = (*stringAttributeFilter)(nil) -) +var _ samplingpolicy.Evaluator = (*stringAttributeFilter)(nil) // NewStringAttributeFilter creates a policy evaluator that samples all traces with // the given attribute in the given numeric range. diff --git a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go index 1fc01cd2a0db9..a319aaaaf5c91 100644 --- a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go @@ -314,7 +314,7 @@ func TestStringTagFilter_EarlyEvaluate(t *testing.T) { require.NoError(t, err) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) rs := c.Trace.ReceivedBatches.ResourceSpans().At(0) - decision, err := filter.(samplingpolicy.EarlyEvaluator).EarlyEvaluate(t.Context(), traceID, rs, c.Trace) + decision, err := filter.EarlyEvaluate(t.Context(), traceID, rs, c.Trace) assert.NoError(t, err) assert.Equal(t, decision, c.Decision) }) diff --git a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go index e8a561f197618..24f829dd934c4 100644 --- a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter_test.go @@ -146,7 +146,7 @@ func TestTraceStateFilter_EarlyEvaluate(t *testing.T) { for _, c := range cases { t.Run(c.Desc, func(t *testing.T) { - filter := NewTraceStateFilter(componenttest.NewNopTelemetrySettings(), c.filterCfg.Key, c.filterCfg.Values).(samplingpolicy.EarlyEvaluator) + filter := NewTraceStateFilter(componenttest.NewNopTelemetrySettings(), c.filterCfg.Key, c.filterCfg.Values) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) rs := c.Trace.ReceivedBatches.ResourceSpans().At(0) decision, err := filter.EarlyEvaluate(t.Context(), traceID, rs, c.Trace) diff --git a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go index 1f083dca1b115..96fac7ef8f781 100644 --- a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go +++ b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go @@ -52,15 +52,11 @@ const ( type Evaluator interface { // Evaluate looks at the trace data and returns a corresponding SamplingDecision. Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) -} -// EarlyEvaluator uses partial traces (newData) in order to make a sampling -// decision. The partial data could be as little as a single span, up to all -// the spans for a given resource. -type EarlyEvaluator interface { - // EarlyEvaluate does the actual evaluation of the batch of spans. Any - // implementations must only return Sampled, NotSampled, Dropped, or - // Unspecified decisions. Any other values will be treated as Unspecified. + // EarlyEvaluate uses partial traces (newData) in order to make a sampling + // decison. Any implementations must only return Sampled, NotSampled, + // Dropped, or Unspecified decisions. Any other values will be treated as + // Unspecified. // // The `ReceivedBatches` on allData should not be iterated through each // time this function is called. It is included for implementations that diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 8a51971477b6f..d3bea207e03eb 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -37,8 +37,6 @@ type policy struct { name string // evaluator that decides if a trace is sampled or not by this policy instance. evaluator samplingpolicy.Evaluator - // earlyEvaluator is not nil iff this policy can be used for early decisions. - earlyEvaluator samplingpolicy.EarlyEvaluator // attribute to use in the telemetry to denote the policy. attribute metric.MeasurementOption } @@ -221,7 +219,6 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, dropPolicies := make([]*policy, 0, cLen) policyNames := make(map[string]struct{}, cLen) - earlyEvaluationPossible := false for i := range cfgs { cfg := cfgs[i] if cfg.Name == "" { @@ -248,10 +245,6 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, evaluator: eval, attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)), } - if earlyEvaluator, ok := eval.(samplingpolicy.EarlyEvaluator); ok { - earlyEvaluationPossible = true - p.earlyEvaluator = earlyEvaluator - } if cfg.Type == Drop { dropPolicies = append(dropPolicies, p) @@ -261,9 +254,7 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, } // We do not support early evaluation when drop policies are present yet. - if len(dropPolicies) > 0 { - earlyEvaluationPossible = false - } + earlyEvaluationPossible := len(dropPolicies) == 0 // Dropped decision takes precedence over all others, therefore we evaluate them first. return slices.Concat(dropPolicies, policies), earlyEvaluationPossible, nil @@ -811,11 +802,7 @@ func (tsp *tailSamplingSpanProcessor) processEarlyDecisions(id pcommon.TraceID, // Check all policies before making a final decision. policyLoop: for _, p := range tsp.policies { - if p.earlyEvaluator == nil { - continue - } - - decision, err := p.earlyEvaluator.EarlyEvaluate(tsp.ctx, id, currentSpans, trace) + decision, err := p.evaluator.EarlyEvaluate(tsp.ctx, id, currentSpans, trace) if err != nil { tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, 1) tsp.logger.Debug("Sampling policy error during early evaluation", zap.Error(err)) diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 2e71946a61dc4..a66011aff70c8 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -920,9 +920,10 @@ func uInt64ToSpanID(id uint64) pcommon.SpanID { } type mockPolicyEvaluator struct { - NextDecision samplingpolicy.Decision - NextError error - EvaluationCount int + NextDecision samplingpolicy.Decision + NextError error + EvaluationCount int + EarlyEvaluationCount int } var _ samplingpolicy.Evaluator = (*mockPolicyEvaluator)(nil) @@ -932,6 +933,11 @@ func (m *mockPolicyEvaluator) Evaluate(context.Context, pcommon.TraceID, *sampli return m.NextDecision, m.NextError } +func (m *mockPolicyEvaluator) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + m.EarlyEvaluationCount++ + return m.NextDecision, m.NextError +} + type syncIDBatcher struct { sync.Mutex openBatch idbatcher.Batch From f686d2326b743f3074789e56596f8a742f5ab8dc Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Wed, 26 Nov 2025 10:23:59 -0700 Subject: [PATCH 07/11] Fix and test NotSampled early decision case --- processor/tailsamplingprocessor/processor.go | 3 ++- .../tailsamplingprocessor/processor_test.go | 21 ++++++++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index d3bea207e03eb..001d1916d2d90 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -775,8 +775,9 @@ func (tsp *tailSamplingSpanProcessor) processTrace(tb traceBatch) { case samplingpolicy.Sampled: tsp.releaseSampledTrace(tsp.ctx, tb.id, actualData.ReceivedBatches) return - case samplingpolicy.Dropped: + case samplingpolicy.NotSampled, samplingpolicy.Dropped: tsp.releaseNotSampledTrace(tb.id) + return } } // If no early sample was done new traces still need to be added to the diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index a66011aff70c8..bfa50d619424e 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -1099,7 +1099,19 @@ func TestEarlyDecision(t *testing.T) { DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, - PolicyCfgs: testPolicy, + DecisionCache: DecisionCacheConfig{ + SampledCacheSize: 128, + NonSampledCacheSize: 128, + }, + PolicyCfgs: []PolicyCfg{ + {sharedPolicyCfg: sharedPolicyCfg{ + Name: "test-policy", + Type: Probabilistic, + ProbabilisticCfg: ProbabilisticCfg{ + SamplingPercentage: 50, + }, + }}, + }, Options: []Option{ withTestController(controller), }, @@ -1123,9 +1135,12 @@ func TestEarlyDecision(t *testing.T) { } time.Sleep(5 * time.Millisecond) - // Make sure all traces are sampled before a tick is called. + // Make sure about half of traces are sampled before a tick is called. allSampledTraces := nextConsumer.AllTraces() - assert.Len(t, allSampledTraces, len(batches)) + assert.Less(t, len(allSampledTraces), len(batches)*6/10) + assert.Greater(t, len(allSampledTraces), len(batches)*4/10) + // All traces should be flushed from the map. + assert.Empty(t, sp.(*tailSamplingSpanProcessor).idToTrace) } func TestExtension(t *testing.T) { From 0dd0a1c7ffceb2876ad0f215daeb6f31b319682c Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Wed, 26 Nov 2025 10:28:38 -0700 Subject: [PATCH 08/11] Fix lint errors --- processor/tailsamplingprocessor/internal/sampling/and.go | 2 +- .../internal/sampling/bytes_limiting.go | 2 +- .../tailsamplingprocessor/internal/sampling/composite.go | 2 +- processor/tailsamplingprocessor/internal/sampling/drop.go | 2 +- .../internal/sampling/rate_limiting.go | 2 +- .../pkg/samplingpolicy/samplingpolicy.go | 2 +- processor/tailsamplingprocessor/processor.go | 6 +++--- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/processor/tailsamplingprocessor/internal/sampling/and.go b/processor/tailsamplingprocessor/internal/sampling/and.go index d5cd44c506589..d20593197c75e 100644 --- a/processor/tailsamplingprocessor/internal/sampling/and.go +++ b/processor/tailsamplingprocessor/internal/sampling/and.go @@ -45,6 +45,6 @@ func (c *And) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *samp return samplingpolicy.Sampled, nil } -func (c *And) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { +func (*And) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { return samplingpolicy.Unspecified, nil } diff --git a/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go b/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go index 7050e3ec3ce80..663f62062e829 100644 --- a/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go +++ b/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go @@ -57,7 +57,7 @@ func (b *bytesLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *sa return samplingpolicy.NotSampled, nil } -func (b *bytesLimiting) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { +func (*bytesLimiting) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { return samplingpolicy.Unspecified, nil } diff --git a/processor/tailsamplingprocessor/internal/sampling/composite.go b/processor/tailsamplingprocessor/internal/sampling/composite.go index f429427ef05eb..26fa2b8dd4cc7 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite.go @@ -136,6 +136,6 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace return samplingpolicy.NotSampled, nil } -func (c *Composite) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { +func (*Composite) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { return samplingpolicy.Unspecified, nil } diff --git a/processor/tailsamplingprocessor/internal/sampling/drop.go b/processor/tailsamplingprocessor/internal/sampling/drop.go index 39eb9531347a7..b1256552d6f89 100644 --- a/processor/tailsamplingprocessor/internal/sampling/drop.go +++ b/processor/tailsamplingprocessor/internal/sampling/drop.go @@ -46,6 +46,6 @@ func (c *Drop) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *sam return samplingpolicy.Dropped, nil } -func (c *Drop) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { +func (*Drop) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { return samplingpolicy.Unspecified, nil } diff --git a/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go b/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go index 501b5c937dbd9..4493babb03273 100644 --- a/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go +++ b/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go @@ -50,6 +50,6 @@ func (r *rateLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *sam return samplingpolicy.NotSampled, nil } -func (r *rateLimiting) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, _ ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { +func (*rateLimiting) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { return samplingpolicy.Unspecified, nil } diff --git a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go index 96fac7ef8f781..de53940e97b10 100644 --- a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go +++ b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go @@ -54,7 +54,7 @@ type Evaluator interface { Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) // EarlyEvaluate uses partial traces (newData) in order to make a sampling - // decison. Any implementations must only return Sampled, NotSampled, + // decision. Any implementations must only return Sampled, NotSampled, // Dropped, or Unspecified decisions. Any other values will be treated as // Unspecified. // diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 001d1916d2d90..fddd9faf48285 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -183,11 +183,11 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraces(_ context.Context, td ptrace // be more efficient on its single goroutine. batch := []traceBatch{} for traceID, spans := range idToSpansAndScope { - rss, parentSpan := newResourceSpanFromSpanAndScopes(rss, spans) + newRSS, rootSpan := newResourceSpanFromSpanAndScopes(rss, spans) batch = append(batch, traceBatch{ id: traceID, - rss: rss, - rootSpan: parentSpan, + rss: newRSS, + rootSpan: rootSpan, spanCount: int64(len(spans)), }) } From 53528404c54e9e2b7c3d3bcd0fb50974a11c3cbe Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 9 Dec 2025 10:24:27 -0700 Subject: [PATCH 09/11] Fix lint failures post rebase --- processor/tailsamplingprocessor/processor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index fddd9faf48285..7c552fa73afff 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -747,8 +747,8 @@ func (tsp *tailSamplingSpanProcessor) processTrace(tb traceBatch) { actualData.SpanCount += tb.spanCount } - if tb.rootSpan != nil && actualData.TraceData.RootSpan == nil { - actualData.TraceData.RootSpan = tb.rootSpan + if tb.rootSpan != nil && actualData.RootSpan == nil { + actualData.RootSpan = tb.rootSpan } finalDecision := actualData.finalDecision From a6efff54d2ed44b3fd2cc783621a5ae8f6d0e5a2 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 9 Dec 2025 12:37:44 -0700 Subject: [PATCH 10/11] Add more early vs normal attributes for debugging --- processor/tailsamplingprocessor/documentation.md | 6 ++++++ processor/tailsamplingprocessor/metadata.yaml | 1 + processor/tailsamplingprocessor/processor.go | 7 +++++-- .../tailsamplingprocessor/processor_telemetry_test.go | 6 ++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/processor/tailsamplingprocessor/documentation.md b/processor/tailsamplingprocessor/documentation.md index c55140a2e7101..5503cd6f6d255 100644 --- a/processor/tailsamplingprocessor/documentation.md +++ b/processor/tailsamplingprocessor/documentation.md @@ -101,6 +101,12 @@ Count of sampling policy evaluation errors [Development] | ---- | ----------- | ---------- | --------- | --------- | | {errors} | Sum | Int | true | Development | +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| early | Whether the decision was made early or after the decision wait | Any Bool | + ### otelcol_processor_tail_sampling_sampling_policy_execution_count Total number of executions of a specific sampling policy [Development] diff --git a/processor/tailsamplingprocessor/metadata.yaml b/processor/tailsamplingprocessor/metadata.yaml index b89d431af4fa3..f5e3133e61163 100644 --- a/processor/tailsamplingprocessor/metadata.yaml +++ b/processor/tailsamplingprocessor/metadata.yaml @@ -115,6 +115,7 @@ telemetry: sum: value_type: int monotonic: true + attributes: [early] processor_tail_sampling_sampling_policy_execution_count: description: Total number of executions of a specific sampling policy diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 7c552fa73afff..b34552063b07b 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -255,6 +255,9 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, // We do not support early evaluation when drop policies are present yet. earlyEvaluationPossible := len(dropPolicies) == 0 + if tsp.cfg.EarlyDecisions && !earlyEvaluationPossible { + tsp.logger.Debug("Early evaluations are not possible when drop policies are present") + } // Dropped decision takes precedence over all others, therefore we evaluate them first. return slices.Concat(dropPolicies, policies), earlyEvaluationPossible, nil @@ -593,7 +596,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, time.Since(startTime).Milliseconds()) tsp.telemetry.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(len(tsp.idToTrace))) tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount) - tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount) + tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount, attrNormalDecision) for decision, count := range globalTracesSampledByDecision { tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, count, decisionToAttributes[decision]) @@ -805,7 +808,7 @@ policyLoop: for _, p := range tsp.policies { decision, err := p.evaluator.EarlyEvaluate(tsp.ctx, id, currentSpans, trace) if err != nil { - tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, 1) + tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, 1, attrEarlyDecision) tsp.logger.Debug("Sampling policy error during early evaluation", zap.Error(err)) continue } diff --git a/processor/tailsamplingprocessor/processor_telemetry_test.go b/processor/tailsamplingprocessor/processor_telemetry_test.go index 9cfae51354fa2..f2a1753cf7d25 100644 --- a/processor/tailsamplingprocessor/processor_telemetry_test.go +++ b/processor/tailsamplingprocessor/processor_telemetry_test.go @@ -197,6 +197,9 @@ func TestMetricsAfterOneEvaluation(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[int64]{ { + Attributes: attribute.NewSet( + attribute.Bool("early", false), + ), Value: 0, }, }, @@ -919,6 +922,9 @@ func TestProcessorTailSamplingSamplingPolicyEvaluationError(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[int64]{ { + Attributes: attribute.NewSet( + attribute.Bool("early", false), + ), Value: 2, }, }, From aa881963809925eab7c3ed70000772954f75f81a Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 9 Dec 2025 16:02:55 -0700 Subject: [PATCH 11/11] Add changelog entry --- .chloggen/tsp-early-decisions.yaml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/tsp-early-decisions.yaml diff --git a/.chloggen/tsp-early-decisions.yaml b/.chloggen/tsp-early-decisions.yaml new file mode 100644 index 0000000000000..3ac950e6c5bda --- /dev/null +++ b/.chloggen/tsp-early-decisions.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: processor/tail_sampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add optional early decision functionality to the tail sampling processor + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [43876] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: When enabled, early decisions will be made for each batch of spans that arrive, depending on the policy configuration this can significantly reduce the number of traces kept in memory. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user]