From 1d98eeb9d79bcfdd6e763f28e3077cb5959d12f4 Mon Sep 17 00:00:00 2001 From: Joshua Colvin Date: Tue, 29 Nov 2022 17:30:57 -0700 Subject: [PATCH 1/3] Reduce code duplication for resetting expdecaysample Use single function for creating bounded histogram sample --- eth/protocols/eth/handler.go | 4 +--- eth/protocols/snap/handler.go | 4 +--- metrics/sample.go | 6 ++++++ p2p/tracker/tracker.go | 4 +--- rpc/metrics.go | 4 +--- 5 files changed, 10 insertions(+), 12 deletions(-) diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 42d0412a127c..8149db149568 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -218,9 +218,7 @@ func handleMessage(backend Backend, peer *Peer) error { h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code) defer func(start time.Time) { sampler := func() metrics.Sample { - return metrics.ResettingSample( - metrics.NewExpDecaySample(1028, 0.015), - ) + return metrics.NewBoundedHistogramSample() } metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(start).Microseconds()) }(time.Now()) diff --git a/eth/protocols/snap/handler.go b/eth/protocols/snap/handler.go index bd7ce9e71543..968fcfbfa556 100644 --- a/eth/protocols/snap/handler.go +++ b/eth/protocols/snap/handler.go @@ -145,9 +145,7 @@ func HandleMessage(backend Backend, peer *Peer) error { h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code) defer func(start time.Time) { sampler := func() metrics.Sample { - return metrics.ResettingSample( - metrics.NewExpDecaySample(1028, 0.015), - ) + return metrics.NewBoundedHistogramSample() } metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(start).Microseconds()) }(start) diff --git a/metrics/sample.go b/metrics/sample.go index 5398dd42d5de..3ec7fff13600 100644 --- a/metrics/sample.go +++ b/metrics/sample.go @@ -32,6 +32,12 @@ type Sample interface { Update(int64) } +func NewBoundedHistogramSample() Sample { + return ResettingSample( + NewExpDecaySample(1028, 0.015), + ) +} + // ExpDecaySample is an exponentially-decaying sample using a forward-decaying // priority reservoir. See Cormode et al's "Forward Decay: A Practical Time // Decay Model for Streaming Systems". diff --git a/p2p/tracker/tracker.go b/p2p/tracker/tracker.go index 6a733b9ba51e..0e473420f1d3 100644 --- a/p2p/tracker/tracker.go +++ b/p2p/tracker/tracker.go @@ -197,9 +197,7 @@ func (t *Tracker) Fulfil(peer string, version uint, code uint64, id uint64) { h := fmt.Sprintf("%s/%s/%d/%#02x", waitHistName, t.protocol, req.version, req.reqCode) sampler := func() metrics.Sample { - return metrics.ResettingSample( - metrics.NewExpDecaySample(1028, 0.015), - ) + return metrics.NewBoundedHistogramSample() } metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(req.time).Microseconds()) } diff --git a/rpc/metrics.go b/rpc/metrics.go index ef7449ce05e2..4573fee110c0 100644 --- a/rpc/metrics.go +++ b/rpc/metrics.go @@ -42,9 +42,7 @@ func updateServeTimeHistogram(method string, success bool, elapsed time.Duration } h := fmt.Sprintf("%s/%s/%s", serveTimeHistName, method, note) sampler := func() metrics.Sample { - return metrics.ResettingSample( - metrics.NewExpDecaySample(1028, 0.015), - ) + return metrics.NewBoundedHistogramSample() } metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(elapsed.Nanoseconds()) } From 852baee5d0e404572a1e62c498088f91878c92bc Mon Sep 17 00:00:00 2001 From: Joshua Colvin Date: Sun, 11 Dec 2022 19:29:16 -0700 Subject: [PATCH 2/3] Add SlidingTimeWindowArray The current go-ethereum histogram metrics object is set to reset whenever the metrics endpoint is scraped. The reset is done to prevent the problem of histograms not getting updated for commands that are rarely called. However, this breaks many Prometheus assumptions and makes computing correct metrics difficult and unreliable. The go-ethereum metrics appears to be a fork of rcrowley/go-metrics which in turn is a port of the Java library https://github.com/dropwizard/metrics. Looking at https://metrics.dropwizard.io/4.2.0/manual/core.html#exponentially-decaying-reservoirs there is an implementation of `SlidingTimeWindowArrayReservoir`: > SlidingTimeWindowArrayReservoir is comparable with > ExponentiallyDecayingReservoir in terms GC overhead and performance. As for > required memory, SlidingTimeWindowArrayReservoir takes ~128 bits per stored > measurement and you can simply calculate required amount of heap. > Example: 10K measurements / sec with reservoir storing time of 1 minute will > take 10000 * 60 * 128 / 8 = 9600000 bytes ~ 9 megabytes Here is more information on the sampling error introduced by Exponential Decay sampling: https://medium.com/expedia-group-tech/your-latency-metrics-could-be-misleading-you-how-hdrhistogram-can-help-9d545b598374 --- go.mod | 3 +- go.sum | 9 +- metrics/chunked_associative_array.go | 221 ++++++++++++++++++++ metrics/chunked_associative_array_test.go | 170 +++++++++++++++ metrics/sample_test.go | 12 ++ metrics/sliding_time_window_array_sample.go | 123 +++++++++++ 6 files changed, 533 insertions(+), 5 deletions(-) create mode 100644 metrics/chunked_associative_array.go create mode 100644 metrics/chunked_associative_array_test.go create mode 100644 metrics/sliding_time_window_array_sample.go diff --git a/go.mod b/go.mod index 79bdc2551abe..71accb557874 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/fjl/gencodec v0.0.0-20230517082657-f9840df7b83e github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 github.com/fsnotify/fsnotify v1.6.0 + github.com/gammazero/deque v0.2.1 github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff github.com/gballet/go-verkle v0.1.1-0.20231031103413-a67434b50f46 github.com/gofrs/flock v0.8.1 @@ -98,7 +99,7 @@ require ( github.com/consensys/bavard v0.1.13 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect - github.com/deepmap/oapi-codegen v1.6.0 // indirect + github.com/deepmap/oapi-codegen v1.8.2 // indirect github.com/dlclark/regexp2 v1.7.0 // indirect github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61 // indirect github.com/go-ole/go-ole v1.3.0 // indirect diff --git a/go.sum b/go.sum index b692629b6b6a..e7fb7a9b04be 100644 --- a/go.sum +++ b/go.sum @@ -160,8 +160,9 @@ github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= -github.com/deepmap/oapi-codegen v1.6.0 h1:w/d1ntwh91XI0b/8ja7+u5SvA4IFfM0UNNLmiDR1gg0= github.com/deepmap/oapi-codegen v1.6.0/go.mod h1:ryDa9AgbELGeB+YEXE1dR53yAjHwFvE9iAUlWl9Al3M= +github.com/deepmap/oapi-codegen v1.8.2 h1:SegyeYGcdi0jLLrpbCMoJxnUUn8GBXHsvr4rbzjuhfU= +github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= @@ -196,6 +197,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= +github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61 h1:IZqZOB2fydHte3kUgxrzK5E1fW7RQGeDwE8F/ZZnUYc= github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61/go.mod h1:Q0X6pkwTILDlzrGEckF6HKjXe48EgsY/l7K7vhY4MW8= github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc= @@ -204,6 +207,7 @@ github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x github.com/gballet/go-verkle v0.1.1-0.20231031103413-a67434b50f46 h1:BAIP2GihuqhwdILrV+7GJel5lyPV3u1+PgzrWLc0TkE= github.com/gballet/go-verkle v0.1.1-0.20231031103413-a67434b50f46/go.mod h1:QNpY22eby74jVhqH4WhDLDwxc/vqsern6pW+u2kbkpc= github.com/getkin/kin-openapi v0.53.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4= +github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM= @@ -221,7 +225,6 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= -github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= @@ -777,8 +780,6 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= diff --git a/metrics/chunked_associative_array.go b/metrics/chunked_associative_array.go new file mode 100644 index 000000000000..caaa1af4b1c4 --- /dev/null +++ b/metrics/chunked_associative_array.go @@ -0,0 +1,221 @@ +package metrics + +// Ported from +// https://github.com/dropwizard/metrics/blob/release/4.2.x/metrics-core/src/main/java/com/codahale/metrics/ChunkedAssociativeLongArray.java + +import ( + "github.com/gammazero/deque" + "sort" + "strconv" + "strings" +) + +const ( + ChunkedAssociativeArrayDefaultChunkSize = 512 + ChunkedAssociativeArrayMaxCacheSize = 128 +) + +type ChunkedAssociativeArray struct { + defaultChunkSize int + + /* + * We use this ArrayDeque as cache to store chunks that are expired and removed from main data structure. + * Then instead of allocating new AssociativeArrayChunk immediately we are trying to poll one from this deque. + * So if you have constant or slowly changing load ChunkedAssociativeLongArray will never + * throw away old chunks or allocate new ones which makes this data structure almost garbage free. + */ + chunksCache *deque.Deque[*AssociativeArrayChunk] + chunks *deque.Deque[*AssociativeArrayChunk] +} + +func NewChunkedAssociativeArray(chunkSize int) *ChunkedAssociativeArray { + return &ChunkedAssociativeArray{ + defaultChunkSize: chunkSize, + chunksCache: deque.New[*AssociativeArrayChunk](ChunkedAssociativeArrayMaxCacheSize, ChunkedAssociativeArrayMaxCacheSize), + chunks: deque.New[*AssociativeArrayChunk](), + } +} + +func (caa *ChunkedAssociativeArray) Clear() { + for i := 0; i < caa.chunks.Len(); i++ { + chunk := caa.chunks.PopBack() + caa.freeChunk(chunk) + } +} + +func (caa *ChunkedAssociativeArray) AllocateChunk() *AssociativeArrayChunk { + if caa.chunksCache.Len() == 0 { + return NewAssociativeArrayChunk(caa.defaultChunkSize) + } + + chunk := caa.chunksCache.PopBack() + chunk.cursor = 0 + chunk.startIndex = 0 + chunk.chunkSize = len(chunk.keys) + + return chunk +} + +func (caa *ChunkedAssociativeArray) freeChunk(chunk *AssociativeArrayChunk) { + if caa.chunksCache.Len() < ChunkedAssociativeArrayMaxCacheSize { + caa.chunksCache.PushBack(chunk) + } +} + +func (caa *ChunkedAssociativeArray) Put(key int64, value int64) { + var activeChunk *AssociativeArrayChunk + if caa.chunks.Len() > 0 { + activeChunk = caa.chunks.Back() + } + + if activeChunk != nil && activeChunk.cursor != 0 && activeChunk.keys[activeChunk.cursor-1] > key { + // Key must be the same as last inserted or bigger + key = activeChunk.keys[activeChunk.cursor-1] + 1 + } + if activeChunk == nil || activeChunk.cursor-activeChunk.startIndex == activeChunk.chunkSize { + // The last chunk doesn't exist or full + activeChunk = caa.AllocateChunk() + caa.chunks.PushBack(activeChunk) + } + activeChunk.Append(key, value) +} + +func (caa *ChunkedAssociativeArray) Values() []int64 { + valuesSize := caa.Size() + if valuesSize == 0 { + // Empty + return []int64{0} + } + + values := make([]int64, 0, valuesSize) + caa.chunks.Index(func(chunk *AssociativeArrayChunk) bool { + values = append(values, chunk.values[chunk.startIndex:chunk.cursor]...) + return false + }) + + return values +} + +func (caa *ChunkedAssociativeArray) Size() int { + var result int + caa.chunks.Index(func(chunk *AssociativeArrayChunk) bool { + result += chunk.cursor - chunk.startIndex + return false + }) + return result +} + +func (caa *ChunkedAssociativeArray) String() string { + var builder strings.Builder + first := true + caa.chunks.Index(func(chunk *AssociativeArrayChunk) bool { + if first { + first = false + } else { + builder.WriteString("->") + } + builder.WriteString("[") + for i := chunk.startIndex; i < chunk.cursor; i++ { + builder.WriteString("(") + builder.WriteString(strconv.FormatInt(chunk.keys[i], 10)) + builder.WriteString(": ") + builder.WriteString(strconv.FormatInt(chunk.values[i], 10)) + builder.WriteString(") ") + } + builder.WriteString("]") + return false + }) + + return builder.String() +} + +// Trim tries to trim all beyond specified boundaries +// startKey: the start value for which all elements less than it should be removed. +// endKey: the end value for which all elements greater/equals than it should be removed +func (caa *ChunkedAssociativeArray) Trim(startKey int64, endKey int64) { + /* + * [3, 4, 5, 9] -> [10, 13, 14, 15] -> [21, 24, 29, 30] -> [31] :: start layout + * |5______________________________23| :: trim(5, 23) + * [5, 9] -> [10, 13, 14, 15] -> [21] :: result layout + */ + // Remove elements that are too large + indexBeforeEndKey := caa.chunks.RIndex(func(chunk *AssociativeArrayChunk) bool { + if chunk.IsFirstElementEmptyOrGreaterEqualThanKey(endKey) { + return false + } + + chunk.cursor = chunk.FindFirstIndexOfGreaterEqualElements(endKey) + return true + }) + + // Remove chunks that only contain elements that are too large + if indexBeforeEndKey >= 0 { + for i := caa.chunks.Len() - 1; i > indexBeforeEndKey; i-- { + chunk := caa.chunks.PopBack() + caa.freeChunk(chunk) + } + } + + // Remove elements that are too small + indexAfterStartKey := caa.chunks.Index(func(chunk *AssociativeArrayChunk) bool { + if chunk.IsLastElementEmptyOrLessThanKey(startKey) { + return false + } + + newStartIndex := chunk.FindFirstIndexOfGreaterEqualElements(startKey) + if chunk.startIndex != newStartIndex { + chunk.startIndex = newStartIndex + chunk.chunkSize = chunk.cursor - chunk.startIndex + } + return true + }) + + // Remove chunks that only contain elements that are too small + for i := 0; i < indexAfterStartKey; i++ { + chunk := caa.chunks.PopFront() + caa.freeChunk(chunk) + } +} + +type AssociativeArrayChunk struct { + keys []int64 + values []int64 + + chunkSize int + startIndex int + cursor int +} + +func NewAssociativeArrayChunk(chunkSize int) *AssociativeArrayChunk { + return &AssociativeArrayChunk{ + keys: make([]int64, chunkSize), + values: make([]int64, chunkSize), + chunkSize: chunkSize, + startIndex: 0, + cursor: 0, + } +} + +func (c *AssociativeArrayChunk) Append(key int64, value int64) { + c.keys[c.cursor] = key + c.values[c.cursor] = value + c.cursor++ +} + +func (c *AssociativeArrayChunk) IsFirstElementEmptyOrGreaterEqualThanKey(key int64) bool { + return c.cursor == c.startIndex || c.keys[c.startIndex] >= key +} + +func (c *AssociativeArrayChunk) IsLastElementEmptyOrLessThanKey(key int64) bool { + return c.cursor == c.startIndex || c.keys[c.cursor-1] < key +} + +func (c *AssociativeArrayChunk) FindFirstIndexOfGreaterEqualElements(minKey int64) int { + if c.cursor == c.startIndex || c.keys[c.startIndex] >= minKey { + return c.startIndex + } + elements := c.keys[c.startIndex:c.cursor] + keyIndex := sort.Search(len(elements), func(i int) bool { return elements[i] >= minKey }) + + return c.startIndex + keyIndex +} diff --git a/metrics/chunked_associative_array_test.go b/metrics/chunked_associative_array_test.go new file mode 100644 index 000000000000..e7b1bc3e8d0f --- /dev/null +++ b/metrics/chunked_associative_array_test.go @@ -0,0 +1,170 @@ +package metrics + +import ( + "testing" +) + +// Ported from +// https://github.com/dropwizard/metrics/blob/release/4.2.x/metrics-core/src/main/java/com/codahale/metrics/ChunkedAssociativeLongArray.java + +func TestChunkedAssociativeArray_Put(t *testing.T) { + array := NewChunkedAssociativeArray(3) + // Test that time cannot go backwards + array.Put(7, 7) + expectedStringBefore := "[(7: 7) ]" + if array.String() != expectedStringBefore { + t.Errorf("initial array string incorrect: %s", array.String()) + } +} + +func TestChunkedAssociativeArray_ValuesEmpty(t *testing.T) { + array := NewChunkedAssociativeArray(3) + + values := array.Values() + if len(values) != 1 { + t.Fatalf("unexpected length of values: %d", len(values)) + } + if values[0] != 0 { + t.Errorf("unexpected value in empty values: %d", values[0]) + } +} + +func TestChunkedAssociativeArray_Trim(t *testing.T) { + array := NewChunkedAssociativeArray(3) + array.Put(-7, 7) + array.Put(-5, 7) + array.Put(-4, 7) + array.Put(-3, 3) + array.Put(-2, 1) + array.Put(0, 5) + array.Put(3, 0) + array.Put(9, 8) + array.Put(15, 0) + array.Put(19, 5) + array.Put(21, 5) + array.Put(34, -9) + array.Put(109, 5) + + expectedStringBefore := "[(-7: 7) (-5: 7) (-4: 7) ]->[(-3: 3) (-2: 1) (0: 5) ]->[(3: 0) (9: 8) (15: 0) ]->[(19: 5) (21: 5) (34: -9) ]->[(109: 5) ]" + if array.String() != expectedStringBefore { + t.Errorf("initial array string incorrect: %s", array.String()) + } + valuesBefore := array.Values() + expectedValuesBefore := []int64{7, 7, 7, 3, 1, 5, 0, 8, 0, 5, 5, -9, 5} + if len(valuesBefore) != len(expectedValuesBefore) { + t.Errorf("initial values returned incorrect length: %d", len(valuesBefore)) + } else { + for i, value := range valuesBefore { + if value != expectedValuesBefore[i] { + t.Errorf("unexpected value %d at index %d", value, i) + } + } + } + if array.Size() != 13 { + t.Errorf("initial array size incorrect: %d", array.Size()) + } + + array.Trim(-2, 20) + + expectedStringAfter := "[(-2: 1) (0: 5) ]->[(3: 0) (9: 8) (15: 0) ]->[(19: 5) ]" + if array.String() != expectedStringAfter { + t.Errorf("array string incorrect: %s", array.String()) + } + valuesAfter := array.Values() + expectedValuesAfter := []int64{1, 5, 0, 8, 0, 5} + if len(valuesAfter) != len(expectedValuesAfter) { + t.Errorf("values returned incorrect length: %d", len(valuesAfter)) + } else { + for i, value := range valuesAfter { + if value != expectedValuesAfter[i] { + t.Errorf("unexpected value %d at index %d", value, i) + } + } + } + if array.Size() != 6 { + t.Errorf("array size incorrect: %d", array.Size()) + } + + array.Trim(-2, 16) + expectedStringAfter2 := "[(-2: 1) (0: 5) ]->[(3: 0) (9: 8) (15: 0) ]" + if array.String() != expectedStringAfter2 { + t.Errorf("array string incorrect: %s", array.String()) + } + + initialCacheCount := array.chunksCache.Len() + + // Have AllocateChunk take from cache + array.Put(200, 555) + expectedStringAfter3 := "[(-2: 1) (0: 5) ]->[(3: 0) (9: 8) (15: 0) ]->[(200: 555) ]" + if array.String() != expectedStringAfter3 { + t.Errorf("array string incorrect: %s", array.String()) + } + + if array.chunksCache.Len() >= initialCacheCount { + t.Error("cache not used when allocating chunk") + } +} + +func TestAssociativeArrayChunk_IsFirstElementEmptyOrGreaterEqualThanKey(t *testing.T) { + chunk := NewAssociativeArrayChunk(3) + + if !chunk.IsFirstElementEmptyOrGreaterEqualThanKey(5) { + t.Error("empty test failed") + } + + chunk.keys = []int64{41, 42, 43} + chunk.startIndex = 1 + + if chunk.IsFirstElementEmptyOrGreaterEqualThanKey(43) { + t.Error("element less than key test failed") + } + if !chunk.IsFirstElementEmptyOrGreaterEqualThanKey(42) { + t.Error("element greater than or equal to key test failed") + } +} + +func TestAssociativeArrayChunk_IsLastElementIsLessThanKey(t *testing.T) { + chunk := NewAssociativeArrayChunk(3) + + if !chunk.IsLastElementEmptyOrLessThanKey(5) { + t.Error("empty test failed") + } + + chunk.keys = []int64{41, 42, 43} + chunk.cursor = 2 + + if !chunk.IsLastElementEmptyOrLessThanKey(43) { + t.Error("element less than key test failed") + } + if chunk.IsLastElementEmptyOrLessThanKey(42) { + t.Error("element greater than or equal to key test failed") + } +} + +func TestAssociativeArrayChunk_FindFirstIndexOfGreaterEqualElements(t *testing.T) { + chunk := NewAssociativeArrayChunk(7) + + if chunk.FindFirstIndexOfGreaterEqualElements(5) != 0 { + t.Error("empty test failed") + } + + chunk.keys = []int64{41, 42, 43, 44, 45, 46, 48} + chunk.startIndex = 1 + chunk.cursor = 6 + + if chunk.FindFirstIndexOfGreaterEqualElements(41) != chunk.startIndex { + t.Error("minKey less than first element test failed") + } + if chunk.FindFirstIndexOfGreaterEqualElements(42) != chunk.startIndex { + t.Error("minKey greater than or equal to first element test failed") + } + if chunk.FindFirstIndexOfGreaterEqualElements(43) != 2 { + t.Error("2nd element test failed") + } + if chunk.FindFirstIndexOfGreaterEqualElements(46) != 5 { + t.Error("last element test failed") + } + if chunk.FindFirstIndexOfGreaterEqualElements(49) != 6 { + t.Error("past last element test failed") + } +} diff --git a/metrics/sample_test.go b/metrics/sample_test.go index 79673570554c..38bf50f51f13 100644 --- a/metrics/sample_test.go +++ b/metrics/sample_test.go @@ -87,6 +87,18 @@ func BenchmarkUniformSample1028(b *testing.B) { benchmarkSample(b, NewUniformSample(1028)) } +func BenchmarkSlidingWindowArraySample257(b *testing.B) { + benchmarkSample(b, NewSlidingTimeWindowArraySample(257)) +} + +func BenchmarkSlidingWindowArraySample514(b *testing.B) { + benchmarkSample(b, NewSlidingTimeWindowArraySample(514)) +} + +func BenchmarkSlidingWindowArraySample1028(b *testing.B) { + benchmarkSample(b, NewSlidingTimeWindowArraySample(1028)) +} + func min(a, b int) int { if a < b { return a diff --git a/metrics/sliding_time_window_array_sample.go b/metrics/sliding_time_window_array_sample.go new file mode 100644 index 000000000000..f50bf5206c03 --- /dev/null +++ b/metrics/sliding_time_window_array_sample.go @@ -0,0 +1,123 @@ +package metrics + +import ( + "math" + "sync" + "time" +) + +// SlidingTimeWindowArraySample is ported from Coda Hale's dropwizard library +// +// A reservoir implementation backed by a sliding window that stores only the +// measurements made in the last given window of time +type SlidingTimeWindowArraySample struct { + startTick int64 + measurements *ChunkedAssociativeArray + window int64 + count int64 + lastTick int64 + mutex sync.Mutex +} + +const ( + // SlidingTimeWindowCollisionBuffer allow this many duplicate ticks + // before overwriting measurements + SlidingTimeWindowCollisionBuffer = 256 + + // SlidingTimeWindowTrimThreshold is number of updates between trimming data + SlidingTimeWindowTrimThreshold = 256 + + // SlidingTimeWindowClearBufferTicks is the number of ticks to keep past the + // requested trim + SlidingTimeWindowClearBufferTicks = int64(time.Hour/time.Nanosecond) * + SlidingTimeWindowCollisionBuffer +) + +// NewSlidingTimeWindowArraySample creates new object with given window of time +func NewSlidingTimeWindowArraySample(window time.Duration) Sample { + if !Enabled { + return NilSample{} + } + return &SlidingTimeWindowArraySample{ + startTick: time.Now().UnixNano(), + measurements: NewChunkedAssociativeArray(ChunkedAssociativeArrayDefaultChunkSize), + window: window.Nanoseconds() * SlidingTimeWindowCollisionBuffer, + } +} + +// Clear clears all samples. +func (s *SlidingTimeWindowArraySample) Clear() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.count = 0 + s.measurements.Clear() +} + +// trim requires s.mutex to already be acquired +func (s *SlidingTimeWindowArraySample) trim() { + now := s.getTick() + windowStart := now - s.window + windowEnd := now + SlidingTimeWindowClearBufferTicks + if windowStart < windowEnd { + s.measurements.Trim(windowStart, windowEnd) + } else { + // long overflow handling that can only happen 1 year after class loading + s.measurements.Clear() + } +} + +// getTick requires s.mutex to already be acquired +func (s *SlidingTimeWindowArraySample) getTick() int64 { + oldTick := s.lastTick + tick := (time.Now().UnixNano() - s.startTick) * SlidingTimeWindowCollisionBuffer + var newTick int64 + if tick-oldTick > 0 { + newTick = tick + } else { + newTick = oldTick + 1 + } + s.lastTick = newTick + return newTick +} + +// Snapshot returns a read-only copy of the sample. +func (s *SlidingTimeWindowArraySample) Snapshot() SampleSnapshot { + s.mutex.Lock() + defer s.mutex.Unlock() + s.trim() + var ( + samples = s.measurements.Values() + values = make([]int64, len(samples)) + max int64 = math.MinInt64 + min int64 = math.MaxInt64 + sum int64 + ) + for i, v := range samples { + values[i] = v + sum += v + if v > max { + max = v + } + if v < min { + min = v + } + } + return newSampleSnapshotPrecalculated(s.count, values, min, max, sum) +} + +// Update samples a new value. +func (s *SlidingTimeWindowArraySample) Update(v int64) { + s.mutex.Lock() + defer s.mutex.Unlock() + var newTick int64 + s.count += 1 + if s.count%SlidingTimeWindowTrimThreshold == 0 { + s.trim() + } + newTick = s.getTick() + longOverflow := newTick < s.lastTick + if longOverflow { + s.measurements.Clear() + } + s.measurements.Put(newTick, v) +} From 00e202f693975e5ee054d9e745062c76ae6cf43c Mon Sep 17 00:00:00 2001 From: Joshua Colvin Date: Tue, 29 Nov 2022 18:16:35 -0700 Subject: [PATCH 3/3] Use SlidingTimeWindowArray instead of using resetting ExpDecaySample for latency histograms --- metrics/sample.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/metrics/sample.go b/metrics/sample.go index 3ec7fff13600..fd1e537c19bc 100644 --- a/metrics/sample.go +++ b/metrics/sample.go @@ -33,9 +33,7 @@ type Sample interface { } func NewBoundedHistogramSample() Sample { - return ResettingSample( - NewExpDecaySample(1028, 0.015), - ) + return NewSlidingTimeWindowArraySample(time.Minute * 1) } // ExpDecaySample is an exponentially-decaying sample using a forward-decaying