Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 97 additions & 64 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,31 @@ type (
bufferedMetricMap map[string]*bufferedMetric
)

type countShard struct {
sync.RWMutex
counts countsMap
}

type gaugeShard struct {
sync.RWMutex
gauges gaugesMap
}

type setShard struct {
sync.RWMutex
sets setsMap
}

type aggregator struct {
nbContextGauge uint64
nbContextCount uint64
nbContextSet uint64

countsM sync.RWMutex
gaugesM sync.RWMutex
setsM sync.RWMutex
shardsCount int
countShards []*countShard
gaugeShards []*gaugeShard
setShards []*setShard

gauges gaugesMap
counts countsMap
sets setsMap
histograms bufferedMetricContexts
distributions bufferedMetricContexts
timings bufferedMetricContexts
Expand All @@ -43,18 +56,25 @@ type aggregator struct {
wg sync.WaitGroup
}

func newAggregator(c *ClientEx, maxSamplesPerContext int64) *aggregator {
return &aggregator{
func newAggregator(c *ClientEx, maxSamplesPerContext int64, shardsCount int) *aggregator {
agg := &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
shardsCount: shardsCount,
countShards: make([]*countShard, shardsCount),
gaugeShards: make([]*gaugeShard, shardsCount),
setShards: make([]*setShard, shardsCount),
histograms: newBufferedContexts(newHistogramMetric, maxSamplesPerContext),
distributions: newBufferedContexts(newDistributionMetric, maxSamplesPerContext),
timings: newBufferedContexts(newTimingMetric, maxSamplesPerContext),
closed: make(chan struct{}),
stopChannelMode: make(chan struct{}),
}
for i := 0; i < shardsCount; i++ {
agg.countShards[i] = &countShard{counts: countsMap{}}
agg.gaugeShards[i] = &gaugeShard{gauges: gaugesMap{}}
agg.setShards[i] = &setShard{sets: setsMap{}}
}
return agg
}

func (a *aggregator) start(flushInterval time.Duration) {
Expand Down Expand Up @@ -135,40 +155,43 @@ func (a *aggregator) flushMetrics() []metric {
// We reset the values to avoid sending 'zero' values for metrics not
// sampled during this flush interval

a.setsM.Lock()
sets := a.sets
a.sets = setsMap{}
a.setsM.Unlock()

for _, s := range sets {
metrics = append(metrics, s.flushUnsafe()...)
for _, shard := range a.setShards {
shard.Lock()
sets := shard.sets
shard.sets = setsMap{}
shard.Unlock()
for _, s := range sets {
metrics = append(metrics, s.flushUnsafe()...)
}
atomic.AddUint64(&a.nbContextSet, uint64(len(sets)))
}

a.gaugesM.Lock()
gauges := a.gauges
a.gauges = gaugesMap{}
a.gaugesM.Unlock()

for _, g := range gauges {
metrics = append(metrics, g.flushUnsafe())
for _, shard := range a.gaugeShards {
shard.Lock()
gauges := shard.gauges
shard.gauges = gaugesMap{}
shard.Unlock()
for _, g := range gauges {
metrics = append(metrics, g.flushUnsafe())
}
atomic.AddUint64(&a.nbContextGauge, uint64(len(gauges)))
}

a.countsM.Lock()
counts := a.counts
a.counts = countsMap{}
a.countsM.Unlock()

for _, c := range counts {
metrics = append(metrics, c.flushUnsafe())
for _, shard := range a.countShards {
shard.Lock()
counts := shard.counts
shard.counts = countsMap{}
shard.Unlock()
for _, c := range counts {
metrics = append(metrics, c.flushUnsafe())
}
atomic.AddUint64(&a.nbContextCount, uint64(len(counts)))
}

metrics = a.histograms.flush(metrics)
metrics = a.distributions.flush(metrics)
metrics = a.timings.flush(metrics)

atomic.AddUint64(&a.nbContextCount, uint64(len(counts)))
atomic.AddUint64(&a.nbContextGauge, uint64(len(gauges)))
atomic.AddUint64(&a.nbContextSet, uint64(len(sets)))
return metrics
}

Expand Down Expand Up @@ -223,76 +246,86 @@ func getContextAndTags(name string, tags []string, cardinality Cardinality) (str
return s, s[len(name)+len(nameSeparatorSymbol)+cardStringLen:]
}

func getShardIndex(shardsCount int, context string) int {
if shardsCount <= 1 {
return 0
}
return int(hashString32(context) % uint32(shardsCount))
}

func (a *aggregator) count(name string, value int64, tags []string, cardinality Cardinality) error {
context := getContext(name, tags, cardinality)
a.countsM.RLock()
if count, found := a.counts[context]; found {
shard := a.countShards[getShardIndex(a.shardsCount, context)]
shard.RLock()
if count, found := shard.counts[context]; found {
count.sample(value)
a.countsM.RUnlock()
shard.RUnlock()
return nil
}
a.countsM.RUnlock()
shard.RUnlock()

metric := newCountMetric(name, value, tags, cardinality)

a.countsM.Lock()
shard.Lock()
// Check if another goroutines hasn't created the value between the RUnlock and 'Lock'
if count, found := a.counts[context]; found {
if count, found := shard.counts[context]; found {
count.sample(value)
a.countsM.Unlock()
shard.Unlock()
return nil
}

a.counts[context] = metric
a.countsM.Unlock()
shard.counts[context] = metric
shard.Unlock()
return nil
}

func (a *aggregator) gauge(name string, value float64, tags []string, cardinality Cardinality) error {
context := getContext(name, tags, cardinality)
a.gaugesM.RLock()
if gauge, found := a.gauges[context]; found {
shard := a.gaugeShards[getShardIndex(a.shardsCount, context)]
shard.RLock()
if gauge, found := shard.gauges[context]; found {
gauge.sample(value)
a.gaugesM.RUnlock()
shard.RUnlock()
return nil
}
a.gaugesM.RUnlock()
shard.RUnlock()

gauge := newGaugeMetric(name, value, tags, cardinality)

a.gaugesM.Lock()
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
if gauge, found := a.gauges[context]; found {
shard.Lock()
// Check if another goroutines hasn't created the value between the 'RUnlock' and 'Lock'
if gauge, found := shard.gauges[context]; found {
gauge.sample(value)
a.gaugesM.Unlock()
shard.Unlock()
return nil
}
a.gauges[context] = gauge
a.gaugesM.Unlock()
shard.gauges[context] = gauge
shard.Unlock()
return nil
}

func (a *aggregator) set(name string, value string, tags []string, cardinality Cardinality) error {
context := getContext(name, tags, cardinality)
a.setsM.RLock()
if set, found := a.sets[context]; found {
shard := a.setShards[getShardIndex(a.shardsCount, context)]
shard.RLock()
if set, found := shard.sets[context]; found {
set.sample(value)
a.setsM.RUnlock()
shard.RUnlock()
return nil
}
a.setsM.RUnlock()
shard.RUnlock()

metric := newSetMetric(name, value, tags, cardinality)

a.setsM.Lock()
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
if set, found := a.sets[context]; found {
shard.Lock()
// Check if another goroutines hasn't created the value between the 'RUnlock' and 'Lock'
if set, found := shard.sets[context]; found {
set.sample(value)
a.setsM.Unlock()
shard.Unlock()
return nil
}
a.sets[context] = metric
a.setsM.Unlock()
shard.sets[context] = metric
shard.Unlock()
return nil
}

Expand Down
29 changes: 29 additions & 0 deletions statsd/aggregator_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package statsd

import (
"fmt"
"testing"
)

func BenchmarkAggregatorSharding(b *testing.B) {
shardCounts := []int{1, 2, 3, 4, 5, 6, 8, 16, 32, 64, 128, 256}

for _, shards := range shardCounts {
b.Run(fmt.Sprintf("Shards_%d", shards), func(b *testing.B) {
a := newAggregator(nil, 0, shards)
tags := []string{"tag:1", "tag:2"}

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
i++
name := fmt.Sprintf("metric.%d", i%100000)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This benchmark is mostly going to be benchmarking the string allocation here. We should instead preallocate a big list of these strings before b.ResetTimer().

a.count(name, 1, tags, CardinalityLow)
a.gauge(name, 10.0, tags, CardinalityLow)
a.set(name, "val", tags, CardinalityLow)
}
})
})
}
}
Loading
Loading