Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ require (
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.40.0
github.com/tilinna/clock v1.1.0
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/vincent-petithory/dataurl v1.0.0
github.com/webdevops/azure-metrics-exporter v0.0.0-20230717202958-8701afc2b013
github.com/webdevops/go-common v0.0.0-20250617214056-2620f947754f
Expand Down
54 changes: 13 additions & 41 deletions internal/component/loki/process/stages/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@ package stages

import (
"fmt"
"math"
"math/rand"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber/jaeger-client-go/utils"

"github.com/grafana/alloy/internal/sampling"
)

const (
ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f"
)
const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff

var (
defaultSamplingpReason = "sampling_stage"
Expand All @@ -31,35 +28,27 @@ func (s *SamplingConfig) SetToDefault() {
}

func (s *SamplingConfig) Validate() error {
if s.SamplingRate < 0.0 || s.SamplingRate > 1.0 {
if err := sampling.ValidateRate(s.SamplingRate); err != nil {
return fmt.Errorf(ErrSamplingStageInvalidRate, s.SamplingRate)
}
return nil
}

// newSamplingStage creates a SamplingStage from config
// code from jaeger project.
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/tracer.go:126
// newSamplingStage creates a SamplingStage from config using the shared probabilistic sampler.
func newSamplingStage(logger log.Logger, cfg SamplingConfig, registerer prometheus.Registerer) Stage {
samplingRate := math.Max(0.0, math.Min(cfg.SamplingRate, 1.0))
samplingBoundary := uint64(float64(maxRandomNumber) * samplingRate)
seedGenerator := utils.NewRand(time.Now().UnixNano())
source := rand.NewSource(seedGenerator.Int63())
return &samplingStage{
logger: log.With(logger, "component", "stage", "type", "sampling"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
samplingBoundary: samplingBoundary,
source: source,
logger: log.With(logger, "component", "stage", "type", "sampling"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
sampler: sampling.NewSampler(cfg.SamplingRate),
}
}

type samplingStage struct {
logger log.Logger
cfg SamplingConfig
dropCount *prometheus.CounterVec
samplingBoundary uint64
source rand.Source
logger log.Logger
cfg SamplingConfig
dropCount *prometheus.CounterVec
sampler *sampling.Sampler
}

func (m *samplingStage) Run(in chan Entry) chan Entry {
Expand All @@ -68,7 +57,7 @@ func (m *samplingStage) Run(in chan Entry) chan Entry {
defer close(out)
counter := m.dropCount.WithLabelValues(m.cfg.DropReason)
for e := range in {
if m.isSampled() {
if m.sampler.ShouldSample() {
out <- e
continue
}
Expand All @@ -78,23 +67,6 @@ func (m *samplingStage) Run(in chan Entry) chan Entry {
return out
}

// code from jaeger project.
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/sampler.go:144
// func (s *ProbabilisticSampler) IsSampled(id TraceID, operation string) (bool, []Tag)
func (m *samplingStage) isSampled() bool {
return m.samplingBoundary >= m.randomID()&maxRandomNumber
}
func (m *samplingStage) randomID() uint64 {
val := m.randomNumber()
for val == 0 {
val = m.randomNumber()
}
return val
}
func (m *samplingStage) randomNumber() uint64 {
return uint64(m.source.Int63())
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
Expand Down
47 changes: 13 additions & 34 deletions internal/component/loki/secretfilter/secretfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"context"
"crypto/sha1"
"fmt"
"math"
"math/rand"
"os"
"strings"
"sync"
Expand All @@ -15,6 +13,7 @@ import (
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/sampling"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/syntax"
Expand Down Expand Up @@ -75,17 +74,14 @@ func (args *Arguments) SetToDefault() {

// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
if args.Rate < 0.0 || args.Rate > 1.0 {
return fmt.Errorf("secretfilter rate must be between 0.0 and 1.0, received %f", args.Rate)
if err := sampling.ValidateRate(args.Rate); err != nil {
return fmt.Errorf("secretfilter: %w", err)
}
return nil
}

var _ syntax.Validator = (*Arguments)(nil)

// maxRandomNumber is the maximum value used for sampling boundary
const maxRandomNumber = ^(uint64(1) << 63) // 0x7fffffffffffffff

var (
_ component.Component = (*Component)(nil)
_ component.LiveDebugging = (*Component)(nil)
Expand All @@ -105,8 +101,7 @@ type Component struct {
redactPercent uint

// sampling state (used when 0 < Rate < 1)
samplingBoundary uint64
samplingSource rand.Source
sampler *sampling.Sampler

metrics *metrics
debugDataPublisher livedebugging.DebugDataPublisher
Expand Down Expand Up @@ -324,29 +319,12 @@ func (c *Component) Run(ctx context.Context) error {
}
}

// shouldProcessEntry returns true if this entry should be processed through the secret filter (rate = probability of "keep" / process).
// shouldProcessEntry returns true if this entry should be processed through the secret filter (rate = probability of process).
func (c *Component) shouldProcessEntry() bool {
rate := c.args.Rate
if rate >= 1.0 {
if c.sampler == nil {
return true
}
if rate <= 0.0 {
return false
}
return c.samplingBoundary >= c.samplingRandomID()&maxRandomNumber
}

// samplingRandomID returns a random uint64 in [1, maxRandomNumber] for sampling.
// If samplingSource is nil (e.g. rate was 0 or 1), returns maxRandomNumber so the caller does not panic.
func (c *Component) samplingRandomID() uint64 {
if c.samplingSource == nil {
return maxRandomNumber
}
val := uint64(c.samplingSource.Int63())
for val == 0 {
val = uint64(c.samplingSource.Int63())
}
return val
return c.sampler.ShouldSample()
}

// processEntry scans the log entry for secrets and redacts them. Returns the
Expand Down Expand Up @@ -450,12 +428,13 @@ func (c *Component) Update(args component.Arguments) error {
} else {
c.redactPercent = defaultRedactPercent
}
if newArgs.Rate > 0 && newArgs.Rate < 1 {
c.samplingBoundary = uint64(float64(maxRandomNumber) * math.Max(0, math.Min(newArgs.Rate, 1)))
c.samplingSource = rand.NewSource(time.Now().UnixNano())
if c.sampler == nil {
if err := sampling.ValidateRate(newArgs.Rate); err != nil {
return fmt.Errorf("failed to create gitleaks sampler: %w", err)
}
c.sampler = sampling.NewSampler(newArgs.Rate)
} else {
c.samplingBoundary = 0
c.samplingSource = nil
c.sampler.Update(newArgs.Rate)
}
c.metrics = newMetrics(c.opts.Registerer, newArgs.OriginLabel)

Expand Down
9 changes: 9 additions & 0 deletions internal/component/loki/secretfilter/secretfilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ func TestSecretFiltering(t *testing.T) {
RunTestCases(t, testhelper.TestConfigs["default"], DefaultTestCases())
}

// TestDefaultRate_Unmarshalled verifies that when rate is not set in config, the default (1.0) is used.
// The syntax package calls SetToDefault() before decoding, so omitted optional fields keep their default.
func TestDefaultRate_Unmarshalled(t *testing.T) {
var args Arguments
config := `forward_to = []`
require.NoError(t, syntax.Unmarshal([]byte(config), &args))
require.Equal(t, defaultRate, args.Rate, "rate should default to 1.0 when not set in config")
}

// TestGitleaksConfig_InvalidPath checks that a missing config path returns an error.
// Valid custom config file loading (and [extend] useDefault) is tested in the
// extend package so it runs in a separate process and avoids gitleaks global state.
Expand Down
74 changes: 74 additions & 0 deletions internal/sampling/sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Package sampling provides rate based sampling for use by
// components that need to include a fraction of items in a "sampled" set (e.g.
// loki.secretfilter for processing rate, loki.process.stages.sampling for drop rate).
package sampling

import (
"fmt"
"math/rand"
"time"
)

// maxRandomNumber is the maximum value used for the sampling boundary (0x7fffffffffffffff).
const maxRandomNumber = ^(uint64(1) << 63)

// ValidateRate returns an error if rate is not in [0.0, 1.0].
func ValidateRate(rate float64) error {
if rate < 0.0 || rate > 1.0 {
return fmt.Errorf("rate must be between 0.0 and 1.0, received %f", rate)
}
return nil
}

// Sampler decides probabilistically whether an item should be included in the sample (ShouldSample returns true).
// Rate is the probability of inclusion; 0 = never, 1 = always, 0.5 = ~50%.
type Sampler struct {
rate float64
boundary uint64
source rand.Source
}

// NewSampler returns a Sampler for the given rate. Rate must be in [0.0, 1.0];
// call ValidateRate first or the sampler behavior for out-of-range rate is undefined.
func NewSampler(rate float64) *Sampler {
s := &Sampler{}
s.Update(rate)
return s
}

// Update updates the sampler for a new rate (e.g. on component config change).
// Rate must be in [0.0, 1.0]; call ValidateRate first or behavior is undefined.
func (s *Sampler) Update(rate float64) {
s.rate = rate
if rate > 0 && rate < 1 {
s.boundary = uint64(float64(maxRandomNumber) * rate)
s.source = rand.NewSource(time.Now().UnixNano())
} else {
s.boundary = 0
s.source = nil
}
}

// ShouldSample returns true with probability equal to the rate used to create or update the sampler.
// Rate 0 → always false; rate 1 → always true; otherwise uses the same probabilistic algorithm as Jaeger's ProbabilisticSampler.
func (s *Sampler) ShouldSample() bool {
if s.rate >= 1.0 {
return true
}
if s.rate <= 0.0 {
return false
}
return s.boundary >= s.randomID()&maxRandomNumber
}

// randomID returns a random uint64 in [1, maxRandomNumber] for sampling.
func (s *Sampler) randomID() uint64 {
if s.source == nil {
return maxRandomNumber
}
val := uint64(s.source.Int63())
for val == 0 {
val = uint64(s.source.Int63())
}
return val
}
84 changes: 84 additions & 0 deletions internal/sampling/sampler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package sampling

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidateRate(t *testing.T) {
tests := []struct {
name string
rate float64
wantErr bool
}{
{"valid zero", 0, false},
{"valid one", 1, false},
{"valid half", 0.5, false},
{"invalid negative", -0.1, true},
{"invalid over one", 1.1, true},
{"invalid large", 2.0, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ValidateRate(tt.rate)
if tt.wantErr {
require.Error(t, err)
require.Contains(t, err.Error(), "rate must be between")
} else {
require.NoError(t, err)
}
})
}
}

func TestSampler_RateZeroAlwaysFalse(t *testing.T) {
s := NewSampler(0)
for i := 0; i < 100; i++ {
require.False(t, s.ShouldSample(), "rate 0 should never sample")
}
}

func TestSampler_RateOneAlwaysTrue(t *testing.T) {
s := NewSampler(1)
for i := 0; i < 100; i++ {
require.True(t, s.ShouldSample(), "rate 1 should always sample")
}
}

func TestSampler_RateHalfApproximatelyHalf(t *testing.T) {
s := NewSampler(0.5)
const n = 1000
var trues int
for i := 0; i < n; i++ {
if s.ShouldSample() {
trues++
}
}
// Allow 35-65% to avoid flakiness
require.GreaterOrEqual(t, trues, int(0.35*float64(n)), "expected at least ~35%% sampled")
require.LessOrEqual(t, trues, int(0.65*float64(n)), "expected at most ~65%% sampled")
}

func TestSampler_Update(t *testing.T) {
s := NewSampler(0.5)
// After Update(0), all false
s.Update(0)
for i := 0; i < 50; i++ {
require.False(t, s.ShouldSample())
}
// After Update(1), all true
s.Update(1)
for i := 0; i < 50; i++ {
require.True(t, s.ShouldSample())
}
}

func TestSampler_OutOfRangeRateDeterministic(t *testing.T) {
// Out-of-range rate is not clamped; callers must ValidateRate first.
// Our ShouldSample guards still yield deterministic behavior (no randomness).
sNeg := NewSampler(-1)
require.False(t, sNeg.ShouldSample(), "negative rate → never sample")
sOver := NewSampler(2)
require.True(t, sOver.ShouldSample(), "rate > 1 → always sample")
}
Loading