diff --git a/.chloggen/tsp-decision-hooks.yaml b/.chloggen/tsp-decision-hooks.yaml new file mode 100644 index 0000000000000..f838180997c04 --- /dev/null +++ b/.chloggen/tsp-decision-hooks.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: processor/tail_sampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add hooks to call when a sampling decision is made for a trace. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [46161] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go index d957d53cf38c5..74b4c56f18bcf 100644 --- a/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go +++ b/processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go @@ -14,6 +14,8 @@ import ( type TraceData struct { // SpanCount track the number of spans on the trace. SpanCount int64 + // SizeBytes is how many bytes we have accumulated for the trace. + SizeBytes uint64 // ReceivedBatches stores all the batches received for the trace. ReceivedBatches ptrace.Traces } diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 4676792cb7e8e..ba0164239b5da 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -41,21 +41,22 @@ type policy struct { attribute metric.MeasurementOption } -// traceData is a wrapper around the publically used samplingpolicy.TraceData +// TraceData is a wrapper around the publically used samplingpolicy.TraceData // that tracks information related to the decision making process but not // needed by any sampler implementations. -type traceData struct { +type TraceData struct { samplingpolicy.TraceData + FinalDecision samplingpolicy.Decision + PolicyName string arrivalTime time.Time decisionTime time.Time - bytes uint64 - finalDecision samplingpolicy.Decision - policyName string deleteElement *list.Element batchID uint64 } +type DecisionHook func(ctx context.Context, id pcommon.TraceID, td *TraceData) + type tailSamplingSpanProcessor struct { ctx context.Context @@ -66,7 +67,7 @@ type tailSamplingSpanProcessor struct { deleteTraceQueue *list.List nextConsumer consumer.Traces policies []*policy - idToTrace map[pcommon.TraceID]*traceData + idToTrace map[pcommon.TraceID]*TraceData tickerFrequency time.Duration decisionBatcher idbatcher.Batcher sampledIDCache cache.Cache @@ -79,6 +80,9 @@ type tailSamplingSpanProcessor struct { cfg Config host component.Host + sampledHooks []DecisionHook + nonSampledHooks []DecisionHook + newPolicyChan chan newPolicyCmd newTraceSizeChan chan uint64 workChan chan []traceBatch @@ -120,7 +124,7 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume sampledIDCache: sampledDecisions, nonSampledIDCache: nonSampledDecisions, logger: set.Logger, - idToTrace: make(map[pcommon.TraceID]*traceData), + idToTrace: make(map[pcommon.TraceID]*TraceData), deleteTraceQueue: list.New(), sampleOnFirstMatch: cfg.SampleOnFirstMatch, blockOnOverflow: cfg.BlockOnOverflow, @@ -317,6 +321,20 @@ func WithNonSampledDecisionCache(c cache.Cache) Option { } } +// WithSampledHooks sets hooks to be called when a trace is sampled. +func WithSampledHooks(hooks ...DecisionHook) Option { + return func(tsp *tailSamplingSpanProcessor) { + tsp.sampledHooks = hooks + } +} + +// WithNonSampledHooks sets hooks to be called when a trace is not sampled. +func WithNonSampledHooks(hooks ...DecisionHook) Option { + return func(tsp *tailSamplingSpanProcessor) { + tsp.nonSampledHooks = hooks + } +} + func withRecordPolicy() Option { return func(tsp *tailSamplingSpanProcessor) { tsp.recordPolicy = true @@ -580,7 +598,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { // A decision was already made, no need to do it again. This happens // when no decision cache is used and a trace was processed both due to // a root span trigger and after decision_wait. - if trace.finalDecision != samplingpolicy.Unspecified { + if trace.FinalDecision != samplingpolicy.Unspecified { continue } @@ -589,17 +607,17 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool { decision, policyName := tsp.makeDecision(id, &trace.TraceData, metrics) globalTracesSampledByDecision[decision]++ - // Sampled or not, remove the batches - allSpans := trace.ReceivedBatches - trace.finalDecision = decision - trace.policyName = policyName - trace.ReceivedBatches = ptrace.NewTraces() + trace.FinalDecision = decision + trace.PolicyName = policyName if decision == samplingpolicy.Sampled { - tsp.releaseSampledTrace(ctx, id, allSpans, policyName) + tsp.releaseSampledTrace(ctx, id, trace) } else { - tsp.releaseNotSampledTrace(id, policyName) + tsp.releaseNotSampledTrace(id, trace) } + + // Sampled or not, remove the batches + trace.ReceivedBatches = ptrace.NewTraces() } tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, time.Since(startTime).Milliseconds()) @@ -746,7 +764,7 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac actualData, ok := tsp.idToTrace[id] if !ok { - actualData = &traceData{ + actualData = &TraceData{ arrivalTime: currTime, TraceData: samplingpolicy.TraceData{ SpanCount: spanCount, @@ -769,19 +787,19 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac tsp.decisionBatcher.MoveToEarlierBatch(id, actualData.batchID, uint64(tsp.cfg.DecisionWaitAfterRootReceived.Seconds())) } - finalDecision := actualData.finalDecision + finalDecision := actualData.FinalDecision marshaler := &ptrace.ProtoMarshaler{} - actualData.bytes += uint64(marshaler.ResourceSpansSize(rss)) + actualData.SizeBytes += uint64(marshaler.ResourceSpansSize(rss)) if finalDecision == samplingpolicy.Unspecified && tsp.maxTraceSizeBytes > 0 && - actualData.bytes > tsp.maxTraceSizeBytes { + actualData.SizeBytes > tsp.maxTraceSizeBytes { tsp.telemetry.ProcessorTailSamplingTracesDroppedTooLarge.Add(tsp.ctx, 1) - actualData.finalDecision = samplingpolicy.NotSampled + actualData.FinalDecision = samplingpolicy.NotSampled // Since we are not in a normal decision flow when dropping large traces, also be sure to remove it from the batcher. tsp.decisionBatcher.RemoveFromBatch(id, actualData.batchID) - tsp.releaseNotSampledTrace(id, "") + tsp.releaseNotSampledTrace(id, actualData) return } @@ -798,7 +816,8 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac appendToTraces(traceTd, rss) tsp.forwardSpans(tsp.ctx, traceTd) case samplingpolicy.NotSampled: - tsp.releaseNotSampledTrace(id, actualData.policyName) + // TODO: I don't think this is correct? If it isn't sampled shouldn't we just do nothing? + tsp.releaseNotSampledTrace(id, actualData) default: tsp.logger.Warn("Unexpected sampling decision", zap.Int("decision", int(finalDecision))) } @@ -863,9 +882,12 @@ func (tsp *tailSamplingSpanProcessor) forwardSpans(ctx context.Context, td ptrac // releaseSampledTrace sends the trace data to the next consumer. It // additionally adds the trace ID to the cache of sampled trace IDs. If the // trace ID is cached, it deletes the spans from the internal map. -func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, id pcommon.TraceID, td ptrace.Traces, policyName string) { - tsp.sampledIDCache.Put(id, cache.DecisionMetadata{PolicyName: policyName}) - tsp.forwardSpans(ctx, td) +func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, id pcommon.TraceID, td *TraceData) { + for _, hook := range tsp.sampledHooks { + hook(ctx, id, td) + } + tsp.sampledIDCache.Put(id, cache.DecisionMetadata{PolicyName: td.PolicyName}) + tsp.forwardSpans(ctx, td.ReceivedBatches) _, ok := tsp.sampledIDCache.Get(id) if ok { tsp.dropTrace(id, time.Now()) @@ -874,8 +896,11 @@ func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, i // releaseNotSampledTrace adds the trace ID to the cache of not sampled trace // IDs. If the trace ID is cached, it deletes the spans from the internal map. -func (tsp *tailSamplingSpanProcessor) releaseNotSampledTrace(id pcommon.TraceID, policyName string) { - tsp.nonSampledIDCache.Put(id, cache.DecisionMetadata{PolicyName: policyName}) +func (tsp *tailSamplingSpanProcessor) releaseNotSampledTrace(id pcommon.TraceID, td *TraceData) { + for _, hook := range tsp.nonSampledHooks { + hook(context.Background(), id, td) + } + tsp.nonSampledIDCache.Put(id, cache.DecisionMetadata{PolicyName: td.PolicyName}) _, ok := tsp.nonSampledIDCache.Get(id) if ok { tsp.dropTrace(id, time.Now()) diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 0e883dda7b7c7..c05ed4463ff2d 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -855,6 +855,102 @@ func TestDropPolicyIsFirstInPolicyList(t *testing.T) { assert.Contains(t, sampledTraceIDs, uInt64ToTraceID(2)) } +func TestDecisionHooks(t *testing.T) { + controller := newTestTSPController() + + // Track hook invocations + var sampledHookCalls []struct { + id pcommon.TraceID + td *TraceData + } + var nonSampledHookCalls []struct { + id pcommon.TraceID + td *TraceData + } + + sampledHook := func(_ context.Context, id pcommon.TraceID, td *TraceData) { + sampledHookCalls = append(sampledHookCalls, struct { + id pcommon.TraceID + td *TraceData + }{id: id, td: td}) + } + + nonSampledHook := func(_ context.Context, id pcommon.TraceID, td *TraceData) { + nonSampledHookCalls = append(nonSampledHookCalls, struct { + id pcommon.TraceID + td *TraceData + }{id: id, td: td}) + } + + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + PolicyCfgs: []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "sample-high-latency", + Type: Latency, + LatencyCfg: LatencyCfg{ + ThresholdMs: 100, + }, + }, + }, + }, + Options: []Option{ + withTestController(controller), + WithSampledHooks(sampledHook), + WithNonSampledHooks(nonSampledHook), + }, + } + + msp := new(consumertest.TracesSink) + p, err := newTracesProcessor(t.Context(), processortest.NewNopSettings(metadata.Type), msp, cfg) + require.NoError(t, err) + + require.NoError(t, p.Start(t.Context(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(t.Context())) + }() + + // Create a trace that will be sampled (high latency) + sampledTraceID := uInt64ToTraceID(1) + sampledTrace := simpleTracesWithID(sampledTraceID) + sampledTrace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-200 * time.Millisecond))) + sampledTrace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + + // Create a trace that will not be sampled (low latency) + nonSampledTraceID := uInt64ToTraceID(2) + nonSampledTrace := simpleTracesWithID(nonSampledTraceID) + nonSampledTrace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-50 * time.Millisecond))) + nonSampledTrace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + + require.NoError(t, p.ConsumeTraces(t.Context(), sampledTrace)) + require.NoError(t, p.ConsumeTraces(t.Context(), nonSampledTrace)) + + controller.waitForTick() // First tick does nothing + controller.waitForTick() // Second tick makes decisions + + // Verify hooks were called + require.Len(t, sampledHookCalls, 1, "sampled hook should be called once") + require.Len(t, nonSampledHookCalls, 1, "non-sampled hook should be called once") + + // Verify sampled hook data + sampledCall := sampledHookCalls[0] + assert.Equal(t, sampledTraceID, sampledCall.id) + assert.NotNil(t, sampledCall.td) + assert.Equal(t, samplingpolicy.Sampled, sampledCall.td.FinalDecision) + assert.Equal(t, "sample-high-latency", sampledCall.td.PolicyName) + assert.Equal(t, int64(1), sampledCall.td.SpanCount) + + // Verify non-sampled hook data + nonSampledCall := nonSampledHookCalls[0] + assert.Equal(t, nonSampledTraceID, nonSampledCall.id) + assert.NotNil(t, nonSampledCall.td) + assert.Equal(t, samplingpolicy.NotSampled, nonSampledCall.td.FinalDecision) + assert.Empty(t, nonSampledCall.td.PolicyName, "PolicyName should be empty for not sampled traces") + assert.Equal(t, int64(1), nonSampledCall.td.SpanCount) +} + func collectSpanIDs(trace ptrace.Traces) []pcommon.SpanID { var spanIDs []pcommon.SpanID