diff --git a/.chloggen/trace-too-large-decision-metadata.yaml b/.chloggen/trace-too-large-decision-metadata.yaml new file mode 100644 index 0000000000000..c6c1b5221b97b --- /dev/null +++ b/.chloggen/trace-too-large-decision-metadata.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: processor/tail_sampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add information about when a trace is too large to `cache.DecisionMetadata`. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [46080] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/processor/tailsamplingprocessor/cache/types.go b/processor/tailsamplingprocessor/cache/types.go index 06ac35150adee..326fa801353f6 100644 --- a/processor/tailsamplingprocessor/cache/types.go +++ b/processor/tailsamplingprocessor/cache/types.go @@ -19,4 +19,19 @@ type Cache interface { type DecisionMetadata struct { PolicyName string + + // Additional information for when a trace is too large. + TraceTooLarge bool + TraceSize uint64 + MaxTraceSize uint64 +} + +type MetadataOption func(*DecisionMetadata) + +func WithTraceTooLarge(traceSize, maxTraceSize uint64) MetadataOption { + return func(m *DecisionMetadata) { + m.TraceTooLarge = true + m.TraceSize = traceSize + m.MaxTraceSize = maxTraceSize + } } diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 4676792cb7e8e..b7df6384d1f02 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -781,7 +781,7 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac actualData.finalDecision = samplingpolicy.NotSampled // Since we are not in a normal decision flow when dropping large traces, also be sure to remove it from the batcher. tsp.decisionBatcher.RemoveFromBatch(id, actualData.batchID) - tsp.releaseNotSampledTrace(id, "") + tsp.releaseNotSampledTrace(id, "", cache.WithTraceTooLarge(actualData.bytes, tsp.maxTraceSizeBytes)) return } @@ -874,8 +874,12 @@ func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, i // releaseNotSampledTrace adds the trace ID to the cache of not sampled trace // IDs. If the trace ID is cached, it deletes the spans from the internal map. -func (tsp *tailSamplingSpanProcessor) releaseNotSampledTrace(id pcommon.TraceID, policyName string) { - tsp.nonSampledIDCache.Put(id, cache.DecisionMetadata{PolicyName: policyName}) +func (tsp *tailSamplingSpanProcessor) releaseNotSampledTrace(id pcommon.TraceID, policyName string, metadataOpts ...cache.MetadataOption) { + metadata := cache.DecisionMetadata{PolicyName: policyName} + for _, opt := range metadataOpts { + opt(&metadata) + } + tsp.nonSampledIDCache.Put(id, metadata) _, ok := tsp.nonSampledIDCache.Get(id) if ok { tsp.dropTrace(id, time.Now()) diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 0e883dda7b7c7..9e49e19c61626 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -30,6 +30,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/idbatcher" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy" @@ -1120,16 +1121,19 @@ func TestDropLargeTraces(t *testing.T) { ss := largeTrace.ScopeSpans().AppendEmpty() largeValue := strings.Repeat("bar", 1024) sp := ss.Spans().AppendEmpty() - sp.SetTraceID(pcommon.TraceID([16]byte{1, 2, 3, 4})) + largeTraceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) + sp.SetTraceID(largeTraceID) sp.Attributes().PutStr("foo", largeValue) // Small trace with just one attribute. smallTrace := traces.ResourceSpans().AppendEmpty() ss = smallTrace.ScopeSpans().AppendEmpty() sp = ss.Spans().AppendEmpty() - sp.SetTraceID(pcommon.TraceID([16]byte{1, 2, 3, 5})) + smallTraceID := pcommon.TraceID([16]byte{1, 2, 3, 5}) + sp.SetTraceID(smallTraceID) sp.Attributes().PutStr("foo", "short") + decisionCache := metadataCache{} cfg := Config{ DecisionWait: defaultTestDecisionWait, NumTraces: uint64(4), @@ -1138,6 +1142,8 @@ func TestDropLargeTraces(t *testing.T) { PolicyCfgs: testPolicy, Options: []Option{ withTestController(controller), + WithSampledDecisionCache(decisionCache), + WithNonSampledDecisionCache(decisionCache), }, } telem := setupTestTelemetry() @@ -1165,7 +1171,7 @@ func TestDropLargeTraces(t *testing.T) { controller.waitForTick() largeOnly := ptrace.NewTraces() - // Create a another large trace as ConsumeTraces is not guaranteed to preserve the trace. + // Create another large trace as ConsumeTraces is not guaranteed to preserve the trace. largeTrace = largeOnly.ResourceSpans().AppendEmpty() ss = largeTrace.ScopeSpans().AppendEmpty() sp = ss.Spans().AppendEmpty() @@ -1217,6 +1223,24 @@ func TestDropLargeTraces(t *testing.T) { } tooLarge := telem.getMetric(expectedTooLarge.Name, md) metricdatatest.AssertEqual(t, expectedTooLarge, tooLarge, metricdatatest.IgnoreTimestamp()) + + assert.Equal(t, cache.DecisionMetadata{PolicyName: "test-policy"}, decisionCache[smallTraceID]) + largeCacheEntry := decisionCache[largeTraceID] + assert.Empty(t, largeCacheEntry.PolicyName) + assert.True(t, largeCacheEntry.TraceTooLarge) + assert.Greater(t, largeCacheEntry.TraceSize, uint64(1024)) + assert.EqualValues(t, 1024, largeCacheEntry.MaxTraceSize) +} + +type metadataCache map[pcommon.TraceID]cache.DecisionMetadata + +func (c metadataCache) Get(id pcommon.TraceID) (cache.DecisionMetadata, bool) { + result, ok := c[id] + return result, ok +} + +func (c metadataCache) Put(id pcommon.TraceID, metadata cache.DecisionMetadata) { + c[id] = metadata } // TestDeleteQueueCleared verifies that all in memory traces are removed from