From 722db4a7b48eeb547644ff70e92db83a79bcc37f Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 11:50:01 +0000 Subject: [PATCH 01/14] [processor/tailsampling] simplify strategies to trace-complete and span-ingest Consolidate sampling strategy behavior around two explicit modes by removing root-only ingest and updating processor, validation, tests, and docs to match the final decision model. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- processor/tailsamplingprocessor/README.md | 29 ++ processor/tailsamplingprocessor/config.go | 30 ++ .../tailsamplingprocessor/config.schema.yaml | 6 + .../tailsamplingprocessor/config_test.go | 1 + processor/tailsamplingprocessor/factory.go | 1 + .../tailsamplingprocessor/factory_test.go | 60 +++ processor/tailsamplingprocessor/fuzz_test.go | 4 +- .../internal/sampling/always_sample.go | 4 + .../internal/sampling/and.go | 9 + .../internal/sampling/and_test.go | 8 + .../internal/sampling/boolean_tag_filter.go | 4 + .../internal/sampling/bytes_limiting.go | 4 + .../internal/sampling/composite.go | 10 +- .../internal/sampling/composite_test.go | 10 + .../internal/sampling/drop.go | 9 + .../internal/sampling/drop_test.go | 8 + .../internal/sampling/latency.go | 4 + .../internal/sampling/not.go | 4 + .../internal/sampling/not_test.go | 12 + .../internal/sampling/numeric_tag_filter.go | 4 + .../internal/sampling/ottl.go | 4 + .../internal/sampling/probabilistic.go | 4 + .../internal/sampling/rate_limiting.go | 4 + .../internal/sampling/span_count_sampler.go | 4 + .../internal/sampling/status_code.go | 4 + .../internal/sampling/string_tag_filter.go | 4 + .../internal/sampling/trace_flags.go | 4 + .../internal/sampling/trace_state_filter.go | 4 + .../pkg/samplingpolicy/samplingpolicy.go | 2 + processor/tailsamplingprocessor/processor.go | 142 ++++++- .../processor_benchmarks_test.go | 8 +- .../processor_decisions_test.go | 380 ++++++++++++++++-- .../processor_telemetry_test.go | 128 +++++- .../tailsamplingprocessor/processor_test.go | 60 ++- 34 files changed, 883 insertions(+), 90 deletions(-) diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index 43946c3788469..ad23100d65f4c 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -50,6 +50,7 @@ Multiple policies exist today and it is straight forward to add more. These incl The following configuration options can also be modified: - `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision - `decision_wait_after_root_received` (default = 0s): Wait time after the root span of a trace is received before making a sampling decision. 0s means disabled (only use `decision_wait`). +- `sampling_strategy` (default = `trace-complete`): Controls when a decision is made and what data is evaluated. Valid values are `trace-complete` and `span-ingest`. See [Sampling Strategies](#sampling-strategies) for detailed behavior, benefits, tradeoffs, and caveats for each mode. - `num_traces` (default = 50000): Number of traces kept in memory. - `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures) - `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches @@ -68,6 +69,34 @@ The following configuration options can also be modified: already ingested. - `maximum_trace_size_bytes`: The maximum size a trace can reach in bytes, traces larger than this size will be immediately dropped from the tail sampling processor in order to protect the system. +## Sampling Strategies + +The `sampling_strategy` setting controls both decision timing and the data passed to policy evaluators. + +### trace-complete + +- Decision timing/data model: default behavior. The processor accumulates trace data and evaluates policies on the timer path after `decision_wait` (or earlier when `decision_wait_after_root_received` is set and a root span arrives). +- Benefits: highest policy flexibility because evaluators observe accumulated trace data at decision time. +- Downsides/tradeoffs: higher memory/storage and deferred decisions, because spans are retained until decision timing is reached. +- Caveats: evaluation depends on delayed decision timing rather than ingest-time finalization. + +### span-ingest + +- Decision timing/data model: evaluates policies at ingest time for each incoming batch of a trace. It does not re-evaluate previously ingested spans. Terminal results (`sampled` or `dropped`) finalize immediately; otherwise the trace remains pending. +- Benefits: earlier terminal outcomes and no repeated evaluation of old spans. +- Downsides/tradeoffs: reduced policy compatibility compared to `trace-complete`. +- Caveats: + - pending traces are cleanup-finalized as `not sampled` on the timer cleanup path without policy re-evaluation. + - stateful policies are rejected for this mode. + - policies with semantics that assume complete traces can produce different outcomes than `trace-complete`. + +### Strategy Comparison Notes + +- Policy compatibility: `trace-complete` supports stateful policies; `span-ingest` rejects them. +- Timer controls: `decision_wait` and `decision_wait_after_root_received` influence decision timing only in `trace-complete`. +- Late-span behavior: decision caches remain important in all modes to carry decisions for spans arriving after in-memory trace data is gone. +- Terminal vs pending: `span-ingest` finalizes on terminal outcomes immediately and keeps non-terminal outcomes pending until cleanup finalization. + Each policy will result in a decision, and the processor will evaluate them to make a final decision: diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index d065ca79cb107..7f673ea9490d4 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -4,6 +4,7 @@ package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor" import ( + "fmt" "time" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" @@ -53,6 +54,17 @@ const ( TraceFlags PolicyType = "trace_flags" ) +const ( + // samplingStrategyTraceComplete keeps the current tail-sampling behavior: + // accumulate spans and decide on full trace data after decision timing. + samplingStrategyTraceComplete samplingStrategy = "trace-complete" + // samplingStrategySpanIngest evaluates each incoming span batch on ingest. + // Non-terminal outcomes remain pending until cleanup finalization. + samplingStrategySpanIngest samplingStrategy = "span-ingest" +) + +type samplingStrategy string + // sharedPolicyCfg holds the common configuration to all policies that are used in derivative policy configurations // such as the and & composite policies. type sharedPolicyCfg struct { @@ -325,6 +337,10 @@ type Config struct { Options []Option `mapstructure:"-"` // Make decision as soon as a policy matches SampleOnFirstMatch bool `mapstructure:"sample_on_first_match"` + // SamplingStrategy controls how/when sampling decisions are made. + // "trace-complete" (default) keeps classic tail sampling behavior. + // "span-ingest" evaluates on ingest using accumulated trace data. + SamplingStrategy samplingStrategy `mapstructure:"sampling_strategy"` // DropPendingTracesOnShutdown will drop all traces that are part of batches that have not yet reached the decision // wait when the processor is shutdown. DropPendingTracesOnShutdown bool `mapstructure:"drop_pending_traces_on_shutdown"` @@ -333,3 +349,17 @@ type Config struct { // A 0 value disables dropping large traces early. MaximumTraceSizeBytes uint64 `mapstructure:"maximum_trace_size_bytes"` } + +func (cfg *Config) Validate() error { + switch cfg.SamplingStrategy { + case samplingStrategyTraceComplete, samplingStrategySpanIngest: + return nil + default: + return fmt.Errorf( + "invalid sampling_strategy %q, expected one of %q or %q", + cfg.SamplingStrategy, + samplingStrategyTraceComplete, + samplingStrategySpanIngest, + ) + } +} diff --git a/processor/tailsamplingprocessor/config.schema.yaml b/processor/tailsamplingprocessor/config.schema.yaml index 5830c158e777c..2dd076a37f5d8 100644 --- a/processor/tailsamplingprocessor/config.schema.yaml +++ b/processor/tailsamplingprocessor/config.schema.yaml @@ -456,3 +456,9 @@ properties: sample_on_first_match: description: Make decision as soon as a policy matches type: boolean + sampling_strategy: + description: SamplingStrategy controls how/when sampling decisions are made. trace-complete (default) accumulates spans and makes a decision after decision_wait using full-trace context. span-ingest evaluates as spans arrive (in any order), finalizing terminal outcomes immediately and finalizing unresolved traces as not sampled during cleanup without policy re-evaluation; stateful policies are not supported in this mode. + type: string + enum: + - trace-complete + - span-ingest diff --git a/processor/tailsamplingprocessor/config_test.go b/processor/tailsamplingprocessor/config_test.go index 11e9f8ca09327..5352bbd7171ca 100644 --- a/processor/tailsamplingprocessor/config_test.go +++ b/processor/tailsamplingprocessor/config_test.go @@ -35,6 +35,7 @@ func TestLoadConfig(t *testing.T) { DecisionWait: 10 * time.Second, NumTraces: 100, ExpectedNewTracesPerSec: 10, + SamplingStrategy: samplingStrategyTraceComplete, DecisionCache: DecisionCacheConfig{SampledCacheSize: 1_000, NonSampledCacheSize: 10_000}, PolicyCfgs: []PolicyCfg{ { diff --git a/processor/tailsamplingprocessor/factory.go b/processor/tailsamplingprocessor/factory.go index fad2c168e71a7..73029daea5cb7 100644 --- a/processor/tailsamplingprocessor/factory.go +++ b/processor/tailsamplingprocessor/factory.go @@ -30,6 +30,7 @@ func createDefaultConfig() component.Config { DecisionWait: 30 * time.Second, NumTraces: 50000, SampleOnFirstMatch: false, + SamplingStrategy: samplingStrategyTraceComplete, } } diff --git a/processor/tailsamplingprocessor/factory_test.go b/processor/tailsamplingprocessor/factory_test.go index 3921e7b85f38b..4e2450fd0966e 100644 --- a/processor/tailsamplingprocessor/factory_test.go +++ b/processor/tailsamplingprocessor/factory_test.go @@ -45,3 +45,63 @@ func TestCreateProcessor(t *testing.T) { assert.NoError(t, tp.Start(t.Context(), componenttest.NewNopHost())) assert.NoError(t, tp.Shutdown(t.Context())) } + +func TestCreateProcessorRejectsInvalidSamplingStrategy(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SamplingStrategy = "invalid" + + params := processortest.NewNopSettings(metadata.Type) + tp, err := factory.CreateTraces(t.Context(), params, cfg, consumertest.NewNop()) + assert.Nil(t, tp) + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid sampling_strategy") +} + +func TestCreateProcessorAllowsSpanIngest(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SamplingStrategy = samplingStrategySpanIngest + cfg.PolicyCfgs = []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "policy", + Type: Probabilistic, + ProbabilisticCfg: ProbabilisticCfg{ + SamplingPercentage: 1, + }, + }, + }, + } + + params := processortest.NewNopSettings(metadata.Type) + tp, err := factory.CreateTraces(t.Context(), params, cfg, consumertest.NewNop()) + assert.NotNil(t, tp) + assert.NoError(t, err) +} + +func TestCreateProcessorRejectsStatefulPolicyForSpanIngest(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SamplingStrategy = samplingStrategySpanIngest + cfg.PolicyCfgs = []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "stateful-policy", + Type: RateLimiting, + RateLimitingCfg: RateLimitingCfg{ + SpansPerSecond: 10, + }, + }, + }, + } + + params := processortest.NewNopSettings(metadata.Type) + tp, err := factory.CreateTraces(t.Context(), params, cfg, consumertest.NewNop()) + require.NoError(t, err) + require.NotNil(t, tp) + err = tp.Start(t.Context(), componenttest.NewNopHost()) + require.Error(t, err) + assert.Contains(t, err.Error(), "requires all policies to be stateless") + assert.Contains(t, err.Error(), "stateful-policy") +} diff --git a/processor/tailsamplingprocessor/fuzz_test.go b/processor/tailsamplingprocessor/fuzz_test.go index 7c515d652e5ed..f9164ed637a3a 100644 --- a/processor/tailsamplingprocessor/fuzz_test.go +++ b/processor/tailsamplingprocessor/fuzz_test.go @@ -22,7 +22,9 @@ func FuzzConsumeTraces(f *testing.F) { } sink := new(consumertest.TracesSink) set := processortest.NewNopSettings(metadata.Type) - cfg := &Config{} + cfg := &Config{ + SamplingStrategy: samplingStrategyTraceComplete, + } tsp, err := newTracesProcessor(t.Context(), set, sink, *cfg) if err != nil { t.Fatal(err) diff --git a/processor/tailsamplingprocessor/internal/sampling/always_sample.go b/processor/tailsamplingprocessor/internal/sampling/always_sample.go index c3fa4ec793cba..ecda5edbbca45 100644 --- a/processor/tailsamplingprocessor/internal/sampling/always_sample.go +++ b/processor/tailsamplingprocessor/internal/sampling/always_sample.go @@ -31,3 +31,7 @@ func (as *alwaysSample) Evaluate(context.Context, pcommon.TraceID, *samplingpoli as.logger.Debug("Evaluating spans in always-sample filter") return samplingpolicy.Sampled, nil } + +func (*alwaysSample) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/and.go b/processor/tailsamplingprocessor/internal/sampling/and.go index 0c007b3f26305..4ab0d35affa5c 100644 --- a/processor/tailsamplingprocessor/internal/sampling/and.go +++ b/processor/tailsamplingprocessor/internal/sampling/and.go @@ -44,3 +44,12 @@ func (c *And) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *samp } return samplingpolicy.Sampled, nil } + +func (c *And) IsStateful() bool { + for _, sub := range c.subpolicies { + if sub.IsStateful() { + return true + } + } + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/and_test.go b/processor/tailsamplingprocessor/internal/sampling/and_test.go index 735faef7c8533..1426f9267b9f2 100644 --- a/processor/tailsamplingprocessor/internal/sampling/and_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/and_test.go @@ -117,3 +117,11 @@ func TestAndEvaluatorStringInvertNotSampled(t *testing.T) { require.NoError(t, err, "Failed to evaluate and policy: %v", err) assert.Equal(t, samplingpolicy.NotSampled, decision) } + +func TestAndIsStatefulIfAnySubpolicyIsStateful(t *testing.T) { + stateless := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) + stateful := NewRateLimiting(componenttest.NewNopTelemetrySettings(), 10) + + and := NewAnd(zap.NewNop(), []samplingpolicy.Evaluator{stateless, stateful}) + assert.True(t, and.IsStateful()) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go index b5dfa8cb66eee..430c4d39e2b5b 100644 --- a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go @@ -74,3 +74,7 @@ func (baf *booleanAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID return false }), nil } + +func (*booleanAttributeFilter) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go b/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go index 515428c1ef235..b0f262c2803f5 100644 --- a/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go +++ b/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go @@ -57,6 +57,10 @@ func (b *bytesLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *sa return samplingpolicy.NotSampled, nil } +func (*bytesLimiting) IsStateful() bool { + return true +} + // calculateTraceSize calculates the accurate protobuf marshaled size of a trace in bytes // using the OpenTelemetry Collector's built-in ProtoMarshaler.TracesSize() method func calculateTraceSize(trace *samplingpolicy.TraceData) int64 { diff --git a/processor/tailsamplingprocessor/internal/sampling/composite.go b/processor/tailsamplingprocessor/internal/sampling/composite.go index 0d29c8c6b4b8c..454427a3c0785 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite.go @@ -61,7 +61,6 @@ func NewComposite( recordSubPolicy bool, ) samplingpolicy.Evaluator { var subpolicies []*subpolicy - for i := range subPolicyParams { sub := &subpolicy{} sub.evaluator = subPolicyParams[i].Evaluator @@ -135,3 +134,12 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace return samplingpolicy.NotSampled, nil } + +func (c *Composite) IsStateful() bool { + for _, sub := range c.subpolicies { + if sub.evaluator.IsStateful() { + return true + } + } + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/composite_test.go b/processor/tailsamplingprocessor/internal/sampling/composite_test.go index e62ca0dbcf632..31426056718b3 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite_test.go @@ -314,3 +314,13 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { assert.Equal(t, expected, decision) } } + +func TestCompositeIsStatefulIfAnySubpolicyIsStateful(t *testing.T) { + stateless := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) + stateful := NewRateLimiting(componenttest.NewNopTelemetrySettings(), 10) + c := NewComposite(zap.NewNop(), 100, []SubPolicyEvalParams{ + {Evaluator: stateless, MaxSpansPerSecond: 50, Name: "stateless"}, + {Evaluator: stateful, MaxSpansPerSecond: 50, Name: "stateful"}, + }, FakeTimeProvider{}, false) + assert.True(t, c.IsStateful()) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/drop.go b/processor/tailsamplingprocessor/internal/sampling/drop.go index c1ad8aef5b31e..70f86db8c4020 100644 --- a/processor/tailsamplingprocessor/internal/sampling/drop.go +++ b/processor/tailsamplingprocessor/internal/sampling/drop.go @@ -45,3 +45,12 @@ func (c *Drop) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *sam } return samplingpolicy.Dropped, nil } + +func (c *Drop) IsStateful() bool { + for _, sub := range c.subpolicies { + if sub.IsStateful() { + return true + } + } + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/drop_test.go b/processor/tailsamplingprocessor/internal/sampling/drop_test.go index fdf90ba72739e..0cdc1e34f0e4e 100644 --- a/processor/tailsamplingprocessor/internal/sampling/drop_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/drop_test.go @@ -117,3 +117,11 @@ func TestDropEvaluatorStringInvertNotMatch(t *testing.T) { require.NoError(t, err, "Failed to evaluate and policy: %v", err) assert.Equal(t, samplingpolicy.NotSampled, decision) } + +func TestDropIsStatefulIfAnySubpolicyIsStateful(t *testing.T) { + stateless := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) + stateful := NewRateLimiting(componenttest.NewNopTelemetrySettings(), 10) + + drop := NewDrop(zap.NewNop(), []samplingpolicy.Evaluator{stateless, stateful}) + assert.True(t, drop.IsStateful()) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/latency.go b/processor/tailsamplingprocessor/internal/sampling/latency.go index e6e305dc79c7a..73a3af3298894 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency.go @@ -55,3 +55,7 @@ func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *samp return (l.thresholdMs < duration.Milliseconds() && duration.Milliseconds() <= l.upperThresholdMs) }), nil } + +func (*latency) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/not.go b/processor/tailsamplingprocessor/internal/sampling/not.go index 25475cf75604c..6c57ef79abc93 100644 --- a/processor/tailsamplingprocessor/internal/sampling/not.go +++ b/processor/tailsamplingprocessor/internal/sampling/not.go @@ -47,3 +47,7 @@ func (n *not) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *samp return decision, nil } } + +func (n *not) IsStateful() bool { + return n.subPolicyEvaluator.IsStateful() +} diff --git a/processor/tailsamplingprocessor/internal/sampling/not_test.go b/processor/tailsamplingprocessor/internal/sampling/not_test.go index 5a7be4c18813a..c12203b0cde80 100644 --- a/processor/tailsamplingprocessor/internal/sampling/not_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/not_test.go @@ -18,12 +18,17 @@ import ( type mockEvaluator struct { decision samplingpolicy.Decision err error + stateful bool } func (m *mockEvaluator) Evaluate(_ context.Context, _ pcommon.TraceID, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { return m.decision, m.err } +func (m *mockEvaluator) IsStateful() bool { + return m.stateful +} + func TestNotSamplingPolicy_Evaluate_Sampled(t *testing.T) { logger := zap.NewNop() mockSubPolicy := &mockEvaluator{decision: samplingpolicy.Sampled} @@ -86,3 +91,10 @@ func TestNotSamplingPolicy_Evaluate_Err_Not_Nil(t *testing.T) { assert.Equal(t, expectedError, err) assert.Equal(t, samplingpolicy.Error, decision) } + +func TestNotIsStatefulFollowsSubpolicy(t *testing.T) { + logger := zap.NewNop() + mockSubPolicy := &mockEvaluator{stateful: true} + notPolicy := NewNot(logger, mockSubPolicy) + assert.True(t, notPolicy.IsStateful()) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go index 134a995cb4d5f..a0f37ff9c4df2 100644 --- a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go @@ -101,3 +101,7 @@ func (naf *numericAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID }, ), nil } + +func (*numericAttributeFilter) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/ottl.go b/processor/tailsamplingprocessor/internal/sampling/ottl.go index c45213573f891..1d5431e8fcfa5 100644 --- a/processor/tailsamplingprocessor/internal/sampling/ottl.go +++ b/processor/tailsamplingprocessor/internal/sampling/ottl.go @@ -117,3 +117,7 @@ func (ocf *ottlConditionFilter) Evaluate(ctx context.Context, traceID pcommon.Tr } return samplingpolicy.NotSampled, nil } + +func (*ottlConditionFilter) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go index 5557b5b79201c..0ad31ad41e1d8 100644 --- a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go +++ b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go @@ -54,6 +54,10 @@ func (s *probabilisticSampler) Evaluate(_ context.Context, traceID pcommon.Trace return samplingpolicy.NotSampled, nil } +func (*probabilisticSampler) IsStateful() bool { + return false +} + // calculateThreshold converts a ratio into a value between 0 and MaxUint64 func calculateThreshold(ratio float64) uint64 { // Use big.Float and big.Int to calculate threshold because directly convert diff --git a/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go b/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go index dd108db475f9c..09c4ad52aa536 100644 --- a/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go +++ b/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go @@ -48,3 +48,7 @@ func (r *rateLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *sam return samplingpolicy.NotSampled, nil } + +func (*rateLimiting) IsStateful() bool { + return true +} diff --git a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go index cdaed349f1947..72af59a4666d7 100644 --- a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go +++ b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go @@ -44,3 +44,7 @@ func (c *spanCount) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *sa return samplingpolicy.NotSampled, nil } } + +func (*spanCount) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/status_code.go b/processor/tailsamplingprocessor/internal/sampling/status_code.go index 200d6b23ee206..cf00725925b59 100644 --- a/processor/tailsamplingprocessor/internal/sampling/status_code.go +++ b/processor/tailsamplingprocessor/internal/sampling/status_code.go @@ -62,3 +62,7 @@ func (r *statusCodeFilter) Evaluate(_ context.Context, _ pcommon.TraceID, trace return slices.Contains(r.statusCodes, span.Status().Code()) }), nil } + +func (*statusCodeFilter) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go index 6e5d6dca09372..a63ac95334d64 100644 --- a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go @@ -151,6 +151,10 @@ func (saf *stringAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID, ), nil } +func (*stringAttributeFilter) IsStateful() bool { + return false +} + // addFilters compiles all the given filters and stores them as regexes. // All regexes are automatically anchored to enforce full string matches. func addFilters(exprs []string) ([]*regexp.Regexp, error) { diff --git a/processor/tailsamplingprocessor/internal/sampling/trace_flags.go b/processor/tailsamplingprocessor/internal/sampling/trace_flags.go index 5b0363b124933..e979341f22185 100644 --- a/processor/tailsamplingprocessor/internal/sampling/trace_flags.go +++ b/processor/tailsamplingprocessor/internal/sampling/trace_flags.go @@ -38,3 +38,7 @@ func (tf *traceFlags) Evaluate(_ context.Context, _ pcommon.TraceID, td *samplin return (byte(span.Flags()) & byte(trace.FlagsSampled)) != 0 }), nil } + +func (*traceFlags) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go index 1bcc73f658883..00262a3e9d220 100644 --- a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go @@ -59,3 +59,7 @@ func (tsf *traceStateFilter) Evaluate(_ context.Context, _ pcommon.TraceID, trac return false }), nil } + +func (*traceStateFilter) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go index 74b4c56f18bcf..4bb83cd400af0 100644 --- a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go +++ b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go @@ -56,6 +56,8 @@ const ( type Evaluator interface { // Evaluate looks at the trace data and returns a corresponding SamplingDecision. Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) + // IsStateful reports whether decisions can depend on prior evaluations/state. + IsStateful() bool } type Extension interface { diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index d45da34f804e6..c945bd17c74a6 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -94,6 +94,10 @@ type tailSamplingSpanProcessor struct { } func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config) (processor.Traces, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + telemetrySettings := set.TelemetrySettings telemetry, err := metadata.NewTelemetryBuilder(telemetrySettings) if err != nil { @@ -241,6 +245,9 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, if err != nil { return nil, fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err) } + if tsp.cfg.SamplingStrategy == samplingStrategySpanIngest && eval.IsStateful() { + return nil, fmt.Errorf("sampling_strategy %q requires all policies to be stateless, but policy %q is stateful", tsp.cfg.SamplingStrategy, cfg.Name) + } uniquePolicyName := cfg.Name if componentID != "" { @@ -427,7 +434,7 @@ type policyDecisionMetrics struct { spansSampled int64 } -type policyTickMetrics struct { +type policyEvaluationMetrics struct { idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled, decisionDropped int64 tracesSampledByPolicyDecision []map[samplingpolicy.Decision]policyDecisionMetrics cumulativeExecutionTime []perPolicyExecutionTime @@ -441,31 +448,55 @@ type perPolicyExecutionTime struct { executionCount int64 } -func newPolicyTickMetrics(numPolicies int) *policyTickMetrics { +func newPolicyEvaluationMetrics(numPolicies int) *policyEvaluationMetrics { tracesSampledByPolicyDecision := make([]map[samplingpolicy.Decision]policyDecisionMetrics, numPolicies) for i := range tracesSampledByPolicyDecision { tracesSampledByPolicyDecision[i] = make(map[samplingpolicy.Decision]policyDecisionMetrics) } - return &policyTickMetrics{ + return &policyEvaluationMetrics{ tracesSampledByPolicyDecision: tracesSampledByPolicyDecision, cumulativeExecutionTime: make([]perPolicyExecutionTime, numPolicies), } } -func (m *policyTickMetrics) addDecision(policyIndex int, decision samplingpolicy.Decision, spansSampled int64) { +func (m *policyEvaluationMetrics) addDecision(policyIndex int, decision samplingpolicy.Decision, spansSampled int64) { stats := m.tracesSampledByPolicyDecision[policyIndex][decision] stats.tracesSampled++ stats.spansSampled += spansSampled m.tracesSampledByPolicyDecision[policyIndex][decision] = stats } -func (m *policyTickMetrics) addDecisionTime(policyIndex int, decisionTime time.Duration) { +func (m *policyEvaluationMetrics) addDecisionTime(policyIndex int, decisionTime time.Duration) { perPolicyExecutionTime := m.cumulativeExecutionTime[policyIndex] perPolicyExecutionTime.executionTime += decisionTime perPolicyExecutionTime.executionCount++ m.cumulativeExecutionTime[policyIndex] = perPolicyExecutionTime } +func (tsp *tailSamplingSpanProcessor) recordPerPolicyEvaluationMetrics(metrics *policyEvaluationMetrics) { + for i, p := range tsp.policies { + for decision, stats := range metrics.tracesSampledByPolicyDecision[i] { + tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(tsp.ctx, int64(stats.tracesSampled), p.attribute, decisionToAttributes[decision]) + if telemetry.IsMetricStatCountSpansSampledEnabled() { + tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(tsp.ctx, stats.spansSampled, p.attribute, decisionToAttributes[decision]) + } + } + tsp.telemetry.ProcessorTailSamplingSamplingPolicyExecutionTimeSum.Add(tsp.ctx, metrics.cumulativeExecutionTime[i].executionTime.Microseconds(), p.attribute) + tsp.telemetry.ProcessorTailSamplingSamplingPolicyExecutionCount.Add(tsp.ctx, metrics.cumulativeExecutionTime[i].executionCount, p.attribute) + } +} + +func (tsp *tailSamplingSpanProcessor) recordImmediateDecisionMetrics(decision samplingpolicy.Decision, metrics *policyEvaluationMetrics, evaluationLatency time.Duration) { + tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, evaluationLatency.Milliseconds()) + tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount) + + if attrs, ok := decisionToAttributes[decision]; ok { + tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, attrs) + } + + tsp.recordPerPolicyEvaluationMetrics(metrics) +} + func (tsp *tailSamplingSpanProcessor) loop() { ticker := time.NewTicker(tsp.tickerFrequency) defer ticker.Stop() @@ -504,7 +535,7 @@ func (tsp *tailSamplingSpanProcessor) iter(tickChan <-chan time.Time, workChan < tsp.waitForSpace(tickChan) } - tsp.processTrace(trace.id, trace.rss, trace.spanCount, trace.rootSpan != nil) + tsp.processTrace(trace.id, trace.rss, trace.spanCount, trace.rootSpan) } case cmd := <-tsp.newPolicyChan: tsp.policies = cmd.policies @@ -582,7 +613,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { tsp.logger.Debug("Sampling Policy Evaluation ticked") ctx := context.Background() - metrics := newPolicyTickMetrics(len(tsp.policies)) + metrics := newPolicyEvaluationMetrics(len(tsp.policies)) startTime := time.Now() globalTracesSampledByDecision := make(map[samplingpolicy.Decision]int64) @@ -602,6 +633,18 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { continue } + // In span-ingest mode, tick is a cleanup path only. Finalize any + // still-pending trace as implicit not sampled without policy evaluation. + if tsp.cfg.SamplingStrategy == samplingStrategySpanIngest { + trace.decisionTime = time.Now() + trace.FinalDecision = samplingpolicy.NotSampled + globalTracesSampledByDecision[samplingpolicy.NotSampled]++ + metrics.decisionNotSampled++ + tsp.releaseNotSampledTrace(id, trace) + trace.ReceivedBatches = ptrace.NewTraces() + continue + } + trace.decisionTime = time.Now() decision, policyName := tsp.makeDecision(id, &trace.TraceData, metrics) @@ -628,17 +671,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { for decision, count := range globalTracesSampledByDecision { tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, count, decisionToAttributes[decision]) } - - for i, p := range tsp.policies { - for decision, stats := range metrics.tracesSampledByPolicyDecision[i] { - tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(tsp.ctx, int64(stats.tracesSampled), p.attribute, decisionToAttributes[decision]) - if telemetry.IsMetricStatCountSpansSampledEnabled() { - tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(tsp.ctx, stats.spansSampled, p.attribute, decisionToAttributes[decision]) - } - } - tsp.telemetry.ProcessorTailSamplingSamplingPolicyExecutionTimeSum.Add(tsp.ctx, metrics.cumulativeExecutionTime[i].executionTime.Microseconds(), p.attribute) - tsp.telemetry.ProcessorTailSamplingSamplingPolicyExecutionCount.Add(tsp.ctx, metrics.cumulativeExecutionTime[i].executionCount, p.attribute) - } + tsp.recordPerPolicyEvaluationMetrics(metrics) tsp.logger.Debug("Sampling policy evaluation completed", zap.Int("batch.len", batchLen), @@ -651,7 +684,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { return hasMore } -func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *samplingpolicy.TraceData, metrics *policyTickMetrics) (samplingpolicy.Decision, string) { +func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *samplingpolicy.TraceData, metrics *policyEvaluationMetrics) (samplingpolicy.Decision, string) { finalDecision := samplingpolicy.NotSampled samplingDecisions := map[samplingpolicy.Decision]*policy{ samplingpolicy.Error: nil, @@ -733,6 +766,38 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa return finalDecision, getPolicyName(sampledPolicy) } +// makeDecisionOnSpanIngest is used by span-ingest mode. It only returns +// terminal decisions at ingest time. All other outcomes remain pending. +func (tsp *tailSamplingSpanProcessor) makeDecisionOnSpanIngest(id pcommon.TraceID, trace *samplingpolicy.TraceData, metrics *policyEvaluationMetrics) (samplingpolicy.Decision, string) { + ctx := context.Background() + for i, p := range tsp.policies { + startTime := time.Now() + decision, err := p.evaluator.Evaluate(ctx, id, trace) + metrics.addDecisionTime(i, time.Since(startTime)) + + if err != nil { + metrics.evaluateErrorCount++ + tsp.logger.Debug("Sampling policy error", zap.Error(err)) + continue + } + + metrics.addDecision(i, decision, trace.SpanCount) + + if decision == samplingpolicy.Dropped { + metrics.decisionDropped++ + return samplingpolicy.Dropped, p.name + } + if decision == samplingpolicy.Sampled { + metrics.decisionSampled++ + if tsp.recordPolicy { + sampling.SetAttrOnScopeSpans(trace.ReceivedBatches, "tailsampling.policy", p.name) + } + return samplingpolicy.Sampled, p.name + } + } + return samplingpolicy.Pending, "" +} + func groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceID][]spanAndScope { idToSpans := make(map[pcommon.TraceID][]spanAndScope) ilss := resourceSpans.ScopeSpans() @@ -753,8 +818,9 @@ func groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceI return idToSpans } -func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrace.ResourceSpans, spanCount int64, containsRootSpan bool) { +func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrace.ResourceSpans, spanCount int64, rootSpan *ptrace.Span) { currTime := time.Now() + containsRootSpan := rootSpan != nil var newTraceIDs int64 defer func() { @@ -783,7 +849,7 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac } else { actualData.SpanCount += spanCount } - if containsRootSpan && tsp.cfg.DecisionWaitAfterRootReceived > 0 { + if containsRootSpan && tsp.cfg.SamplingStrategy == samplingStrategyTraceComplete && tsp.cfg.DecisionWaitAfterRootReceived > 0 { actualData.batchID = tsp.decisionBatcher.MoveToEarlierBatch(id, actualData.batchID, uint64(tsp.cfg.DecisionWaitAfterRootReceived.Seconds())) } @@ -804,6 +870,30 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac } if finalDecision == samplingpolicy.Unspecified { + if tsp.cfg.SamplingStrategy == samplingStrategySpanIngest { + metrics := newPolicyEvaluationMetrics(len(tsp.policies)) + evaluationStart := time.Now() + actualData.decisionTime = evaluationStart + spanIngestTraceData := traceDataWithCurrentBatch(rss, actualData.SpanCount) + decision, policyName := tsp.makeDecisionOnSpanIngest(id, &spanIngestTraceData, metrics) + tsp.recordImmediateDecisionMetrics(decision, metrics, time.Since(evaluationStart)) + + // Store current batch after evaluation to avoid re-evaluating prior spans + // while still releasing full accumulated trace data on terminal outcomes. + appendToTraces(actualData.ReceivedBatches, rss) + + if decision == samplingpolicy.Sampled || decision == samplingpolicy.Dropped { + actualData.FinalDecision = decision + actualData.PolicyName = policyName + if decision == samplingpolicy.Sampled { + tsp.releaseSampledTrace(tsp.ctx, id, actualData) + } else { + tsp.releaseNotSampledTrace(id, actualData) + } + } + return + } + // If the final decision hasn't been made, add the new spans to the // existing trace. appendToTraces(actualData.ReceivedBatches, rss) @@ -827,6 +917,16 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac } } +func traceDataWithCurrentBatch(rss ptrace.ResourceSpans, totalSpanCount int64) samplingpolicy.TraceData { + traceData := samplingpolicy.TraceData{ + SpanCount: totalSpanCount, + ReceivedBatches: ptrace.NewTraces(), + } + rs := traceData.ReceivedBatches.ResourceSpans().AppendEmpty() + rss.CopyTo(rs) + return traceData +} + func extensions(host component.Host) map[string]samplingpolicy.Extension { if host == nil { return nil diff --git a/processor/tailsamplingprocessor/processor_benchmarks_test.go b/processor/tailsamplingprocessor/processor_benchmarks_test.go index 19ff1063de5b7..89d88fc9a4efe 100644 --- a/processor/tailsamplingprocessor/processor_benchmarks_test.go +++ b/processor/tailsamplingprocessor/processor_benchmarks_test.go @@ -23,6 +23,7 @@ import ( func BenchmarkSampling(b *testing.B) { traceIDs, batches := generateIDsAndBatches(128) cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -34,7 +35,7 @@ func BenchmarkSampling(b *testing.B) { defer func() { require.NoError(b, tsp.Shutdown(b.Context())) }() - metrics := newPolicyTickMetrics(len(cfg.PolicyCfgs)) + metrics := newPolicyEvaluationMetrics(len(cfg.PolicyCfgs)) sampleBatches := make([]*samplingpolicy.TraceData, 0, len(batches)) for _, batch := range batches { @@ -53,8 +54,9 @@ func BenchmarkSampling(b *testing.B) { func BenchmarkProcessorThroughput(b *testing.B) { cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: 1024, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: 1024, // Create a handful of reasonable policies to not only test batching. PolicyCfgs: []PolicyCfg{ {sharedPolicyCfg: sharedPolicyCfg{Name: "always-sample", Type: AlwaysSample}}, diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index 25e9a979a4153..f3266c93edcae 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -4,7 +4,9 @@ package tailsamplingprocessor import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -22,6 +24,25 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" ) +type batchSizeTrackingEvaluator struct { + decisions []samplingpolicy.Decision + evaluationBatches []int +} + +func (e *batchSizeTrackingEvaluator) Evaluate(_ context.Context, _ pcommon.TraceID, trace *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + e.evaluationBatches = append(e.evaluationBatches, trace.ReceivedBatches.SpanCount()) + if len(e.decisions) == 0 { + return samplingpolicy.NotSampled, nil + } + decision := e.decisions[0] + e.decisions = e.decisions[1:] + return decision, nil +} + +func (*batchSizeTrackingEvaluator) IsStateful() bool { + return false +} + func TestSamplingPolicyTypicalPath(t *testing.T) { nextConsumer := new(consumertest.TracesSink) @@ -33,8 +54,9 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -78,8 +100,9 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -126,8 +149,9 @@ func TestSamplingMultiplePolicies(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -179,9 +203,10 @@ func TestSamplingMultiplePolicies_WithRecordPolicy(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, - Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, } p, err := newTracesProcessor(t.Context(), ct, nextConsumer, cfg) @@ -226,8 +251,9 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -274,9 +300,10 @@ func TestSamplingPolicyDecisionNotSampled_WithRecordPolicy(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, - Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, } p, err := newTracesProcessor(t.Context(), ct, nextConsumer, cfg) @@ -315,8 +342,9 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -369,9 +397,10 @@ func TestSamplingPolicyDecisionInvertNotSampled_WithRecordPolicy(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, - Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, } p, err := newTracesProcessor(t.Context(), ct, nextConsumer, cfg) @@ -412,8 +441,9 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -484,8 +514,9 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { require.NoError(t, err) cfg := Config{ - DecisionWait: defaultTestDecisionWait * 10, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait * 10, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -581,8 +612,9 @@ func TestLateArrivingSpanUsesDecisionCacheWhenDropped(t *testing.T) { require.NoError(t, err) cfg := Config{ - DecisionWait: defaultTestDecisionWait * 10, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait * 10, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -672,8 +704,9 @@ func TestLateArrivingSpanWithoutCacheMetadata(t *testing.T) { require.NoError(t, err) cfg := Config{ - DecisionWait: defaultTestDecisionWait * 10, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait * 10, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -744,8 +777,9 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { require.NoError(t, err) cfg := Config{ - DecisionWait: defaultTestDecisionWait * 10, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait * 10, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -827,6 +861,7 @@ func TestSampleOnFirstMatch(t *testing.T) { } cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: defaultNumTraces, SampleOnFirstMatch: true, @@ -868,11 +903,287 @@ func TestSampleOnFirstMatch(t *testing.T) { require.Equal(t, 1, nextConsumer.SpanCount()) } +func TestSpanIngestEvaluatesOnIngestAndFinalizesOnTerminalDecision(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + traceID := uInt64ToTraceID(50) + rootSpanID := uInt64ToSpanID(1) + + // First ingest path evaluation is non-terminal in span-ingest mode. + mpe.NextDecision = samplingpolicy.NotSampled + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) + require.Eventually(t, func() bool { return mpe.EvaluationCount == 1 }, time.Second, 10*time.Millisecond) + require.Equal(t, 0, nextConsumer.SpanCount()) + + // Second ingest path evaluation returns terminal Sampled and releases both spans. + mpe.NextDecision = samplingpolicy.Sampled + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) + require.Eventually(t, func() bool { + return mpe.EvaluationCount == 2 && nextConsumer.SpanCount() == 2 + }, time.Second, 10*time.Millisecond) +} + +func TestSpanIngestFinalizesOnDroppedDecision(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + DecisionCache: DecisionCacheConfig{ + NonSampledCacheSize: 64, + }, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + traceID := uInt64ToTraceID(51) + rootSpanID := uInt64ToSpanID(1) + + // First ingest path evaluation is non-terminal in span-ingest mode. + mpe.NextDecision = samplingpolicy.NotSampled + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) + require.Eventually(t, func() bool { return mpe.EvaluationCount == 1 }, time.Second, 10*time.Millisecond) + require.Equal(t, 0, nextConsumer.SpanCount()) + + // Second ingest path evaluation returns terminal Dropped and finalizes trace. + mpe.NextDecision = samplingpolicy.Dropped + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) + require.Eventually(t, func() bool { + return mpe.EvaluationCount == 2 && nextConsumer.SpanCount() == 0 + }, time.Second, 10*time.Millisecond) + + // Late spans should use cached non-sampled decision and skip re-evaluation. + mpe.NextDecision = samplingpolicy.Sampled + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(3), rootSpanID))) + require.Eventually(t, func() bool { + return mpe.EvaluationCount == 2 && nextConsumer.SpanCount() == 0 + }, time.Second, 10*time.Millisecond) +} + +func TestSpanIngestTickCleansUpPendingWithoutPolicyEvaluation(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + DecisionCache: DecisionCacheConfig{ + NonSampledCacheSize: 64, + }, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + // Non-terminal in span-ingest mode: pending in memory. + mpe.NextDecision = samplingpolicy.NotSampled + require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) + require.Eventually(t, func() bool { return mpe.EvaluationCount == 1 }, time.Second, 10*time.Millisecond) + require.Equal(t, 0, nextConsumer.SpanCount()) + + // Tick path cleans pending traces without evaluating policies in span-ingest mode. + controller.waitForTick() + controller.waitForTick() + require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 0, nextConsumer.SpanCount()) + require.Eventually(t, func() bool { + return len(p.(*tailSamplingSpanProcessor).idToTrace) == 0 + }, time.Second, 10*time.Millisecond) +} + +func TestSpanIngestEvaluatesOnlyIncomingBatch(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + eval := &batchSizeTrackingEvaluator{ + decisions: []samplingpolicy.Decision{samplingpolicy.NotSampled, samplingpolicy.Sampled}, + } + + policies := []*policy{ + {name: "tracking-policy", evaluator: eval, attribute: metric.WithAttributes(attribute.String("policy", "tracking-policy"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + traceID := uInt64ToTraceID(55) + rootSpanID := uInt64ToSpanID(1) + + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) + + require.Eventually(t, func() bool { + return len(eval.evaluationBatches) == 2 + }, time.Second, 10*time.Millisecond) + require.Equal(t, []int{1, 1}, eval.evaluationBatches) + require.Eventually(t, func() bool { + return nextConsumer.SpanCount() == 2 + }, time.Second, 10*time.Millisecond) +} + +func TestSpanIngestRootFirstThenChild(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + traceID := uInt64ToTraceID(60) + rootSpanID := uInt64ToSpanID(1) + + // Root first, non-terminal. + mpe.NextDecision = samplingpolicy.NotSampled + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) + require.Eventually(t, func() bool { return mpe.EvaluationCount == 1 }, time.Second, 10*time.Millisecond) + require.Equal(t, 0, nextConsumer.SpanCount()) + + // Child later, terminal sampled -> both spans released. + mpe.NextDecision = samplingpolicy.Sampled + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) + require.Eventually(t, func() bool { + return mpe.EvaluationCount == 2 && nextConsumer.SpanCount() == 2 + }, time.Second, 10*time.Millisecond) +} + +func TestSpanIngestChildFirstThenRootPendingCleanup(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + DecisionCache: DecisionCacheConfig{ + NonSampledCacheSize: 64, + }, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + traceID := uInt64ToTraceID(61) + rootSpanID := uInt64ToSpanID(1) + + // Child then root, both non-terminal. + mpe.NextDecision = samplingpolicy.NotSampled + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) + require.Eventually(t, func() bool { return mpe.EvaluationCount == 1 }, time.Second, 10*time.Millisecond) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) + require.Eventually(t, func() bool { return mpe.EvaluationCount == 2 }, time.Second, 10*time.Millisecond) + require.Equal(t, 0, nextConsumer.SpanCount()) + + // Cleanup tick finalizes pending as not sampled without more evaluations. + controller.waitForTick() + controller.waitForTick() + require.Equal(t, 2, mpe.EvaluationCount) + require.Equal(t, 0, nextConsumer.SpanCount()) + require.Eventually(t, func() bool { + return len(p.(*tailSamplingSpanProcessor).idToTrace) == 0 + }, time.Second, 10*time.Millisecond) +} + func TestRateLimiter(t *testing.T) { nextConsumer := new(consumertest.TracesSink) controller := newTestTSPController() cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: defaultNumTraces, SampleOnFirstMatch: true, @@ -920,3 +1231,14 @@ func TestRateLimiter(t *testing.T) { require.LessOrEqual(t, len(sampledTraceIDs), 2) require.GreaterOrEqual(t, len(sampledTraceIDs), 1) } + +func singleSpanTrace(traceID pcommon.TraceID, spanID, parentID pcommon.SpanID) ptrace.Traces { + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetTraceID(traceID) + span.SetSpanID(spanID) + if !parentID.IsEmpty() { + span.SetParentSpanID(parentID) + } + return traces +} diff --git a/processor/tailsamplingprocessor/processor_telemetry_test.go b/processor/tailsamplingprocessor/processor_telemetry_test.go index b0b46b6f57f79..fdc1dcf463f0e 100644 --- a/processor/tailsamplingprocessor/processor_telemetry_test.go +++ b/processor/tailsamplingprocessor/processor_telemetry_test.go @@ -6,6 +6,7 @@ package tailsamplingprocessor import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -32,8 +33,9 @@ func TestMetricsAfterOneEvaluation(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 100, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -243,14 +245,100 @@ func TestMetricsAfterOneEvaluation(t *testing.T) { assert.Len(t, cs.AllTraces(), 1) } +func TestMetricsSpanIngestModeRecordIngestDecisionTelemetry(t *testing.T) { + for _, tc := range []struct { + name string + strategy samplingStrategy + }{ + {name: "span-ingest", strategy: samplingStrategySpanIngest}, + } { + t.Run(tc.name, func(t *testing.T) { + s := setupTestTelemetry() + cfg := Config{ + SamplingStrategy: tc.strategy, + DecisionWait: 1, + NumTraces: 100, + PolicyCfgs: []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "always", + Type: AlwaysSample, + }, + }, + }, + } + cs := &consumertest.TracesSink{} + ct := s.newSettings() + proc, err := newTracesProcessor(t.Context(), ct, cs, cfg) + require.NoError(t, err) + require.NoError(t, proc.Start(t.Context(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, proc.Shutdown(t.Context())) + }() + + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetTraceID(uInt64ToTraceID(101)) + span.SetSpanID(uInt64ToSpanID(1)) + require.NoError(t, proc.ConsumeTraces(t.Context(), traces)) + require.Eventually(t, func() bool { + return cs.SpanCount() == 1 + }, time.Second, 10*time.Millisecond) + + var md metricdata.ResourceMetrics + require.NoError(t, s.reader.Collect(t.Context(), &md)) + + metricdatatest.AssertEqual(t, metricdata.Metrics{ + Name: "otelcol_processor_tail_sampling_global_count_traces_sampled", + Description: "Global count of traces that were sampled or not by at least one policy [Development]", + Unit: "{traces}", + Data: metricdata.Sum[int64]{ + IsMonotonic: true, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("sampled", "true"), + attribute.String("decision", "sampled"), + ), + Value: 1, + }, + }, + }, + }, s.getMetric("otelcol_processor_tail_sampling_global_count_traces_sampled", md), metricdatatest.IgnoreTimestamp()) + + metricdatatest.AssertEqual(t, metricdata.Metrics{ + Name: "otelcol_processor_tail_sampling_count_traces_sampled", + Description: "Count of traces that were sampled or not per sampling policy [Development]", + Unit: "{traces}", + Data: metricdata.Sum[int64]{ + IsMonotonic: true, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("policy", "always"), + attribute.String("sampled", "true"), + attribute.String("decision", "sampled"), + ), + Value: 1, + }, + }, + }, + }, s.getMetric("otelcol_processor_tail_sampling_count_traces_sampled", md), metricdatatest.IgnoreTimestamp()) + }) + } +} + func TestMetricsWithComponentID(t *testing.T) { // prepare s := setupTestTelemetry() controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 100, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -603,9 +691,10 @@ func TestMetricsCountSampled(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 100, - PolicyCfgs: tt.policyCfgs, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, + PolicyCfgs: tt.policyCfgs, Options: []Option{ withTestController(controller), }, @@ -650,8 +739,9 @@ func TestProcessorTailSamplingSamplingTraceRemovalAge(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 2, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 2, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -709,8 +799,9 @@ func TestProcessorTailSamplingSamplingLateSpanAge(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 100, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -793,8 +884,9 @@ func TestProcessorTailSamplingSamplingTraceDroppedTooEarly(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 2, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 2, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -858,8 +950,9 @@ func TestProcessorTailSamplingSamplingPolicyEvaluationError(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 100, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -931,8 +1024,9 @@ func TestProcessorTailSamplingEarlyReleasesFromCacheDecision(t *testing.T) { require.NoError(t, err) cfg := Config{ - DecisionWait: 1, - NumTraces: 100, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index c05ed4463ff2d..c813792fd47b4 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -65,6 +65,10 @@ func (t *TestPolicyEvaluator) Evaluate(ctx context.Context, traceID pcommon.Trac return t.pe.Evaluate(ctx, traceID, trace) } +func (t *TestPolicyEvaluator) IsStateful() bool { + return t.pe.IsStateful() +} + // testTSPController is a set of mechanisms to make the TSP do predictable // things in tests. type testTSPController struct { @@ -199,8 +203,9 @@ func TestTraceIntegrity(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withPolicies(policies), withTestController(controller), @@ -252,6 +257,7 @@ func TestSequentialTraceArrival(t *testing.T) { traceIDs, batches := generateIDsAndBatches(128) controller := newTestTSPController() cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -300,6 +306,7 @@ func TestConcurrentTraceArrival(t *testing.T) { controller := newTestTSPController() var wg sync.WaitGroup cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -368,6 +375,7 @@ func TestConcurrentArrivalAndEvaluation(t *testing.T) { var wg sync.WaitGroup cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -409,6 +417,7 @@ func TestSequentialTraceMapSize(t *testing.T) { controller := newTestTSPController() traceIDs, batches := generateIDsAndBatches(210) cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: defaultNumTraces, ExpectedNewTracesPerSec: 64, @@ -484,7 +493,8 @@ func TestConsumptionDuringPolicyEvaluation(t *testing.T) { // prepare msp := new(consumertest.TracesSink) cfg := Config{ - DecisionWait: 10 * time.Millisecond, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 10 * time.Millisecond, // idToTrace map size is 2x the number of batches, to eliminate "expected" // dropped too early errors. NumTraces: uint64(numBatches * 2), @@ -559,8 +569,9 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { msp := new(consumertest.TracesSink) cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -632,8 +643,9 @@ func TestSetSamplingPolicy(t *testing.T) { telem := setupTestTelemetry() cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -710,9 +722,10 @@ func TestSubSecondDecisionTime(t *testing.T) { // prepare msp := new(consumertest.TracesSink) tsp, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), msp, Config{ - DecisionWait: 500 * time.Millisecond, - NumTraces: defaultNumTraces, - PolicyCfgs: testPolicy, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 500 * time.Millisecond, + NumTraces: defaultNumTraces, + PolicyCfgs: testPolicy, Options: []Option{ withTickerFrequency(10 * time.Millisecond), }, @@ -767,8 +780,9 @@ func TestDuplicatePolicyName(t *testing.T) { } p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), msp, Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ {sharedPolicyCfg: alwaysSample}, {sharedPolicyCfg: alwaysSample}, @@ -790,8 +804,9 @@ func TestDropPolicyIsFirstInPolicyList(t *testing.T) { msp := new(consumertest.TracesSink) cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -883,8 +898,9 @@ func TestDecisionHooks(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -1032,6 +1048,10 @@ func (m *mockPolicyEvaluator) Evaluate(context.Context, pcommon.TraceID, *sampli return m.NextDecision, m.NextError } +func (*mockPolicyEvaluator) IsStateful() bool { + return false +} + type syncIDBatcher struct { sync.Mutex openBatch idbatcher.Batch @@ -1227,6 +1247,7 @@ func TestDropLargeTraces(t *testing.T) { sp.Attributes().PutStr("foo", "short") cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(4), ExpectedNewTracesPerSec: 64, @@ -1322,6 +1343,7 @@ func TestDeleteQueueCleared(t *testing.T) { traceIDs, batches := generateIDsAndBatches(128) cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -1365,6 +1387,7 @@ func TestDeleteQueueCleared(t *testing.T) { func TestRootReceivedBatcher(t *testing.T) { traceIDs, batches := generateIDsAndBatches(128) cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: time.Minute, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -1415,8 +1438,9 @@ func TestExtension(t *testing.T) { msp := new(consumertest.TracesSink) cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ From 27db8e328d1f1e7b81719963946b6f81b10ae049 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 15:02:24 +0000 Subject: [PATCH 02/14] [processor/tailsampling] avoid root span pointer churn in processTrace Keep processTrace root-span presence as a boolean and pass trace.rootSpan != nil at the call site so this PR does not introduce unnecessary signature churn unrelated to strategy behavior. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- processor/tailsamplingprocessor/processor.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index c945bd17c74a6..1de4086eccbe5 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -535,7 +535,7 @@ func (tsp *tailSamplingSpanProcessor) iter(tickChan <-chan time.Time, workChan < tsp.waitForSpace(tickChan) } - tsp.processTrace(trace.id, trace.rss, trace.spanCount, trace.rootSpan) + tsp.processTrace(trace.id, trace.rss, trace.spanCount, trace.rootSpan != nil) } case cmd := <-tsp.newPolicyChan: tsp.policies = cmd.policies @@ -818,9 +818,8 @@ func groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceI return idToSpans } -func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrace.ResourceSpans, spanCount int64, rootSpan *ptrace.Span) { +func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrace.ResourceSpans, spanCount int64, containsRootSpan bool) { currTime := time.Now() - containsRootSpan := rootSpan != nil var newTraceIDs int64 defer func() { From cea3f16ca87c8536a4ca619b3f0eb2da5c4cd725 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 15:13:21 +0000 Subject: [PATCH 03/14] [processor/tailsampling] inline span-ingest current-batch trace data construction Remove traceDataWithCurrentBatch and build the ingest-time one-batch TraceData inline to avoid an unnecessary helper while preserving batch isolation semantics. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- processor/tailsamplingprocessor/processor.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 1de4086eccbe5..1468dbb912f92 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -873,7 +873,12 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac metrics := newPolicyEvaluationMetrics(len(tsp.policies)) evaluationStart := time.Now() actualData.decisionTime = evaluationStart - spanIngestTraceData := traceDataWithCurrentBatch(rss, actualData.SpanCount) + // Build an isolated one-batch trace view for ingest-time evaluation. + spanIngestTraceData := samplingpolicy.TraceData{ + SpanCount: actualData.SpanCount, + ReceivedBatches: ptrace.NewTraces(), + } + rss.CopyTo(spanIngestTraceData.ReceivedBatches.ResourceSpans().AppendEmpty()) decision, policyName := tsp.makeDecisionOnSpanIngest(id, &spanIngestTraceData, metrics) tsp.recordImmediateDecisionMetrics(decision, metrics, time.Since(evaluationStart)) @@ -916,16 +921,6 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac } } -func traceDataWithCurrentBatch(rss ptrace.ResourceSpans, totalSpanCount int64) samplingpolicy.TraceData { - traceData := samplingpolicy.TraceData{ - SpanCount: totalSpanCount, - ReceivedBatches: ptrace.NewTraces(), - } - rs := traceData.ReceivedBatches.ResourceSpans().AppendEmpty() - rss.CopyTo(rs) - return traceData -} - func extensions(host component.Host) map[string]samplingpolicy.Extension { if host == nil { return nil From 4ef1cfd5584a9412c79ab222b737e3b4b3188a18 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 15:20:22 +0000 Subject: [PATCH 04/14] [processor/tailsampling] avoid deep copy in span-ingest policy input path Construct the ingest-time one-batch TraceData by moving the current ResourceSpans into an isolated evaluation view and then moving it into buffered trace data, eliminating the previous CopyTo deep-copy in this hot path. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- processor/tailsamplingprocessor/processor.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 1468dbb912f92..188ad5c156a7a 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -873,18 +873,19 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac metrics := newPolicyEvaluationMetrics(len(tsp.policies)) evaluationStart := time.Now() actualData.decisionTime = evaluationStart - // Build an isolated one-batch trace view for ingest-time evaluation. + // Build an isolated one-batch trace view for ingest-time evaluation + // using moves to avoid deep-copying span data. spanIngestTraceData := samplingpolicy.TraceData{ SpanCount: actualData.SpanCount, ReceivedBatches: ptrace.NewTraces(), } - rss.CopyTo(spanIngestTraceData.ReceivedBatches.ResourceSpans().AppendEmpty()) + appendToTraces(spanIngestTraceData.ReceivedBatches, rss) decision, policyName := tsp.makeDecisionOnSpanIngest(id, &spanIngestTraceData, metrics) tsp.recordImmediateDecisionMetrics(decision, metrics, time.Since(evaluationStart)) // Store current batch after evaluation to avoid re-evaluating prior spans // while still releasing full accumulated trace data on terminal outcomes. - appendToTraces(actualData.ReceivedBatches, rss) + appendToTraces(actualData.ReceivedBatches, spanIngestTraceData.ReceivedBatches.ResourceSpans().At(0)) if decision == samplingpolicy.Sampled || decision == samplingpolicy.Dropped { actualData.FinalDecision = decision From 46cc875864518d4b9b76168ed36f1e957be1480c Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 15:25:40 +0000 Subject: [PATCH 05/14] [processor/tailsampling] apply root-wait batching to span-ingest cleanup timing Apply decision_wait_after_root_received batch acceleration whenever a root span arrives so pending traces in span-ingest mode can be cleanup-finalized earlier, and document the strategy-specific timer semantics. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- processor/tailsamplingprocessor/README.md | 4 ++-- processor/tailsamplingprocessor/processor.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index ad23100d65f4c..b91d9cd59878f 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -49,7 +49,7 @@ Multiple policies exist today and it is straight forward to add more. These incl The following configuration options can also be modified: - `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision -- `decision_wait_after_root_received` (default = 0s): Wait time after the root span of a trace is received before making a sampling decision. 0s means disabled (only use `decision_wait`). +- `decision_wait_after_root_received` (default = 0s): Wait time after the root span of a trace is received before earlier timer handling. In `trace-complete`, this can make the sampling decision happen earlier. In `span-ingest`, this can make pending-trace cleanup finalization happen earlier. 0s means disabled (only use `decision_wait`). - `sampling_strategy` (default = `trace-complete`): Controls when a decision is made and what data is evaluated. Valid values are `trace-complete` and `span-ingest`. See [Sampling Strategies](#sampling-strategies) for detailed behavior, benefits, tradeoffs, and caveats for each mode. - `num_traces` (default = 50000): Number of traces kept in memory. - `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures) @@ -93,7 +93,7 @@ The `sampling_strategy` setting controls both decision timing and the data passe ### Strategy Comparison Notes - Policy compatibility: `trace-complete` supports stateful policies; `span-ingest` rejects them. -- Timer controls: `decision_wait` and `decision_wait_after_root_received` influence decision timing only in `trace-complete`. +- Timer controls: in `trace-complete`, `decision_wait` and `decision_wait_after_root_received` influence decision timing; in `span-ingest`, they influence pending cleanup/finalization timing. - Late-span behavior: decision caches remain important in all modes to carry decisions for spans arriving after in-memory trace data is gone. - Terminal vs pending: `span-ingest` finalizes on terminal outcomes immediately and keeps non-terminal outcomes pending until cleanup finalization. diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 188ad5c156a7a..44e5d5d93a09a 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -848,7 +848,7 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac } else { actualData.SpanCount += spanCount } - if containsRootSpan && tsp.cfg.SamplingStrategy == samplingStrategyTraceComplete && tsp.cfg.DecisionWaitAfterRootReceived > 0 { + if containsRootSpan && tsp.cfg.DecisionWaitAfterRootReceived > 0 { actualData.batchID = tsp.decisionBatcher.MoveToEarlierBatch(id, actualData.batchID, uint64(tsp.cfg.DecisionWaitAfterRootReceived.Seconds())) } From 048c7a088a874a1c282d390f36c24eb2290d3047 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 15:35:07 +0000 Subject: [PATCH 06/14] [processor/tailsampling] align strategy docs/comments with current behavior Update config comments and schema descriptions to match current trace-complete and span-ingest semantics, including root-wait timer effects on span-ingest cleanup finalization. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- processor/tailsamplingprocessor/config.go | 9 ++++++--- processor/tailsamplingprocessor/config.schema.yaml | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 7f673ea9490d4..146cfd227dc69 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -316,8 +316,10 @@ type Config struct { // DecisionWait is the desired wait time from the arrival of the first span of // trace until the decision about sampling it or not is evaluated. DecisionWait time.Duration `mapstructure:"decision_wait"` - // DecisionWaitAfterRootReceived is the desired wait time from the arrival of the root span of - // trace until the decision about sampling it or not is evaluated. + // DecisionWaitAfterRootReceived is the desired wait time from root-span arrival + // until earlier timer handling for that trace. + // In trace-complete, this can make sampling decisions happen earlier. + // In span-ingest, this can make pending cleanup finalization happen earlier. DecisionWaitAfterRootReceived time.Duration `mapstructure:"decision_wait_after_root_received"` // NumTraces is the number of traces kept on memory. Typically most of the data // of a trace is released after a sampling decision is taken. @@ -339,7 +341,8 @@ type Config struct { SampleOnFirstMatch bool `mapstructure:"sample_on_first_match"` // SamplingStrategy controls how/when sampling decisions are made. // "trace-complete" (default) keeps classic tail sampling behavior. - // "span-ingest" evaluates on ingest using accumulated trace data. + // "span-ingest" evaluates each incoming batch on ingest and only terminal + // outcomes finalize immediately; non-terminal outcomes are cleanup-finalized. SamplingStrategy samplingStrategy `mapstructure:"sampling_strategy"` // DropPendingTracesOnShutdown will drop all traces that are part of batches that have not yet reached the decision // wait when the processor is shutdown. diff --git a/processor/tailsamplingprocessor/config.schema.yaml b/processor/tailsamplingprocessor/config.schema.yaml index 2dd076a37f5d8..ffc269d520ad3 100644 --- a/processor/tailsamplingprocessor/config.schema.yaml +++ b/processor/tailsamplingprocessor/config.schema.yaml @@ -430,7 +430,7 @@ properties: type: string format: duration decision_wait_after_root_received: - description: DecisionWaitAfterRootReceived is the desired wait time from the arrival of the root span of trace until the decision about sampling it or not is evaluated. + description: DecisionWaitAfterRootReceived is the desired wait time from root-span arrival until earlier timer handling for that trace. In trace-complete this can make the sampling decision happen earlier; in span-ingest this can make pending cleanup finalization happen earlier. type: string format: duration drop_pending_traces_on_shutdown: From b31e47b5de2db101943638fc9ac942e993010f04 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 18:08:54 +0000 Subject: [PATCH 07/14] [processor/tailsampling] streamline strategy section and restore flow heading Condense sampling strategy documentation into a shorter high-signal section and add a dedicated policy decision flow header so README structure remains easy to scan. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- processor/tailsamplingprocessor/README.md | 28 ++++++----------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index b91d9cd59878f..a77831fdc7412 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -71,32 +71,18 @@ The following configuration options can also be modified: ## Sampling Strategies -The `sampling_strategy` setting controls both decision timing and the data passed to policy evaluators. +The `sampling_strategy` setting controls both decision timing and what data evaluators use: -### trace-complete +- `trace-complete` (default): evaluates on the timer path using accumulated trace data (after `decision_wait`, or earlier after root arrival when `decision_wait_after_root_received` is set). This is the most flexible mode for policies, but with later decisions and higher in-memory/storage pressure. +- `span-ingest`: evaluates each incoming batch at ingest time without re-evaluating previously ingested batches. Terminal outcomes (`sampled`/`dropped`) finalize immediately; non-terminal outcomes stay pending and are finalized as `not sampled` during cleanup without policy re-evaluation. -- Decision timing/data model: default behavior. The processor accumulates trace data and evaluates policies on the timer path after `decision_wait` (or earlier when `decision_wait_after_root_received` is set and a root span arrives). -- Benefits: highest policy flexibility because evaluators observe accumulated trace data at decision time. -- Downsides/tradeoffs: higher memory/storage and deferred decisions, because spans are retained until decision timing is reached. -- Caveats: evaluation depends on delayed decision timing rather than ingest-time finalization. - -### span-ingest - -- Decision timing/data model: evaluates policies at ingest time for each incoming batch of a trace. It does not re-evaluate previously ingested spans. Terminal results (`sampled` or `dropped`) finalize immediately; otherwise the trace remains pending. -- Benefits: earlier terminal outcomes and no repeated evaluation of old spans. -- Downsides/tradeoffs: reduced policy compatibility compared to `trace-complete`. -- Caveats: - - pending traces are cleanup-finalized as `not sampled` on the timer cleanup path without policy re-evaluation. - - stateful policies are rejected for this mode. - - policies with semantics that assume complete traces can produce different outcomes than `trace-complete`. - -### Strategy Comparison Notes +Quick comparison: - Policy compatibility: `trace-complete` supports stateful policies; `span-ingest` rejects them. -- Timer controls: in `trace-complete`, `decision_wait` and `decision_wait_after_root_received` influence decision timing; in `span-ingest`, they influence pending cleanup/finalization timing. -- Late-span behavior: decision caches remain important in all modes to carry decisions for spans arriving after in-memory trace data is gone. -- Terminal vs pending: `span-ingest` finalizes on terminal outcomes immediately and keeps non-terminal outcomes pending until cleanup finalization. +- Timer controls: in `trace-complete`, `decision_wait` and `decision_wait_after_root_received` affect decision timing; in `span-ingest`, they affect pending cleanup/finalization timing. +- Late spans: decision caches remain important in both modes for spans that arrive after in-memory trace data is gone. +## Policy Decision Flow Each policy will result in a decision, and the processor will evaluate them to make a final decision: From 7bcd2a2076a732e3e75218f2414f11baed2250de Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 19:29:08 +0000 Subject: [PATCH 08/14] [processor/tailsampling] clarify decision_wait semantics by strategy Update config comments, schema, and README wording so decision_wait behavior is described correctly for both trace-complete decision timing and span-ingest pending cleanup finalization timing. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- processor/tailsamplingprocessor/README.md | 2 +- processor/tailsamplingprocessor/config.go | 6 ++++-- processor/tailsamplingprocessor/config.schema.yaml | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index a77831fdc7412..86b67a0215fd1 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -48,7 +48,7 @@ Multiple policies exist today and it is straight forward to add more. These incl 3. To ensure remaining capacity is filled use always_sample as one of the policies The following configuration options can also be modified: -- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision +- `decision_wait` (default = 30s): Wait time since first-span arrival before timer handling. In `trace-complete`, this primarily controls decision timing. In `span-ingest`, this primarily controls pending-trace cleanup finalization timing. - `decision_wait_after_root_received` (default = 0s): Wait time after the root span of a trace is received before earlier timer handling. In `trace-complete`, this can make the sampling decision happen earlier. In `span-ingest`, this can make pending-trace cleanup finalization happen earlier. 0s means disabled (only use `decision_wait`). - `sampling_strategy` (default = `trace-complete`): Controls when a decision is made and what data is evaluated. Valid values are `trace-complete` and `span-ingest`. See [Sampling Strategies](#sampling-strategies) for detailed behavior, benefits, tradeoffs, and caveats for each mode. - `num_traces` (default = 50000): Number of traces kept in memory. diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 146cfd227dc69..9de5150efeb9c 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -313,8 +313,10 @@ type DecisionCacheConfig struct { // Config holds the configuration for tail-based sampling. type Config struct { - // DecisionWait is the desired wait time from the arrival of the first span of - // trace until the decision about sampling it or not is evaluated. + // DecisionWait is the desired wait time from first-span arrival until timer + // handling for that trace. + // In trace-complete, this is the primary decision timing. + // In span-ingest, this controls pending cleanup finalization timing. DecisionWait time.Duration `mapstructure:"decision_wait"` // DecisionWaitAfterRootReceived is the desired wait time from root-span arrival // until earlier timer handling for that trace. diff --git a/processor/tailsamplingprocessor/config.schema.yaml b/processor/tailsamplingprocessor/config.schema.yaml index ffc269d520ad3..c756a594d5a8c 100644 --- a/processor/tailsamplingprocessor/config.schema.yaml +++ b/processor/tailsamplingprocessor/config.schema.yaml @@ -426,7 +426,7 @@ properties: description: DecisionCache holds configuration for the decision cache(s) $ref: decision_cache_config decision_wait: - description: DecisionWait is the desired wait time from the arrival of the first span of trace until the decision about sampling it or not is evaluated. + description: DecisionWait is the desired wait time from first-span arrival until timer handling for that trace. In trace-complete this is the primary decision timing; in span-ingest this controls pending cleanup finalization timing. type: string format: duration decision_wait_after_root_received: From 9c1d95c2748c22a298a8beee42d045361b201e9f Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 19:33:14 +0000 Subject: [PATCH 09/14] [processor/tailsampling] add changelog entry for sampling_strategy config Add a chloggen entry describing the new sampling_strategy configuration with trace-complete and span-ingest modes, linked to issue #46600. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- .chloggen/tailsampling-sampling-strategy-config.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .chloggen/tailsampling-sampling-strategy-config.yaml diff --git a/.chloggen/tailsampling-sampling-strategy-config.yaml b/.chloggen/tailsampling-sampling-strategy-config.yaml new file mode 100644 index 0000000000000..281253f1d8323 --- /dev/null +++ b/.chloggen/tailsampling-sampling-strategy-config.yaml @@ -0,0 +1,5 @@ +change_type: enhancement +component: processor/tail_sampling +note: Add `sampling_strategy` config with `trace-complete` and `span-ingest` modes for tail sampling decision timing and evaluation behavior. +issues: [46600] +change_logs: [user] From c81d057b1a372a6cfd82884c92b1b35eb4142c16 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 20:01:39 +0000 Subject: [PATCH 10/14] [processor/tailsampling] fix race in span-ingest decision tests using mock evaluator Make mockPolicyEvaluator thread-safe with mutex-guarded decision and count accessors and update span-ingest tests to use those synchronized methods, eliminating race-detector failures while keeping the mock-based test style. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- .../processor_decisions_test.go | 65 ++++++++++++------- .../tailsamplingprocessor/processor_test.go | 23 +++++++ 2 files changed, 65 insertions(+), 23 deletions(-) diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index f3266c93edcae..85abfcdfd8568 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -5,6 +5,7 @@ package tailsamplingprocessor import ( "context" + "sync" "testing" "time" @@ -25,11 +26,15 @@ import ( ) type batchSizeTrackingEvaluator struct { + mu sync.Mutex decisions []samplingpolicy.Decision evaluationBatches []int } func (e *batchSizeTrackingEvaluator) Evaluate(_ context.Context, _ pcommon.TraceID, trace *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + e.mu.Lock() + defer e.mu.Unlock() + e.evaluationBatches = append(e.evaluationBatches, trace.ReceivedBatches.SpanCount()) if len(e.decisions) == 0 { return samplingpolicy.NotSampled, nil @@ -43,6 +48,20 @@ func (*batchSizeTrackingEvaluator) IsStateful() bool { return false } +func (e *batchSizeTrackingEvaluator) EvaluationCount() int { + e.mu.Lock() + defer e.mu.Unlock() + return len(e.evaluationBatches) +} + +func (e *batchSizeTrackingEvaluator) EvaluationBatches() []int { + e.mu.Lock() + defer e.mu.Unlock() + batches := make([]int, len(e.evaluationBatches)) + copy(batches, e.evaluationBatches) + return batches +} + func TestSamplingPolicyTypicalPath(t *testing.T) { nextConsumer := new(consumertest.TracesSink) @@ -933,16 +952,16 @@ func TestSpanIngestEvaluatesOnIngestAndFinalizesOnTerminalDecision(t *testing.T) rootSpanID := uInt64ToSpanID(1) // First ingest path evaluation is non-terminal in span-ingest mode. - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetNextDecision(samplingpolicy.NotSampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) - require.Eventually(t, func() bool { return mpe.EvaluationCount == 1 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 1 }, time.Second, 10*time.Millisecond) require.Equal(t, 0, nextConsumer.SpanCount()) // Second ingest path evaluation returns terminal Sampled and releases both spans. - mpe.NextDecision = samplingpolicy.Sampled + mpe.SetNextDecision(samplingpolicy.Sampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) require.Eventually(t, func() bool { - return mpe.EvaluationCount == 2 && nextConsumer.SpanCount() == 2 + return mpe.GetEvaluationCount() == 2 && nextConsumer.SpanCount() == 2 }, time.Second, 10*time.Millisecond) } @@ -979,23 +998,23 @@ func TestSpanIngestFinalizesOnDroppedDecision(t *testing.T) { rootSpanID := uInt64ToSpanID(1) // First ingest path evaluation is non-terminal in span-ingest mode. - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetNextDecision(samplingpolicy.NotSampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) - require.Eventually(t, func() bool { return mpe.EvaluationCount == 1 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 1 }, time.Second, 10*time.Millisecond) require.Equal(t, 0, nextConsumer.SpanCount()) // Second ingest path evaluation returns terminal Dropped and finalizes trace. - mpe.NextDecision = samplingpolicy.Dropped + mpe.SetNextDecision(samplingpolicy.Dropped) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) require.Eventually(t, func() bool { - return mpe.EvaluationCount == 2 && nextConsumer.SpanCount() == 0 + return mpe.GetEvaluationCount() == 2 && nextConsumer.SpanCount() == 0 }, time.Second, 10*time.Millisecond) // Late spans should use cached non-sampled decision and skip re-evaluation. - mpe.NextDecision = samplingpolicy.Sampled + mpe.SetNextDecision(samplingpolicy.Sampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(3), rootSpanID))) require.Eventually(t, func() bool { - return mpe.EvaluationCount == 2 && nextConsumer.SpanCount() == 0 + return mpe.GetEvaluationCount() == 2 && nextConsumer.SpanCount() == 0 }, time.Second, 10*time.Millisecond) } @@ -1029,15 +1048,15 @@ func TestSpanIngestTickCleansUpPendingWithoutPolicyEvaluation(t *testing.T) { }(p) // Non-terminal in span-ingest mode: pending in memory. - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetNextDecision(samplingpolicy.NotSampled) require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) - require.Eventually(t, func() bool { return mpe.EvaluationCount == 1 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 1 }, time.Second, 10*time.Millisecond) require.Equal(t, 0, nextConsumer.SpanCount()) // Tick path cleans pending traces without evaluating policies in span-ingest mode. controller.waitForTick() controller.waitForTick() - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.GetEvaluationCount()) require.Equal(t, 0, nextConsumer.SpanCount()) require.Eventually(t, func() bool { return len(p.(*tailSamplingSpanProcessor).idToTrace) == 0 @@ -1079,9 +1098,9 @@ func TestSpanIngestEvaluatesOnlyIncomingBatch(t *testing.T) { require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) require.Eventually(t, func() bool { - return len(eval.evaluationBatches) == 2 + return eval.EvaluationCount() == 2 }, time.Second, 10*time.Millisecond) - require.Equal(t, []int{1, 1}, eval.evaluationBatches) + require.Equal(t, []int{1, 1}, eval.EvaluationBatches()) require.Eventually(t, func() bool { return nextConsumer.SpanCount() == 2 }, time.Second, 10*time.Millisecond) @@ -1116,16 +1135,16 @@ func TestSpanIngestRootFirstThenChild(t *testing.T) { rootSpanID := uInt64ToSpanID(1) // Root first, non-terminal. - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetNextDecision(samplingpolicy.NotSampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) - require.Eventually(t, func() bool { return mpe.EvaluationCount == 1 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 1 }, time.Second, 10*time.Millisecond) require.Equal(t, 0, nextConsumer.SpanCount()) // Child later, terminal sampled -> both spans released. - mpe.NextDecision = samplingpolicy.Sampled + mpe.SetNextDecision(samplingpolicy.Sampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) require.Eventually(t, func() bool { - return mpe.EvaluationCount == 2 && nextConsumer.SpanCount() == 2 + return mpe.GetEvaluationCount() == 2 && nextConsumer.SpanCount() == 2 }, time.Second, 10*time.Millisecond) } @@ -1161,17 +1180,17 @@ func TestSpanIngestChildFirstThenRootPendingCleanup(t *testing.T) { rootSpanID := uInt64ToSpanID(1) // Child then root, both non-terminal. - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetNextDecision(samplingpolicy.NotSampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) - require.Eventually(t, func() bool { return mpe.EvaluationCount == 1 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 1 }, time.Second, 10*time.Millisecond) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) - require.Eventually(t, func() bool { return mpe.EvaluationCount == 2 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 2 }, time.Second, 10*time.Millisecond) require.Equal(t, 0, nextConsumer.SpanCount()) // Cleanup tick finalizes pending as not sampled without more evaluations. controller.waitForTick() controller.waitForTick() - require.Equal(t, 2, mpe.EvaluationCount) + require.Equal(t, 2, mpe.GetEvaluationCount()) require.Equal(t, 0, nextConsumer.SpanCount()) require.Eventually(t, func() bool { return len(p.(*tailSamplingSpanProcessor).idToTrace) == 0 diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index c813792fd47b4..d66bbe8a79613 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -1036,6 +1036,8 @@ func uInt64ToSpanID(id uint64) pcommon.SpanID { } type mockPolicyEvaluator struct { + mu sync.Mutex + NextDecision samplingpolicy.Decision NextError error EvaluationCount int @@ -1044,6 +1046,9 @@ type mockPolicyEvaluator struct { var _ samplingpolicy.Evaluator = (*mockPolicyEvaluator)(nil) func (m *mockPolicyEvaluator) Evaluate(context.Context, pcommon.TraceID, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.EvaluationCount++ return m.NextDecision, m.NextError } @@ -1052,6 +1057,24 @@ func (*mockPolicyEvaluator) IsStateful() bool { return false } +func (m *mockPolicyEvaluator) SetNextDecision(decision samplingpolicy.Decision) { + m.mu.Lock() + defer m.mu.Unlock() + m.NextDecision = decision +} + +func (m *mockPolicyEvaluator) SetNextError(nextError error) { + m.mu.Lock() + defer m.mu.Unlock() + m.NextError = nextError +} + +func (m *mockPolicyEvaluator) GetEvaluationCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.EvaluationCount +} + type syncIDBatcher struct { sync.Mutex openBatch idbatcher.Batch From 34e27c86a15f25739c73e2319ee4088a808a2732 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 20:13:42 +0000 Subject: [PATCH 11/14] [processor/tailsampling] unify mock evaluator naming and keep race-safe access Make mockPolicyEvaluator field access private and expose synchronized helpers with names aligned to existing evaluator style (SetDecision, SetError, EvaluationCount), then update tests to use those methods consistently. Assisted-by: ChatGPT 5.3 Codex Made-with: Cursor --- .../processor_decisions_test.go | 162 +++++++++--------- .../tailsamplingprocessor/processor_test.go | 28 +-- 2 files changed, 95 insertions(+), 95 deletions(-) diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index 85abfcdfd8568..db3abc1996ecc 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -89,20 +89,20 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { require.NoError(t, p.Shutdown(t.Context())) }() - mpe1.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) // The final decision SHOULD be Sampled. require.Equal(t, 1, nextConsumer.SpanCount()) @@ -136,20 +136,20 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { }() //nolint:staticcheck // SA1019: Use of inverted decisions until they are fully removed. - mpe1.NextDecision = samplingpolicy.InvertSampled + mpe1.SetDecision(samplingpolicy.InvertSampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) // The final decision SHOULD be Sampled. require.Equal(t, 1, nextConsumer.SpanCount()) @@ -185,23 +185,23 @@ func TestSamplingMultiplePolicies(t *testing.T) { }() // InvertNotSampled takes precedence - mpe1.NextDecision = samplingpolicy.Sampled - mpe2.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.Sampled) + mpe2.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) - require.Equal(t, 0, mpe2.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) + require.Equal(t, 0, mpe2.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) - require.Equal(t, 1, mpe2.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) + require.Equal(t, 1, mpe2.EvaluationCount()) // The final decision SHOULD be Sampled. require.Equal(t, 1, nextConsumer.SpanCount()) @@ -237,8 +237,8 @@ func TestSamplingMultiplePolicies_WithRecordPolicy(t *testing.T) { }() // First policy takes precedence - mpe1.NextDecision = samplingpolicy.Sampled - mpe2.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.Sampled) + mpe2.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) @@ -287,20 +287,20 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { }() // InvertNotSampled takes precedence - mpe1.NextDecision = samplingpolicy.NotSampled + mpe1.SetDecision(samplingpolicy.NotSampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) // The final decision SHOULD be NotSampled. require.Equal(t, 0, nextConsumer.SpanCount()) @@ -334,7 +334,7 @@ func TestSamplingPolicyDecisionNotSampled_WithRecordPolicy(t *testing.T) { }() // InvertNotSampled takes precedence - mpe1.NextDecision = samplingpolicy.NotSampled + mpe1.SetDecision(samplingpolicy.NotSampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) @@ -379,23 +379,23 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { // InvertNotSampled takes precedence //nolint:staticcheck // SA1019: Use of inverted decisions until they are fully removed. - mpe1.NextDecision = samplingpolicy.InvertNotSampled - mpe2.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.InvertNotSampled) + mpe2.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) - require.Equal(t, 0, mpe2.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) + require.Equal(t, 0, mpe2.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) - require.Equal(t, 1, mpe2.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) + require.Equal(t, 1, mpe2.EvaluationCount()) // The final decision SHOULD be NotSampled. require.Equal(t, 0, nextConsumer.SpanCount()) @@ -432,8 +432,8 @@ func TestSamplingPolicyDecisionInvertNotSampled_WithRecordPolicy(t *testing.T) { // InvertNotSampled takes precedence //nolint:staticcheck // SA1019: Use of inverted decisions until they are fully removed. - mpe1.NextDecision = samplingpolicy.InvertNotSampled - mpe2.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.InvertNotSampled) + mpe2.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) @@ -481,8 +481,8 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { // The combined decision from the policies is NotSampled //nolint:staticcheck // SA1019: Use of inverted decisions until they are fully removed. - mpe1.NextDecision = samplingpolicy.InvertSampled - mpe2.NextDecision = samplingpolicy.NotSampled + mpe1.SetDecision(samplingpolicy.InvertSampled) + mpe2.SetDecision(samplingpolicy.NotSampled) // A function that return a ptrace.Traces containing a single span for the single trace we are using. spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { @@ -498,15 +498,15 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) - require.Equal(t, 0, mpe2.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) + require.Equal(t, 0, mpe2.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) - require.Equal(t, 1, mpe2.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) + require.Equal(t, 1, mpe2.EvaluationCount()) // The final decision SHOULD be NotSampled. require.Equal(t, 0, nextConsumer.SpanCount()) @@ -514,8 +514,8 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. require.NoError(t, p.ConsumeTraces(t.Context(), spanIndexToTraces(2))) - require.Equal(t, 1, mpe1.EvaluationCount) - require.Equal(t, 1, mpe2.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) + require.Equal(t, 1, mpe2.EvaluationCount()) require.Equal(t, 0, nextConsumer.SpanCount(), "original final decision not honored") } @@ -555,7 +555,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { traceID := uInt64ToTraceID(1) // The first span will be sampled, this will later be set to not sampled, but the sampling decision will be cached - mpe.NextDecision = samplingpolicy.Sampled + mpe.SetDecision(samplingpolicy.Sampled) // A function that return a ptrace.Traces containing a single span for the single trace we are using. spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { @@ -571,13 +571,13 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe.EvaluationCount) + require.Equal(t, 0, mpe.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Policy should have been evaluated once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) // The final decision SHOULD be Sampled. require.Equal(t, 1, nextConsumer.SpanCount()) @@ -591,7 +591,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { }(p) // Set next decision to not sampled, ensuring the next decision is determined by the decision cache, not the policy - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetDecision(samplingpolicy.NotSampled) // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. @@ -599,7 +599,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { controller.waitForTick() // Policy should still have been evaluated only once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) require.Equal(t, 2, nextConsumer.SpanCount(), "original final decision not honored") allTraces := nextConsumer.AllTraces() require.Len(t, allTraces, 2) @@ -653,7 +653,7 @@ func TestLateArrivingSpanUsesDecisionCacheWhenDropped(t *testing.T) { traceID := uInt64ToTraceID(1) // The first span will be sampled, this will later be set to not sampled, but the sampling decision will be cached - mpe.NextDecision = samplingpolicy.Dropped + mpe.SetDecision(samplingpolicy.Dropped) // A function that return a ptrace.Traces containing a single span for the single trace we are using. spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { @@ -669,13 +669,13 @@ func TestLateArrivingSpanUsesDecisionCacheWhenDropped(t *testing.T) { // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe.EvaluationCount) + require.Equal(t, 0, mpe.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Policy should have been evaluated once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) // The final decision SHOULD be Dropped. require.Equal(t, 0, nextConsumer.SpanCount()) @@ -689,7 +689,7 @@ func TestLateArrivingSpanUsesDecisionCacheWhenDropped(t *testing.T) { }(p) // Set next decision to not sampled, ensuring the next decision is determined by the decision cache, not the policy - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetDecision(samplingpolicy.NotSampled) // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. @@ -697,7 +697,7 @@ func TestLateArrivingSpanUsesDecisionCacheWhenDropped(t *testing.T) { controller.waitForTick() // Policy should still have been evaluated only once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) require.Equal(t, 0, nextConsumer.SpanCount(), "original final decision not honored") allTraces := nextConsumer.AllTraces() require.Empty(t, allTraces) @@ -738,7 +738,7 @@ func TestLateArrivingSpanWithoutCacheMetadata(t *testing.T) { traceID := uInt64ToTraceID(1) // The first span will be sampled, this will later be set to not sampled, but the sampling decision will be cached - mpe.NextDecision = samplingpolicy.Sampled + mpe.SetDecision(samplingpolicy.Sampled) // A function that return a ptrace.Traces containing a single span for the single trace we are using. spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { @@ -761,7 +761,7 @@ func TestLateArrivingSpanWithoutCacheMetadata(t *testing.T) { }(p) // Set next decision to not sampled, ensuring the next decision is determined by the decision cache, not the policy - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetDecision(samplingpolicy.NotSampled) // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. @@ -769,7 +769,7 @@ func TestLateArrivingSpanWithoutCacheMetadata(t *testing.T) { controller.waitForTick() // Policy should never have been evaluated - require.Equal(t, 0, mpe.EvaluationCount) + require.Equal(t, 0, mpe.EvaluationCount()) require.Equal(t, 1, nextConsumer.SpanCount(), "original final decision not honored") allTraces := nextConsumer.AllTraces() require.Len(t, allTraces, 1) @@ -817,7 +817,7 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { traceID := uInt64ToTraceID(1) // The first span will be NOT sampled, this will later be set to sampled, but the sampling decision will be cached - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetDecision(samplingpolicy.NotSampled) // A function that return a ptrace.Traces containing a single span for the single trace we are using. spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { @@ -833,13 +833,13 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe.EvaluationCount) + require.Equal(t, 0, mpe.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Policy should have been evaluated once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) // The final decision SHOULD be NOT Sampled. require.Equal(t, 0, nextConsumer.SpanCount()) @@ -853,7 +853,7 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { }(p) // Set next decision to sampled, ensuring the next decision is determined by the decision cache, not the policy - mpe.NextDecision = samplingpolicy.Sampled + mpe.SetDecision(samplingpolicy.Sampled) // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. @@ -861,7 +861,7 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { controller.waitForTick() // Policy should still have been evaluated only once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) require.Equal(t, 0, nextConsumer.SpanCount(), "original final decision not honored") } @@ -898,25 +898,25 @@ func TestSampleOnFirstMatch(t *testing.T) { }(p) // Second policy matches, last policy should not be evaluated - mpe1.NextDecision = samplingpolicy.NotSampled - mpe2.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.NotSampled) + mpe2.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) - require.Equal(t, 0, mpe2.EvaluationCount) - require.Equal(t, 0, mpe3.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) + require.Equal(t, 0, mpe2.EvaluationCount()) + require.Equal(t, 0, mpe3.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Only the first policy should have been evaluated - require.Equal(t, 1, mpe1.EvaluationCount) - require.Equal(t, 1, mpe2.EvaluationCount) - require.Equal(t, 0, mpe3.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) + require.Equal(t, 1, mpe2.EvaluationCount()) + require.Equal(t, 0, mpe3.EvaluationCount()) // The final decision SHOULD be Sampled. require.Equal(t, 1, nextConsumer.SpanCount()) @@ -952,16 +952,16 @@ func TestSpanIngestEvaluatesOnIngestAndFinalizesOnTerminalDecision(t *testing.T) rootSpanID := uInt64ToSpanID(1) // First ingest path evaluation is non-terminal in span-ingest mode. - mpe.SetNextDecision(samplingpolicy.NotSampled) + mpe.SetDecision(samplingpolicy.NotSampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) - require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 1 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 1 }, time.Second, 10*time.Millisecond) require.Equal(t, 0, nextConsumer.SpanCount()) // Second ingest path evaluation returns terminal Sampled and releases both spans. - mpe.SetNextDecision(samplingpolicy.Sampled) + mpe.SetDecision(samplingpolicy.Sampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) require.Eventually(t, func() bool { - return mpe.GetEvaluationCount() == 2 && nextConsumer.SpanCount() == 2 + return mpe.EvaluationCount() == 2 && nextConsumer.SpanCount() == 2 }, time.Second, 10*time.Millisecond) } @@ -998,23 +998,23 @@ func TestSpanIngestFinalizesOnDroppedDecision(t *testing.T) { rootSpanID := uInt64ToSpanID(1) // First ingest path evaluation is non-terminal in span-ingest mode. - mpe.SetNextDecision(samplingpolicy.NotSampled) + mpe.SetDecision(samplingpolicy.NotSampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) - require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 1 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 1 }, time.Second, 10*time.Millisecond) require.Equal(t, 0, nextConsumer.SpanCount()) // Second ingest path evaluation returns terminal Dropped and finalizes trace. - mpe.SetNextDecision(samplingpolicy.Dropped) + mpe.SetDecision(samplingpolicy.Dropped) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) require.Eventually(t, func() bool { - return mpe.GetEvaluationCount() == 2 && nextConsumer.SpanCount() == 0 + return mpe.EvaluationCount() == 2 && nextConsumer.SpanCount() == 0 }, time.Second, 10*time.Millisecond) // Late spans should use cached non-sampled decision and skip re-evaluation. - mpe.SetNextDecision(samplingpolicy.Sampled) + mpe.SetDecision(samplingpolicy.Sampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(3), rootSpanID))) require.Eventually(t, func() bool { - return mpe.GetEvaluationCount() == 2 && nextConsumer.SpanCount() == 0 + return mpe.EvaluationCount() == 2 && nextConsumer.SpanCount() == 0 }, time.Second, 10*time.Millisecond) } @@ -1048,15 +1048,15 @@ func TestSpanIngestTickCleansUpPendingWithoutPolicyEvaluation(t *testing.T) { }(p) // Non-terminal in span-ingest mode: pending in memory. - mpe.SetNextDecision(samplingpolicy.NotSampled) + mpe.SetDecision(samplingpolicy.NotSampled) require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) - require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 1 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 1 }, time.Second, 10*time.Millisecond) require.Equal(t, 0, nextConsumer.SpanCount()) // Tick path cleans pending traces without evaluating policies in span-ingest mode. controller.waitForTick() controller.waitForTick() - require.Equal(t, 1, mpe.GetEvaluationCount()) + require.Equal(t, 1, mpe.EvaluationCount()) require.Equal(t, 0, nextConsumer.SpanCount()) require.Eventually(t, func() bool { return len(p.(*tailSamplingSpanProcessor).idToTrace) == 0 @@ -1135,16 +1135,16 @@ func TestSpanIngestRootFirstThenChild(t *testing.T) { rootSpanID := uInt64ToSpanID(1) // Root first, non-terminal. - mpe.SetNextDecision(samplingpolicy.NotSampled) + mpe.SetDecision(samplingpolicy.NotSampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) - require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 1 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 1 }, time.Second, 10*time.Millisecond) require.Equal(t, 0, nextConsumer.SpanCount()) // Child later, terminal sampled -> both spans released. - mpe.SetNextDecision(samplingpolicy.Sampled) + mpe.SetDecision(samplingpolicy.Sampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) require.Eventually(t, func() bool { - return mpe.GetEvaluationCount() == 2 && nextConsumer.SpanCount() == 2 + return mpe.EvaluationCount() == 2 && nextConsumer.SpanCount() == 2 }, time.Second, 10*time.Millisecond) } @@ -1180,17 +1180,17 @@ func TestSpanIngestChildFirstThenRootPendingCleanup(t *testing.T) { rootSpanID := uInt64ToSpanID(1) // Child then root, both non-terminal. - mpe.SetNextDecision(samplingpolicy.NotSampled) + mpe.SetDecision(samplingpolicy.NotSampled) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) - require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 1 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 1 }, time.Second, 10*time.Millisecond) require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) - require.Eventually(t, func() bool { return mpe.GetEvaluationCount() == 2 }, time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 2 }, time.Second, 10*time.Millisecond) require.Equal(t, 0, nextConsumer.SpanCount()) // Cleanup tick finalizes pending as not sampled without more evaluations. controller.waitForTick() controller.waitForTick() - require.Equal(t, 2, mpe.GetEvaluationCount()) + require.Equal(t, 2, mpe.EvaluationCount()) require.Equal(t, 0, nextConsumer.SpanCount()) require.Eventually(t, func() bool { return len(p.(*tailSamplingSpanProcessor).idToTrace) == 0 diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index d66bbe8a79613..51e0559b11128 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -219,20 +219,20 @@ func TestTraceIntegrity(t *testing.T) { require.NoError(t, p.Shutdown(t.Context())) }() - mpe1.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), traces)) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - assert.Equal(t, 4, mpe1.EvaluationCount) + assert.Equal(t, 4, mpe1.EvaluationCount()) consumed := nextConsumer.AllTraces() require.Len(t, consumed, 4) @@ -1038,9 +1038,9 @@ func uInt64ToSpanID(id uint64) pcommon.SpanID { type mockPolicyEvaluator struct { mu sync.Mutex - NextDecision samplingpolicy.Decision - NextError error - EvaluationCount int + nextDecision samplingpolicy.Decision + nextError error + evaluationCount int } var _ samplingpolicy.Evaluator = (*mockPolicyEvaluator)(nil) @@ -1049,30 +1049,30 @@ func (m *mockPolicyEvaluator) Evaluate(context.Context, pcommon.TraceID, *sampli m.mu.Lock() defer m.mu.Unlock() - m.EvaluationCount++ - return m.NextDecision, m.NextError + m.evaluationCount++ + return m.nextDecision, m.nextError } func (*mockPolicyEvaluator) IsStateful() bool { return false } -func (m *mockPolicyEvaluator) SetNextDecision(decision samplingpolicy.Decision) { +func (m *mockPolicyEvaluator) SetDecision(decision samplingpolicy.Decision) { m.mu.Lock() defer m.mu.Unlock() - m.NextDecision = decision + m.nextDecision = decision } -func (m *mockPolicyEvaluator) SetNextError(nextError error) { +func (m *mockPolicyEvaluator) SetError(nextError error) { m.mu.Lock() defer m.mu.Unlock() - m.NextError = nextError + m.nextError = nextError } -func (m *mockPolicyEvaluator) GetEvaluationCount() int { +func (m *mockPolicyEvaluator) EvaluationCount() int { m.mu.Lock() defer m.mu.Unlock() - return m.EvaluationCount + return m.evaluationCount } type syncIDBatcher struct { From c498e8e0b126f4d42c493274cd98fe8f766de8f3 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 20:49:12 +0000 Subject: [PATCH 12/14] make generate-schemas --- processor/tailsamplingprocessor/config.schema.yaml | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/processor/tailsamplingprocessor/config.schema.yaml b/processor/tailsamplingprocessor/config.schema.yaml index c756a594d5a8c..68ccb7eeba58f 100644 --- a/processor/tailsamplingprocessor/config.schema.yaml +++ b/processor/tailsamplingprocessor/config.schema.yaml @@ -426,11 +426,11 @@ properties: description: DecisionCache holds configuration for the decision cache(s) $ref: decision_cache_config decision_wait: - description: DecisionWait is the desired wait time from first-span arrival until timer handling for that trace. In trace-complete this is the primary decision timing; in span-ingest this controls pending cleanup finalization timing. + description: DecisionWait is the desired wait time from first-span arrival until timer handling for that trace. In trace-complete, this is the primary decision timing. In span-ingest, this controls pending cleanup finalization timing. type: string format: duration decision_wait_after_root_received: - description: DecisionWaitAfterRootReceived is the desired wait time from root-span arrival until earlier timer handling for that trace. In trace-complete this can make the sampling decision happen earlier; in span-ingest this can make pending cleanup finalization happen earlier. + description: DecisionWaitAfterRootReceived is the desired wait time from root-span arrival until earlier timer handling for that trace. In trace-complete, this can make sampling decisions happen earlier. In span-ingest, this can make pending cleanup finalization happen earlier. type: string format: duration drop_pending_traces_on_shutdown: @@ -457,8 +457,5 @@ properties: description: Make decision as soon as a policy matches type: boolean sampling_strategy: - description: SamplingStrategy controls how/when sampling decisions are made. trace-complete (default) accumulates spans and makes a decision after decision_wait using full-trace context. span-ingest evaluates as spans arrive (in any order), finalizing terminal outcomes immediately and finalizing unresolved traces as not sampled during cleanup without policy re-evaluation; stateful policies are not supported in this mode. + description: SamplingStrategy controls how/when sampling decisions are made. "trace-complete" (default) keeps classic tail sampling behavior. "span-ingest" evaluates each incoming batch on ingest and only terminal outcomes finalize immediately; non-terminal outcomes are cleanup-finalized. type: string - enum: - - trace-complete - - span-ingest From 5d2a2dd7655d5d8cfaf578adfdbfa020ea1f3a59 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 20:52:09 +0000 Subject: [PATCH 13/14] Update config description --- processor/tailsamplingprocessor/README.md | 2 +- processor/tailsamplingprocessor/config.go | 6 +++--- processor/tailsamplingprocessor/config.schema.yaml | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index 86b67a0215fd1..33b83e0b9268a 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -50,7 +50,7 @@ Multiple policies exist today and it is straight forward to add more. These incl The following configuration options can also be modified: - `decision_wait` (default = 30s): Wait time since first-span arrival before timer handling. In `trace-complete`, this primarily controls decision timing. In `span-ingest`, this primarily controls pending-trace cleanup finalization timing. - `decision_wait_after_root_received` (default = 0s): Wait time after the root span of a trace is received before earlier timer handling. In `trace-complete`, this can make the sampling decision happen earlier. In `span-ingest`, this can make pending-trace cleanup finalization happen earlier. 0s means disabled (only use `decision_wait`). -- `sampling_strategy` (default = `trace-complete`): Controls when a decision is made and what data is evaluated. Valid values are `trace-complete` and `span-ingest`. See [Sampling Strategies](#sampling-strategies) for detailed behavior, benefits, tradeoffs, and caveats for each mode. +- `sampling_strategy` (default = `trace-complete`): Controls decision timing and evaluation scope. `trace-complete` evaluates accumulated trace data on timer handling; `span-ingest` evaluates each incoming batch on ingest, finalizing terminal outcomes immediately and non-terminal traces on cleanup. See [Sampling Strategies](#sampling-strategies) for details. - `num_traces` (default = 50000): Number of traces kept in memory. - `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures) - `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 9de5150efeb9c..db8389764899e 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -342,9 +342,9 @@ type Config struct { // Make decision as soon as a policy matches SampleOnFirstMatch bool `mapstructure:"sample_on_first_match"` // SamplingStrategy controls how/when sampling decisions are made. - // "trace-complete" (default) keeps classic tail sampling behavior. - // "span-ingest" evaluates each incoming batch on ingest and only terminal - // outcomes finalize immediately; non-terminal outcomes are cleanup-finalized. + // "trace-complete" (default) evaluates accumulated trace data on timer handling. + // "span-ingest" evaluates each incoming batch on ingest; terminal outcomes + // finalize immediately, and non-terminal traces are finalized on cleanup. SamplingStrategy samplingStrategy `mapstructure:"sampling_strategy"` // DropPendingTracesOnShutdown will drop all traces that are part of batches that have not yet reached the decision // wait when the processor is shutdown. diff --git a/processor/tailsamplingprocessor/config.schema.yaml b/processor/tailsamplingprocessor/config.schema.yaml index 68ccb7eeba58f..07e8fd42833d1 100644 --- a/processor/tailsamplingprocessor/config.schema.yaml +++ b/processor/tailsamplingprocessor/config.schema.yaml @@ -457,5 +457,5 @@ properties: description: Make decision as soon as a policy matches type: boolean sampling_strategy: - description: SamplingStrategy controls how/when sampling decisions are made. "trace-complete" (default) keeps classic tail sampling behavior. "span-ingest" evaluates each incoming batch on ingest and only terminal outcomes finalize immediately; non-terminal outcomes are cleanup-finalized. + description: SamplingStrategy controls how/when sampling decisions are made. "trace-complete" (default) evaluates accumulated trace data on timer handling. "span-ingest" evaluates each incoming batch on ingest; terminal outcomes finalize immediately, and non-terminal traces are finalized on cleanup. type: string From 60805c0b1e2f101a370ba95526567990ca39f5b0 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 9 Mar 2026 21:05:06 +0000 Subject: [PATCH 14/14] Update config --- processor/tailsamplingprocessor/README.md | 4 ++-- processor/tailsamplingprocessor/config.go | 14 ++++++-------- processor/tailsamplingprocessor/config.schema.yaml | 4 ++-- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index 33b83e0b9268a..475a36f466010 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -48,9 +48,9 @@ Multiple policies exist today and it is straight forward to add more. These incl 3. To ensure remaining capacity is filled use always_sample as one of the policies The following configuration options can also be modified: -- `decision_wait` (default = 30s): Wait time since first-span arrival before timer handling. In `trace-complete`, this primarily controls decision timing. In `span-ingest`, this primarily controls pending-trace cleanup finalization timing. -- `decision_wait_after_root_received` (default = 0s): Wait time after the root span of a trace is received before earlier timer handling. In `trace-complete`, this can make the sampling decision happen earlier. In `span-ingest`, this can make pending-trace cleanup finalization happen earlier. 0s means disabled (only use `decision_wait`). - `sampling_strategy` (default = `trace-complete`): Controls decision timing and evaluation scope. `trace-complete` evaluates accumulated trace data on timer handling; `span-ingest` evaluates each incoming batch on ingest, finalizing terminal outcomes immediately and non-terminal traces on cleanup. See [Sampling Strategies](#sampling-strategies) for details. +- `decision_wait` (default = 30s): Time before timer handling for a trace. When `sampling_strategy` is `trace-complete`, this controls decision timing. When `sampling_strategy` is `span-ingest`, this controls pending cleanup finalization timing. +- `decision_wait_after_root_received` (default = 0s): Additional root-span-based acceleration for timer handling. When `sampling_strategy` is `trace-complete`, this can make decisions earlier. When `sampling_strategy` is `span-ingest`, this can finalize pending traces earlier on cleanup. `0s` disables it. - `num_traces` (default = 50000): Number of traces kept in memory. - `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures) - `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index db8389764899e..0a901d5ae002c 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -313,15 +313,13 @@ type DecisionCacheConfig struct { // Config holds the configuration for tail-based sampling. type Config struct { - // DecisionWait is the desired wait time from first-span arrival until timer - // handling for that trace. - // In trace-complete, this is the primary decision timing. - // In span-ingest, this controls pending cleanup finalization timing. + // DecisionWait is the time before timer handling for a trace. + // When sampling_strategy is "trace-complete", this controls decision timing. + // When sampling_strategy is "span-ingest", this controls pending cleanup finalization timing. DecisionWait time.Duration `mapstructure:"decision_wait"` - // DecisionWaitAfterRootReceived is the desired wait time from root-span arrival - // until earlier timer handling for that trace. - // In trace-complete, this can make sampling decisions happen earlier. - // In span-ingest, this can make pending cleanup finalization happen earlier. + // DecisionWaitAfterRootReceived adds root-span-based acceleration for timer handling. + // When sampling_strategy is "trace-complete", this can make decisions earlier. + // When sampling_strategy is "span-ingest", this can finalize pending traces earlier on cleanup. DecisionWaitAfterRootReceived time.Duration `mapstructure:"decision_wait_after_root_received"` // NumTraces is the number of traces kept on memory. Typically most of the data // of a trace is released after a sampling decision is taken. diff --git a/processor/tailsamplingprocessor/config.schema.yaml b/processor/tailsamplingprocessor/config.schema.yaml index 07e8fd42833d1..07c292786f935 100644 --- a/processor/tailsamplingprocessor/config.schema.yaml +++ b/processor/tailsamplingprocessor/config.schema.yaml @@ -426,11 +426,11 @@ properties: description: DecisionCache holds configuration for the decision cache(s) $ref: decision_cache_config decision_wait: - description: DecisionWait is the desired wait time from first-span arrival until timer handling for that trace. In trace-complete, this is the primary decision timing. In span-ingest, this controls pending cleanup finalization timing. + description: DecisionWait is the time before timer handling for a trace. When sampling_strategy is "trace-complete", this controls decision timing. When sampling_strategy is "span-ingest", this controls pending cleanup finalization timing. type: string format: duration decision_wait_after_root_received: - description: DecisionWaitAfterRootReceived is the desired wait time from root-span arrival until earlier timer handling for that trace. In trace-complete, this can make sampling decisions happen earlier. In span-ingest, this can make pending cleanup finalization happen earlier. + description: DecisionWaitAfterRootReceived adds root-span-based acceleration for timer handling. When sampling_strategy is "trace-complete", this can make decisions earlier. When sampling_strategy is "span-ingest", this can finalize pending traces earlier on cleanup. type: string format: duration drop_pending_traces_on_shutdown: