Skip to content
27 changes: 27 additions & 0 deletions .chloggen/tsp-early-decisions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: processor/tail_sampling

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add optional early decision functionality to the tail sampling processor

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [43876]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: When enabled, early decisions will be made for each batch of spans that arrive, depending on the policy configuration this can significantly reduce the number of traces kept in memory.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,6 @@ type Config struct {
// DropPendingTracesOnShutdown will drop all traces that are part of batches that have not yet reached the decision
// wait when the processor is shutdown.
DropPendingTracesOnShutdown bool `mapstructure:"drop_pending_traces_on_shutdown"`
// EarlyDecisions enables making early decisions to sample traces rather than waiting for the DecisionWait duration.
EarlyDecisions bool `mapstructure:"early_decisions"`
}
8 changes: 8 additions & 0 deletions processor/tailsamplingprocessor/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Count of spans that were sampled or not per sampling policy [Development]
| policy | Name of the policy | Any Str |
| sampled | Whether the sampling decision was sampled or not, false can mean either not sampled or dropped | Any Bool |
| decision | The sampling decision | Str: ``sampled``, ``not_sampled``, ``dropped`` |
| early | Whether the decision was made early or after the decision wait | Any Bool |

### otelcol_processor_tail_sampling_count_traces_sampled

Expand All @@ -37,6 +38,7 @@ Count of traces that were sampled or not per sampling policy [Development]
| policy | Name of the policy | Any Str |
| sampled | Whether the sampling decision was sampled or not, false can mean either not sampled or dropped | Any Bool |
| decision | The sampling decision | Str: ``sampled``, ``not_sampled``, ``dropped`` |
| early | Whether the decision was made early or after the decision wait | Any Bool |

### otelcol_processor_tail_sampling_early_releases_from_cache_decision

Expand Down Expand Up @@ -99,6 +101,12 @@ Count of sampling policy evaluation errors [Development]
| ---- | ----------- | ---------- | --------- | --------- |
| {errors} | Sum | Int | true | Development |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| early | Whether the decision was made early or after the decision wait | Any Bool |

### otelcol_processor_tail_sampling_sampling_policy_execution_count

Total number of executions of a specific sampling policy [Development]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
Expand All @@ -31,3 +32,7 @@ func (as *alwaysSample) Evaluate(context.Context, pcommon.TraceID, *samplingpoli
as.logger.Debug("Evaluating spans in always-sample filter")
return samplingpolicy.Sampled, nil
}

func (*alwaysSample) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
return samplingpolicy.Sampled, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
)
Expand All @@ -22,3 +23,15 @@ func TestEvaluate_AlwaysSample(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, samplingpolicy.Sampled, decision)
}

func TestEarlyEvaluate_AlwaysSample(t *testing.T) {
filter := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
decision, err := filter.EarlyEvaluate(
t.Context(),
pcommon.TraceID([16]byte{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
}), ptrace.NewResourceSpans(), nil,
)
assert.NoError(t, err)
assert.Equal(t, samplingpolicy.Sampled, decision)
}
5 changes: 5 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/and.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
Expand Down Expand Up @@ -43,3 +44,7 @@ func (c *And) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *samp
}
return samplingpolicy.Sampled, nil
}

func (*And) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
return samplingpolicy.Unspecified, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (baf *booleanAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID
},
), nil
}

return hasResourceOrSpanWithCondition(
batches,
func(resource pcommon.Resource) bool {
Expand All @@ -74,3 +75,27 @@ func (baf *booleanAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID
return false
}), nil
}

func (baf *booleanAttributeFilter) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, batch ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
// Do not support the deprecated invert match code for early evaluations.
if baf.invertMatch {
return samplingpolicy.Unspecified, nil
}

return batchHasResourceOrSpanWithCondition(
batch,
func(resource pcommon.Resource) bool {
if v, ok := resource.Attributes().Get(baf.key); ok {
value := v.Bool()
return value == baf.value
}
return false
},
func(span ptrace.Span) bool {
if v, ok := span.Attributes().Get(baf.key); ok {
value := v.Bool()
return value == baf.value
}
return false
}), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,46 @@ func TestBooleanTagFilter(t *testing.T) {
}
}

func TestBooleanTagFilter_EarlyEvaluate(t *testing.T) {
empty := map[string]any{}
filter := NewBooleanAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", true, false)

resAttr := map[string]any{}
resAttr["example"] = 8

cases := []struct {
Desc string
Trace *samplingpolicy.TraceData
Decision samplingpolicy.Decision
}{
{
Desc: "non-matching span attribute",
Trace: newTraceBoolAttrs(empty, "non_matching", true),
Decision: samplingpolicy.Unspecified,
},
{
Desc: "span attribute with unwanted boolean value",
Trace: newTraceBoolAttrs(empty, "example", false),
Decision: samplingpolicy.Unspecified,
},
{
Desc: "span attribute with wanted boolean value",
Trace: newTraceBoolAttrs(empty, "example", true),
Decision: samplingpolicy.Sampled,
},
}

for _, c := range cases {
t.Run(c.Desc, func(t *testing.T) {
u, _ := uuid.NewRandom()
rs := c.Trace.ReceivedBatches.ResourceSpans().At(0)
decision, err := filter.EarlyEvaluate(t.Context(), pcommon.TraceID(u), rs, c.Trace)
assert.NoError(t, err)
assert.Equal(t, decision, c.Decision)
})
}
}

