diff --git a/.chloggen/tsp-decision-wait-after-root-received.yaml b/.chloggen/tsp-decision-wait-after-root-received.yaml new file mode 100644 index 0000000000000..2ef80f35b8518 --- /dev/null +++ b/.chloggen/tsp-decision-wait-after-root-received.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: processor/tail_sampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Provide an option, `decision_wait_after_root_received`, to make quicker decisions after a root span is received. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [43876] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index e805c993d2fc0..33b6c9708f6e3 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -48,6 +48,7 @@ Multiple policies exist today and it is straight forward to add more. These incl The following configuration options can also be modified: - `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision +- `decision_wait_after_root_received` (default = 0s): Wait time after the root span of a trace is received before making a sampling decision. 0s means disabled (only use `decision_wait`). - `num_traces` (default = 50000): Number of traces kept in memory. - `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures) - `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index fc7b0979a8c0a..f5d6855993a44 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -288,6 +288,9 @@ type Config struct { // DecisionWait is the desired wait time from the arrival of the first span of // trace until the decision about sampling it or not is evaluated. DecisionWait time.Duration `mapstructure:"decision_wait"` + // DecisionWaitAfterRootReceived is the desired wait time from the arrival of the root span of + // trace until the decision about sampling it or not is evaluated. + DecisionWaitAfterRootReceived time.Duration `mapstructure:"decision_wait_after_root_received"` // NumTraces is the number of traces kept on memory. Typically most of the data // of a trace is released after a sampling decision is taken. NumTraces uint64 `mapstructure:"num_traces"` diff --git a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher.go b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher.go index 10c57aa7dbc01..d9569f881e536 100644 --- a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher.go +++ b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher.go @@ -19,8 +19,8 @@ var ( ErrInvalidBatchChannelSize = errors.New("invalid batch channel size, it must be greater than zero") ) -// Batch is the type of batches held by the Batcher. -type Batch []pcommon.TraceID +// Batch is the type of batches held by the Batcher. It uses a set in order to merge batches efficiently. +type Batch map[pcommon.TraceID]struct{} // Batcher behaves like a pipeline of batches that has a fixed number of batches in the pipe // and a new batch being built outside of the pipe. Items can be concurrently added to the batch @@ -84,7 +84,7 @@ func New(numBatches, newBatchesInitialCapacity, batchChannelSize uint64) (Batche batcher := &batcher{ pendingIDs: make(chan pcommon.TraceID, batchChannelSize), batches: batches, - currentBatch: make(Batch, 0, newBatchesInitialCapacity), + currentBatch: make(Batch, newBatchesInitialCapacity), newBatchesInitialCapacity: newBatchesInitialCapacity, stopchan: make(chan bool), } @@ -94,7 +94,7 @@ func New(numBatches, newBatchesInitialCapacity, batchChannelSize uint64) (Batche go func() { for id := range batcher.pendingIDs { batcher.cbMutex.Lock() - batcher.currentBatch = append(batcher.currentBatch, id) + batcher.currentBatch[id] = struct{}{} batcher.cbMutex.Unlock() } batcher.stopchan <- true @@ -111,7 +111,7 @@ func (b *batcher) CloseCurrentAndTakeFirstBatch() (Batch, bool) { if readBatch, ok := <-b.batches; ok { b.stopLock.RLock() if !b.stopped { - nextBatch := make(Batch, 0, max(b.newBatchesInitialCapacity, uint64(len(readBatch)))) + nextBatch := make(Batch, max(b.newBatchesInitialCapacity, uint64(len(readBatch)))) b.cbMutex.Lock() b.batches <- b.currentBatch b.currentBatch = nextBatch diff --git a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go index f797f9f73c5ef..ab8e5a3d1363a 100644 --- a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go +++ b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go @@ -88,7 +88,7 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC ticker := time.NewTicker(100 * time.Millisecond) stopTicker := make(chan bool) - var got Batch + got := Batch{} go func() { var completedDequeues uint64 outer: @@ -101,7 +101,9 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC t.Error("Some of the first batches were not empty") return } - got = append(got, g...) + for id := range g { + got[id] = struct{}{} + } case <-stopTicker: break outer } @@ -132,7 +134,9 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC // Get all ids added to the batcher for { batch, ok := batcher.CloseCurrentAndTakeFirstBatch() - got = append(got, batch...) + for id := range batch { + got[id] = struct{}{} + } if !ok { break } @@ -140,13 +144,9 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC require.Len(t, got, len(ids), "Batcher got incorrect count of traces from batches") - idSeen := make(map[[16]byte]bool, len(ids)) - for _, id := range got { - idSeen[id] = true - } - - for i := range ids { - require.True(t, idSeen[ids[i]], "want id %v but id was not seen", ids[i]) + for _, id := range ids { + _, ok := got[id] + require.True(t, ok, "want id %v but id was not seen", id) } } diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 289b960372716..bf2d247101b60 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "maps" "math" "runtime" "slices" @@ -61,17 +62,18 @@ type tailSamplingSpanProcessor struct { telemetry *metadata.TelemetryBuilder logger *zap.Logger - deleteTraceQueue *list.List - nextConsumer consumer.Traces - policies []*policy - idToTrace map[pcommon.TraceID]*traceData - tickerFrequency time.Duration - decisionBatcher idbatcher.Batcher - sampledIDCache cache.Cache - nonSampledIDCache cache.Cache - recordPolicy bool - sampleOnFirstMatch bool - blockOnOverflow bool + deleteTraceQueue *list.List + nextConsumer consumer.Traces + policies []*policy + idToTrace map[pcommon.TraceID]*traceData + tickerFrequency time.Duration + decisionBatcher idbatcher.Batcher + rootReceivedBatcher idbatcher.Batcher + sampledIDCache cache.Cache + nonSampledIDCache cache.Cache + recordPolicy bool + sampleOnFirstMatch bool + blockOnOverflow bool cfg Config host component.Host @@ -160,11 +162,20 @@ func (tsp *tailSamplingSpanProcessor) Start(_ context.Context, host component.Ho // this will start a goroutine in the background, so we run it only if everything went // well in creating the policies, and only when the processor starts. numDecisionBatches := math.Max(1, tsp.cfg.DecisionWait.Seconds()) - inBatcher, err := idbatcher.New(uint64(numDecisionBatches), tsp.cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU())) + idBatcher, err := idbatcher.New(uint64(numDecisionBatches), tsp.cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU())) if err != nil { return err } - tsp.decisionBatcher = inBatcher + tsp.decisionBatcher = idBatcher + } + + if tsp.rootReceivedBatcher == nil && tsp.cfg.DecisionWaitAfterRootReceived > 0 { + numDecisionBatches := math.Max(1, tsp.cfg.DecisionWaitAfterRootReceived.Seconds()) + idBatcher, err := idbatcher.New(uint64(numDecisionBatches), tsp.cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU())) + if err != nil { + return err + } + tsp.rootReceivedBatcher = idBatcher } tsp.doneChan = make(chan struct{}) @@ -183,9 +194,11 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraces(_ context.Context, td ptrace // be more efficient on its single goroutine. batch := []traceBatch{} for traceID, spans := range idToSpansAndScope { + newRSS, rootSpan := newResourceSpanFromSpanAndScopes(rss, spans) batch = append(batch, traceBatch{ id: traceID, - rss: newResourceSpanFromSpanAndScopes(rss, spans), + rootSpan: rootSpan, + rss: newRSS, spanCount: int64(len(spans)), }) } @@ -254,6 +267,7 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicies(host component.Host, // traceBatch contains all spans from a single batch for a single trace. type traceBatch struct { id pcommon.TraceID + rootSpan *ptrace.Span rss ptrace.ResourceSpans spanCount int64 } @@ -447,6 +461,9 @@ func (tsp *tailSamplingSpanProcessor) iter(tickChan <-chan time.Time, workChan < if !ok { // Stop the batcher so that we can read all batches without creating new ones. tsp.decisionBatcher.Stop() + if tsp.rootReceivedBatcher != nil { + tsp.rootReceivedBatcher.Stop() + } // Do the best decision we can for any traces we have already ingested unless a user wants to drop them. if !tsp.cfg.DropPendingTracesOnShutdown { @@ -467,7 +484,7 @@ func (tsp *tailSamplingSpanProcessor) iter(tickChan <-chan time.Time, workChan < tsp.waitForSpace(tickChan) } - tsp.processTrace(trace.id, trace.rss, trace.spanCount) + tsp.processTrace(trace.id, trace.rss, trace.spanCount, trace.rootSpan != nil) } case cmd := <-tsp.newPolicyChan: tsp.policies = cmd.policies @@ -547,14 +564,36 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { globalTracesSampledByDecision := make(map[samplingpolicy.Decision]int64) batch, hasMore := tsp.decisionBatcher.CloseCurrentAndTakeFirstBatch() + if tsp.rootReceivedBatcher != nil { + rootBatch, _ := tsp.rootReceivedBatcher.CloseCurrentAndTakeFirstBatch() + if batch == nil { + batch = rootBatch + } else { + maps.Copy(batch, rootBatch) + } + } batchLen := len(batch) - for _, id := range batch { + for id := range batch { trace, ok := tsp.idToTrace[id] if !ok { - metrics.idNotFoundOnMapCount++ + // Only increment the not found metric if the trace is not in the + // cache. If it is in the cache that means a decision was already + // made and the trace properly released. If using block on overflow + // we can avoid checking the cache as it is not possible to release + // a trace that is still in the batcher with that flow. + if !tsp.blockOnOverflow && !tsp.inCache(id) { + metrics.idNotFoundOnMapCount++ + } + continue + } + // A decision was already made, no need to do it again. This happens + // when no decision cache is used and a trace was processed both due to + // a root span trigger and after decision_wait. + if trace.finalDecision != samplingpolicy.Unspecified { continue } + trace.decisionTime = time.Now() decision, policyName := tsp.makeDecision(id, &trace.TraceData, metrics) @@ -604,6 +643,16 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { return hasMore } +// inCache returns if a trace id is in either cache, i.e. a decision has been made for it and it was released. +func (tsp *tailSamplingSpanProcessor) inCache(id pcommon.TraceID) bool { + _, ok := tsp.nonSampledIDCache.Get(id) + if ok { + return true + } + _, ok = tsp.sampledIDCache.Get(id) + return ok +} + func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *samplingpolicy.TraceData, metrics *policyTickMetrics) (samplingpolicy.Decision, string) { finalDecision := samplingpolicy.NotSampled samplingDecisions := map[samplingpolicy.Decision]*policy{ @@ -701,7 +750,7 @@ func groupSpansByTraceKey(resourceSpans ptrace.ResourceSpans) map[pcommon.TraceI return idToSpans } -func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrace.ResourceSpans, spanCount int64) { +func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrace.ResourceSpans, spanCount int64, containsRootSpan bool) { currTime := time.Now() var newTraceIDs int64 @@ -731,6 +780,9 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac } else { actualData.SpanCount += spanCount } + if containsRootSpan && tsp.rootReceivedBatcher != nil { + tsp.rootReceivedBatcher.AddToCurrentBatch(id) + } finalDecision := actualData.finalDecision @@ -843,24 +895,29 @@ func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans) { rss.MoveTo(rs) } -func newResourceSpanFromSpanAndScopes(rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) ptrace.ResourceSpans { +func newResourceSpanFromSpanAndScopes(rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) (ptrace.ResourceSpans, *ptrace.Span) { rs := ptrace.NewResourceSpans() rss.Resource().CopyTo(rs.Resource()) + var rootSpan *ptrace.Span scopePointerToNewScope := make(map[*pcommon.InstrumentationScope]*ptrace.ScopeSpans) for _, spanAndScope := range spanAndScopes { // If the scope of the spanAndScope is not in the map, add it to the map and the destination. + var sp ptrace.Span if scope, ok := scopePointerToNewScope[spanAndScope.instrumentationScope]; !ok { is := rs.ScopeSpans().AppendEmpty() spanAndScope.instrumentationScope.CopyTo(is.Scope()) scopePointerToNewScope[spanAndScope.instrumentationScope] = &is - sp := is.Spans().AppendEmpty() - spanAndScope.span.CopyTo(sp) + sp = is.Spans().AppendEmpty() } else { - sp := scope.Spans().AppendEmpty() - spanAndScope.span.CopyTo(sp) + sp = scope.Spans().AppendEmpty() + } + + spanAndScope.span.CopyTo(sp) + if sp.ParentSpanID().IsEmpty() { + rootSpan = &sp } } - return rs + return rs, rootSpan } diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 69c44ca8ce977..61e03611cddec 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -896,6 +896,9 @@ func generateIDsAndBatches(numIDs int) ([]pcommon.TraceID, []ptrace.Traces) { span := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) span.SetTraceID(traceIDs[i]) + if j != 0 { + span.SetParentSpanID(uInt64ToSpanID(uint64(spanID))) + } spanID++ span.SetSpanID(uInt64ToSpanID(uint64(spanID))) tds = append(tds, td) @@ -946,6 +949,7 @@ func newSyncIDBatcher() idbatcher.Batcher { batches <- nil return &syncIDBatcher{ batchPipe: batches, + openBatch: idbatcher.Batch{}, } } @@ -954,7 +958,7 @@ func (s *syncIDBatcher) AddToCurrentBatch(id pcommon.TraceID) { if s.stopped { panic("cannot add to stopped batcher!") } - s.openBatch = append(s.openBatch, id) + s.openBatch[id] = struct{}{} s.Unlock() } @@ -964,12 +968,14 @@ func (s *syncIDBatcher) CloseCurrentAndTakeFirstBatch() (idbatcher.Batch, bool) firstBatch, ok := <-s.batchPipe // When batchPipe is closed it means we have stopped and just need to return the openBatch as the last entries. if !ok { - return s.openBatch, false + batch := s.openBatch + s.openBatch = nil + return batch, false } // Do not move the open batch to the channel if we are stopped. It will panic, we return it once the channel is closed instead. if !s.stopped { s.batchPipe <- s.openBatch - s.openBatch = nil + s.openBatch = idbatcher.Batch{} } return firstBatch, true } @@ -1134,6 +1140,54 @@ func TestDeleteQueueCleared(t *testing.T) { assert.Zero(t, sp.(*tailSamplingSpanProcessor).deleteTraceQueue.Len()) } +func TestRootReceivedBatcher(t *testing.T) { + traceIDs, batches := generateIDsAndBatches(128) + cfg := Config{ + DecisionWait: time.Minute, + NumTraces: uint64(2 * len(traceIDs)), + ExpectedNewTracesPerSec: 64, + DecisionCache: DecisionCacheConfig{ + SampledCacheSize: 128, + NonSampledCacheSize: 128, + }, + PolicyCfgs: []PolicyCfg{ + {sharedPolicyCfg: sharedPolicyCfg{ + Name: "test-policy", + Type: Probabilistic, + ProbabilisticCfg: ProbabilisticCfg{ + SamplingPercentage: 50, + }, + }}, + }, + DecisionWaitAfterRootReceived: time.Second, + DropPendingTracesOnShutdown: true, + } + telem := setupTestTelemetry() + telemetrySettings := telem.newSettings() + nextConsumer := new(consumertest.TracesSink) + sp, err := newTracesProcessor(t.Context(), telemetrySettings, nextConsumer, cfg) + require.NoError(t, err) + + err = sp.Start(t.Context(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + err = sp.Shutdown(t.Context()) + require.NoError(t, err) + }() + + for _, batch := range batches { + require.NoError(t, sp.ConsumeTraces(t.Context(), batch)) + } + // Wait long enough that we pass the decision wait after a root is received, + // but no where near the base decision wait. + time.Sleep(2 * time.Second) + + // Make sure about half of traces are sampled before a tick is called. + allSampledTraces := nextConsumer.AllTraces() + assert.Less(t, len(allSampledTraces), len(traceIDs)*6/10) + assert.Greater(t, len(allSampledTraces), len(traceIDs)*4/10) +} + func TestExtension(t *testing.T) { controller := newTestTSPController() msp := new(consumertest.TracesSink)