From 53c34dae61485759fb83e316c1be56c58d97dcab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 29 Mar 2023 16:15:31 +0300 Subject: [PATCH 1/6] store: pool input to snappy.Decode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pool input to snappy.Decode to avoid allocations. Signed-off-by: Giedrius Statkevičius --- pkg/store/bucket.go | 25 ++++++++++++------- pkg/store/postings_codec.go | 41 ++++++++++++++++++++++++++------ pkg/store/postings_codec_test.go | 4 ++-- 3 files changed, 53 insertions(+), 17 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 6726991f7c5..c87b00db9ba 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2162,10 +2162,15 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M keys = append(keys, allPostingsLabel) } - fetchedPostings, err := r.fetchPostings(ctx, keys, bytesLimiter) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) if err != nil { return nil, errors.Wrap(err, "get postings") } + defer func() { + for _, closeFn := range closeFns { + closeFn() + } + }() // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys // again, and this is exactly the same order as before (when building the groups), so we can simply @@ -2302,7 +2307,8 @@ type postingPtr struct { // fetchPostings fill postings requested by posting groups. // It returns one postings for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. -func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, error) { +func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, []func(), error) { + var closeFns []func() timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration) defer timer.ObserveDuration() @@ -2314,7 +2320,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys) for _, dataFromCache := range fromCache { if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { - return nil, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache") + return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache") } } @@ -2335,18 +2341,21 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab ) if isDiffVarintSnappyEncodedPostings(b) { s := time.Now() - l, err = diffVarintSnappyDecode(b) + clPostings, err := diffVarintSnappyDecode(b) r.stats.cachedPostingsDecompressions += 1 r.stats.CachedPostingsDecompressionTimeSum += time.Since(s) if err != nil { r.stats.cachedPostingsDecompressionErrors += 1 + } else { + closeFns = append(closeFns, clPostings.close) + l = clPostings } } else { _, l, err = r.dec.Postings(b) } if err != nil { - return nil, errors.Wrap(err, "decode postings") + return nil, closeFns, errors.Wrap(err, "decode postings") } output[ix] = l @@ -2362,7 +2371,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab } if err != nil { - return nil, errors.Wrap(err, "index header PostingsOffset") + return nil, nil, errors.Wrap(err, "index header PostingsOffset") } r.stats.postingsToFetch++ @@ -2384,7 +2393,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab length := int64(part.End) - start if err := bytesLimiter.Reserve(uint64(length)); err != nil { - return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings") + return nil, nil, errors.Wrap(err, "bytes limit exceeded while fetching postings") } } @@ -2462,7 +2471,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab }) } - return output, g.Wait() + return output, closeFns, g.Wait() } func resizePostings(b []byte) ([]byte, error) { diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 3920049227c..8574391d94e 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -5,6 +5,7 @@ package store import ( "bytes" + "sync" "github.com/golang/snappy" "github.com/pkg/errors" @@ -82,27 +83,53 @@ func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) { return buf.B, nil } -func diffVarintSnappyDecode(input []byte) (index.Postings, error) { +var snappyDecodePool sync.Pool + +type closeablePostings interface { + index.Postings + close() +} + +func diffVarintSnappyDecode(input []byte) (closeablePostings, error) { if !isDiffVarintSnappyEncodedPostings(input) { return nil, errors.New("header not found") } - raw, err := snappy.Decode(nil, input[len(codecHeaderSnappy):]) + var dstBuf []byte + decodeBuf := snappyDecodePool.Get() + if decodeBuf != nil { + dstBuf = *(decodeBuf.(*[]byte)) + } + + raw, err := snappy.Decode(dstBuf, input[len(codecHeaderSnappy):]) if err != nil { return nil, errors.Wrap(err, "snappy decode") } - return newDiffVarintPostings(raw), nil + biggerSlice := raw + if cap(dstBuf) > cap(biggerSlice) { + biggerSlice = dstBuf + } + + return newDiffVarintPostings(raw, biggerSlice), nil } -func newDiffVarintPostings(input []byte) *diffVarintPostings { - return &diffVarintPostings{buf: &encoding.Decbuf{B: input}} +func newDiffVarintPostings(input, freeSlice []byte) *diffVarintPostings { + return &diffVarintPostings{freeSlice: freeSlice, buf: &encoding.Decbuf{B: input}} } // diffVarintPostings is an implementation of index.Postings based on diff+varint encoded data. type diffVarintPostings struct { - buf *encoding.Decbuf - cur storage.SeriesRef + buf *encoding.Decbuf + cur storage.SeriesRef + freeSlice []byte +} + +func (it *diffVarintPostings) close() { + if it.freeSlice == nil { + return + } + snappyDecodePool.Put(&it.freeSlice) } func (it *diffVarintPostings) At() storage.SeriesRef { diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index be5cce4f915..8ac86008b5f 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -55,9 +55,9 @@ func TestDiffVarintCodec(t *testing.T) { codecs := map[string]struct { codingFunction func(index.Postings, int) ([]byte, error) - decodingFunction func([]byte) (index.Postings, error) + decodingFunction func([]byte) (closeablePostings, error) }{ - "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (index.Postings, error) { return newDiffVarintPostings(bytes), nil }}, + "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }}, "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, } From c01c82f5d0b986ac2e7435f81e61e89ffa2ca069 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 29 Mar 2023 16:16:30 +0300 Subject: [PATCH 2/6] store: use s2 for decoding snappy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It's faster hence use it. Signed-off-by: Giedrius Statkevičius --- pkg/store/postings_codec.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 8574391d94e..2b2a340734a 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/golang/snappy" + "github.com/klauspost/compress/s2" "github.com/pkg/errors" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/encoding" @@ -101,7 +102,7 @@ func diffVarintSnappyDecode(input []byte) (closeablePostings, error) { dstBuf = *(decodeBuf.(*[]byte)) } - raw, err := snappy.Decode(dstBuf, input[len(codecHeaderSnappy):]) + raw, err := s2.Decode(dstBuf, input[len(codecHeaderSnappy):]) if err != nil { return nil, errors.Wrap(err, "snappy decode") } From 93d9abdd1dca033f07ec7b1a6c3b6ee65b4425dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 29 Mar 2023 16:18:11 +0300 Subject: [PATCH 3/6] store: small code style adjustment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/store/bucket.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index c87b00db9ba..e8b3e995b77 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2309,6 +2309,7 @@ type postingPtr struct { // If postings for given key is not fetched, entry at given index will be nil. func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, []func(), error) { var closeFns []func() + timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration) defer timer.ObserveDuration() From 6c67dc1284ee867cd3c74677603ce460f5a4395f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 30 Mar 2023 13:14:10 +0300 Subject: [PATCH 4/6] store: call closefns before returning err MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/store/bucket.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e8b3e995b77..0ed03e4de4c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2163,14 +2163,14 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M } fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) - if err != nil { - return nil, errors.Wrap(err, "get postings") - } defer func() { for _, closeFn := range closeFns { closeFn() } }() + if err != nil { + return nil, errors.Wrap(err, "get postings") + } // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys // again, and this is exactly the same order as before (when building the groups), so we can simply From 9d05c09442a2b88fe58518751319c0e1bee4684f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 11 Apr 2023 14:43:01 +0300 Subject: [PATCH 5/6] store/postings_codec: return both if possible MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/store/postings_codec.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 2b2a340734a..f60fe7c1b26 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -91,15 +91,24 @@ type closeablePostings interface { close() } +// alias returns true if given slices have the same both backing array. +// See: https://groups.google.com/g/golang-nuts/c/C6ufGl73Uzk. +func alias(x, y []byte) bool { + return cap(x) > 0 && cap(y) > 0 && &x[0:cap(x)][cap(x)-1] == &y[0:cap(y)][cap(y)-1] +} + func diffVarintSnappyDecode(input []byte) (closeablePostings, error) { if !isDiffVarintSnappyEncodedPostings(input) { return nil, errors.New("header not found") } + toFree := make([][]byte, 0, 2) + var dstBuf []byte decodeBuf := snappyDecodePool.Get() if decodeBuf != nil { dstBuf = *(decodeBuf.(*[]byte)) + toFree = append(toFree, dstBuf) } raw, err := s2.Decode(dstBuf, input[len(codecHeaderSnappy):]) @@ -107,30 +116,28 @@ func diffVarintSnappyDecode(input []byte) (closeablePostings, error) { return nil, errors.Wrap(err, "snappy decode") } - biggerSlice := raw - if cap(dstBuf) > cap(biggerSlice) { - biggerSlice = dstBuf + if !alias(raw, dstBuf) { + toFree = append(toFree, raw) } - return newDiffVarintPostings(raw, biggerSlice), nil + return newDiffVarintPostings(raw, toFree), nil } -func newDiffVarintPostings(input, freeSlice []byte) *diffVarintPostings { - return &diffVarintPostings{freeSlice: freeSlice, buf: &encoding.Decbuf{B: input}} +func newDiffVarintPostings(input []byte, freeSlices [][]byte) *diffVarintPostings { + return &diffVarintPostings{freeSlices: freeSlices, buf: &encoding.Decbuf{B: input}} } // diffVarintPostings is an implementation of index.Postings based on diff+varint encoded data. type diffVarintPostings struct { - buf *encoding.Decbuf - cur storage.SeriesRef - freeSlice []byte + buf *encoding.Decbuf + cur storage.SeriesRef + freeSlices [][]byte } func (it *diffVarintPostings) close() { - if it.freeSlice == nil { - return + for i := range it.freeSlices { + snappyDecodePool.Put(&it.freeSlices[i]) } - snappyDecodePool.Put(&it.freeSlice) } func (it *diffVarintPostings) At() storage.SeriesRef { From 92d095f9a48e6893b4d23267ccf9748bd8e4e9db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 11 Apr 2023 15:02:57 +0300 Subject: [PATCH 6/6] store/bucket: always call close fns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/store/bucket.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 0ed03e4de4c..05e82607e3d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2372,7 +2372,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab } if err != nil { - return nil, nil, errors.Wrap(err, "index header PostingsOffset") + return nil, closeFns, errors.Wrap(err, "index header PostingsOffset") } r.stats.postingsToFetch++ @@ -2394,7 +2394,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab length := int64(part.End) - start if err := bytesLimiter.Reserve(uint64(length)); err != nil { - return nil, nil, errors.Wrap(err, "bytes limit exceeded while fetching postings") + return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while fetching postings") } }