Skip to content

Commit

Permalink
perf: Improve approx_topk performance by reducing allocations. (#15450
Browse files Browse the repository at this point in the history
)

**What this PR does / why we need it**:

The metrics slice should keep a constant amount of memory by removing the smallest item when the maximum of labels is reached.

```
› benchstat before.log after.log
goos: linux
goarch: amd64
pkg: github.com/grafana/loki/v3/pkg/logql
cpu: AMD Ryzen 7 3700X 8-Core Processor             
                                │ before.log  │              after.log              │
                                │   sec/op    │   sec/op     vs base                │
_HeapCountMinSketchVectorAdd-16   839.0m ± 3%   418.9m ± 2%  -50.07% (p=0.000 n=10)

                                │  before.log  │              after.log               │
                                │     B/op     │     B/op      vs base                │
_HeapCountMinSketchVectorAdd-16   72.58Mi ± 0%   12.10Mi ± 0%  -83.33% (p=0.000 n=10)

                                │  before.log  │              after.log              │
                                │  allocs/op   │  allocs/op   vs base                │
_HeapCountMinSketchVectorAdd-16   4073.9k ± 0%   116.9k ± 0%  -97.13% (p=0.000 n=10)

```

**Checklist**
- [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] Title matches the required conventional commits format, see [here](https://www.conventionalcommits.org/en/v1.0.0/)
  - **Note** that Promtail is considered to be feature complete, and future development for logs collection will be in [Grafana Alloy](https://github.com/grafana/alloy). As such, `feat` PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
- [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md`
- [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](0d4416a)
  • Loading branch information
jeschkies authored Dec 19, 2024
1 parent 80e8e60 commit 04994ca
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 36 deletions.
52 changes: 38 additions & 14 deletions pkg/logql/count_min_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -157,8 +158,13 @@ type HeapCountMinSketchVector struct {
CountMinSketchVector

// internal set of observed events
observed map[string]struct{}
observed map[uint64]struct{}
maxLabels int

// The buffers are used by `labels.Bytes` similar to `series.Hash` in `codec.MergeResponse`. They are alloccated
// outside of the method in order to reuse them for the next `Add` call. This saves a lot of allocations.
// 1KB is used for `b` after some experimentation. Reusing the buffer is not thread safe.
buffer []byte
}

func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCountMinSketchVector {
Expand All @@ -172,31 +178,39 @@ func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCou
CountMinSketchVector: CountMinSketchVector{
T: ts,
F: f,
Metrics: make([]labels.Labels, 0, metricsLength),
Metrics: make([]labels.Labels, 0, metricsLength+1),
},
observed: make(map[string]struct{}),
observed: make(map[uint64]struct{}),
maxLabels: maxLabels,
buffer: make([]byte, 0, 1024),
}
}

func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) {
// TODO: we save a lot of allocations by reusing the buffer inside metric.String
metricString := metric.String()
v.F.Add(metricString, value)
v.buffer = metric.Bytes(v.buffer)

v.F.Add(v.buffer, value)

// Add our metric if we haven't seen it
if _, ok := v.observed[metricString]; !ok {

// TODO(karsten): There is a chance that the ids match but not the labels due to hash collision. Ideally there's
// an else block the compares the series labels. However, that's not trivial. Besides, instance.Series has the
// same issue in its deduping logic.
id := xxhash.Sum64(v.buffer)
if _, ok := v.observed[id]; !ok {
heap.Push(v, metric)
v.observed[metricString] = struct{}{}
} else if v.Metrics[0].String() == metricString {
// The smalles element has been updated to fix the heap.
v.observed[id] = struct{}{}
} else if labels.Equal(v.Metrics[0], metric) {
// The smallest element has been updated to fix the heap.
heap.Fix(v, 0)
}

// The maximum number of labels has been reached, so drop the smallest element.
if len(v.Metrics) > v.maxLabels {
metric := heap.Pop(v).(labels.Labels)
delete(v.observed, metric.String())
v.buffer = metric.Bytes(v.buffer)
id := xxhash.Sum64(v.buffer)
delete(v.observed, id)
}
}

Expand All @@ -205,8 +219,11 @@ func (v HeapCountMinSketchVector) Len() int {
}

func (v HeapCountMinSketchVector) Less(i, j int) bool {
left := v.F.Count(v.Metrics[i].String())
right := v.F.Count(v.Metrics[j].String())
v.buffer = v.Metrics[i].Bytes(v.buffer)
left := v.F.Count(v.buffer)

v.buffer = v.Metrics[j].Bytes(v.buffer)
right := v.F.Count(v.buffer)
return left < right
}

Expand Down Expand Up @@ -295,6 +312,11 @@ func (e *countMinSketchVectorAggEvaluator) Error() error {
type CountMinSketchVectorStepEvaluator struct {
exhausted bool
vec *CountMinSketchVector

// The buffers are used by `labels.Bytes` similar to `series.Hash` in `codec.MergeResponse`. They are alloccated
// outside of the method in order to reuse them for the next `Next` call. This saves a lot of allocations.
// 1KB is used for `b` after some experimentation. Reusing the buffer is not thread safe.
buffer []byte
}

var _ StepEvaluator = NewQuantileSketchVectorStepEvaluator(nil, 0)
Expand All @@ -303,6 +325,7 @@ func NewCountMinSketchVectorStepEvaluator(vec *CountMinSketchVector) *CountMinSk
return &CountMinSketchVectorStepEvaluator{
exhausted: false,
vec: vec,
buffer: make([]byte, 0, 1024),
}
}

Expand All @@ -315,7 +338,8 @@ func (e *CountMinSketchVectorStepEvaluator) Next() (bool, int64, StepResult) {

for i, labels := range e.vec.Metrics {

f := e.vec.F.Count(labels.String())
e.buffer = labels.Bytes(e.buffer)
f := e.vec.F.Count(e.buffer)

vec[i] = promql.Sample{
T: e.vec.T,
Expand Down
34 changes: 32 additions & 2 deletions pkg/logql/count_min_sketch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package logql

import (
"fmt"
"math/rand"
"testing"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -57,8 +59,9 @@ func TestCountMinSketchSerialization(t *testing.T) {
T: 42,
F: cms,
},
observed: make(map[string]struct{}, 0),
observed: make(map[uint64]struct{}, 0),
maxLabels: 10_000,
buffer: make([]byte, 0, 1024),
}
vec.Add(metric, 42.0)

Expand All @@ -68,7 +71,7 @@ func TestCountMinSketchSerialization(t *testing.T) {
Sketch: &logproto.CountMinSketch{
Depth: 2,
Width: 4,
Counters: []float64{0, 0, 0, 42, 0, 42, 0, 0},
Counters: []float64{0, 42, 0, 0, 0, 42, 0, 0},
Hyperloglog: hllBytes,
},
Metrics: []*logproto.Labels{
Expand All @@ -86,3 +89,30 @@ func TestCountMinSketchSerialization(t *testing.T) {
// The HeapCountMinSketchVector is serialized to a CountMinSketchVector.
require.Equal(t, round, vec.CountMinSketchVector)
}

func BenchmarkHeapCountMinSketchVectorAdd(b *testing.B) {
maxLabels := 10_000
v := NewHeapCountMinSketchVector(0, maxLabels, maxLabels)
if len(v.Metrics) > maxLabels || cap(v.Metrics) > maxLabels+1 {
b.Errorf("Length or capcity of metrics is too high: len=%d cap=%d", len(v.Metrics), cap(v.Metrics))
}

eventsCount := 100_000
uniqueEventsCount := 20_000
events := make([]labels.Labels, eventsCount)
for i := range events {
events[i] = labels.Labels{{Name: "event", Value: fmt.Sprintf("%d", i%uniqueEventsCount)}}
}

b.ResetTimer()
b.ReportAllocs()

for n := 0; n < b.N; n++ {
for _, event := range events {
v.Add(event, rand.Float64())
if len(v.Metrics) > maxLabels || cap(v.Metrics) > maxLabels+1 {
b.Errorf("Length or capcity of metrics is too high: len=%d cap=%d", len(v.Metrics), cap(v.Metrics))
}
}
}
}
16 changes: 8 additions & 8 deletions pkg/logql/sketch/cms.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type CountMinSketch struct {
Depth, Width uint32
Counters [][]float64
HyperLogLog *hyperloglog.Sketch //hyperloglog.New16(),
HyperLogLog *hyperloglog.Sketch // hyperloglog.New16(),
}

// NewCountMinSketch creates a new CMS for a given width and depth.
Expand Down Expand Up @@ -46,8 +46,8 @@ func (s *CountMinSketch) getPos(h1, h2, row uint32) uint32 {
}

// Add 'count' occurrences of the given input.
func (s *CountMinSketch) Add(event string, count float64) {
s.HyperLogLog.Insert(unsafeGetBytes(event))
func (s *CountMinSketch) Add(event []byte, count float64) {
s.HyperLogLog.Insert(event)
// see the comments in the hashn function for how using only 2
// hash functions rather than a function per row still fullfils
// the pairwise indendent hash functions requirement for CMS
Expand All @@ -58,7 +58,7 @@ func (s *CountMinSketch) Add(event string, count float64) {
}
}

func (s *CountMinSketch) Increment(event string) {
func (s *CountMinSketch) Increment(event []byte) {
s.Add(event, 1)
}

Expand All @@ -69,8 +69,8 @@ func (s *CountMinSketch) Increment(event string) {
// value that's less than Count(h) + count rather than all counters that h hashed to.
// Returns the new estimate for the event as well as the both hashes which can be used
// to identify the event for other things that need a hash.
func (s *CountMinSketch) ConservativeAdd(event string, count float64) (float64, uint32, uint32) {
s.HyperLogLog.Insert(unsafeGetBytes(event))
func (s *CountMinSketch) ConservativeAdd(event []byte, count float64) (float64, uint32, uint32) {
s.HyperLogLog.Insert(event)

min := float64(math.MaxUint64)

Expand All @@ -94,12 +94,12 @@ func (s *CountMinSketch) ConservativeAdd(event string, count float64) (float64,
return min, h1, h2
}

func (s *CountMinSketch) ConservativeIncrement(event string) (float64, uint32, uint32) {
func (s *CountMinSketch) ConservativeIncrement(event []byte) (float64, uint32, uint32) {
return s.ConservativeAdd(event, float64(1))
}

// Count returns the approximate min count for the given input.
func (s *CountMinSketch) Count(event string) float64 {
func (s *CountMinSketch) Count(event []byte) float64 {
min := float64(math.MaxUint64)
h1, h2 := hashn(event)

Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/sketch/cms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestCMS(_ *testing.T) {

for _, e := range events {
for i := 0; i < e.count; i++ {
cms.ConservativeIncrement(e.name)
cms.ConservativeIncrement(unsafeGetBytes(e.name))
}
}
}
4 changes: 2 additions & 2 deletions pkg/logql/sketch/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import "hash/fnv"
// SOFTWARE.
//
// <http://www.opensource.org/licenses/mit-license.php>
func hashn(s string) (h1, h2 uint32) {
func hashn(s []byte) (h1, h2 uint32) {
// This construction comes from
// http://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
// "Building a Better Bloom Filter", by Kirsch and Mitzenmacher. Their
Expand All @@ -34,7 +34,7 @@ func hashn(s string) (h1, h2 uint32) {
// Empirically, though, this seems to work "just fine".

fnv1a := fnv.New32a()
fnv1a.Write([]byte(s))
fnv1a.Write(s)
h1 = fnv1a.Sum32()

// inlined jenkins one-at-a-time hash
Expand Down
18 changes: 9 additions & 9 deletions pkg/logql/sketch/topk.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func (t *Topk) heapMinReplace(event string, estimate float64, removed string) {
// updates the BF to ensure that the removed event won't be mistakenly thought
// to be in the heap, and updates the BF to ensure that we would get a truthy result for the added event
func (t *Topk) updateBF(removed, added string) {
r1, r2 := hashn(removed)
a1, a2 := hashn(added)
r1, r2 := hashn(unsafeGetBytes(removed))
a1, a2 := hashn(unsafeGetBytes(added))
var pos uint32
for i := range t.bf {
// removed event
Expand Down Expand Up @@ -230,7 +230,7 @@ func unsafeGetBytes(s string) []byte {
// for each node in the heap and rebalance the heap, and then if the event we're observing has an estimate that is still
// greater than the minimum heap element count, we should put this event into the heap and remove the other one.
func (t *Topk) Observe(event string) {
estimate, h1, h2 := t.sketch.ConservativeIncrement(event)
estimate, h1, h2 := t.sketch.ConservativeIncrement(unsafeGetBytes(event))
t.hll.Insert(unsafeGetBytes(event))

if t.InTopk(h1, h2) {
Expand All @@ -246,12 +246,12 @@ func (t *Topk) Observe(event string) {
var h1, h2 uint32
var pos uint32
for i := range *t.heap {
(*t.heap)[i].count = t.sketch.Count((*t.heap)[i].event)
(*t.heap)[i].count = t.sketch.Count(unsafeGetBytes((*t.heap)[i].event))
if i <= len(*t.heap)/2 {
heap.Fix(t.heap, i)
}
// ensure all the bf buckets are truthy for the event
h1, h2 = hashn((*t.heap)[i].event)
h1, h2 = hashn(unsafeGetBytes((*t.heap)[i].event))
for j := range t.bf {
pos = t.sketch.getPos(h1, h2, uint32(j))
t.bf[j][pos] = true
Expand Down Expand Up @@ -304,11 +304,11 @@ func (t *Topk) Merge(from *Topk) error {

var all TopKResult
for _, e := range *t.heap {
all = append(all, element{Event: e.event, Count: t.sketch.Count(e.event)})
all = append(all, element{Event: e.event, Count: t.sketch.Count(unsafeGetBytes(e.event))})
}

for _, e := range *from.heap {
all = append(all, element{Event: e.event, Count: t.sketch.Count(e.event)})
all = append(all, element{Event: e.event, Count: t.sketch.Count(unsafeGetBytes(e.event))})
}

all = removeDuplicates(all)
Expand All @@ -317,7 +317,7 @@ func (t *Topk) Merge(from *Topk) error {
var h1, h2 uint32
// TODO: merging should also potentially replace it's bloomfilter? or 0 everything in the bloomfilter
for _, e := range all[:t.max] {
h1, h2 = hashn(e.Event)
h1, h2 = hashn(unsafeGetBytes(e.Event))
t.heapPush(temp, e.Event, float64(e.Count), h1, h2)
}
t.heap = temp
Expand Down Expand Up @@ -347,7 +347,7 @@ func (t *Topk) Topk() TopKResult {
for _, e := range *t.heap {
res = append(res, element{
Event: e.event,
Count: t.sketch.Count(e.event),
Count: t.sketch.Count(unsafeGetBytes(e.event)),
})
}
sort.Sort(res)
Expand Down

0 comments on commit 04994ca

Please sign in to comment.