diff --git a/.chloggen/stratified.yaml b/.chloggen/stratified.yaml new file mode 100644 index 0000000000000..a68c761d22205 --- /dev/null +++ b/.chloggen/stratified.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/tailsamplingprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Added stratified sampling policy to the tailsampling processor" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [40917] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: The current implementation of the probabilistic sampling policy in the tail sampling processor in the OpenTelemetry Collector Contrib repository randomly samples a percentage of traces. This approach does not ensure that all the application service workflows associated with different transaction types get a representation in the sampled set of traces. A user can initiate any specific application functionality/operation, which subsequently triggers a corresponding subset of service components (or a workflow). For instance, in an e-commerce application designed using a microservices architecture, distinct operations, such as browsing, adding to a cart, and others will invoke different microservices. Each functionality will invoke service components in a defined order, with the invocation order representing a subgraph within the broader application workflow. Defining this subgraph of service components for servicing a request as the trajectory, for a sampled set of traces to truly represent an application and thus be of more value to the downstream tasks, all the trajectories must get at least one representation in the sampled set of traces for the given sampling interval. This new sampling policy, called the stratified sampling policy, samples a new trajectory whenever it is encountered for the first time within a sampling interval. If a trajectory has already been observed within that interval, the policy will revert to a probabilistic sampling approach, where trajectories are selected based on predefined probabilities. This ensures that newly encountered trajectories are prioritized for sampling while maintaining flexibility for previously seen trajectories. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 0964ff57e7cfe..addae3030dc70 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -27,6 +27,8 @@ const ( // StringAttribute sample traces that an attribute, of type string, matching // one of the listed values. StringAttribute PolicyType = "string_attribute" + // Stratified Probabilistic samples a given percentage of traces considering the trace trajectory as well. + StratifiedProbabilistic PolicyType = "stratified" // RateLimiting allows all traces until the specified limits are satisfied. RateLimiting PolicyType = "rate_limiting" // Composite allows defining a composite policy, combining the other policies in one @@ -60,6 +62,8 @@ type sharedPolicyCfg struct { NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"` // Configs for probabilistic sampling policy evaluator. ProbabilisticCfg ProbabilisticCfg `mapstructure:"probabilistic"` + // Configs for stratified probabilistic sampling policy evaluator. + StratifiedProbabilisticCfg StratifiedProbabilisticCfg `mapstructure:"stratified"` // Configs for status code filter sampling policy evaluator. StatusCodeCfg StatusCodeCfg `mapstructure:"status_code"` // Configs for string attribute filter sampling policy evaluator. @@ -170,6 +174,18 @@ type ProbabilisticCfg struct { SamplingPercentage float64 `mapstructure:"sampling_percentage"` } +// StratifiedProbabilisticCfg holds the configurable settings to create a stratified probabilistic +// sampling policy evaluator. +type StratifiedProbabilisticCfg struct { + // HashSalt allows one to configure the hashing salts. This is important in scenarios where multiple layers of collectors + // have different sampling rates: if they use the same salt all passing one layer may pass the other even if they have + // different sampling rates, configuring different salts avoids that. + HashSalt string `mapstructure:"hash_salt"` + // SamplingPercentage is the percentage rate at which traces are going to be sampled. Defaults to zero, i.e.: no sample. + // Values greater or equal 100 are treated as "sample all traces". + SamplingPercentage float64 `mapstructure:"sampling_percentage"` +} + // StatusCodeCfg holds the configurable settings to create a status code filter sampling // policy evaluator. type StatusCodeCfg struct { diff --git a/processor/tailsamplingprocessor/internal/sampling/stratified_probabilistic.go b/processor/tailsamplingprocessor/internal/sampling/stratified_probabilistic.go new file mode 100644 index 0000000000000..9e91e4fa952e4 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/stratified_probabilistic.go @@ -0,0 +1,340 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "hash/fnv" + "math" + "math/big" + "sort" + "strconv" + "strings" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.uber.org/zap" +) + +const ( + stratifiedDefaultHashSalt = "default-hash-seed" +) + +type StratifiedProbabilisticSampler struct { + logger *zap.Logger + threshold uint64 + hashSalt string + mu sync.Mutex + traceTrajectoryCount map[string]int +} + +type Node struct { + Service string + Operation string +} + +type Edge struct { + From Node + To Node +} + +var _ PolicyEvaluator = (*StratifiedProbabilisticSampler)(nil) + +// NewStratifiedProbabilisticSampler creates a policy evaluator that samples a percentage of traces. +func NewStratifiedProbabilisticSampler(settings component.TelemetrySettings, hashSalt string, samplingPercentage float64) PolicyEvaluator { + if hashSalt == "" { + hashSalt = stratifiedDefaultHashSalt + } + + return &StratifiedProbabilisticSampler{ + logger: settings.Logger, + // calculate threshold once + threshold: stratifiedCalculateThreshold(samplingPercentage / 100), + hashSalt: hashSalt, + traceTrajectoryCount: make(map[string]int), + } +} + +// Evaluate looks at the trace data and returns a corresponding SamplingDecision. +func (s *StratifiedProbabilisticSampler) Evaluate(_ context.Context, traceID pcommon.TraceID, traceData *TraceData) (Decision, error) { + s.logger.Debug("Evaluating spans in stratified probabilistic filter") + hash := s.getTraceTrajectoryHash(traceData) + s.logger.Debug("Graph representation received", zap.String("Trace Identifier", traceID.String()), zap.String("Trace Hash", hash)) + + var count int + var exists bool + + s.mu.Lock() + defer s.mu.Unlock() + + count, exists = s.traceTrajectoryCount[hash] + + if !exists { + // Trajectory is new for the interval + s.traceTrajectoryCount[hash] = 1 + s.logger.Debug("New Trajectory", zap.String("Trace Hash", hash), zap.String("Check for Trace Identifier", traceID.String()), zap.Int("TraceCount", count)) + return Sampled, nil + } + // Trajectory exist. Update the count + s.traceTrajectoryCount[hash] = count + 1 + s.logger.Debug("Trajectory exists", zap.String("Trace Hash", hash), zap.String("Check for Trace Identifier", traceID.String()), zap.Int("TraceCount", count)) + + // Fallback to probabilistic sampling if the trajectory is seen before + + if stratifiedHashTraceID(s.hashSalt, traceID[:]) <= s.threshold { + return Sampled, nil + } + + return NotSampled, nil +} + +func (s *StratifiedProbabilisticSampler) ResetWindow() { + s.logger.Debug("Resetting Evaluation Window") + s.mu.Lock() + defer s.mu.Unlock() + s.logger.Debug("Trace trajectory count map", zap.Any("traceTrajectoryCount", s.traceTrajectoryCount)) + s.traceTrajectoryCount = make(map[string]int) +} + +func (s *StratifiedProbabilisticSampler) getTraceTrajectoryHash(traceData *TraceData) string { + nodes, edges := s.getTraceSpanDetails(traceData) + + // Build adjacency list and in-degree map + adj := make(map[Node][]Node) + inDegree := make(map[Node]int) + allNodes := make(map[Node]struct{}) + + for _, node := range nodes { + allNodes[node] = struct{}{} + if _, exists := inDegree[node]; !exists { + inDegree[node] = 0 + } + } + + for _, edge := range edges { + if !isEmptyNode(edge.From) { + adj[edge.From] = append(adj[edge.From], edge.To) + inDegree[edge.To]++ + allNodes[edge.From] = struct{}{} + } else { + // Edge from empty node indicates a root span + if _, exists := inDegree[edge.To]; !exists { + inDegree[edge.To] = 0 + } + } + allNodes[edge.To] = struct{}{} + } + + // Canonical topological sort + // var sorted []Node + zeroInDegree := []Node{} + + for node := range allNodes { + if inDegree[node] == 0 { + zeroInDegree = append(zeroInDegree, node) + } + } + sort.Slice(zeroInDegree, func(i, j int) bool { + return compareNodes(zeroInDegree[i], zeroInDegree[j]) < 0 + }) + + for len(zeroInDegree) > 0 { + node := zeroInDegree[0] + zeroInDegree = zeroInDegree[1:] + // sorted = append(sorted, node) + + children := adj[node] + sort.Slice(children, func(i, j int) bool { + return compareNodes(children[i], children[j]) < 0 + }) + + for _, child := range children { + inDegree[child]-- + if inDegree[child] == 0 { + zeroInDegree = append(zeroInDegree, child) + } + } + sort.Slice(zeroInDegree, func(i, j int) bool { + return compareNodes(zeroInDegree[i], zeroInDegree[j]) < 0 + }) + } + + // Serialize edges deterministically with quoting + edgeStrs := make([]string, len(edges)) + for i, e := range edges { + fromLabel := fmt.Sprintf("%s:%s", e.From.Service, e.From.Operation) + toLabel := fmt.Sprintf("%s:%s", e.To.Service, e.To.Operation) + edgeStrs[i] = strconv.Quote(fromLabel) + "->" + strconv.Quote(toLabel) + } + sort.Strings(edgeStrs) + edgesStr := fmt.Sprintf("[%s]", joinWithComma(edgeStrs)) + + // Construct graph representation and hash it + graphRepr := edgesStr + h := sha256.Sum256([]byte(graphRepr)) + hash := hex.EncodeToString(h[:]) + + // Log the canonical representation and hash + s.logger.Debug("Graph representation and hash", + zap.String("Graph representation", graphRepr), + zap.String("Trace Hash", hash), + ) + + return hash +} + +func isEmptyNode(n Node) bool { + return n.Service == "" && n.Operation == "" +} + +// compareNodes ensures deterministic sorting of Node structs +func compareNodes(a, b Node) int { + if a.Service != b.Service { + return compareString(a.Service, b.Service) + } + return compareString(a.Operation, b.Operation) +} + +func compareString(a, b string) int { + if a < b { + return -1 + } else if a > b { + return 1 + } + return 0 +} + +func joinWithComma(items []string) string { + return strings.Join(items, ",") +} + +func (s *StratifiedProbabilisticSampler) getTraceSpanDetails(traceData *TraceData) ([]Node, []Edge) { + s.logger.Debug("Extracting span details for the trace") + traceData.Lock() + defer traceData.Unlock() + + batches := traceData.ReceivedBatches + + nodeSet := make(map[Node]struct{}) + spanIDToNode := make(map[string]Node) + spanIDToParentID := make(map[string]string) + + // First pass: build spanID to Node mapping and collect span -> parentSpan relationships + for i := 0; i < batches.ResourceSpans().Len(); i++ { + rs := batches.ResourceSpans().At(i) + resource := rs.Resource() + + var serviceName string + if svcAttr, ok := resource.Attributes().Get("service.name"); ok { + serviceName = svcAttr.AsString() + } + + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := 0; k < ss.Spans().Len(); k++ { + span := ss.Spans().At(k) + spanID := span.SpanID().String() + parentSpanID := span.ParentSpanID().String() + operationName := span.Name() + // traceID := span.TraceID().String() + + node := Node{Service: serviceName, Operation: operationName} + nodeSet[node] = struct{}{} + spanIDToNode[spanID] = node + spanIDToParentID[spanID] = parentSpanID + } + } + } + + // Second pass: build edges from parent to child + edges := []Edge{} + for childSpanID, parentSpanID := range spanIDToParentID { + childNode := spanIDToNode[childSpanID] + var parentNode Node + if parentSpanID == "" { + // Root span + parentNode = Node{Service: "", Operation: ""} + } else { + parentNode = spanIDToNode[parentSpanID] + } + newEdge := Edge{From: parentNode, To: childNode} + edges = insertSortedEdge(edges, newEdge) + } + + // Convert node set to slice + nodes := make([]Node, 0, len(nodeSet)) + for node := range nodeSet { + nodes = append(nodes, node) + } + + return nodes, edges +} + +// Helper function to insert an edge in sorted order +func insertSortedEdge(edges []Edge, newEdge Edge) []Edge { + // Find the insertion point by comparing the edges (using lexicographical order) + idx := sort.Search(len(edges), func(i int) bool { + return compareEdges(edges[i], newEdge) >= 0 + }) + + // Insert the new edge at the found position + edges = append(edges[:idx], append([]Edge{newEdge}, edges[idx:]...)...) + return edges +} + +// Helper function to compare two edges lexicographically +func compareEdges(e1, e2 Edge) int { + // Compare "From" (parent node) + if e1.From.Service != e2.From.Service { + if e1.From.Service < e2.From.Service { + return -1 + } + return 1 + } + if e1.From.Operation != e2.From.Operation { + if e1.From.Operation < e2.From.Operation { + return -1 + } + return 1 + } + + // Compare "To" (child node) + if e1.To.Service != e2.To.Service { + if e1.To.Service < e2.To.Service { + return -1 + } + return 1 + } + if e1.To.Operation != e2.To.Operation { + if e1.To.Operation < e2.To.Operation { + return -1 + } + return 1 + } + + return 0 // They are equal +} + +// calculateThreshold converts a ratio into a value between 0 and MaxUint64 +func stratifiedCalculateThreshold(ratio float64) uint64 { + // Use big.Float and big.Int to calculate threshold because directly convert + // math.MaxUint64 to float64 will cause digits/bits to be cut off if the converted value + // doesn't fit into bits that are used to store digits for float64 in Golang + boundary := new(big.Float).SetInt(new(big.Int).SetUint64(math.MaxUint64)) + res, _ := boundary.Mul(boundary, big.NewFloat(ratio)).Uint64() + return res +} + +// hashTraceID creates a hash using the FNV-1a algorithm. +func stratifiedHashTraceID(salt string, b []byte) uint64 { + hasher := fnv.New64a() + // the implementation fnv.Write() never returns an error, see hash/fnv/fnv.go + _, _ = hasher.Write([]byte(salt)) + _, _ = hasher.Write(b) + return hasher.Sum64() +} diff --git a/processor/tailsamplingprocessor/internal/sampling/stratified_probabilistic_test.go b/processor/tailsamplingprocessor/internal/sampling/stratified_probabilistic_test.go new file mode 100644 index 0000000000000..941d99e919d41 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/stratified_probabilistic_test.go @@ -0,0 +1,106 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "context" + "encoding/binary" + "math/rand/v2" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestStratifiedProbabilisticSampling(t *testing.T) { + tests := []struct { + name string + samplingPercentage float64 + hashSalt string + expectedSamplingPercentage float64 + }{ + { + "100%", + 100, + "", + 100, + }, + { + "0%", + 0, + "", + 0, + }, + { + "25%", + 25, + "", + 25, + }, + { + "33%", + 33, + "", + 33, + }, + { + "33% - custom salt", + 33, + "test-salt", + 33, + }, + { + "-%50", + -50, + "", + 0, + }, + { + "150%", + 150, + "", + 100, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + traceCount := 100_000 + + stratifiedProbabilisticSampler := NewStratifiedProbabilisticSampler(componenttest.NewNopTelemetrySettings(), tt.hashSalt, tt.samplingPercentage) + + sampled := 0 + for _, traceID := range genStratifiedRandomTraceIDs(traceCount) { + trace := newTraceStringAttrs(nil, "example", "value") + + decision, err := stratifiedProbabilisticSampler.Evaluate(context.Background(), traceID, trace) + assert.NoError(t, err) + + if decision == Sampled { + sampled++ + } + } + + effectiveSamplingPercentage := float32(sampled) / float32(traceCount) * 100 + assert.InDelta(t, tt.expectedSamplingPercentage, effectiveSamplingPercentage, 0.2, + "Effective sampling percentage is %f, expected %f", effectiveSamplingPercentage, tt.expectedSamplingPercentage, + ) + }) + } +} + +func genStratifiedRandomTraceIDs(num int) (ids []pcommon.TraceID) { + // NOTE: using a fixed seed is intentional here, + // as otherwise the delta in the tests above will + // be unpredictable. + r := rand.New(rand.NewPCG(123, 456)) + ids = make([]pcommon.TraceID, 0, num) + for i := 0; i < num; i++ { + traceID := [16]byte{} + binary.BigEndian.PutUint64(traceID[:8], r.Uint64()) + binary.BigEndian.PutUint64(traceID[8:], r.Uint64()) + ids = append(ids, pcommon.TraceID(traceID)) + } + return ids +} diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 506212ad11636..d7ee88e3d12d1 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -52,9 +52,11 @@ type tailSamplingSpanProcessor struct { telemetry *metadata.TelemetryBuilder logger *zap.Logger - nextConsumer consumer.Traces - maxNumTraces uint64 - policies []*policy + nextConsumer consumer.Traces + maxNumTraces uint64 + policies []*policy + // Policy Mutex + policiesMux sync.RWMutex idToTrace sync.Map policyTicker timeutils.TTicker tickerFrequency time.Duration @@ -154,12 +156,15 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume tsp.tickerFrequency = time.Second } + tsp.policiesMux.Lock() if tsp.policies == nil { err := tsp.loadSamplingPolicy(cfg.PolicyCfgs) if err != nil { + tsp.policiesMux.Unlock() return nil, err } } + tsp.policiesMux.Unlock() if tsp.decisionBatcher == nil { // this will start a goroutine in the background, so we run it only if everything went @@ -185,6 +190,8 @@ func withDecisionBatcher(batcher idbatcher.Batcher) Option { // withPolicies sets the sampling policies to be used by the processor. func withPolicies(policies []*policy) Option { return func(tsp *tailSamplingSpanProcessor) { + tsp.policiesMux.Lock() + defer tsp.policiesMux.Unlock() tsp.policies = policies } } @@ -251,6 +258,9 @@ func getSharedPolicyEvaluator(settings component.TelemetrySettings, cfg *sharedP case Probabilistic: pCfg := cfg.ProbabilisticCfg return sampling.NewProbabilisticSampler(settings, pCfg.HashSalt, pCfg.SamplingPercentage), nil + case StratifiedProbabilistic: + pCfg := cfg.StratifiedProbabilisticCfg + return sampling.NewStratifiedProbabilisticSampler(settings, pCfg.HashSalt, pCfg.SamplingPercentage), nil case StringAttribute: safCfg := cfg.StringAttributeCfg return sampling.NewStringAttributeFilter(settings, safCfg.Key, safCfg.Values, safCfg.EnabledRegexMatching, safCfg.CacheMaxSize, safCfg.InvertMatch), nil @@ -390,6 +400,15 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() { func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { tsp.logger.Debug("Sampling Policy Evaluation ticked") + // Re-initialize the time-window if stratified sampling is configured + tsp.policiesMux.RLock() + for _, p := range tsp.policies { + if stratified, ok := p.evaluator.(*sampling.StratifiedProbabilisticSampler); ok { + stratified.ResetWindow() + } + } + tsp.policiesMux.RUnlock() + tsp.loadPendingSamplingPolicy() ctx := context.Background() @@ -465,6 +484,10 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa startTime := time.Now() // Check all policies before making a final decision. + + tsp.policiesMux.RLock() + defer tsp.policiesMux.RUnlock() + for i, p := range tsp.policies { decision, err := p.evaluator.Evaluate(ctx, id, trace) latency := time.Since(startTime) diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 278dee610217c..6b61e849bc5db 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -296,12 +296,15 @@ func TestConcurrentArrivalAndEvaluation(t *testing.T) { }() tsp := sp.(*tailSamplingSpanProcessor) + + tsp.policiesMux.Lock() tpe := &TestPolicyEvaluator{ Started: evalStarted, CouldContinue: continueEvaluation, pe: tsp.policies[0].evaluator, } tsp.policies[0].evaluator = tpe + tsp.policiesMux.Unlock() for _, batch := range batches { wg.Add(1) @@ -498,11 +501,15 @@ func TestSetSamplingPolicy(t *testing.T) { tsp := p.(*tailSamplingSpanProcessor) + tsp.policiesMux.RLock() assert.Len(t, tsp.policies, 1) + tsp.policiesMux.RUnlock() tsp.policyTicker.OnTick() + tsp.policiesMux.RLock() assert.Len(t, tsp.policies, 1) + tsp.policiesMux.RUnlock() cfgs := []PolicyCfg{ { @@ -520,11 +527,15 @@ func TestSetSamplingPolicy(t *testing.T) { } tsp.SetSamplingPolicy(cfgs) + tsp.policiesMux.RLock() assert.Len(t, tsp.policies, 1) + tsp.policiesMux.RUnlock() tsp.policyTicker.OnTick() + tsp.policiesMux.RLock() assert.Len(t, tsp.policies, 2) + tsp.policiesMux.RUnlock() // Duplicate policy name. cfgs = []PolicyCfg{ @@ -549,12 +560,16 @@ func TestSetSamplingPolicy(t *testing.T) { } tsp.SetSamplingPolicy(cfgs) + tsp.policiesMux.RLock() assert.Len(t, tsp.policies, 2) + tsp.policiesMux.RUnlock() tsp.policyTicker.OnTick() // Should revert sampling policy. + tsp.policiesMux.RLock() assert.Len(t, tsp.policies, 2) + tsp.policiesMux.RUnlock() } func TestSubSecondDecisionTime(t *testing.T) { @@ -701,8 +716,10 @@ func TestDropPolicyIsFirstInPolicyList(t *testing.T) { require.NoError(t, err) tsp := p.(*tailSamplingSpanProcessor) + tsp.policiesMux.RLock() require.GreaterOrEqual(t, len(tsp.policies), 2) assert.Equal(t, "drop-policy", tsp.policies[0].name) + tsp.policiesMux.RUnlock() } func collectSpanIDs(trace ptrace.Traces) []pcommon.SpanID {