Skip to content
Merged
27 changes: 27 additions & 0 deletions .chloggen/tsp-decision-wait-after-root-received.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: processor/tail_sampling

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Provide an option, `decision_wait_after_root_received`, to make quicker decisions after a root span is received.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [43876]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Multiple policies exist today and it is straight forward to add more. These incl

The following configuration options can also be modified:
- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision
- `decision_wait_after_root_received` (default = 0s): Wait time after the root span of a trace is received before making a sampling decision. 0s means disabled (only use `decision_wait`).
- `num_traces` (default = 50000): Number of traces kept in memory.
- `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures)
- `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches
Expand Down
3 changes: 3 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ type Config struct {
// DecisionWait is the desired wait time from the arrival of the first span of
// trace until the decision about sampling it or not is evaluated.
DecisionWait time.Duration `mapstructure:"decision_wait"`
// DecisionWaitAfterRootReceived is the desired wait time from the arrival of the root span of
// trace until the decision about sampling it or not is evaluated.
DecisionWaitAfterRootReceived time.Duration `mapstructure:"decision_wait_after_root_received"`
// NumTraces is the number of traces kept on memory. Typically most of the data
// of a trace is released after a sampling decision is taken.
NumTraces uint64 `mapstructure:"num_traces"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var (
ErrInvalidBatchChannelSize = errors.New("invalid batch channel size, it must be greater than zero")
)

// Batch is the type of batches held by the Batcher.
type Batch []pcommon.TraceID
// Batch is the type of batches held by the Batcher. It uses a set in order to merge batches efficiently.
type Batch map[pcommon.TraceID]struct{}

// Batcher behaves like a pipeline of batches that has a fixed number of batches in the pipe
// and a new batch being built outside of the pipe. Items can be concurrently added to the batch
Expand Down Expand Up @@ -84,7 +84,7 @@ func New(numBatches, newBatchesInitialCapacity, batchChannelSize uint64) (Batche
batcher := &batcher{
pendingIDs: make(chan pcommon.TraceID, batchChannelSize),
batches: batches,
currentBatch: make(Batch, 0, newBatchesInitialCapacity),
currentBatch: make(Batch, newBatchesInitialCapacity),
newBatchesInitialCapacity: newBatchesInitialCapacity,
stopchan: make(chan bool),
}
Expand All @@ -94,7 +94,7 @@ func New(numBatches, newBatchesInitialCapacity, batchChannelSize uint64) (Batche
go func() {
for id := range batcher.pendingIDs {
batcher.cbMutex.Lock()
batcher.currentBatch = append(batcher.currentBatch, id)
batcher.currentBatch[id] = struct{}{}
batcher.cbMutex.Unlock()
}
batcher.stopchan <- true
Expand All @@ -111,7 +111,7 @@ func (b *batcher) CloseCurrentAndTakeFirstBatch() (Batch, bool) {
if readBatch, ok := <-b.batches; ok {
b.stopLock.RLock()
if !b.stopped {
nextBatch := make(Batch, 0, max(b.newBatchesInitialCapacity, uint64(len(readBatch))))
nextBatch := make(Batch, max(b.newBatchesInitialCapacity, uint64(len(readBatch))))
b.cbMutex.Lock()
b.batches <- b.currentBatch
b.currentBatch = nextBatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC

ticker := time.NewTicker(100 * time.Millisecond)
stopTicker := make(chan bool)
var got Batch
got := Batch{}
go func() {
var completedDequeues uint64
outer:
Expand All @@ -101,7 +101,9 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC
t.Error("Some of the first batches were not empty")
return
}
got = append(got, g...)
for id := range g {
got[id] = struct{}{}
}
case <-stopTicker:
break outer
}
Expand Down Expand Up @@ -132,21 +134,19 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC
// Get all ids added to the batcher
for {
batch, ok := batcher.CloseCurrentAndTakeFirstBatch()
got = append(got, batch...)
for id := range batch {
got[id] = struct{}{}
}
if !ok {
break
}
}

require.Len(t, got, len(ids), "Batcher got incorrect count of traces from batches")

idSeen := make(map[[16]byte]bool, len(ids))
for _, id := range got {
idSeen[id] = true
}

for i := range ids {
require.True(t, idSeen[ids[i]], "want id %v but id was not seen", ids[i])
for _, id := range ids {
_, ok := got[id]
require.True(t, ok, "want id %v but id was not seen", id)
}
}

Expand Down
105 changes: 81 additions & 24 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"maps"
"math"
"runtime"
"slices"
Expand Down Expand Up @@ -61,17 +62,18 @@ type tailSamplingSpanProcessor struct {
telemetry *metadata.TelemetryBuilder
logger *zap.Logger

deleteTraceQueue *list.List
nextConsumer consumer.Traces
policies []*policy
idToTrace map[pcommon.TraceID]*traceData
tickerFrequency time.Duration
decisionBatcher idbatcher.Batcher
sampledIDCache cache.Cache
nonSampledIDCache cache.Cache
recordPolicy bool
sampleOnFirstMatch bool
blockOnOverflow bool
deleteTraceQueue *list.List
nextConsumer consumer.Traces
policies []*policy
idToTrace map[pcommon.TraceID]*traceData
tickerFrequency time.Duration
decisionBatcher idbatcher.Batcher
rootReceivedBatcher idbatcher.Batcher
sampledIDCache cache.Cache
nonSampledIDCache cache.Cache
recordPolicy bool
sampleOnFirstMatch bool
blockOnOverflow bool

cfg Config
host component.Host
Expand Down Expand Up @@ -160,11 +162,20 @@ func (tsp *tailSamplingSpanProcessor) Start(_ context.Context, host component.Ho
// this will start a goroutine in the background, so we run it only if everything went
// well in creating the policies, and only when the processor starts.
numDecisionBatches := math.Max(1, tsp.cfg.DecisionWait.Seconds())
inBatcher, err := idbatcher.New(uint64(numDecisionBatches), tsp.cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
idBatcher, err := idbatcher.New(uint64(numDecisionBatches), tsp.cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
if err != nil {
return err
}
tsp.decisionBatcher = inBatcher
tsp.decisionBatcher = idBatcher
}

if tsp.rootReceivedBatcher == nil && tsp.cfg.DecisionWaitAfterRootReceived > 0 {
numDecisionBatches := math.Max(1, tsp.cfg.DecisionWaitAfterRootReceived.Seconds())
idBatcher, err := idbatcher.New(uint64(numDecisionBatches), tsp.cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
if err != nil {
return err
}
tsp.rootReceivedBatcher = idBatcher
}

tsp.doneChan = make(chan struct{})
Expand All @@ -183,9 +194,11 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraces(_ context.Context, td ptrace
// be more efficient on its single goroutine.
batch := []traceBatch{}
for traceID, spans := range idToSpansAndScope {
newRSS, rootSpan := newResourceSpanFromSpanAndScopes(rss, spans)
batch = append(batch, traceBatch{
id: traceID,
rss: newResourceSpanFromSpanAndScopes(rss, spans),
rootSpan: rootSpan,
rss: newRSS,
spanCount: int64(len(spans)),
})
}
Expand Down Expand Up @@ -254,6 +267,7 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host,
// traceBatch contains all spans from a single batch for a single trace.
type traceBatch struct {
id pcommon.TraceID
rootSpan *ptrace.Span
rss ptrace.ResourceSpans
spanCount int64
}
Expand Down Expand Up @@ -447,6 +461,9 @@ func (tsp *tailSamplingSpanProcessor) iter(tickChan <-chan time.Time, workChan <
if !ok {
// Stop the batcher so that we can read all batches without creating new ones.
tsp.decisionBatcher.Stop()
if tsp.rootReceivedBatcher != nil {
tsp.rootReceivedBatcher.Stop()
}

// Do the best decision we can for any traces we have already ingested unless a user wants to drop them.
if !tsp.cfg.DropPendingTracesOnShutdown {
Expand All @@ -467,7 +484,7 @@ func (tsp *tailSamplingSpanProcessor) iter(tickChan <-chan time.Time, workChan <
tsp.waitForSpace(tickChan)
}

tsp.processTrace(trace.id, trace.rss, trace.spanCount)
tsp.processTrace(trace.id, trace.rss, trace.spanCount, trace.rootSpan != nil)
}
case cmd := <-tsp.newPolicyChan:
tsp.policies = cmd.policies
Expand Down Expand Up @@ -547,14 +564,36 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool {
globalTracesSampledByDecision := make(map[samplingpolicy.Decision]int64)

batch, hasMore := tsp.decisionBatcher.CloseCurrentAndTakeFirstBatch()
if tsp.rootReceivedBatcher != nil {
rootBatch, _ := tsp.rootReceivedBatcher.CloseCurrentAndTakeFirstBatch()
if batch == nil {
batch = rootBatch
} else {
maps.Copy(batch, rootBatch)
}
}
batchLen := len(batch)

for _, id := range batch {
for id := range batch {
trace, ok := tsp.idToTrace[id]
if !ok {
metrics.idNotFoundOnMapCount++
// Only increment the not found metric if the trace is not in the
// cache. If it is in the cache that means a decision was already
// made and the trace properly released. If using block on overflow
// we can avoid checking the cache as it is not possible to release
// a trace that is still in the batcher with that flow.
if !tsp.blockOnOverflow && !tsp.inCache(id) {
metrics.idNotFoundOnMapCount++
}
continue
}
// A decision was already made, no need to do it again. This happens
// when no decision cache is used and a trace was processed both due to
// a root span trigger and after decision_wait.
if trace.finalDecision != samplingpolicy.Unspecified {
continue
}

trace.decisionTime = time.Now()

decision, policyName := tsp.makeDecision(id, &trace.TraceData, metrics)
Expand Down Expand Up @@ -604,6 +643,16 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool {
return hasMore
}

// inCache returns if a trace id is in either cache, i.e. a decision has been made for it and it was released.
func (tsp *tailSamplingSpanProcessor) inCache(id pcommon.TraceID) bool {
_, ok := tsp.nonSampledIDCache.Get(id)
if ok {
return true
}
_, ok = tsp.sampledIDCache.Get(id)
return ok
}

func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *samplingpolicy.TraceData, metrics *policyTickMetrics) (samplingpolicy.Decision, string) {
finalDecision := samplingpolicy.NotSampled
samplingDecisions := map[samplingpolicy.Decision]*policy{
Expand Down Expand Up @@ -701,7 +750,7 @@ func groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceI
return idToSpans
}

func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrace.ResourceSpans, spanCount int64) {
func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrace.ResourceSpans, spanCount int64, containsRootSpan bool) {
currTime := time.Now()

var newTraceIDs int64
Expand Down Expand Up @@ -731,6 +780,9 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac
} else {
actualData.SpanCount += spanCount
}
if containsRootSpan && tsp.rootReceivedBatcher != nil {
tsp.rootReceivedBatcher.AddToCurrentBatch(id)
}

finalDecision := actualData.finalDecision

Expand Down Expand Up @@ -843,24 +895,29 @@ func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans) {
rss.MoveTo(rs)
}

func newResourceSpanFromSpanAndScopes(rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) ptrace.ResourceSpans {
func newResourceSpanFromSpanAndScopes(rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) (ptrace.ResourceSpans, *ptrace.Span) {
rs := ptrace.NewResourceSpans()
rss.Resource().CopyTo(rs.Resource())
var rootSpan *ptrace.Span

scopePointerToNewScope := make(map[*pcommon.InstrumentationScope]*ptrace.ScopeSpans)
for _, spanAndScope := range spanAndScopes {
// If the scope of the spanAndScope is not in the map, add it to the map and the destination.
var sp ptrace.Span
if scope, ok := scopePointerToNewScope[spanAndScope.instrumentationScope]; !ok {
is := rs.ScopeSpans().AppendEmpty()
spanAndScope.instrumentationScope.CopyTo(is.Scope())
scopePointerToNewScope[spanAndScope.instrumentationScope] = &is

sp := is.Spans().AppendEmpty()
spanAndScope.span.CopyTo(sp)
sp = is.Spans().AppendEmpty()
} else {
sp := scope.Spans().AppendEmpty()
spanAndScope.span.CopyTo(sp)
sp = scope.Spans().AppendEmpty()
}

spanAndScope.span.CopyTo(sp)
if sp.ParentSpanID().IsEmpty() {
rootSpan = &sp
}
Comment on lines +918 to 920
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking the address of a loop variable sp is problematic. In Go, loop variables are reused across iterations, so &sp will always point to the last iteration's value. This means rootSpan may reference the wrong span or become invalid. Instead, create a copy of the span or use a different approach to store the root span reference.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

@csmarchbanks csmarchbanks Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a valid comment as:

  1. sp is not a loop variable, but instead created in each iteration of the loop.
  2. I believe this behavior was fixed in Go 1.22 anyway.

Leaving up for someone to double check though.

}
return rs
return rs, rootSpan
}
Loading