func TestBooleanTagFilterInverted(t *testing.T) {
empty := map[string]any{}
filter := NewBooleanAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", true, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (b *bytesLimiting) Evaluate(_ context.Context, _ pcommon.TraceID, trace *sa
return samplingpolicy.NotSampled, nil
}

func (*bytesLimiting) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
return samplingpolicy.Unspecified, nil
}

// calculateTraceSize calculates the accurate protobuf marshaled size of a trace in bytes
// using the OpenTelemetry Collector's built-in ProtoMarshaler.TracesSize() method
func calculateTraceSize(trace *samplingpolicy.TraceData) int64 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
Expand Down Expand Up @@ -134,3 +135,7 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace

return samplingpolicy.NotSampled, nil
}

func (*Composite) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
return samplingpolicy.Unspecified, nil
}
5 changes: 5 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
Expand Down Expand Up @@ -44,3 +45,7 @@ func (c *Drop) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *sam
}
return samplingpolicy.Dropped, nil
}

func (*Drop) EarlyEvaluate(context.Context, pcommon.TraceID, ptrace.ResourceSpans, *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
return samplingpolicy.Unspecified, nil
}
17 changes: 15 additions & 2 deletions processor/tailsamplingprocessor/internal/sampling/latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,23 @@ func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *samp

batches := traceData.ReceivedBatches

return hasSpanWithCondition(batches, l.condition()), nil
}

func (l *latency) EarlyEvaluate(_ context.Context, _ pcommon.TraceID, batch ptrace.ResourceSpans, _ *samplingpolicy.TraceData) (samplingpolicy.Decision, error) {
Comment thread
csmarchbanks marked this conversation as resolved.
// If an upper threshold is set we don't know if there will be a future
// span that will cause a not sampled decision.
if l.upperThresholdMs > 0 {
return samplingpolicy.Unspecified, nil
}
return batchHasSpanWithCondition(batch, l.condition()), nil
}

func (l *latency) condition() func(span ptrace.Span) bool {
var minTime pcommon.Timestamp
var maxTime pcommon.Timestamp

return hasSpanWithCondition(batches, func(span ptrace.Span) bool {
return func(span ptrace.Span) bool {
if minTime == 0 || span.StartTimestamp() < minTime {
minTime = span.StartTimestamp()
}
Expand All @@ -53,5 +66,5 @@ func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *samp
return duration.Milliseconds() >= l.thresholdMs
}
return (l.thresholdMs < duration.Milliseconds() && duration.Milliseconds() <= l.upperThresholdMs)
}), nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
)

func TestEvaluate_Latency(t *testing.T) {
func TestLatency_Evaluate(t *testing.T) {
filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000, 0)

traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestEvaluate_Latency(t *testing.T) {
}
}

func TestEvaluate_Bounded_Latency(t *testing.T) {
func TestLatency_Evaluate_Bounded(t *testing.T) {
filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000, 10000)

traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
Expand Down Expand Up @@ -159,6 +159,92 @@ func TestEvaluate_Bounded_Latency(t *testing.T) {
}
}

func TestLatency_EarlyEvaluate(t *testing.T) {
traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
now := time.Now()

cases := []struct {
Desc string
Threshold, UpperThreshold time.Duration
Spans []spanWithTimeAndDuration
Decision samplingpolicy.Decision
}{
{
"trace duration shorter than threshold",
5 * time.Second, 0,
[]spanWithTimeAndDuration{
{
StartTime: now,
Duration: 4500 * time.Millisecond,
},
},
samplingpolicy.Unspecified,
},
{
"trace duration is equal to threshold",
5 * time.Second, 0,
[]spanWithTimeAndDuration{
{
StartTime: now,
Duration: 5000 * time.Millisecond,
},
},
samplingpolicy.Sampled,
},
{
"total trace duration is longer than threshold but every single span is shorter",
5 * time.Second, 0,
[]spanWithTimeAndDuration{
{
StartTime: now,
Duration: 3000 * time.Millisecond,
},
{
StartTime: now.Add(2500 * time.Millisecond),
Duration: 3000 * time.Millisecond,
},
},
samplingpolicy.Sampled,
},
{
"trace duration is longer than upper bound",
1 * time.Second, 2 * time.Second,
[]spanWithTimeAndDuration{
{
StartTime: now,
Duration: 3 * time.Second,
},
},
// TODO: This can be samplingpolicy.NotSampled in the future.
samplingpolicy.Unspecified,
},
{
"trace duration is longer than lower bound but less than upper bound",
2 * time.Second, 5 * time.Second,
[]spanWithTimeAndDuration{
{
StartTime: now,
Duration: 4000 * time.Millisecond,
},
},
samplingpolicy.Unspecified,
},
}

for _, c := range cases {
t.Run(c.Desc, func(t *testing.T) {
filter := NewLatency(componenttest.NewNopTelemetrySettings(), c.Threshold.Milliseconds(), c.UpperThreshold.Milliseconds())

trace := newTraceWithSpans(c.Spans)
rs := trace.ReceivedBatches.ResourceSpans().At(0)
decision, err := filter.EarlyEvaluate(t.Context(), traceID, rs, trace)

assert.NoError(t, err)
assert.Equal(t, c.Decision, decision)
})
}
}

type spanWithTimeAndDuration struct {
StartTime time.Time
Duration time.Duration
Expand Down
Loading