diff --git a/.chloggen/tailsampling-sampling-strategy-config.yaml b/.chloggen/tailsampling-sampling-strategy-config.yaml new file mode 100644 index 0000000000000..281253f1d8323 --- /dev/null +++ b/.chloggen/tailsampling-sampling-strategy-config.yaml @@ -0,0 +1,5 @@ +change_type: enhancement +component: processor/tail_sampling +note: Add `sampling_strategy` config with `trace-complete` and `span-ingest` modes for tail sampling decision timing and evaluation behavior. +issues: [46600] +change_logs: [user] diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index 43946c3788469..475a36f466010 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -48,8 +48,9 @@ Multiple policies exist today and it is straight forward to add more. These incl 3. To ensure remaining capacity is filled use always_sample as one of the policies The following configuration options can also be modified: -- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision -- `decision_wait_after_root_received` (default = 0s): Wait time after the root span of a trace is received before making a sampling decision. 0s means disabled (only use `decision_wait`). +- `sampling_strategy` (default = `trace-complete`): Controls decision timing and evaluation scope. `trace-complete` evaluates accumulated trace data on timer handling; `span-ingest` evaluates each incoming batch on ingest, finalizing terminal outcomes immediately and non-terminal traces on cleanup. See [Sampling Strategies](#sampling-strategies) for details. +- `decision_wait` (default = 30s): Time before timer handling for a trace. When `sampling_strategy` is `trace-complete`, this controls decision timing. When `sampling_strategy` is `span-ingest`, this controls pending cleanup finalization timing. +- `decision_wait_after_root_received` (default = 0s): Additional root-span-based acceleration for timer handling. When `sampling_strategy` is `trace-complete`, this can make decisions earlier. When `sampling_strategy` is `span-ingest`, this can finalize pending traces earlier on cleanup. `0s` disables it. - `num_traces` (default = 50000): Number of traces kept in memory. - `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures) - `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches @@ -68,6 +69,20 @@ The following configuration options can also be modified: already ingested. - `maximum_trace_size_bytes`: The maximum size a trace can reach in bytes, traces larger than this size will be immediately dropped from the tail sampling processor in order to protect the system. +## Sampling Strategies + +The `sampling_strategy` setting controls both decision timing and what data evaluators use: + +- `trace-complete` (default): evaluates on the timer path using accumulated trace data (after `decision_wait`, or earlier after root arrival when `decision_wait_after_root_received` is set). This is the most flexible mode for policies, but with later decisions and higher in-memory/storage pressure. +- `span-ingest`: evaluates each incoming batch at ingest time without re-evaluating previously ingested batches. Terminal outcomes (`sampled`/`dropped`) finalize immediately; non-terminal outcomes stay pending and are finalized as `not sampled` during cleanup without policy re-evaluation. + +Quick comparison: + +- Policy compatibility: `trace-complete` supports stateful policies; `span-ingest` rejects them. +- Timer controls: in `trace-complete`, `decision_wait` and `decision_wait_after_root_received` affect decision timing; in `span-ingest`, they affect pending cleanup/finalization timing. +- Late spans: decision caches remain important in both modes for spans that arrive after in-memory trace data is gone. + +## Policy Decision Flow Each policy will result in a decision, and the processor will evaluate them to make a final decision: diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index d065ca79cb107..0a901d5ae002c 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -4,6 +4,7 @@ package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor" import ( + "fmt" "time" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" @@ -53,6 +54,17 @@ const ( TraceFlags PolicyType = "trace_flags" ) +const ( + // samplingStrategyTraceComplete keeps the current tail-sampling behavior: + // accumulate spans and decide on full trace data after decision timing. + samplingStrategyTraceComplete samplingStrategy = "trace-complete" + // samplingStrategySpanIngest evaluates each incoming span batch on ingest. + // Non-terminal outcomes remain pending until cleanup finalization. + samplingStrategySpanIngest samplingStrategy = "span-ingest" +) + +type samplingStrategy string + // sharedPolicyCfg holds the common configuration to all policies that are used in derivative policy configurations // such as the and & composite policies. type sharedPolicyCfg struct { @@ -301,11 +313,13 @@ type DecisionCacheConfig struct { // Config holds the configuration for tail-based sampling. type Config struct { - // DecisionWait is the desired wait time from the arrival of the first span of - // trace until the decision about sampling it or not is evaluated. + // DecisionWait is the time before timer handling for a trace. + // When sampling_strategy is "trace-complete", this controls decision timing. + // When sampling_strategy is "span-ingest", this controls pending cleanup finalization timing. DecisionWait time.Duration `mapstructure:"decision_wait"` - // DecisionWaitAfterRootReceived is the desired wait time from the arrival of the root span of - // trace until the decision about sampling it or not is evaluated. + // DecisionWaitAfterRootReceived adds root-span-based acceleration for timer handling. + // When sampling_strategy is "trace-complete", this can make decisions earlier. + // When sampling_strategy is "span-ingest", this can finalize pending traces earlier on cleanup. DecisionWaitAfterRootReceived time.Duration `mapstructure:"decision_wait_after_root_received"` // NumTraces is the number of traces kept on memory. Typically most of the data // of a trace is released after a sampling decision is taken. @@ -325,6 +339,11 @@ type Config struct { Options []Option `mapstructure:"-"` // Make decision as soon as a policy matches SampleOnFirstMatch bool `mapstructure:"sample_on_first_match"` + // SamplingStrategy controls how/when sampling decisions are made. + // "trace-complete" (default) evaluates accumulated trace data on timer handling. + // "span-ingest" evaluates each incoming batch on ingest; terminal outcomes + // finalize immediately, and non-terminal traces are finalized on cleanup. + SamplingStrategy samplingStrategy `mapstructure:"sampling_strategy"` // DropPendingTracesOnShutdown will drop all traces that are part of batches that have not yet reached the decision // wait when the processor is shutdown. DropPendingTracesOnShutdown bool `mapstructure:"drop_pending_traces_on_shutdown"` @@ -333,3 +352,17 @@ type Config struct { // A 0 value disables dropping large traces early. MaximumTraceSizeBytes uint64 `mapstructure:"maximum_trace_size_bytes"` } + +func (cfg *Config) Validate() error { + switch cfg.SamplingStrategy { + case samplingStrategyTraceComplete, samplingStrategySpanIngest: + return nil + default: + return fmt.Errorf( + "invalid sampling_strategy %q, expected one of %q or %q", + cfg.SamplingStrategy, + samplingStrategyTraceComplete, + samplingStrategySpanIngest, + ) + } +} diff --git a/processor/tailsamplingprocessor/config.schema.yaml b/processor/tailsamplingprocessor/config.schema.yaml index 5830c158e777c..07c292786f935 100644 --- a/processor/tailsamplingprocessor/config.schema.yaml +++ b/processor/tailsamplingprocessor/config.schema.yaml @@ -426,11 +426,11 @@ properties: description: DecisionCache holds configuration for the decision cache(s) $ref: decision_cache_config decision_wait: - description: DecisionWait is the desired wait time from the arrival of the first span of trace until the decision about sampling it or not is evaluated. + description: DecisionWait is the time before timer handling for a trace. When sampling_strategy is "trace-complete", this controls decision timing. When sampling_strategy is "span-ingest", this controls pending cleanup finalization timing. type: string format: duration decision_wait_after_root_received: - description: DecisionWaitAfterRootReceived is the desired wait time from the arrival of the root span of trace until the decision about sampling it or not is evaluated. + description: DecisionWaitAfterRootReceived adds root-span-based acceleration for timer handling. When sampling_strategy is "trace-complete", this can make decisions earlier. When sampling_strategy is "span-ingest", this can finalize pending traces earlier on cleanup. type: string format: duration drop_pending_traces_on_shutdown: @@ -456,3 +456,6 @@ properties: sample_on_first_match: description: Make decision as soon as a policy matches type: boolean + sampling_strategy: + description: SamplingStrategy controls how/when sampling decisions are made. "trace-complete" (default) evaluates accumulated trace data on timer handling. "span-ingest" evaluates each incoming batch on ingest; terminal outcomes finalize immediately, and non-terminal traces are finalized on cleanup. + type: string diff --git a/processor/tailsamplingprocessor/config_test.go b/processor/tailsamplingprocessor/config_test.go index 11e9f8ca09327..5352bbd7171ca 100644 --- a/processor/tailsamplingprocessor/config_test.go +++ b/processor/tailsamplingprocessor/config_test.go @@ -35,6 +35,7 @@ func TestLoadConfig(t *testing.T) { DecisionWait: 10 * time.Second, NumTraces: 100, ExpectedNewTracesPerSec: 10, + SamplingStrategy: samplingStrategyTraceComplete, DecisionCache: DecisionCacheConfig{SampledCacheSize: 1_000, NonSampledCacheSize: 10_000}, PolicyCfgs: []PolicyCfg{ { diff --git a/processor/tailsamplingprocessor/factory.go b/processor/tailsamplingprocessor/factory.go index fad2c168e71a7..73029daea5cb7 100644 --- a/processor/tailsamplingprocessor/factory.go +++ b/processor/tailsamplingprocessor/factory.go @@ -30,6 +30,7 @@ func createDefaultConfig() component.Config { DecisionWait: 30 * time.Second, NumTraces: 50000, SampleOnFirstMatch: false, + SamplingStrategy: samplingStrategyTraceComplete, } } diff --git a/processor/tailsamplingprocessor/factory_test.go b/processor/tailsamplingprocessor/factory_test.go index 3921e7b85f38b..4e2450fd0966e 100644 --- a/processor/tailsamplingprocessor/factory_test.go +++ b/processor/tailsamplingprocessor/factory_test.go @@ -45,3 +45,63 @@ func TestCreateProcessor(t *testing.T) { assert.NoError(t, tp.Start(t.Context(), componenttest.NewNopHost())) assert.NoError(t, tp.Shutdown(t.Context())) } + +func TestCreateProcessorRejectsInvalidSamplingStrategy(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SamplingStrategy = "invalid" + + params := processortest.NewNopSettings(metadata.Type) + tp, err := factory.CreateTraces(t.Context(), params, cfg, consumertest.NewNop()) + assert.Nil(t, tp) + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid sampling_strategy") +} + +func TestCreateProcessorAllowsSpanIngest(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SamplingStrategy = samplingStrategySpanIngest + cfg.PolicyCfgs = []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "policy", + Type: Probabilistic, + ProbabilisticCfg: ProbabilisticCfg{ + SamplingPercentage: 1, + }, + }, + }, + } + + params := processortest.NewNopSettings(metadata.Type) + tp, err := factory.CreateTraces(t.Context(), params, cfg, consumertest.NewNop()) + assert.NotNil(t, tp) + assert.NoError(t, err) +} + +func TestCreateProcessorRejectsStatefulPolicyForSpanIngest(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SamplingStrategy = samplingStrategySpanIngest + cfg.PolicyCfgs = []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "stateful-policy", + Type: RateLimiting, + RateLimitingCfg: RateLimitingCfg{ + SpansPerSecond: 10, + }, + }, + }, + } + + params := processortest.NewNopSettings(metadata.Type) + tp, err := factory.CreateTraces(t.Context(), params, cfg, consumertest.NewNop()) + require.NoError(t, err) + require.NotNil(t, tp) + err = tp.Start(t.Context(), componenttest.NewNopHost()) + require.Error(t, err) + assert.Contains(t, err.Error(), "requires all policies to be stateless") + assert.Contains(t, err.Error(), "stateful-policy") +} diff --git a/processor/tailsamplingprocessor/fuzz_test.go b/processor/tailsamplingprocessor/fuzz_test.go index 7c515d652e5ed..f9164ed637a3a 100644 --- a/processor/tailsamplingprocessor/fuzz_test.go +++ b/processor/tailsamplingprocessor/fuzz_test.go @@ -22,7 +22,9 @@ func FuzzConsumeTraces(f *testing.F) { } sink := new(consumertest.TracesSink) set := processortest.NewNopSettings(metadata.Type) - cfg := &Config{} + cfg := &Config{ + SamplingStrategy: samplingStrategyTraceComplete, + } tsp, err := newTracesProcessor(t.Context(), set, sink, *cfg) if err != nil { t.Fatal(err) diff --git a/processor/tailsamplingprocessor/internal/sampling/always_sample.go b/processor/tailsamplingprocessor/internal/sampling/always_sample.go index c3fa4ec793cba..ecda5edbbca45 100644 --- a/processor/tailsamplingprocessor/internal/sampling/always_sample.go +++ b/processor/tailsamplingprocessor/internal/sampling/always_sample.go @@ -31,3 +31,7 @@ func (as *alwaysSample) Evaluate(context.Context, pcommon.TraceID, *samplingpoli as.logger.Debug("Evaluating spans in always-sample filter") return samplingpolicy.Sampled, nil } + +func (*alwaysSample) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/and.go b/processor/tailsamplingprocessor/internal/sampling/and.go index 0c007b3f26305..4ab0d35affa5c 100644 --- a/processor/tailsamplingprocessor/internal/sampling/and.go +++ b/processor/tailsamplingprocessor/internal/sampling/and.go @@ -44,3 +44,12 @@ func (c *And) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *samp } return samplingpolicy.Sampled, nil } + +func (c *And) IsStateful() bool { + for _, sub := range c.subpolicies { + if sub.IsStateful() { + return true + } + } + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/and_test.go b/processor/tailsamplingprocessor/internal/sampling/and_test.go index 735faef7c8533..1426f9267b9f2 100644 --- a/processor/tailsamplingprocessor/internal/sampling/and_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/and_test.go @@ -117,3 +117,11 @@ func TestAndEvaluatorStringInvertNotSampled(t *testing.T) { require.NoError(t, err, "Failed to evaluate and policy: %v", err) assert.Equal(t, samplingpolicy.NotSampled, decision) } + +func TestAndIsStatefulIfAnySubpolicyIsStateful(t *testing.T) { + stateless := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) + stateful := NewRateLimiting(componenttest.NewNopTelemetrySettings(), 10) + + and := NewAnd(zap.NewNop(), []samplingpolicy.Evaluator{stateless, stateful}) + assert.True(t, and.IsStateful()) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go index b5dfa8cb66eee..430c4d39e2b5b 100644 --- a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter.go @@ -74,3 +74,7 @@ func (baf *booleanAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID return false }), nil } + +func (*booleanAttributeFilter) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go b/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go index 515428c1ef235..b0f262c2803f5 100644 --- a/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go +++ b/processor/tailsamplingprocessor/internal/sampling/bytes_limiting.go @@ -57,6 +57,10 @@ func (b *bytesLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *sa return samplingpolicy.NotSampled, nil } +func (*bytesLimiting) IsStateful() bool { + return true +} + // calculateTraceSize calculates the accurate protobuf marshaled size of a trace in bytes // using the OpenTelemetry Collector's built-in ProtoMarshaler.TracesSize() method func calculateTraceSize(trace *samplingpolicy.TraceData) int64 { diff --git a/processor/tailsamplingprocessor/internal/sampling/composite.go b/processor/tailsamplingprocessor/internal/sampling/composite.go index 0d29c8c6b4b8c..454427a3c0785 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite.go @@ -61,7 +61,6 @@ func NewComposite( recordSubPolicy bool, ) samplingpolicy.Evaluator { var subpolicies []*subpolicy - for i := range subPolicyParams { sub := &subpolicy{} sub.evaluator = subPolicyParams[i].Evaluator @@ -135,3 +134,12 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace return samplingpolicy.NotSampled, nil } + +func (c *Composite) IsStateful() bool { + for _, sub := range c.subpolicies { + if sub.evaluator.IsStateful() { + return true + } + } + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/composite_test.go b/processor/tailsamplingprocessor/internal/sampling/composite_test.go index e62ca0dbcf632..31426056718b3 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite_test.go @@ -314,3 +314,13 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { assert.Equal(t, expected, decision) } } + +func TestCompositeIsStatefulIfAnySubpolicyIsStateful(t *testing.T) { + stateless := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) + stateful := NewRateLimiting(componenttest.NewNopTelemetrySettings(), 10) + c := NewComposite(zap.NewNop(), 100, []SubPolicyEvalParams{ + {Evaluator: stateless, MaxSpansPerSecond: 50, Name: "stateless"}, + {Evaluator: stateful, MaxSpansPerSecond: 50, Name: "stateful"}, + }, FakeTimeProvider{}, false) + assert.True(t, c.IsStateful()) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/drop.go b/processor/tailsamplingprocessor/internal/sampling/drop.go index c1ad8aef5b31e..70f86db8c4020 100644 --- a/processor/tailsamplingprocessor/internal/sampling/drop.go +++ b/processor/tailsamplingprocessor/internal/sampling/drop.go @@ -45,3 +45,12 @@ func (c *Drop) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *sam } return samplingpolicy.Dropped, nil } + +func (c *Drop) IsStateful() bool { + for _, sub := range c.subpolicies { + if sub.IsStateful() { + return true + } + } + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/drop_test.go b/processor/tailsamplingprocessor/internal/sampling/drop_test.go index fdf90ba72739e..0cdc1e34f0e4e 100644 --- a/processor/tailsamplingprocessor/internal/sampling/drop_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/drop_test.go @@ -117,3 +117,11 @@ func TestDropEvaluatorStringInvertNotMatch(t *testing.T) { require.NoError(t, err, "Failed to evaluate and policy: %v", err) assert.Equal(t, samplingpolicy.NotSampled, decision) } + +func TestDropIsStatefulIfAnySubpolicyIsStateful(t *testing.T) { + stateless := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) + stateful := NewRateLimiting(componenttest.NewNopTelemetrySettings(), 10) + + drop := NewDrop(zap.NewNop(), []samplingpolicy.Evaluator{stateless, stateful}) + assert.True(t, drop.IsStateful()) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/latency.go b/processor/tailsamplingprocessor/internal/sampling/latency.go index e6e305dc79c7a..73a3af3298894 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency.go @@ -55,3 +55,7 @@ func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *samp return (l.thresholdMs < duration.Milliseconds() && duration.Milliseconds() <= l.upperThresholdMs) }), nil } + +func (*latency) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/not.go b/processor/tailsamplingprocessor/internal/sampling/not.go index 25475cf75604c..6c57ef79abc93 100644 --- a/processor/tailsamplingprocessor/internal/sampling/not.go +++ b/processor/tailsamplingprocessor/internal/sampling/not.go @@ -47,3 +47,7 @@ func (n *not) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *samp return decision, nil } } + +func (n *not) IsStateful() bool { + return n.subPolicyEvaluator.IsStateful() +} diff --git a/processor/tailsamplingprocessor/internal/sampling/not_test.go b/processor/tailsamplingprocessor/internal/sampling/not_test.go index 5a7be4c18813a..c12203b0cde80 100644 --- a/processor/tailsamplingprocessor/internal/sampling/not_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/not_test.go @@ -18,12 +18,17 @@ import ( type mockEvaluator struct { decision samplingpolicy.Decision err error + stateful bool } func (m *mockEvaluator) Evaluate(_ context.Context, _ pcommon.TraceID, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { return m.decision, m.err } +func (m *mockEvaluator) IsStateful() bool { + return m.stateful +} + func TestNotSamplingPolicy_Evaluate_Sampled(t *testing.T) { logger := zap.NewNop() mockSubPolicy := &mockEvaluator{decision: samplingpolicy.Sampled} @@ -86,3 +91,10 @@ func TestNotSamplingPolicy_Evaluate_Err_Not_Nil(t *testing.T) { assert.Equal(t, expectedError, err) assert.Equal(t, samplingpolicy.Error, decision) } + +func TestNotIsStatefulFollowsSubpolicy(t *testing.T) { + logger := zap.NewNop() + mockSubPolicy := &mockEvaluator{stateful: true} + notPolicy := NewNot(logger, mockSubPolicy) + assert.True(t, notPolicy.IsStateful()) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go index 134a995cb4d5f..a0f37ff9c4df2 100644 --- a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter.go @@ -101,3 +101,7 @@ func (naf *numericAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID }, ), nil } + +func (*numericAttributeFilter) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/ottl.go b/processor/tailsamplingprocessor/internal/sampling/ottl.go index c45213573f891..1d5431e8fcfa5 100644 --- a/processor/tailsamplingprocessor/internal/sampling/ottl.go +++ b/processor/tailsamplingprocessor/internal/sampling/ottl.go @@ -117,3 +117,7 @@ func (ocf *ottlConditionFilter) Evaluate(ctx context.Context, traceID pcommon.Tr } return samplingpolicy.NotSampled, nil } + +func (*ottlConditionFilter) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go index 5557b5b79201c..0ad31ad41e1d8 100644 --- a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go +++ b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go @@ -54,6 +54,10 @@ func (s *probabilisticSampler) Evaluate(_ context.Context, traceID pcommon.Trace return samplingpolicy.NotSampled, nil } +func (*probabilisticSampler) IsStateful() bool { + return false +} + // calculateThreshold converts a ratio into a value between 0 and MaxUint64 func calculateThreshold(ratio float64) uint64 { // Use big.Float and big.Int to calculate threshold because directly convert diff --git a/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go b/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go index dd108db475f9c..09c4ad52aa536 100644 --- a/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go +++ b/processor/tailsamplingprocessor/internal/sampling/rate_limiting.go @@ -48,3 +48,7 @@ func (r *rateLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *sam return samplingpolicy.NotSampled, nil } + +func (*rateLimiting) IsStateful() bool { + return true +} diff --git a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go index cdaed349f1947..72af59a4666d7 100644 --- a/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go +++ b/processor/tailsamplingprocessor/internal/sampling/span_count_sampler.go @@ -44,3 +44,7 @@ func (c *spanCount) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *sa return samplingpolicy.NotSampled, nil } } + +func (*spanCount) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/status_code.go b/processor/tailsamplingprocessor/internal/sampling/status_code.go index 200d6b23ee206..cf00725925b59 100644 --- a/processor/tailsamplingprocessor/internal/sampling/status_code.go +++ b/processor/tailsamplingprocessor/internal/sampling/status_code.go @@ -62,3 +62,7 @@ func (r *statusCodeFilter) Evaluate(_ context.Context, _ pcommon.TraceID, trace return slices.Contains(r.statusCodes, span.Status().Code()) }), nil } + +func (*statusCodeFilter) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go index 6e5d6dca09372..a63ac95334d64 100644 --- a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter.go @@ -151,6 +151,10 @@ func (saf *stringAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID, ), nil } +func (*stringAttributeFilter) IsStateful() bool { + return false +} + // addFilters compiles all the given filters and stores them as regexes. // All regexes are automatically anchored to enforce full string matches. func addFilters(exprs []string) ([]*regexp.Regexp, error) { diff --git a/processor/tailsamplingprocessor/internal/sampling/trace_flags.go b/processor/tailsamplingprocessor/internal/sampling/trace_flags.go index 5b0363b124933..e979341f22185 100644 --- a/processor/tailsamplingprocessor/internal/sampling/trace_flags.go +++ b/processor/tailsamplingprocessor/internal/sampling/trace_flags.go @@ -38,3 +38,7 @@ func (tf *traceFlags) Evaluate(_ context.Context, _ pcommon.TraceID, td *samplin return (byte(span.Flags()) & byte(trace.FlagsSampled)) != 0 }), nil } + +func (*traceFlags) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go index 1bcc73f658883..00262a3e9d220 100644 --- a/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go +++ b/processor/tailsamplingprocessor/internal/sampling/trace_state_filter.go @@ -59,3 +59,7 @@ func (tsf *traceStateFilter) Evaluate(_ context.Context, _ pcommon.TraceID, trac return false }), nil } + +func (*traceStateFilter) IsStateful() bool { + return false +} diff --git a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go index 74b4c56f18bcf..4bb83cd400af0 100644 --- a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go +++ b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go @@ -56,6 +56,8 @@ const ( type Evaluator interface { // Evaluate looks at the trace data and returns a corresponding SamplingDecision. Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) + // IsStateful reports whether decisions can depend on prior evaluations/state. + IsStateful() bool } type Extension interface { diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index d45da34f804e6..44e5d5d93a09a 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -94,6 +94,10 @@ type tailSamplingSpanProcessor struct { } func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config) (processor.Traces, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + telemetrySettings := set.TelemetrySettings telemetry, err := metadata.NewTelemetryBuilder(telemetrySettings) if err != nil { @@ -241,6 +245,9 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, if err != nil { return nil, fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err) } + if tsp.cfg.SamplingStrategy == samplingStrategySpanIngest && eval.IsStateful() { + return nil, fmt.Errorf("sampling_strategy %q requires all policies to be stateless, but policy %q is stateful", tsp.cfg.SamplingStrategy, cfg.Name) + } uniquePolicyName := cfg.Name if componentID != "" { @@ -427,7 +434,7 @@ type policyDecisionMetrics struct { spansSampled int64 } -type policyTickMetrics struct { +type policyEvaluationMetrics struct { idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled, decisionDropped int64 tracesSampledByPolicyDecision []map[samplingpolicy.Decision]policyDecisionMetrics cumulativeExecutionTime []perPolicyExecutionTime @@ -441,31 +448,55 @@ type perPolicyExecutionTime struct { executionCount int64 } -func newPolicyTickMetrics(numPolicies int) *policyTickMetrics { +func newPolicyEvaluationMetrics(numPolicies int) *policyEvaluationMetrics { tracesSampledByPolicyDecision := make([]map[samplingpolicy.Decision]policyDecisionMetrics, numPolicies) for i := range tracesSampledByPolicyDecision { tracesSampledByPolicyDecision[i] = make(map[samplingpolicy.Decision]policyDecisionMetrics) } - return &policyTickMetrics{ + return &policyEvaluationMetrics{ tracesSampledByPolicyDecision: tracesSampledByPolicyDecision, cumulativeExecutionTime: make([]perPolicyExecutionTime, numPolicies), } } -func (m *policyTickMetrics) addDecision(policyIndex int, decision samplingpolicy.Decision, spansSampled int64) { +func (m *policyEvaluationMetrics) addDecision(policyIndex int, decision samplingpolicy.Decision, spansSampled int64) { stats := m.tracesSampledByPolicyDecision[policyIndex][decision] stats.tracesSampled++ stats.spansSampled += spansSampled m.tracesSampledByPolicyDecision[policyIndex][decision] = stats } -func (m *policyTickMetrics) addDecisionTime(policyIndex int, decisionTime time.Duration) { +func (m *policyEvaluationMetrics) addDecisionTime(policyIndex int, decisionTime time.Duration) { perPolicyExecutionTime := m.cumulativeExecutionTime[policyIndex] perPolicyExecutionTime.executionTime += decisionTime perPolicyExecutionTime.executionCount++ m.cumulativeExecutionTime[policyIndex] = perPolicyExecutionTime } +func (tsp *tailSamplingSpanProcessor) recordPerPolicyEvaluationMetrics(metrics *policyEvaluationMetrics) { + for i, p := range tsp.policies { + for decision, stats := range metrics.tracesSampledByPolicyDecision[i] { + tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(tsp.ctx, int64(stats.tracesSampled), p.attribute, decisionToAttributes[decision]) + if telemetry.IsMetricStatCountSpansSampledEnabled() { + tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(tsp.ctx, stats.spansSampled, p.attribute, decisionToAttributes[decision]) + } + } + tsp.telemetry.ProcessorTailSamplingSamplingPolicyExecutionTimeSum.Add(tsp.ctx, metrics.cumulativeExecutionTime[i].executionTime.Microseconds(), p.attribute) + tsp.telemetry.ProcessorTailSamplingSamplingPolicyExecutionCount.Add(tsp.ctx, metrics.cumulativeExecutionTime[i].executionCount, p.attribute) + } +} + +func (tsp *tailSamplingSpanProcessor) recordImmediateDecisionMetrics(decision samplingpolicy.Decision, metrics *policyEvaluationMetrics, evaluationLatency time.Duration) { + tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, evaluationLatency.Milliseconds()) + tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount) + + if attrs, ok := decisionToAttributes[decision]; ok { + tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, attrs) + } + + tsp.recordPerPolicyEvaluationMetrics(metrics) +} + func (tsp *tailSamplingSpanProcessor) loop() { ticker := time.NewTicker(tsp.tickerFrequency) defer ticker.Stop() @@ -582,7 +613,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { tsp.logger.Debug("Sampling Policy Evaluation ticked") ctx := context.Background() - metrics := newPolicyTickMetrics(len(tsp.policies)) + metrics := newPolicyEvaluationMetrics(len(tsp.policies)) startTime := time.Now() globalTracesSampledByDecision := make(map[samplingpolicy.Decision]int64) @@ -602,6 +633,18 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { continue } + // In span-ingest mode, tick is a cleanup path only. Finalize any + // still-pending trace as implicit not sampled without policy evaluation. + if tsp.cfg.SamplingStrategy == samplingStrategySpanIngest { + trace.decisionTime = time.Now() + trace.FinalDecision = samplingpolicy.NotSampled + globalTracesSampledByDecision[samplingpolicy.NotSampled]++ + metrics.decisionNotSampled++ + tsp.releaseNotSampledTrace(id, trace) + trace.ReceivedBatches = ptrace.NewTraces() + continue + } + trace.decisionTime = time.Now() decision, policyName := tsp.makeDecision(id, &trace.TraceData, metrics) @@ -628,17 +671,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { for decision, count := range globalTracesSampledByDecision { tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, count, decisionToAttributes[decision]) } - - for i, p := range tsp.policies { - for decision, stats := range metrics.tracesSampledByPolicyDecision[i] { - tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(tsp.ctx, int64(stats.tracesSampled), p.attribute, decisionToAttributes[decision]) - if telemetry.IsMetricStatCountSpansSampledEnabled() { - tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(tsp.ctx, stats.spansSampled, p.attribute, decisionToAttributes[decision]) - } - } - tsp.telemetry.ProcessorTailSamplingSamplingPolicyExecutionTimeSum.Add(tsp.ctx, metrics.cumulativeExecutionTime[i].executionTime.Microseconds(), p.attribute) - tsp.telemetry.ProcessorTailSamplingSamplingPolicyExecutionCount.Add(tsp.ctx, metrics.cumulativeExecutionTime[i].executionCount, p.attribute) - } + tsp.recordPerPolicyEvaluationMetrics(metrics) tsp.logger.Debug("Sampling policy evaluation completed", zap.Int("batch.len", batchLen), @@ -651,7 +684,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { return hasMore } -func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *samplingpolicy.TraceData, metrics *policyTickMetrics) (samplingpolicy.Decision, string) { +func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *samplingpolicy.TraceData, metrics *policyEvaluationMetrics) (samplingpolicy.Decision, string) { finalDecision := samplingpolicy.NotSampled samplingDecisions := map[samplingpolicy.Decision]*policy{ samplingpolicy.Error: nil, @@ -733,6 +766,38 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa return finalDecision, getPolicyName(sampledPolicy) } +// makeDecisionOnSpanIngest is used by span-ingest mode. It only returns +// terminal decisions at ingest time. All other outcomes remain pending. +func (tsp *tailSamplingSpanProcessor) makeDecisionOnSpanIngest(id pcommon.TraceID, trace *samplingpolicy.TraceData, metrics *policyEvaluationMetrics) (samplingpolicy.Decision, string) { + ctx := context.Background() + for i, p := range tsp.policies { + startTime := time.Now() + decision, err := p.evaluator.Evaluate(ctx, id, trace) + metrics.addDecisionTime(i, time.Since(startTime)) + + if err != nil { + metrics.evaluateErrorCount++ + tsp.logger.Debug("Sampling policy error", zap.Error(err)) + continue + } + + metrics.addDecision(i, decision, trace.SpanCount) + + if decision == samplingpolicy.Dropped { + metrics.decisionDropped++ + return samplingpolicy.Dropped, p.name + } + if decision == samplingpolicy.Sampled { + metrics.decisionSampled++ + if tsp.recordPolicy { + sampling.SetAttrOnScopeSpans(trace.ReceivedBatches, "tailsampling.policy", p.name) + } + return samplingpolicy.Sampled, p.name + } + } + return samplingpolicy.Pending, "" +} + func groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceID][]spanAndScope { idToSpans := make(map[pcommon.TraceID][]spanAndScope) ilss := resourceSpans.ScopeSpans() @@ -804,6 +869,36 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac } if finalDecision == samplingpolicy.Unspecified { + if tsp.cfg.SamplingStrategy == samplingStrategySpanIngest { + metrics := newPolicyEvaluationMetrics(len(tsp.policies)) + evaluationStart := time.Now() + actualData.decisionTime = evaluationStart + // Build an isolated one-batch trace view for ingest-time evaluation + // using moves to avoid deep-copying span data. + spanIngestTraceData := samplingpolicy.TraceData{ + SpanCount: actualData.SpanCount, + ReceivedBatches: ptrace.NewTraces(), + } + appendToTraces(spanIngestTraceData.ReceivedBatches, rss) + decision, policyName := tsp.makeDecisionOnSpanIngest(id, &spanIngestTraceData, metrics) + tsp.recordImmediateDecisionMetrics(decision, metrics, time.Since(evaluationStart)) + + // Store current batch after evaluation to avoid re-evaluating prior spans + // while still releasing full accumulated trace data on terminal outcomes. + appendToTraces(actualData.ReceivedBatches, spanIngestTraceData.ReceivedBatches.ResourceSpans().At(0)) + + if decision == samplingpolicy.Sampled || decision == samplingpolicy.Dropped { + actualData.FinalDecision = decision + actualData.PolicyName = policyName + if decision == samplingpolicy.Sampled { + tsp.releaseSampledTrace(tsp.ctx, id, actualData) + } else { + tsp.releaseNotSampledTrace(id, actualData) + } + } + return + } + // If the final decision hasn't been made, add the new spans to the // existing trace. appendToTraces(actualData.ReceivedBatches, rss) diff --git a/processor/tailsamplingprocessor/processor_benchmarks_test.go b/processor/tailsamplingprocessor/processor_benchmarks_test.go index 19ff1063de5b7..89d88fc9a4efe 100644 --- a/processor/tailsamplingprocessor/processor_benchmarks_test.go +++ b/processor/tailsamplingprocessor/processor_benchmarks_test.go @@ -23,6 +23,7 @@ import ( func BenchmarkSampling(b *testing.B) { traceIDs, batches := generateIDsAndBatches(128) cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -34,7 +35,7 @@ func BenchmarkSampling(b *testing.B) { defer func() { require.NoError(b, tsp.Shutdown(b.Context())) }() - metrics := newPolicyTickMetrics(len(cfg.PolicyCfgs)) + metrics := newPolicyEvaluationMetrics(len(cfg.PolicyCfgs)) sampleBatches := make([]*samplingpolicy.TraceData, 0, len(batches)) for _, batch := range batches { @@ -53,8 +54,9 @@ func BenchmarkSampling(b *testing.B) { func BenchmarkProcessorThroughput(b *testing.B) { cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: 1024, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: 1024, // Create a handful of reasonable policies to not only test batching. PolicyCfgs: []PolicyCfg{ {sharedPolicyCfg: sharedPolicyCfg{Name: "always-sample", Type: AlwaysSample}}, diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index 25e9a979a4153..db3abc1996ecc 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -4,7 +4,10 @@ package tailsamplingprocessor import ( + "context" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -22,6 +25,43 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" ) +type batchSizeTrackingEvaluator struct { + mu sync.Mutex + decisions []samplingpolicy.Decision + evaluationBatches []int +} + +func (e *batchSizeTrackingEvaluator) Evaluate(_ context.Context, _ pcommon.TraceID, trace *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { + e.mu.Lock() + defer e.mu.Unlock() + + e.evaluationBatches = append(e.evaluationBatches, trace.ReceivedBatches.SpanCount()) + if len(e.decisions) == 0 { + return samplingpolicy.NotSampled, nil + } + decision := e.decisions[0] + e.decisions = e.decisions[1:] + return decision, nil +} + +func (*batchSizeTrackingEvaluator) IsStateful() bool { + return false +} + +func (e *batchSizeTrackingEvaluator) EvaluationCount() int { + e.mu.Lock() + defer e.mu.Unlock() + return len(e.evaluationBatches) +} + +func (e *batchSizeTrackingEvaluator) EvaluationBatches() []int { + e.mu.Lock() + defer e.mu.Unlock() + batches := make([]int, len(e.evaluationBatches)) + copy(batches, e.evaluationBatches) + return batches +} + func TestSamplingPolicyTypicalPath(t *testing.T) { nextConsumer := new(consumertest.TracesSink) @@ -33,8 +73,9 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -48,20 +89,20 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { require.NoError(t, p.Shutdown(t.Context())) }() - mpe1.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) // The final decision SHOULD be Sampled. require.Equal(t, 1, nextConsumer.SpanCount()) @@ -78,8 +119,9 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -94,20 +136,20 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { }() //nolint:staticcheck // SA1019: Use of inverted decisions until they are fully removed. - mpe1.NextDecision = samplingpolicy.InvertSampled + mpe1.SetDecision(samplingpolicy.InvertSampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) // The final decision SHOULD be Sampled. require.Equal(t, 1, nextConsumer.SpanCount()) @@ -126,8 +168,9 @@ func TestSamplingMultiplePolicies(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -142,23 +185,23 @@ func TestSamplingMultiplePolicies(t *testing.T) { }() // InvertNotSampled takes precedence - mpe1.NextDecision = samplingpolicy.Sampled - mpe2.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.Sampled) + mpe2.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) - require.Equal(t, 0, mpe2.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) + require.Equal(t, 0, mpe2.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) - require.Equal(t, 1, mpe2.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) + require.Equal(t, 1, mpe2.EvaluationCount()) // The final decision SHOULD be Sampled. require.Equal(t, 1, nextConsumer.SpanCount()) @@ -179,9 +222,10 @@ func TestSamplingMultiplePolicies_WithRecordPolicy(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, - Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, } p, err := newTracesProcessor(t.Context(), ct, nextConsumer, cfg) @@ -193,8 +237,8 @@ func TestSamplingMultiplePolicies_WithRecordPolicy(t *testing.T) { }() // First policy takes precedence - mpe1.NextDecision = samplingpolicy.Sampled - mpe2.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.Sampled) + mpe2.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) @@ -226,8 +270,9 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -242,20 +287,20 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { }() // InvertNotSampled takes precedence - mpe1.NextDecision = samplingpolicy.NotSampled + mpe1.SetDecision(samplingpolicy.NotSampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) // The final decision SHOULD be NotSampled. require.Equal(t, 0, nextConsumer.SpanCount()) @@ -274,9 +319,10 @@ func TestSamplingPolicyDecisionNotSampled_WithRecordPolicy(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, - Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, } p, err := newTracesProcessor(t.Context(), ct, nextConsumer, cfg) @@ -288,7 +334,7 @@ func TestSamplingPolicyDecisionNotSampled_WithRecordPolicy(t *testing.T) { }() // InvertNotSampled takes precedence - mpe1.NextDecision = samplingpolicy.NotSampled + mpe1.SetDecision(samplingpolicy.NotSampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) @@ -315,8 +361,9 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -332,23 +379,23 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { // InvertNotSampled takes precedence //nolint:staticcheck // SA1019: Use of inverted decisions until they are fully removed. - mpe1.NextDecision = samplingpolicy.InvertNotSampled - mpe2.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.InvertNotSampled) + mpe2.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) - require.Equal(t, 0, mpe2.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) + require.Equal(t, 0, mpe2.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) - require.Equal(t, 1, mpe2.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) + require.Equal(t, 1, mpe2.EvaluationCount()) // The final decision SHOULD be NotSampled. require.Equal(t, 0, nextConsumer.SpanCount()) @@ -369,9 +416,10 @@ func TestSamplingPolicyDecisionInvertNotSampled_WithRecordPolicy(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, - Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + Options: []Option{withTestController(controller), withPolicies(policies), withRecordPolicy()}, } p, err := newTracesProcessor(t.Context(), ct, nextConsumer, cfg) @@ -384,8 +432,8 @@ func TestSamplingPolicyDecisionInvertNotSampled_WithRecordPolicy(t *testing.T) { // InvertNotSampled takes precedence //nolint:staticcheck // SA1019: Use of inverted decisions until they are fully removed. - mpe1.NextDecision = samplingpolicy.InvertNotSampled - mpe2.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.InvertNotSampled) + mpe2.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) @@ -412,8 +460,9 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -432,8 +481,8 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { // The combined decision from the policies is NotSampled //nolint:staticcheck // SA1019: Use of inverted decisions until they are fully removed. - mpe1.NextDecision = samplingpolicy.InvertSampled - mpe2.NextDecision = samplingpolicy.NotSampled + mpe1.SetDecision(samplingpolicy.InvertSampled) + mpe2.SetDecision(samplingpolicy.NotSampled) // A function that return a ptrace.Traces containing a single span for the single trace we are using. spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { @@ -449,15 +498,15 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) - require.Equal(t, 0, mpe2.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) + require.Equal(t, 0, mpe2.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - require.Equal(t, 1, mpe1.EvaluationCount) - require.Equal(t, 1, mpe2.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) + require.Equal(t, 1, mpe2.EvaluationCount()) // The final decision SHOULD be NotSampled. require.Equal(t, 0, nextConsumer.SpanCount()) @@ -465,8 +514,8 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. require.NoError(t, p.ConsumeTraces(t.Context(), spanIndexToTraces(2))) - require.Equal(t, 1, mpe1.EvaluationCount) - require.Equal(t, 1, mpe2.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) + require.Equal(t, 1, mpe2.EvaluationCount()) require.Equal(t, 0, nextConsumer.SpanCount(), "original final decision not honored") } @@ -484,8 +533,9 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { require.NoError(t, err) cfg := Config{ - DecisionWait: defaultTestDecisionWait * 10, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait * 10, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -505,7 +555,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { traceID := uInt64ToTraceID(1) // The first span will be sampled, this will later be set to not sampled, but the sampling decision will be cached - mpe.NextDecision = samplingpolicy.Sampled + mpe.SetDecision(samplingpolicy.Sampled) // A function that return a ptrace.Traces containing a single span for the single trace we are using. spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { @@ -521,13 +571,13 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe.EvaluationCount) + require.Equal(t, 0, mpe.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Policy should have been evaluated once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) // The final decision SHOULD be Sampled. require.Equal(t, 1, nextConsumer.SpanCount()) @@ -541,7 +591,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { }(p) // Set next decision to not sampled, ensuring the next decision is determined by the decision cache, not the policy - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetDecision(samplingpolicy.NotSampled) // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. @@ -549,7 +599,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { controller.waitForTick() // Policy should still have been evaluated only once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) require.Equal(t, 2, nextConsumer.SpanCount(), "original final decision not honored") allTraces := nextConsumer.AllTraces() require.Len(t, allTraces, 2) @@ -581,8 +631,9 @@ func TestLateArrivingSpanUsesDecisionCacheWhenDropped(t *testing.T) { require.NoError(t, err) cfg := Config{ - DecisionWait: defaultTestDecisionWait * 10, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait * 10, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -602,7 +653,7 @@ func TestLateArrivingSpanUsesDecisionCacheWhenDropped(t *testing.T) { traceID := uInt64ToTraceID(1) // The first span will be sampled, this will later be set to not sampled, but the sampling decision will be cached - mpe.NextDecision = samplingpolicy.Dropped + mpe.SetDecision(samplingpolicy.Dropped) // A function that return a ptrace.Traces containing a single span for the single trace we are using. spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { @@ -618,13 +669,13 @@ func TestLateArrivingSpanUsesDecisionCacheWhenDropped(t *testing.T) { // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe.EvaluationCount) + require.Equal(t, 0, mpe.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Policy should have been evaluated once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) // The final decision SHOULD be Dropped. require.Equal(t, 0, nextConsumer.SpanCount()) @@ -638,7 +689,7 @@ func TestLateArrivingSpanUsesDecisionCacheWhenDropped(t *testing.T) { }(p) // Set next decision to not sampled, ensuring the next decision is determined by the decision cache, not the policy - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetDecision(samplingpolicy.NotSampled) // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. @@ -646,7 +697,7 @@ func TestLateArrivingSpanUsesDecisionCacheWhenDropped(t *testing.T) { controller.waitForTick() // Policy should still have been evaluated only once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) require.Equal(t, 0, nextConsumer.SpanCount(), "original final decision not honored") allTraces := nextConsumer.AllTraces() require.Empty(t, allTraces) @@ -672,8 +723,9 @@ func TestLateArrivingSpanWithoutCacheMetadata(t *testing.T) { require.NoError(t, err) cfg := Config{ - DecisionWait: defaultTestDecisionWait * 10, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait * 10, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -686,7 +738,7 @@ func TestLateArrivingSpanWithoutCacheMetadata(t *testing.T) { traceID := uInt64ToTraceID(1) // The first span will be sampled, this will later be set to not sampled, but the sampling decision will be cached - mpe.NextDecision = samplingpolicy.Sampled + mpe.SetDecision(samplingpolicy.Sampled) // A function that return a ptrace.Traces containing a single span for the single trace we are using. spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { @@ -709,7 +761,7 @@ func TestLateArrivingSpanWithoutCacheMetadata(t *testing.T) { }(p) // Set next decision to not sampled, ensuring the next decision is determined by the decision cache, not the policy - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetDecision(samplingpolicy.NotSampled) // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. @@ -717,7 +769,7 @@ func TestLateArrivingSpanWithoutCacheMetadata(t *testing.T) { controller.waitForTick() // Policy should never have been evaluated - require.Equal(t, 0, mpe.EvaluationCount) + require.Equal(t, 0, mpe.EvaluationCount()) require.Equal(t, 1, nextConsumer.SpanCount(), "original final decision not honored") allTraces := nextConsumer.AllTraces() require.Len(t, allTraces, 1) @@ -744,8 +796,9 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { require.NoError(t, err) cfg := Config{ - DecisionWait: defaultTestDecisionWait * 10, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait * 10, + NumTraces: defaultNumTraces, Options: []Option{ withTestController(controller), withPolicies(policies), @@ -764,7 +817,7 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { traceID := uInt64ToTraceID(1) // The first span will be NOT sampled, this will later be set to sampled, but the sampling decision will be cached - mpe.NextDecision = samplingpolicy.NotSampled + mpe.SetDecision(samplingpolicy.NotSampled) // A function that return a ptrace.Traces containing a single span for the single trace we are using. spanIndexToTraces := func(spanIndex uint64) ptrace.Traces { @@ -780,13 +833,13 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe.EvaluationCount) + require.Equal(t, 0, mpe.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Policy should have been evaluated once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) // The final decision SHOULD be NOT Sampled. require.Equal(t, 0, nextConsumer.SpanCount()) @@ -800,7 +853,7 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { }(p) // Set next decision to sampled, ensuring the next decision is determined by the decision cache, not the policy - mpe.NextDecision = samplingpolicy.Sampled + mpe.SetDecision(samplingpolicy.Sampled) // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. @@ -808,7 +861,7 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { controller.waitForTick() // Policy should still have been evaluated only once - require.Equal(t, 1, mpe.EvaluationCount) + require.Equal(t, 1, mpe.EvaluationCount()) require.Equal(t, 0, nextConsumer.SpanCount(), "original final decision not honored") } @@ -827,6 +880,7 @@ func TestSampleOnFirstMatch(t *testing.T) { } cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: defaultNumTraces, SampleOnFirstMatch: true, @@ -844,35 +898,311 @@ func TestSampleOnFirstMatch(t *testing.T) { }(p) // Second policy matches, last policy should not be evaluated - mpe1.NextDecision = samplingpolicy.NotSampled - mpe2.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.NotSampled) + mpe2.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) - require.Equal(t, 0, mpe2.EvaluationCount) - require.Equal(t, 0, mpe3.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) + require.Equal(t, 0, mpe2.EvaluationCount()) + require.Equal(t, 0, mpe3.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Only the first policy should have been evaluated - require.Equal(t, 1, mpe1.EvaluationCount) - require.Equal(t, 1, mpe2.EvaluationCount) - require.Equal(t, 0, mpe3.EvaluationCount) + require.Equal(t, 1, mpe1.EvaluationCount()) + require.Equal(t, 1, mpe2.EvaluationCount()) + require.Equal(t, 0, mpe3.EvaluationCount()) // The final decision SHOULD be Sampled. require.Equal(t, 1, nextConsumer.SpanCount()) } +func TestSpanIngestEvaluatesOnIngestAndFinalizesOnTerminalDecision(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + traceID := uInt64ToTraceID(50) + rootSpanID := uInt64ToSpanID(1) + + // First ingest path evaluation is non-terminal in span-ingest mode. + mpe.SetDecision(samplingpolicy.NotSampled) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 1 }, time.Second, 10*time.Millisecond) + require.Equal(t, 0, nextConsumer.SpanCount()) + + // Second ingest path evaluation returns terminal Sampled and releases both spans. + mpe.SetDecision(samplingpolicy.Sampled) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) + require.Eventually(t, func() bool { + return mpe.EvaluationCount() == 2 && nextConsumer.SpanCount() == 2 + }, time.Second, 10*time.Millisecond) +} + +func TestSpanIngestFinalizesOnDroppedDecision(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + DecisionCache: DecisionCacheConfig{ + NonSampledCacheSize: 64, + }, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + traceID := uInt64ToTraceID(51) + rootSpanID := uInt64ToSpanID(1) + + // First ingest path evaluation is non-terminal in span-ingest mode. + mpe.SetDecision(samplingpolicy.NotSampled) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 1 }, time.Second, 10*time.Millisecond) + require.Equal(t, 0, nextConsumer.SpanCount()) + + // Second ingest path evaluation returns terminal Dropped and finalizes trace. + mpe.SetDecision(samplingpolicy.Dropped) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) + require.Eventually(t, func() bool { + return mpe.EvaluationCount() == 2 && nextConsumer.SpanCount() == 0 + }, time.Second, 10*time.Millisecond) + + // Late spans should use cached non-sampled decision and skip re-evaluation. + mpe.SetDecision(samplingpolicy.Sampled) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(3), rootSpanID))) + require.Eventually(t, func() bool { + return mpe.EvaluationCount() == 2 && nextConsumer.SpanCount() == 0 + }, time.Second, 10*time.Millisecond) +} + +func TestSpanIngestTickCleansUpPendingWithoutPolicyEvaluation(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + DecisionCache: DecisionCacheConfig{ + NonSampledCacheSize: 64, + }, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + // Non-terminal in span-ingest mode: pending in memory. + mpe.SetDecision(samplingpolicy.NotSampled) + require.NoError(t, p.ConsumeTraces(t.Context(), simpleTraces())) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 1 }, time.Second, 10*time.Millisecond) + require.Equal(t, 0, nextConsumer.SpanCount()) + + // Tick path cleans pending traces without evaluating policies in span-ingest mode. + controller.waitForTick() + controller.waitForTick() + require.Equal(t, 1, mpe.EvaluationCount()) + require.Equal(t, 0, nextConsumer.SpanCount()) + require.Eventually(t, func() bool { + return len(p.(*tailSamplingSpanProcessor).idToTrace) == 0 + }, time.Second, 10*time.Millisecond) +} + +func TestSpanIngestEvaluatesOnlyIncomingBatch(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + eval := &batchSizeTrackingEvaluator{ + decisions: []samplingpolicy.Decision{samplingpolicy.NotSampled, samplingpolicy.Sampled}, + } + + policies := []*policy{ + {name: "tracking-policy", evaluator: eval, attribute: metric.WithAttributes(attribute.String("policy", "tracking-policy"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + traceID := uInt64ToTraceID(55) + rootSpanID := uInt64ToSpanID(1) + + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) + + require.Eventually(t, func() bool { + return eval.EvaluationCount() == 2 + }, time.Second, 10*time.Millisecond) + require.Equal(t, []int{1, 1}, eval.EvaluationBatches()) + require.Eventually(t, func() bool { + return nextConsumer.SpanCount() == 2 + }, time.Second, 10*time.Millisecond) +} + +func TestSpanIngestRootFirstThenChild(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + traceID := uInt64ToTraceID(60) + rootSpanID := uInt64ToSpanID(1) + + // Root first, non-terminal. + mpe.SetDecision(samplingpolicy.NotSampled) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 1 }, time.Second, 10*time.Millisecond) + require.Equal(t, 0, nextConsumer.SpanCount()) + + // Child later, terminal sampled -> both spans released. + mpe.SetDecision(samplingpolicy.Sampled) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) + require.Eventually(t, func() bool { + return mpe.EvaluationCount() == 2 && nextConsumer.SpanCount() == 2 + }, time.Second, 10*time.Millisecond) +} + +func TestSpanIngestChildFirstThenRootPendingCleanup(t *testing.T) { + nextConsumer := new(consumertest.TracesSink) + controller := newTestTSPController() + + mpe := &mockPolicyEvaluator{} + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategySpanIngest, + DecisionCache: DecisionCacheConfig{ + NonSampledCacheSize: 64, + }, + Options: []Option{ + withTestController(controller), + withPolicies(policies), + }, + } + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg) + require.NoError(t, err) + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func(p processor.Traces) { + require.NoError(t, p.Shutdown(t.Context())) + }(p) + + traceID := uInt64ToTraceID(61) + rootSpanID := uInt64ToSpanID(1) + + // Child then root, both non-terminal. + mpe.SetDecision(samplingpolicy.NotSampled) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, uInt64ToSpanID(2), rootSpanID))) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 1 }, time.Second, 10*time.Millisecond) + require.NoError(t, p.ConsumeTraces(t.Context(), singleSpanTrace(traceID, rootSpanID, pcommon.SpanID{}))) + require.Eventually(t, func() bool { return mpe.EvaluationCount() == 2 }, time.Second, 10*time.Millisecond) + require.Equal(t, 0, nextConsumer.SpanCount()) + + // Cleanup tick finalizes pending as not sampled without more evaluations. + controller.waitForTick() + controller.waitForTick() + require.Equal(t, 2, mpe.EvaluationCount()) + require.Equal(t, 0, nextConsumer.SpanCount()) + require.Eventually(t, func() bool { + return len(p.(*tailSamplingSpanProcessor).idToTrace) == 0 + }, time.Second, 10*time.Millisecond) +} + func TestRateLimiter(t *testing.T) { nextConsumer := new(consumertest.TracesSink) controller := newTestTSPController() cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: defaultNumTraces, SampleOnFirstMatch: true, @@ -920,3 +1250,14 @@ func TestRateLimiter(t *testing.T) { require.LessOrEqual(t, len(sampledTraceIDs), 2) require.GreaterOrEqual(t, len(sampledTraceIDs), 1) } + +func singleSpanTrace(traceID pcommon.TraceID, spanID, parentID pcommon.SpanID) ptrace.Traces { + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetTraceID(traceID) + span.SetSpanID(spanID) + if !parentID.IsEmpty() { + span.SetParentSpanID(parentID) + } + return traces +} diff --git a/processor/tailsamplingprocessor/processor_telemetry_test.go b/processor/tailsamplingprocessor/processor_telemetry_test.go index b0b46b6f57f79..fdc1dcf463f0e 100644 --- a/processor/tailsamplingprocessor/processor_telemetry_test.go +++ b/processor/tailsamplingprocessor/processor_telemetry_test.go @@ -6,6 +6,7 @@ package tailsamplingprocessor import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -32,8 +33,9 @@ func TestMetricsAfterOneEvaluation(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 100, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -243,14 +245,100 @@ func TestMetricsAfterOneEvaluation(t *testing.T) { assert.Len(t, cs.AllTraces(), 1) } +func TestMetricsSpanIngestModeRecordIngestDecisionTelemetry(t *testing.T) { + for _, tc := range []struct { + name string + strategy samplingStrategy + }{ + {name: "span-ingest", strategy: samplingStrategySpanIngest}, + } { + t.Run(tc.name, func(t *testing.T) { + s := setupTestTelemetry() + cfg := Config{ + SamplingStrategy: tc.strategy, + DecisionWait: 1, + NumTraces: 100, + PolicyCfgs: []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "always", + Type: AlwaysSample, + }, + }, + }, + } + cs := &consumertest.TracesSink{} + ct := s.newSettings() + proc, err := newTracesProcessor(t.Context(), ct, cs, cfg) + require.NoError(t, err) + require.NoError(t, proc.Start(t.Context(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, proc.Shutdown(t.Context())) + }() + + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetTraceID(uInt64ToTraceID(101)) + span.SetSpanID(uInt64ToSpanID(1)) + require.NoError(t, proc.ConsumeTraces(t.Context(), traces)) + require.Eventually(t, func() bool { + return cs.SpanCount() == 1 + }, time.Second, 10*time.Millisecond) + + var md metricdata.ResourceMetrics + require.NoError(t, s.reader.Collect(t.Context(), &md)) + + metricdatatest.AssertEqual(t, metricdata.Metrics{ + Name: "otelcol_processor_tail_sampling_global_count_traces_sampled", + Description: "Global count of traces that were sampled or not by at least one policy [Development]", + Unit: "{traces}", + Data: metricdata.Sum[int64]{ + IsMonotonic: true, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("sampled", "true"), + attribute.String("decision", "sampled"), + ), + Value: 1, + }, + }, + }, + }, s.getMetric("otelcol_processor_tail_sampling_global_count_traces_sampled", md), metricdatatest.IgnoreTimestamp()) + + metricdatatest.AssertEqual(t, metricdata.Metrics{ + Name: "otelcol_processor_tail_sampling_count_traces_sampled", + Description: "Count of traces that were sampled or not per sampling policy [Development]", + Unit: "{traces}", + Data: metricdata.Sum[int64]{ + IsMonotonic: true, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("policy", "always"), + attribute.String("sampled", "true"), + attribute.String("decision", "sampled"), + ), + Value: 1, + }, + }, + }, + }, s.getMetric("otelcol_processor_tail_sampling_count_traces_sampled", md), metricdatatest.IgnoreTimestamp()) + }) + } +} + func TestMetricsWithComponentID(t *testing.T) { // prepare s := setupTestTelemetry() controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 100, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -603,9 +691,10 @@ func TestMetricsCountSampled(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 100, - PolicyCfgs: tt.policyCfgs, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, + PolicyCfgs: tt.policyCfgs, Options: []Option{ withTestController(controller), }, @@ -650,8 +739,9 @@ func TestProcessorTailSamplingSamplingTraceRemovalAge(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 2, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 2, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -709,8 +799,9 @@ func TestProcessorTailSamplingSamplingLateSpanAge(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 100, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -793,8 +884,9 @@ func TestProcessorTailSamplingSamplingTraceDroppedTooEarly(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 2, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 2, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -858,8 +950,9 @@ func TestProcessorTailSamplingSamplingPolicyEvaluationError(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: 1, - NumTraces: 100, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -931,8 +1024,9 @@ func TestProcessorTailSamplingEarlyReleasesFromCacheDecision(t *testing.T) { require.NoError(t, err) cfg := Config{ - DecisionWait: 1, - NumTraces: 100, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 1, + NumTraces: 100, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index c05ed4463ff2d..51e0559b11128 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -65,6 +65,10 @@ func (t *TestPolicyEvaluator) Evaluate(ctx context.Context, traceID pcommon.Trac return t.pe.Evaluate(ctx, traceID, trace) } +func (t *TestPolicyEvaluator) IsStateful() bool { + return t.pe.IsStateful() +} + // testTSPController is a set of mechanisms to make the TSP do predictable // things in tests. type testTSPController struct { @@ -199,8 +203,9 @@ func TestTraceIntegrity(t *testing.T) { controller := newTestTSPController() cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, Options: []Option{ withPolicies(policies), withTestController(controller), @@ -214,20 +219,20 @@ func TestTraceIntegrity(t *testing.T) { require.NoError(t, p.Shutdown(t.Context())) }() - mpe1.NextDecision = samplingpolicy.Sampled + mpe1.SetDecision(samplingpolicy.Sampled) // Generate and deliver first span require.NoError(t, p.ConsumeTraces(t.Context(), traces)) // The first tick won't do anything controller.waitForTick() - require.Equal(t, 0, mpe1.EvaluationCount) + require.Equal(t, 0, mpe1.EvaluationCount()) // This will cause policy evaluations on the first span controller.waitForTick() // Both policies should have been evaluated once - assert.Equal(t, 4, mpe1.EvaluationCount) + assert.Equal(t, 4, mpe1.EvaluationCount()) consumed := nextConsumer.AllTraces() require.Len(t, consumed, 4) @@ -252,6 +257,7 @@ func TestSequentialTraceArrival(t *testing.T) { traceIDs, batches := generateIDsAndBatches(128) controller := newTestTSPController() cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -300,6 +306,7 @@ func TestConcurrentTraceArrival(t *testing.T) { controller := newTestTSPController() var wg sync.WaitGroup cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -368,6 +375,7 @@ func TestConcurrentArrivalAndEvaluation(t *testing.T) { var wg sync.WaitGroup cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -409,6 +417,7 @@ func TestSequentialTraceMapSize(t *testing.T) { controller := newTestTSPController() traceIDs, batches := generateIDsAndBatches(210) cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: defaultNumTraces, ExpectedNewTracesPerSec: 64, @@ -484,7 +493,8 @@ func TestConsumptionDuringPolicyEvaluation(t *testing.T) { // prepare msp := new(consumertest.TracesSink) cfg := Config{ - DecisionWait: 10 * time.Millisecond, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 10 * time.Millisecond, // idToTrace map size is 2x the number of batches, to eliminate "expected" // dropped too early errors. NumTraces: uint64(numBatches * 2), @@ -559,8 +569,9 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { msp := new(consumertest.TracesSink) cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -632,8 +643,9 @@ func TestSetSamplingPolicy(t *testing.T) { telem := setupTestTelemetry() cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -710,9 +722,10 @@ func TestSubSecondDecisionTime(t *testing.T) { // prepare msp := new(consumertest.TracesSink) tsp, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), msp, Config{ - DecisionWait: 500 * time.Millisecond, - NumTraces: defaultNumTraces, - PolicyCfgs: testPolicy, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: 500 * time.Millisecond, + NumTraces: defaultNumTraces, + PolicyCfgs: testPolicy, Options: []Option{ withTickerFrequency(10 * time.Millisecond), }, @@ -767,8 +780,9 @@ func TestDuplicatePolicyName(t *testing.T) { } p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), msp, Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ {sharedPolicyCfg: alwaysSample}, {sharedPolicyCfg: alwaysSample}, @@ -790,8 +804,9 @@ func TestDropPolicyIsFirstInPolicyList(t *testing.T) { msp := new(consumertest.TracesSink) cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -883,8 +898,9 @@ func TestDecisionHooks(t *testing.T) { } cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{ @@ -1020,16 +1036,43 @@ func uInt64ToSpanID(id uint64) pcommon.SpanID { } type mockPolicyEvaluator struct { - NextDecision samplingpolicy.Decision - NextError error - EvaluationCount int + mu sync.Mutex + + nextDecision samplingpolicy.Decision + nextError error + evaluationCount int } var _ samplingpolicy.Evaluator = (*mockPolicyEvaluator)(nil) func (m *mockPolicyEvaluator) Evaluate(context.Context, pcommon.TraceID, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) { - m.EvaluationCount++ - return m.NextDecision, m.NextError + m.mu.Lock() + defer m.mu.Unlock() + + m.evaluationCount++ + return m.nextDecision, m.nextError +} + +func (*mockPolicyEvaluator) IsStateful() bool { + return false +} + +func (m *mockPolicyEvaluator) SetDecision(decision samplingpolicy.Decision) { + m.mu.Lock() + defer m.mu.Unlock() + m.nextDecision = decision +} + +func (m *mockPolicyEvaluator) SetError(nextError error) { + m.mu.Lock() + defer m.mu.Unlock() + m.nextError = nextError +} + +func (m *mockPolicyEvaluator) EvaluationCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.evaluationCount } type syncIDBatcher struct { @@ -1227,6 +1270,7 @@ func TestDropLargeTraces(t *testing.T) { sp.Attributes().PutStr("foo", "short") cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(4), ExpectedNewTracesPerSec: 64, @@ -1322,6 +1366,7 @@ func TestDeleteQueueCleared(t *testing.T) { traceIDs, batches := generateIDsAndBatches(128) cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -1365,6 +1410,7 @@ func TestDeleteQueueCleared(t *testing.T) { func TestRootReceivedBatcher(t *testing.T) { traceIDs, batches := generateIDsAndBatches(128) cfg := Config{ + SamplingStrategy: samplingStrategyTraceComplete, DecisionWait: time.Minute, NumTraces: uint64(2 * len(traceIDs)), ExpectedNewTracesPerSec: 64, @@ -1415,8 +1461,9 @@ func TestExtension(t *testing.T) { msp := new(consumertest.TracesSink) cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, + SamplingStrategy: samplingStrategyTraceComplete, + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{ { sharedPolicyCfg: sharedPolicyCfg{