From 3b9cbf0c19996ab3183b6596197ceef66f2e4126 Mon Sep 17 00:00:00 2001 From: Miguel Gordo Garcia Date: Thu, 12 Mar 2026 16:18:07 -0500 Subject: [PATCH 1/2] Refactor sampling functionality into an internal library --- .../component/loki/process/stages/sampling.go | 54 +++--------- .../loki/secretfilter/secretfilter.go | 47 +++-------- .../loki/secretfilter/secretfilter_test.go | 9 ++ internal/sampling/sampler.go | 74 ++++++++++++++++ internal/sampling/sampler_test.go | 84 +++++++++++++++++++ 5 files changed, 193 insertions(+), 75 deletions(-) create mode 100644 internal/sampling/sampler.go create mode 100644 internal/sampling/sampler_test.go diff --git a/internal/component/loki/process/stages/sampling.go b/internal/component/loki/process/stages/sampling.go index 38fd02f1fe3..ebd8ca52b82 100644 --- a/internal/component/loki/process/stages/sampling.go +++ b/internal/component/loki/process/stages/sampling.go @@ -2,19 +2,16 @@ package stages import ( "fmt" - "math" - "math/rand" - "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" - "github.com/uber/jaeger-client-go/utils" + + "github.com/grafana/alloy/internal/sampling" ) const ( ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f" ) -const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff var ( defaultSamplingpReason = "sampling_stage" @@ -31,35 +28,27 @@ func (s *SamplingConfig) SetToDefault() { } func (s *SamplingConfig) Validate() error { - if s.SamplingRate < 0.0 || s.SamplingRate > 1.0 { + if err := sampling.ValidateRate(s.SamplingRate); err != nil { return fmt.Errorf(ErrSamplingStageInvalidRate, s.SamplingRate) } return nil } -// newSamplingStage creates a SamplingStage from config -// code from jaeger project. -// github.com/uber/jaeger-client-go@v2.30.0+incompatible/tracer.go:126 +// newSamplingStage creates a SamplingStage from config using the shared probabilistic sampler. func newSamplingStage(logger log.Logger, cfg SamplingConfig, registerer prometheus.Registerer) Stage { - samplingRate := math.Max(0.0, math.Min(cfg.SamplingRate, 1.0)) - samplingBoundary := uint64(float64(maxRandomNumber) * samplingRate) - seedGenerator := utils.NewRand(time.Now().UnixNano()) - source := rand.NewSource(seedGenerator.Int63()) return &samplingStage{ - logger: log.With(logger, "component", "stage", "type", "sampling"), - cfg: cfg, - dropCount: getDropCountMetric(registerer), - samplingBoundary: samplingBoundary, - source: source, + logger: log.With(logger, "component", "stage", "type", "sampling"), + cfg: cfg, + dropCount: getDropCountMetric(registerer), + sampler: sampling.NewSampler(cfg.SamplingRate), } } type samplingStage struct { - logger log.Logger - cfg SamplingConfig - dropCount *prometheus.CounterVec - samplingBoundary uint64 - source rand.Source + logger log.Logger + cfg SamplingConfig + dropCount *prometheus.CounterVec + sampler *sampling.Sampler } func (m *samplingStage) Run(in chan Entry) chan Entry { @@ -68,7 +57,7 @@ func (m *samplingStage) Run(in chan Entry) chan Entry { defer close(out) counter := m.dropCount.WithLabelValues(m.cfg.DropReason) for e := range in { - if m.isSampled() { + if m.sampler.ShouldSample() { out <- e continue } @@ -78,23 +67,6 @@ func (m *samplingStage) Run(in chan Entry) chan Entry { return out } -// code from jaeger project. -// github.com/uber/jaeger-client-go@v2.30.0+incompatible/sampler.go:144 -// func (s *ProbabilisticSampler) IsSampled(id TraceID, operation string) (bool, []Tag) -func (m *samplingStage) isSampled() bool { - return m.samplingBoundary >= m.randomID()&maxRandomNumber -} -func (m *samplingStage) randomID() uint64 { - val := m.randomNumber() - for val == 0 { - val = m.randomNumber() - } - return val -} -func (m *samplingStage) randomNumber() uint64 { - return uint64(m.source.Int63()) -} - // Cleanup implements Stage. func (*samplingStage) Cleanup() { // no-op diff --git a/internal/component/loki/secretfilter/secretfilter.go b/internal/component/loki/secretfilter/secretfilter.go index a5dd7c55f1c..a0427de23ff 100644 --- a/internal/component/loki/secretfilter/secretfilter.go +++ b/internal/component/loki/secretfilter/secretfilter.go @@ -5,8 +5,6 @@ import ( "context" "crypto/sha1" "fmt" - "math" - "math/rand" "os" "strings" "sync" @@ -15,6 +13,7 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/sampling" "github.com/grafana/alloy/internal/service/livedebugging" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" @@ -75,17 +74,14 @@ func (args *Arguments) SetToDefault() { // Validate implements syntax.Validator. func (args *Arguments) Validate() error { - if args.Rate < 0.0 || args.Rate > 1.0 { - return fmt.Errorf("secretfilter rate must be between 0.0 and 1.0, received %f", args.Rate) + if err := sampling.ValidateRate(args.Rate); err != nil { + return fmt.Errorf("secretfilter: %w", err) } return nil } var _ syntax.Validator = (*Arguments)(nil) -// maxRandomNumber is the maximum value used for sampling boundary -const maxRandomNumber = ^(uint64(1) << 63) // 0x7fffffffffffffff - var ( _ component.Component = (*Component)(nil) _ component.LiveDebugging = (*Component)(nil) @@ -105,8 +101,7 @@ type Component struct { redactPercent uint // sampling state (used when 0 < Rate < 1) - samplingBoundary uint64 - samplingSource rand.Source + sampler *sampling.Sampler metrics *metrics debugDataPublisher livedebugging.DebugDataPublisher @@ -324,29 +319,12 @@ func (c *Component) Run(ctx context.Context) error { } } -// shouldProcessEntry returns true if this entry should be processed through the secret filter (rate = probability of "keep" / process). +// shouldProcessEntry returns true if this entry should be processed through the secret filter (rate = probability of process). func (c *Component) shouldProcessEntry() bool { - rate := c.args.Rate - if rate >= 1.0 { + if c.sampler == nil { return true } - if rate <= 0.0 { - return false - } - return c.samplingBoundary >= c.samplingRandomID()&maxRandomNumber -} - -// samplingRandomID returns a random uint64 in [1, maxRandomNumber] for sampling. -// If samplingSource is nil (e.g. rate was 0 or 1), returns maxRandomNumber so the caller does not panic. -func (c *Component) samplingRandomID() uint64 { - if c.samplingSource == nil { - return maxRandomNumber - } - val := uint64(c.samplingSource.Int63()) - for val == 0 { - val = uint64(c.samplingSource.Int63()) - } - return val + return c.sampler.ShouldSample() } // processEntry scans the log entry for secrets and redacts them. Returns the @@ -450,12 +428,13 @@ func (c *Component) Update(args component.Arguments) error { } else { c.redactPercent = defaultRedactPercent } - if newArgs.Rate > 0 && newArgs.Rate < 1 { - c.samplingBoundary = uint64(float64(maxRandomNumber) * math.Max(0, math.Min(newArgs.Rate, 1))) - c.samplingSource = rand.NewSource(time.Now().UnixNano()) + if c.sampler == nil { + if err := sampling.ValidateRate(newArgs.Rate); err != nil { + return fmt.Errorf("failed to create gitleaks sampler: %w", err) + } + c.sampler = sampling.NewSampler(newArgs.Rate) } else { - c.samplingBoundary = 0 - c.samplingSource = nil + c.sampler.Update(newArgs.Rate) } c.metrics = newMetrics(c.opts.Registerer, newArgs.OriginLabel) diff --git a/internal/component/loki/secretfilter/secretfilter_test.go b/internal/component/loki/secretfilter/secretfilter_test.go index 2ed88c2ae31..6f53612ca62 100644 --- a/internal/component/loki/secretfilter/secretfilter_test.go +++ b/internal/component/loki/secretfilter/secretfilter_test.go @@ -28,6 +28,15 @@ func TestSecretFiltering(t *testing.T) { RunTestCases(t, testhelper.TestConfigs["default"], DefaultTestCases()) } +// TestDefaultRate_Unmarshalled verifies that when rate is not set in config, the default (1.0) is used. +// The syntax package calls SetToDefault() before decoding, so omitted optional fields keep their default. +func TestDefaultRate_Unmarshalled(t *testing.T) { + var args Arguments + config := `forward_to = []` + require.NoError(t, syntax.Unmarshal([]byte(config), &args)) + require.Equal(t, defaultRate, args.Rate, "rate should default to 1.0 when not set in config") +} + // TestGitleaksConfig_InvalidPath checks that a missing config path returns an error. // Valid custom config file loading (and [extend] useDefault) is tested in the // extend package so it runs in a separate process and avoids gitleaks global state. diff --git a/internal/sampling/sampler.go b/internal/sampling/sampler.go new file mode 100644 index 00000000000..6d5f975cf2e --- /dev/null +++ b/internal/sampling/sampler.go @@ -0,0 +1,74 @@ +// Package sampling provides rate based sampling for use by +// components that need to include a fraction of items in a "sampled" set (e.g. +// loki.secretfilter for processing rate, loki.process.stages.sampling for drop rate). +package sampling + +import ( + "fmt" + "math/rand" + "time" +) + +// maxRandomNumber is the maximum value used for the sampling boundary (0x7fffffffffffffff). +const maxRandomNumber = ^(uint64(1) << 63) + +// ValidateRate returns an error if rate is not in [0.0, 1.0]. +func ValidateRate(rate float64) error { + if rate < 0.0 || rate > 1.0 { + return fmt.Errorf("rate must be between 0.0 and 1.0, received %f", rate) + } + return nil +} + +// Sampler decides probabilistically whether an item should be included in the sample (ShouldSample returns true). +// Rate is the probability of inclusion; 0 = never, 1 = always, 0.5 = ~50%. +type Sampler struct { + rate float64 + boundary uint64 + source rand.Source +} + +// NewSampler returns a Sampler for the given rate. Rate must be in [0.0, 1.0]; +// call ValidateRate first or the sampler behavior for out-of-range rate is undefined. +func NewSampler(rate float64) *Sampler { + s := &Sampler{} + s.Update(rate) + return s +} + +// Update updates the sampler for a new rate (e.g. on component config change). +// Rate must be in [0.0, 1.0]; call ValidateRate first or behavior is undefined. +func (s *Sampler) Update(rate float64) { + s.rate = rate + if rate > 0 && rate < 1 { + s.boundary = uint64(float64(maxRandomNumber) * rate) + s.source = rand.NewSource(time.Now().UnixNano()) + } else { + s.boundary = 0 + s.source = nil + } +} + +// ShouldSample returns true with probability equal to the rate used to create or update the sampler. +// Rate 0 → always false; rate 1 → always true; otherwise uses the same probabilistic algorithm as Jaeger's ProbabilisticSampler. +func (s *Sampler) ShouldSample() bool { + if s.rate >= 1.0 { + return true + } + if s.rate <= 0.0 { + return false + } + return s.boundary >= s.randomID()&maxRandomNumber +} + +// randomID returns a random uint64 in [1, maxRandomNumber] for sampling. +func (s *Sampler) randomID() uint64 { + if s.source == nil { + return maxRandomNumber + } + val := uint64(s.source.Int63()) + for val == 0 { + val = uint64(s.source.Int63()) + } + return val +} diff --git a/internal/sampling/sampler_test.go b/internal/sampling/sampler_test.go new file mode 100644 index 00000000000..38b200becdc --- /dev/null +++ b/internal/sampling/sampler_test.go @@ -0,0 +1,84 @@ +package sampling + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValidateRate(t *testing.T) { + tests := []struct { + name string + rate float64 + wantErr bool + }{ + {"valid zero", 0, false}, + {"valid one", 1, false}, + {"valid half", 0.5, false}, + {"invalid negative", -0.1, true}, + {"invalid over one", 1.1, true}, + {"invalid large", 2.0, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ValidateRate(tt.rate) + if tt.wantErr { + require.Error(t, err) + require.Contains(t, err.Error(), "rate must be between") + } else { + require.NoError(t, err) + } + }) + } +} + +func TestSampler_RateZeroAlwaysFalse(t *testing.T) { + s := NewSampler(0) + for i := 0; i < 100; i++ { + require.False(t, s.ShouldSample(), "rate 0 should never sample") + } +} + +func TestSampler_RateOneAlwaysTrue(t *testing.T) { + s := NewSampler(1) + for i := 0; i < 100; i++ { + require.True(t, s.ShouldSample(), "rate 1 should always sample") + } +} + +func TestSampler_RateHalfApproximatelyHalf(t *testing.T) { + s := NewSampler(0.5) + const n = 1000 + var trues int + for i := 0; i < n; i++ { + if s.ShouldSample() { + trues++ + } + } + // Allow 35-65% to avoid flakiness + require.GreaterOrEqual(t, trues, int(0.35*float64(n)), "expected at least ~35%% sampled") + require.LessOrEqual(t, trues, int(0.65*float64(n)), "expected at most ~65%% sampled") +} + +func TestSampler_Update(t *testing.T) { + s := NewSampler(0.5) + // After Update(0), all false + s.Update(0) + for i := 0; i < 50; i++ { + require.False(t, s.ShouldSample()) + } + // After Update(1), all true + s.Update(1) + for i := 0; i < 50; i++ { + require.True(t, s.ShouldSample()) + } +} + +func TestSampler_OutOfRangeRateDeterministic(t *testing.T) { + // Out-of-range rate is not clamped; callers must ValidateRate first. + // Our ShouldSample guards still yield deterministic behavior (no randomness). + sNeg := NewSampler(-1) + require.False(t, sNeg.ShouldSample(), "negative rate → never sample") + sOver := NewSampler(2) + require.True(t, sOver.ShouldSample(), "rate > 1 → always sample") +} From fe646f769a2ad56b8380138b8eba4bd29b140150 Mon Sep 17 00:00:00 2001 From: Miguel Gordo Garcia Date: Thu, 12 Mar 2026 16:38:10 -0500 Subject: [PATCH 2/2] Update go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 601feec736f..43f9fe84818 100644 --- a/go.mod +++ b/go.mod @@ -225,7 +225,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.40.0 github.com/tilinna/clock v1.1.0 - github.com/uber/jaeger-client-go v2.30.0+incompatible + github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect github.com/vincent-petithory/dataurl v1.0.0 github.com/webdevops/azure-metrics-exporter v0.0.0-20230717202958-8701afc2b013 github.com/webdevops/go-common v0.0.0-20250617214056-2620f947754f