Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .chloggen/tailsampling-sampling-strategy-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
change_type: enhancement
component: processor/tail_sampling
note: Add `sampling_strategy` config with `trace-complete` and `span-ingest` modes for tail sampling decision timing and evaluation behavior.
issues: [46600]
change_logs: [user]
19 changes: 17 additions & 2 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ Multiple policies exist today and it is straight forward to add more. These incl
3. To ensure remaining capacity is filled use always_sample as one of the policies

The following configuration options can also be modified:
- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision
- `decision_wait_after_root_received` (default = 0s): Wait time after the root span of a trace is received before making a sampling decision. 0s means disabled (only use `decision_wait`).
- `sampling_strategy` (default = `trace-complete`): Controls decision timing and evaluation scope. `trace-complete` evaluates accumulated trace data on timer handling; `span-ingest` evaluates each incoming batch on ingest, finalizing terminal outcomes immediately and non-terminal traces on cleanup. See [Sampling Strategies](#sampling-strategies) for details.
- `decision_wait` (default = 30s): Time before timer handling for a trace. When `sampling_strategy` is `trace-complete`, this controls decision timing. When `sampling_strategy` is `span-ingest`, this controls pending cleanup finalization timing.
- `decision_wait_after_root_received` (default = 0s): Additional root-span-based acceleration for timer handling. When `sampling_strategy` is `trace-complete`, this can make decisions earlier. When `sampling_strategy` is `span-ingest`, this can finalize pending traces earlier on cleanup. `0s` disables it.
- `num_traces` (default = 50000): Number of traces kept in memory.
- `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures)
- `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches
Expand All @@ -68,6 +69,20 @@ The following configuration options can also be modified:
already ingested.
- `maximum_trace_size_bytes`: The maximum size a trace can reach in bytes, traces larger than this size will be immediately dropped from the tail sampling processor in order to protect the system.

## Sampling Strategies

The `sampling_strategy` setting controls both decision timing and what data evaluators use:

- `trace-complete` (default): evaluates on the timer path using accumulated trace data (after `decision_wait`, or earlier after root arrival when `decision_wait_after_root_received` is set). This is the most flexible mode for policies, but with later decisions and higher in-memory/storage pressure.
- `span-ingest`: evaluates each incoming batch at ingest time without re-evaluating previously ingested batches. Terminal outcomes (`sampled`/`dropped`) finalize immediately; non-terminal outcomes stay pending and are finalized as `not sampled` during cleanup without policy re-evaluation.

Quick comparison:

- Policy compatibility: `trace-complete` supports stateful policies; `span-ingest` rejects them.
- Timer controls: in `trace-complete`, `decision_wait` and `decision_wait_after_root_received` affect decision timing; in `span-ingest`, they affect pending cleanup/finalization timing.
- Late spans: decision caches remain important in both modes for spans that arrive after in-memory trace data is gone.

## Policy Decision Flow

Each policy will result in a decision, and the processor will evaluate them to make a final decision:

Expand Down
41 changes: 37 additions & 4 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"

import (
"fmt"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
Expand Down Expand Up @@ -53,6 +54,17 @@ const (
TraceFlags PolicyType = "trace_flags"
)

const (
// samplingStrategyTraceComplete keeps the current tail-sampling behavior:
// accumulate spans and decide on full trace data after decision timing.
samplingStrategyTraceComplete samplingStrategy = "trace-complete"
// samplingStrategySpanIngest evaluates each incoming span batch on ingest.
// Non-terminal outcomes remain pending until cleanup finalization.
samplingStrategySpanIngest samplingStrategy = "span-ingest"
)

type samplingStrategy string

// sharedPolicyCfg holds the common configuration to all policies that are used in derivative policy configurations
// such as the and & composite policies.
type sharedPolicyCfg struct {
Expand Down Expand Up @@ -301,11 +313,13 @@ type DecisionCacheConfig struct {

// Config holds the configuration for tail-based sampling.
type Config struct {
// DecisionWait is the desired wait time from the arrival of the first span of
// trace until the decision about sampling it or not is evaluated.
// DecisionWait is the time before timer handling for a trace.
// When sampling_strategy is "trace-complete", this controls decision timing.
// When sampling_strategy is "span-ingest", this controls pending cleanup finalization timing.
DecisionWait time.Duration `mapstructure:"decision_wait"`
// DecisionWaitAfterRootReceived is the desired wait time from the arrival of the root span of
// trace until the decision about sampling it or not is evaluated.
// DecisionWaitAfterRootReceived adds root-span-based acceleration for timer handling.
// When sampling_strategy is "trace-complete", this can make decisions earlier.
// When sampling_strategy is "span-ingest", this can finalize pending traces earlier on cleanup.
DecisionWaitAfterRootReceived time.Duration `mapstructure:"decision_wait_after_root_received"`
// NumTraces is the number of traces kept on memory. Typically most of the data
// of a trace is released after a sampling decision is taken.
Expand All @@ -325,6 +339,11 @@ type Config struct {
Options []Option `mapstructure:"-"`
// Make decision as soon as a policy matches
SampleOnFirstMatch bool `mapstructure:"sample_on_first_match"`
// SamplingStrategy controls how/when sampling decisions are made.
// "trace-complete" (default) evaluates accumulated trace data on timer handling.
// "span-ingest" evaluates each incoming batch on ingest; terminal outcomes
// finalize immediately, and non-terminal traces are finalized on cleanup.
SamplingStrategy samplingStrategy `mapstructure:"sampling_strategy"`
// DropPendingTracesOnShutdown will drop all traces that are part of batches that have not yet reached the decision
// wait when the processor is shutdown.
DropPendingTracesOnShutdown bool `mapstructure:"drop_pending_traces_on_shutdown"`
Expand All @@ -333,3 +352,17 @@ type Config struct {
// A 0 value disables dropping large traces early.
MaximumTraceSizeBytes uint64 `mapstructure:"maximum_trace_size_bytes"`
}

func (cfg *Config) Validate() error {
switch cfg.SamplingStrategy {
case samplingStrategyTraceComplete, samplingStrategySpanIngest:
return nil
default:
return fmt.Errorf(
"invalid sampling_strategy %q, expected one of %q or %q",
cfg.SamplingStrategy,
samplingStrategyTraceComplete,
samplingStrategySpanIngest,
)
}
}
7 changes: 5 additions & 2 deletions processor/tailsamplingprocessor/config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,11 @@ properties:
description: DecisionCache holds configuration for the decision cache(s)
$ref: decision_cache_config
decision_wait:
description: DecisionWait is the desired wait time from the arrival of the first span of trace until the decision about sampling it or not is evaluated.
description: DecisionWait is the time before timer handling for a trace. When sampling_strategy is "trace-complete", this controls decision timing. When sampling_strategy is "span-ingest", this controls pending cleanup finalization timing.
type: string
format: duration
decision_wait_after_root_received:
description: DecisionWaitAfterRootReceived is the desired wait time from the arrival of the root span of trace until the decision about sampling it or not is evaluated.
description: DecisionWaitAfterRootReceived adds root-span-based acceleration for timer handling. When sampling_strategy is "trace-complete", this can make decisions earlier. When sampling_strategy is "span-ingest", this can finalize pending traces earlier on cleanup.
type: string
format: duration
drop_pending_traces_on_shutdown:
Expand All @@ -456,3 +456,6 @@ properties:
sample_on_first_match:
description: Make decision as soon as a policy matches
type: boolean
sampling_strategy:
description: SamplingStrategy controls how/when sampling decisions are made. "trace-complete" (default) evaluates accumulated trace data on timer handling. "span-ingest" evaluates each incoming batch on ingest; terminal outcomes finalize immediately, and non-terminal traces are finalized on cleanup.
type: string
1 change: 1 addition & 0 deletions processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestLoadConfig(t *testing.T) {
DecisionWait: 10 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 10,
SamplingStrategy: samplingStrategyTraceComplete,
DecisionCache: DecisionCacheConfig{SampledCacheSize: 1_000, NonSampledCacheSize: 10_000},
PolicyCfgs: []PolicyCfg{
{
Expand Down
1 change: 1 addition & 0 deletions processor/tailsamplingprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func createDefaultConfig() component.Config {
DecisionWait: 30 * time.Second,
NumTraces: 50000,
SampleOnFirstMatch: false,
SamplingStrategy: samplingStrategyTraceComplete,
}
}

Expand Down
60 changes: 60 additions & 0 deletions processor/tailsamplingprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,63 @@ func TestCreateProcessor(t *testing.T) {
assert.NoError(t, tp.Start(t.Context(), componenttest.NewNopHost()))
assert.NoError(t, tp.Shutdown(t.Context()))
}

func TestCreateProcessorRejectsInvalidSamplingStrategy(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.SamplingStrategy = "invalid"

params := processortest.NewNopSettings(metadata.Type)
tp, err := factory.CreateTraces(t.Context(), params, cfg, consumertest.NewNop())
assert.Nil(t, tp)
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid sampling_strategy")
}

func TestCreateProcessorAllowsSpanIngest(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.SamplingStrategy = samplingStrategySpanIngest
cfg.PolicyCfgs = []PolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "policy",
Type: Probabilistic,
ProbabilisticCfg: ProbabilisticCfg{
SamplingPercentage: 1,
},
},
},
}

params := processortest.NewNopSettings(metadata.Type)
tp, err := factory.CreateTraces(t.Context(), params, cfg, consumertest.NewNop())
assert.NotNil(t, tp)
assert.NoError(t, err)
}

func TestCreateProcessorRejectsStatefulPolicyForSpanIngest(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.SamplingStrategy = samplingStrategySpanIngest
cfg.PolicyCfgs = []PolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "stateful-policy",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{
SpansPerSecond: 10,
},
},
},
}

params := processortest.NewNopSettings(metadata.Type)
tp, err := factory.CreateTraces(t.Context(), params, cfg, consumertest.NewNop())
require.NoError(t, err)
require.NotNil(t, tp)
err = tp.Start(t.Context(), componenttest.NewNopHost())
require.Error(t, err)
assert.Contains(t, err.Error(), "requires all policies to be stateless")
assert.Contains(t, err.Error(), "stateful-policy")
}
4 changes: 3 additions & 1 deletion processor/tailsamplingprocessor/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ func FuzzConsumeTraces(f *testing.F) {
}
sink := new(consumertest.TracesSink)
set := processortest.NewNopSettings(metadata.Type)
cfg := &Config{}
cfg := &Config{
SamplingStrategy: samplingStrategyTraceComplete,
}
tsp, err := newTracesProcessor(t.Context(), set, sink, *cfg)
if err != nil {
t.Fatal(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@ func (as *alwaysSample) Evaluate(context.Context, pcommon.TraceID, *samplingpoli
as.logger.Debug("Evaluating spans in always-sample filter")
return samplingpolicy.Sampled, nil
}

func (*alwaysSample) IsStateful() bool {
return false
}
9 changes: 9 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/and.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,12 @@ func (c *And) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *samp
}
return samplingpolicy.Sampled, nil
}

func (c *And) IsStateful() bool {
for _, sub := range c.subpolicies {
if sub.IsStateful() {
return true
}
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,11 @@ func TestAndEvaluatorStringInvertNotSampled(t *testing.T) {
require.NoError(t, err, "Failed to evaluate and policy: %v", err)
assert.Equal(t, samplingpolicy.NotSampled, decision)
}

func TestAndIsStatefulIfAnySubpolicyIsStateful(t *testing.T) {
stateless := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
stateful := NewRateLimiting(componenttest.NewNopTelemetrySettings(), 10)

and := NewAnd(zap.NewNop(), []samplingpolicy.Evaluator{stateless, stateful})
assert.True(t, and.IsStateful())
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ func (baf *booleanAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID
return false
}), nil
}

func (*booleanAttributeFilter) IsStateful() bool {
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (b *bytesLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *sa
return samplingpolicy.NotSampled, nil
}

func (*bytesLimiting) IsStateful() bool {
return true
}

// calculateTraceSize calculates the accurate protobuf marshaled size of a trace in bytes
// using the OpenTelemetry Collector's built-in ProtoMarshaler.TracesSize() method
func calculateTraceSize(trace *samplingpolicy.TraceData) int64 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func NewComposite(
recordSubPolicy bool,
) samplingpolicy.Evaluator {
var subpolicies []*subpolicy

for i := range subPolicyParams {
sub := &subpolicy{}
sub.evaluator = subPolicyParams[i].Evaluator
Expand Down Expand Up @@ -135,3 +134,12 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace

return samplingpolicy.NotSampled, nil
}

func (c *Composite) IsStateful() bool {
for _, sub := range c.subpolicies {
if sub.evaluator.IsStateful() {
return true
}
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,13 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) {
assert.Equal(t, expected, decision)
}
}

func TestCompositeIsStatefulIfAnySubpolicyIsStateful(t *testing.T) {
stateless := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
stateful := NewRateLimiting(componenttest.NewNopTelemetrySettings(), 10)
c := NewComposite(zap.NewNop(), 100, []SubPolicyEvalParams{
{Evaluator: stateless, MaxSpansPerSecond: 50, Name: "stateless"},
{Evaluator: stateful, MaxSpansPerSecond: 50, Name: "stateful"},
}, FakeTimeProvider{}, false)
assert.True(t, c.IsStateful())
}
9 changes: 9 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,12 @@ func (c *Drop) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *sam
}
return samplingpolicy.Dropped, nil
}

func (c *Drop) IsStateful() bool {
for _, sub := range c.subpolicies {
if sub.IsStateful() {
return true
}
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,11 @@ func TestDropEvaluatorStringInvertNotMatch(t *testing.T) {
require.NoError(t, err, "Failed to evaluate and policy: %v", err)
assert.Equal(t, samplingpolicy.NotSampled, decision)
}

func TestDropIsStatefulIfAnySubpolicyIsStateful(t *testing.T) {
stateless := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
stateful := NewRateLimiting(componenttest.NewNopTelemetrySettings(), 10)

drop := NewDrop(zap.NewNop(), []samplingpolicy.Evaluator{stateless, stateful})
assert.True(t, drop.IsStateful())
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,7 @@ func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *samp
return (l.thresholdMs < duration.Milliseconds() && duration.Milliseconds() <= l.upperThresholdMs)
}), nil
}

func (*latency) IsStateful() bool {
return false
}
4 changes: 4 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/not.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ func (n *not) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *samp
return decision, nil
}
}

func (n *not) IsStateful() bool {
return n.subPolicyEvaluator.IsStateful()
}
12 changes: 12 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/not_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import (
type mockEvaluator struct {
decision samplingpolicy.Decision
err error
stateful bool
}

func (m *mockEvaluator) Evaluate(_ context.Context, _ pcommon.TraceID, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
return m.decision, m.err
}

func (m *mockEvaluator) IsStateful() bool {
return m.stateful
}

func TestNotSamplingPolicy_Evaluate_Sampled(t *testing.T) {
logger := zap.NewNop()
mockSubPolicy := &mockEvaluator{decision: samplingpolicy.Sampled}
Expand Down Expand Up @@ -86,3 +91,10 @@ func TestNotSamplingPolicy_Evaluate_Err_Not_Nil(t *testing.T) {
assert.Equal(t, expectedError, err)
assert.Equal(t, samplingpolicy.Error, decision)
}

func TestNotIsStatefulFollowsSubpolicy(t *testing.T) {
logger := zap.NewNop()
mockSubPolicy := &mockEvaluator{stateful: true}
notPolicy := NewNot(logger, mockSubPolicy)
assert.True(t, notPolicy.IsStateful())
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,7 @@ func (naf *numericAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID
},
), nil
}

func (*numericAttributeFilter) IsStateful() bool {
return false
}
4 changes: 4 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/ottl.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,7 @@ func (ocf *ottlConditionFilter) Evaluate(ctx context.Context, traceID pcommon.Tr
}
return samplingpolicy.NotSampled, nil
}

func (*ottlConditionFilter) IsStateful() bool {
return false
}
Loading
Loading