Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 97 additions & 64 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,31 @@ type (
bufferedMetricMap map[string]*bufferedMetric
)

type countShard struct {
sync.RWMutex
counts countsMap
}

type gaugeShard struct {
sync.RWMutex
gauges gaugesMap
}

type setShard struct {
sync.RWMutex
sets setsMap
}

type aggregator struct {
nbContextGauge uint64
nbContextCount uint64
nbContextSet uint64

countsM sync.RWMutex
gaugesM sync.RWMutex
setsM sync.RWMutex
shardsCount int
countShards []*countShard
gaugeShards []*gaugeShard
setShards []*setShard

gauges gaugesMap
counts countsMap
sets setsMap
histograms bufferedMetricContexts
distributions bufferedMetricContexts
timings bufferedMetricContexts
Expand All @@ -43,18 +56,25 @@ type aggregator struct {
wg sync.WaitGroup
}

func newAggregator(c *ClientEx, maxSamplesPerContext int64) *aggregator {
return &aggregator{
func newAggregator(c *ClientEx, maxSamplesPerContext int64, shardsCount int) *aggregator {
agg := &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
shardsCount: shardsCount,
countShards: make([]*countShard, shardsCount),
gaugeShards: make([]*gaugeShard, shardsCount),
setShards: make([]*setShard, shardsCount),
histograms: newBufferedContexts(newHistogramMetric, maxSamplesPerContext),
distributions: newBufferedContexts(newDistributionMetric, maxSamplesPerContext),
timings: newBufferedContexts(newTimingMetric, maxSamplesPerContext),
closed: make(chan struct{}),
stopChannelMode: make(chan struct{}),
}
for i := 0; i < shardsCount; i++ {
agg.countShards[i] = &countShard{counts: countsMap{}}
agg.gaugeShards[i] = &gaugeShard{gauges: gaugesMap{}}
agg.setShards[i] = &setShard{sets: setsMap{}}
}
return agg
}

func (a *aggregator) start(flushInterval time.Duration) {
Expand Down Expand Up @@ -135,40 +155,43 @@ func (a *aggregator) flushMetrics() []metric {
// We reset the values to avoid sending 'zero' values for metrics not
// sampled during this flush interval

a.setsM.Lock()
sets := a.sets
a.sets = setsMap{}
a.setsM.Unlock()

for _, s := range sets {
metrics = append(metrics, s.flushUnsafe()...)
for _, shard := range a.setShards {
shard.Lock()
sets := shard.sets
shard.sets = setsMap{}
shard.Unlock()
for _, s := range sets {
metrics = append(metrics, s.flushUnsafe()...)
}
atomic.AddUint64(&a.nbContextSet, uint64(len(sets)))
}

a.gaugesM.Lock()
gauges := a.gauges
a.gauges = gaugesMap{}
a.gaugesM.Unlock()

for _, g := range gauges {
metrics = append(metrics, g.flushUnsafe())
for _, shard := range a.gaugeShards {
shard.Lock()
gauges := shard.gauges
shard.gauges = gaugesMap{}
shard.Unlock()
for _, g := range gauges {
metrics = append(metrics, g.flushUnsafe())
}
atomic.AddUint64(&a.nbContextGauge, uint64(len(gauges)))
}

a.countsM.Lock()
counts := a.counts
a.counts = countsMap{}
a.countsM.Unlock()

for _, c := range counts {
metrics = append(metrics, c.flushUnsafe())
for _, shard := range a.countShards {
shard.Lock()
counts := shard.counts
shard.counts = countsMap{}
shard.Unlock()
for _, c := range counts {
metrics = append(metrics, c.flushUnsafe())
}
atomic.AddUint64(&a.nbContextCount, uint64(len(counts)))
}

metrics = a.histograms.flush(metrics)
metrics = a.distributions.flush(metrics)
metrics = a.timings.flush(metrics)

atomic.AddUint64(&a.nbContextCount, uint64(len(counts)))
atomic.AddUint64(&a.nbContextGauge, uint64(len(gauges)))
atomic.AddUint64(&a.nbContextSet, uint64(len(sets)))
return metrics
}

Expand Down Expand Up @@ -223,76 +246,86 @@ func getContextAndTags(name string, tags []string, cardinality Cardinality) (str
return s, s[len(name)+len(nameSeparatorSymbol)+cardStringLen:]
}

func getShardIndex(shardsCount int, context string) int {
if shardsCount <= 1 {
return 0
}
return int(hashString32(context) % uint32(shardsCount))
}

func (a *aggregator) count(name string, value int64, tags []string, cardinality Cardinality) error {
context := getContext(name, tags, cardinality)
a.countsM.RLock()
if count, found := a.counts[context]; found {
shard := a.countShards[getShardIndex(a.shardsCount, context)]
shard.RLock()
if count, found := shard.counts[context]; found {
count.sample(value)
a.countsM.RUnlock()
shard.RUnlock()
return nil
}
a.countsM.RUnlock()
shard.RUnlock()

metric := newCountMetric(name, value, tags, cardinality)

a.countsM.Lock()
shard.Lock()
// Check if another goroutines hasn't created the value between the RUnlock and 'Lock'
if count, found := a.counts[context]; found {
if count, found := shard.counts[context]; found {
count.sample(value)
a.countsM.Unlock()
shard.Unlock()
return nil
}

a.counts[context] = metric
a.countsM.Unlock()
shard.counts[context] = metric
shard.Unlock()
return nil
}

func (a *aggregator) gauge(name string, value float64, tags []string, cardinality Cardinality) error {
context := getContext(name, tags, cardinality)
a.gaugesM.RLock()
if gauge, found := a.gauges[context]; found {
shard := a.gaugeShards[getShardIndex(a.shardsCount, context)]
shard.RLock()
if gauge, found := shard.gauges[context]; found {
gauge.sample(value)
a.gaugesM.RUnlock()
shard.RUnlock()
return nil
}
a.gaugesM.RUnlock()
shard.RUnlock()

gauge := newGaugeMetric(name, value, tags, cardinality)

a.gaugesM.Lock()
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
if gauge, found := a.gauges[context]; found {
shard.Lock()
// Check if another goroutines hasn't created the value between the 'RUnlock' and 'Lock'
if gauge, found := shard.gauges[context]; found {
gauge.sample(value)
a.gaugesM.Unlock()
shard.Unlock()
return nil
}
a.gauges[context] = gauge
a.gaugesM.Unlock()
shard.gauges[context] = gauge
shard.Unlock()
return nil
}

func (a *aggregator) set(name string, value string, tags []string, cardinality Cardinality) error {
context := getContext(name, tags, cardinality)
a.setsM.RLock()
if set, found := a.sets[context]; found {
shard := a.setShards[getShardIndex(a.shardsCount, context)]
shard.RLock()
if set, found := shard.sets[context]; found {
set.sample(value)
a.setsM.RUnlock()
shard.RUnlock()
return nil
}
a.setsM.RUnlock()
shard.RUnlock()

metric := newSetMetric(name, value, tags, cardinality)

a.setsM.Lock()
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
if set, found := a.sets[context]; found {
shard.Lock()
// Check if another goroutines hasn't created the value between the 'RUnlock' and 'Lock'
if set, found := shard.sets[context]; found {
set.sample(value)
a.setsM.Unlock()
shard.Unlock()
return nil
}
a.sets[context] = metric
a.setsM.Unlock()
shard.sets[context] = metric
shard.Unlock()
return nil
}

Expand Down
44 changes: 44 additions & 0 deletions statsd/aggregator_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package statsd

import (
"fmt"
"testing"
)

// Prevent compiler from optimizing away function calls
var benchErr error

func BenchmarkAggregatorSharding(b *testing.B) {
shardCounts := []int{1, 2, 3, 4, 5, 6, 8, 16, 32, 64, 128, 256}

// Pre-generate metric names to avoid measuring fmt.Sprintf performance
const numMetrics = 100000
metricNames := make([]string, numMetrics)
Comment thread
StephenWakely marked this conversation as resolved.
for i := 0; i < numMetrics; i++ {
metricNames[i] = fmt.Sprintf("metric.%d", i)
}

for _, shards := range shardCounts {
b.Run(fmt.Sprintf("Shards_%d", shards), func(b *testing.B) {
a := newAggregator(nil, 0, shards)
tags := []string{"tag:1", "tag:2"}

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var err error
i := 0
for pb.Next() {
name := metricNames[i%numMetrics]
Comment thread
StephenWakely marked this conversation as resolved.
i++
err = a.count(name, 1, tags, CardinalityLow)
err = a.gauge(name, 10.0, tags, CardinalityLow)
err = a.set(name, "val", tags, CardinalityLow)
}
benchErr = err
})
})
if benchErr != nil {
b.Fatal(benchErr)
}
}
}
Loading
Loading