-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathcomposite.go
143 lines (115 loc) · 4.48 KB
/
composite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
import (
"context"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.uber.org/zap"
)
type subpolicy struct {
// the subpolicy evaluator
evaluator PolicyEvaluator
// spans per second allocated to each subpolicy
allocatedSPS int64
// spans per second that each subpolicy sampled in this period
sampledSPS int64
}
// Composite evaluator and its internal data
type Composite struct {
// the subpolicy evaluators
subpolicies []*subpolicy
// maximum total spans per second that must be sampled
maxTotalSPS int64
// current unix timestamp second
currentSecond int64
// The time provider (can be different from clock for testing purposes)
timeProvider TimeProvider
logger *zap.Logger
}
var _ PolicyEvaluator = (*Composite)(nil)
// SubPolicyEvalParams defines the evaluator and max rate for a sub-policy
type SubPolicyEvalParams struct {
Evaluator PolicyEvaluator
MaxSpansPerSecond int64
}
// NewComposite creates a policy evaluator that samples all subpolicies.
func NewComposite(
logger *zap.Logger,
maxTotalSpansPerSecond int64,
subPolicyParams []SubPolicyEvalParams,
timeProvider TimeProvider,
) PolicyEvaluator {
var subpolicies []*subpolicy
for i := 0; i < len(subPolicyParams); i++ {
sub := &subpolicy{}
sub.evaluator = subPolicyParams[i].Evaluator
sub.allocatedSPS = subPolicyParams[i].MaxSpansPerSecond
// We are just starting, so there is no previous input, set it to 0
sub.sampledSPS = 0
subpolicies = append(subpolicies, sub)
}
return &Composite{
maxTotalSPS: maxTotalSpansPerSecond,
subpolicies: subpolicies,
timeProvider: timeProvider,
logger: logger,
}
}
// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) {
// Rate limiting works by counting spans that are sampled during each 1 second
// time period. Until the total number of spans during a particular second
// exceeds the allocated number of spans-per-second the traces are sampled,
// once the limit is exceeded the traces are no longer sampled. The counter
// restarts at the beginning of each second.
// Current counters and rate limits are kept separately for each subpolicy.
currSecond := c.timeProvider.getCurSecond()
if c.currentSecond != currSecond {
// This is a new second
c.currentSecond = currSecond
// Reset counters
for i := range c.subpolicies {
c.subpolicies[i].sampledSPS = 0
}
}
for _, sub := range c.subpolicies {
decision, err := sub.evaluator.Evaluate(ctx, traceID, trace)
if err != nil {
return Unspecified, err
}
if decision == Sampled || decision == InvertSampled {
// The subpolicy made a decision to Sample. Now we need to make our decision.
// Calculate resulting SPS counter if we decide to sample this trace
spansInSecondIfSampled := sub.sampledSPS + trace.SpanCount.Load()
// Check if the rate will be within the allocated bandwidth.
if spansInSecondIfSampled <= sub.allocatedSPS && spansInSecondIfSampled <= c.maxTotalSPS {
sub.sampledSPS = spansInSecondIfSampled
// Let the sampling happen
return Sampled, nil
}
// We exceeded the rate limit. Don't sample this trace.
// Note that we will continue evaluating new incoming traces against
// allocated SPS, we do not update sub.sampledSPS here in order to give
// chance to another smaller trace to be accepted later.
return NotSampled, nil
}
}
return NotSampled, nil
}
// OnDroppedSpans is called when the trace needs to be dropped, due to memory
// pressure, before the decision_wait time has been reached.
func (c *Composite) OnDroppedSpans(pcommon.TraceID, *TraceData) (Decision, error) {
// Here we have a number of possible solutions:
// 1. Random sample traces based on maxTotalSPS.
// 2. Perform full composite sampling logic by calling Composite.Evaluate(), essentially
// using partial trace data for sampling.
// 3. Sample everything.
//
// It seems that #2 may be the best choice from end user perspective, but
// it is not certain and it is also additional performance penalty when we are
// already under a memory (and possibly CPU) pressure situation.
//
// For now we are playing safe and go with #3. Investigating alternate options
// should be a future task.
return Sampled, nil
}