From c2f8d83f5682bf9d6eb1e2b541385387c1e9a73a Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 16 Mar 2021 00:13:51 +0300 Subject: [PATCH 01/10] Store: add benchmark for quering downsampled data. Signed-off-by: Vladimir Kononov --- pkg/store/bucket_test.go | 81 +++++++++++++--------------- pkg/store/storepb/testutil/series.go | 25 ++++++--- 2 files changed, 56 insertions(+), 50 deletions(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index ddad30d5d41..158b4e39a78 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -40,6 +40,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/filesystem" @@ -2297,6 +2298,17 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) { } func BenchmarkBlockSeries(b *testing.B) { + blk, blockMeta := prepareBucket(b, compact.ResolutionLevelRaw) + + aggrs := []storepb.Aggr{storepb.Aggr_RAW} + for _, concurrency := range []int{1, 2, 4, 8, 16, 32} { + b.Run(fmt.Sprintf("concurrency: %d", concurrency), func(b *testing.B) { + benchmarkBlockSeriesWithConcurrency(b, concurrency, blockMeta, blk, aggrs) + }) + } +} + +func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*bucketBlock, *metadata.Meta) { var ( ctx = context.Background() logger = log.NewNopLogger() @@ -2318,13 +2330,13 @@ func BenchmarkBlockSeries(b *testing.B) { head, _ := storetestutil.CreateHeadWithSeries(b, 0, storetestutil.HeadGenOptions{ TSDBDir: filepath.Join(tmpDir, "head"), SamplesPerSeries: 86400 / 15, // Simulate 1 day block with 15s scrape interval. + ScrapeInterval: 15 * 1000, Series: 1000, PrependLabels: nil, Random: rand.New(rand.NewSource(120)), SkipChunks: true, }) blockID := createBlockFromHead(b, tmpDir, head) - testutil.Ok(b, head.Close()) // Upload the block to the bucket. thanosMeta := metadata.Thanos{ @@ -2338,6 +2350,17 @@ func BenchmarkBlockSeries(b *testing.B) { testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + if resolutionLevel > 0 { + // Downsample newly-created block + blockID, err = downsample.Downsample(logger, blockMeta, head, tmpDir, int64(resolutionLevel)) + testutil.Ok(b, err) + blockMeta, err = metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) + testutil.Ok(b, err) + + testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + } + testutil.Ok(b, head.Close()) + // Create chunk pool and partitioner using the same production settings. chunkPool, err := NewDefaultChunkBytesPool(64 * 1024 * 1024 * 1024) testutil.Ok(b, err) @@ -2353,15 +2376,10 @@ func BenchmarkBlockSeries(b *testing.B) { // Create a bucket block with only the dependencies we need for the benchmark. blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner) testutil.Ok(b, err) - - for _, concurrency := range []int{1, 2, 4, 8, 16, 32} { - b.Run(fmt.Sprintf("concurrency: %d", concurrency), func(b *testing.B) { - benchmarkBlockSeriesWithConcurrency(b, concurrency, blockMeta, blk) - }) - } + return blk, blockMeta } -func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMeta *metadata.Meta, blk *bucketBlock) { +func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMeta *metadata.Meta, blk *bucketBlock, aggrs []storepb.Aggr) { ctx := context.Background() // Run the same number of queries per goroutine. @@ -2392,6 +2410,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet {Type: storepb.LabelMatcher_RE, Name: "i", Value: labelMatcher}, }, SkipChunks: false, + Aggregates: aggrs, } matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) @@ -2415,41 +2434,15 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet wg.Wait() } -func TestChunkOffsetsToByteRanges(t *testing.T) { - tests := map[string]struct { - offsets []uint32 - start uint32 - expected byteRanges - }{ - "no offsets in input": { - offsets: nil, - expected: byteRanges{}, - }, - "no overlapping ranges in input": { - offsets: []uint32{1000, 20000, 45000}, - start: 1000, - expected: byteRanges{ - {offset: 0, length: 16000}, - {offset: 19000, length: 16000}, - {offset: 44000, length: 16000}, - }, - }, - "overlapping ranges in input": { - offsets: []uint32{1000, 5000, 9500, 30000}, - start: 1000, - expected: byteRanges{ - {offset: 0, length: 4000}, - {offset: 4000, length: 4500}, - {offset: 8500, length: 16000}, - {offset: 29000, length: 16000}, - }, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - testutil.Equals(t, len(testData.offsets), len(testData.expected)) - testutil.Equals(t, testData.expected, chunkOffsetsToByteRanges(testData.offsets, testData.start)) - }) +func BenchmarkDownsampledBlockSeries(b *testing.B) { + blk, blockMeta := prepareBucket(b, compact.ResolutionLevel5m) + aggrs := []storepb.Aggr{} + for i := 1; i < int(storepb.Aggr_COUNTER); i++ { + aggrs = append(aggrs, storepb.Aggr(i)) + for _, concurrency := range []int{1, 2, 4, 8, 16, 32} { + b.Run(fmt.Sprintf("aggregates: %v, concurrency: %d", aggrs, concurrency), func(b *testing.B) { + benchmarkBlockSeriesWithConcurrency(b, concurrency, blockMeta, blk, aggrs) + }) + } } } diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index e7dae990edf..fb1b2cb199a 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -39,8 +39,8 @@ func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { } type HeadGenOptions struct { - TSDBDir string - SamplesPerSeries, Series int + TSDBDir string + SamplesPerSeries, Series, ScrapeInterval int WithWAL bool PrependLabels labels.Labels @@ -57,8 +57,17 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, if opts.SamplesPerSeries < 1 || opts.Series < 1 { t.Fatal("samples and series has to be 1 or more") } + if opts.ScrapeInterval == 0 { + opts.ScrapeInterval = 1 + } - fmt.Printf("Creating %d %d-sample series in %s\n", opts.Series, opts.SamplesPerSeries, opts.TSDBDir) + fmt.Printf( + "Creating %d %d-sample series with %d ms interval in %s\n", + opts.Series, + opts.SamplesPerSeries, + opts.ScrapeInterval, + opts.TSDBDir, + ) var w *wal.WAL var err error @@ -76,12 +85,16 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, app := h.Appender(context.Background()) for i := 0; i < opts.Series; i++ { - ts := int64(j*opts.Series*opts.SamplesPerSeries + i*opts.SamplesPerSeries) - ref, err := app.Add(labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", ts, LabelLongSuffix)), ts, opts.Random.Float64()) + tsLabel := j*opts.Series*opts.SamplesPerSeries + i*opts.SamplesPerSeries + ref, err := app.Add( + labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix)), + int64(tsLabel*opts.ScrapeInterval), + opts.Random.Float64(), + ) testutil.Ok(t, err) for is := 1; is < opts.SamplesPerSeries; is++ { - testutil.Ok(t, app.AddFast(ref, ts+int64(is), opts.Random.Float64())) + testutil.Ok(t, app.AddFast(ref, int64((tsLabel+is)*opts.ScrapeInterval), opts.Random.Float64())) } } testutil.Ok(t, app.Commit()) From 1c5db5359fee355aeeed7bb01b7a0918b93aec44 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 16 Mar 2021 03:44:50 +0300 Subject: [PATCH 02/10] pkg/pool: fix data race Signed-off-by: Vladimir Kononov --- pkg/pool/pool.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index cbd034e9e7c..f0bd580042f 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -112,13 +112,12 @@ func (p *BucketedBytes) Put(b *[]byte) { continue } *b = (*b)[:0] + p.mtx.Lock() + defer p.mtx.Unlock() p.buckets[i].Put(b) break } - p.mtx.Lock() - defer p.mtx.Unlock() - // We could assume here that our users will not make the slices larger // but lets be on the safe side to avoid an underflow of p.usedTotal. sz := uint64(cap(*b)) From 194d23411954ec79f65a814c98c3d9d8afc604b2 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 16 Mar 2021 03:49:20 +0300 Subject: [PATCH 03/10] pkg/testutil: fix data race Signed-off-by: Vladimir Kononov --- pkg/testutil/testutil.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index 984d137c927..c477281319f 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -9,6 +9,7 @@ import ( "reflect" "runtime" "runtime/debug" + "sync" "testing" "github.com/davecgh/go-spew/spew" @@ -19,8 +20,12 @@ import ( "go.uber.org/goleak" ) +var mu sync.Mutex + // Assert fails the test if the condition is false. func Assert(tb testing.TB, condition bool, v ...interface{}) { + mu.Lock() + defer mu.Unlock() tb.Helper() if condition { return @@ -36,6 +41,8 @@ func Assert(tb testing.TB, condition bool, v ...interface{}) { // Ok fails the test if an err is not nil. func Ok(tb testing.TB, err error, v ...interface{}) { + mu.Lock() + defer mu.Unlock() tb.Helper() if err == nil { return @@ -51,6 +58,8 @@ func Ok(tb testing.TB, err error, v ...interface{}) { // NotOk fails the test if an err is nil. func NotOk(tb testing.TB, err error, v ...interface{}) { + mu.Lock() + defer mu.Unlock() tb.Helper() if err != nil { return @@ -66,6 +75,8 @@ func NotOk(tb testing.TB, err error, v ...interface{}) { // Equals fails the test if exp is not equal to act. func Equals(tb testing.TB, exp, act interface{}, v ...interface{}) { + mu.Lock() + defer mu.Unlock() tb.Helper() if reflect.DeepEqual(exp, act) { return From 8c0d7c61886ede378ce641e0f48865b5c11fa72f Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 16 Mar 2021 03:57:21 +0300 Subject: [PATCH 04/10] store: reduce allocated memory amount for chunks queries Signed-off-by: Vladimir Kononov --- pkg/store/bucket.go | 265 ++++++++++++++++----------- pkg/store/bucket_test.go | 12 +- pkg/store/storepb/testutil/series.go | 2 +- 3 files changed, 161 insertions(+), 118 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 4fabe004c2d..888025b3ee3 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -4,10 +4,12 @@ package store import ( + "bufio" "bytes" "context" "encoding/binary" "fmt" + "io" "io/ioutil" "math" "os" @@ -64,6 +66,7 @@ const ( // EstimatedMaxChunkSize is average max of chunk size. This can be exceeded though in very rare (valid) cases. EstimatedMaxChunkSize = 16000 maxSeriesSize = 64 * 1024 + chunkBytesPoolMinSize = 8 // CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility // with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels. @@ -758,8 +761,9 @@ func blockSeries( // Schedule loading chunks. s.refs = make([]uint64, 0, len(chks)) s.chks = make([]storepb.AggrChunk, 0, len(chks)) - for _, meta := range chks { - if err := chunkr.addPreload(meta.Ref); err != nil { + for j, meta := range chks { + // s is appended to res, but not at every iteration, hence len(res) is the index we need. + if err := chunkr.addPreload(meta.Ref, len(res), j); err != nil { return nil, nil, errors.Wrap(err, "add chunk preload") } s.chks = append(s.chks, storepb.AggrChunk{ @@ -786,29 +790,20 @@ func blockSeries( return newBucketSeriesSet(res), indexr.stats, nil } - // Preload all chunks that were marked in the previous stage. - if err := chunkr.preload(); err != nil { + if err := chunkr.preload(res, req.Aggregates); err != nil { return nil, nil, errors.Wrap(err, "preload chunks") } - // Transform all chunks into the response format. - for _, s := range res { - for i, ref := range s.refs { - chk, err := chunkr.Chunk(ref) - if err != nil { - return nil, nil, errors.Wrap(err, "get chunk") - } - if err := populateChunk(&s.chks[i], chk, req.Aggregates); err != nil { - return nil, nil, errors.Wrap(err, "populate chunk") - } - } - } return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } -func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error { +func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, saviour func([]byte) ([]byte, error)) error { if in.Encoding() == chunkenc.EncXOR { - out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()} + b, err := saviour(in.Bytes()) + if err != nil { + return err + } + out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} return nil } if in.Encoding() != downsample.ChunkEncAggr { @@ -824,31 +819,51 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrCount) } - out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: x.Bytes()} + b, err := saviour(x.Bytes()) + if err != nil { + return err + } + out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} case storepb.Aggr_SUM: x, err := ac.Get(downsample.AggrSum) if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrSum) } - out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: x.Bytes()} + b, err := saviour(x.Bytes()) + if err != nil { + return err + } + out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} case storepb.Aggr_MIN: x, err := ac.Get(downsample.AggrMin) if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrMin) } - out.Min = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: x.Bytes()} + b, err := saviour(x.Bytes()) + if err != nil { + return err + } + out.Min = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} case storepb.Aggr_MAX: x, err := ac.Get(downsample.AggrMax) if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrMax) } - out.Max = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: x.Bytes()} + b, err := saviour(x.Bytes()) + if err != nil { + return err + } + out.Max = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} case storepb.Aggr_COUNTER: x, err := ac.Get(downsample.AggrCounter) if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrCounter) } - out.Counter = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: x.Bytes()} + b, err := saviour(x.Bytes()) + if err != nil { + return err + } + out.Counter = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} } } return nil @@ -1520,6 +1535,14 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i return chunkBuffer, nil } +func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length int64) (io.ReadCloser, error) { + if seq < 0 || seq >= len(b.chunkObjs) { + return nil, errors.Errorf("unknown segment file for index %d", seq) + } + + return b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length) +} + func (b *bucketBlock) indexReader(ctx context.Context) *bucketIndexReader { b.pendingReaders.Add(1) return newBucketIndexReader(ctx, b) @@ -2069,7 +2092,7 @@ type Partitioner interface { // Partition partitions length entries into n <= length ranges that cover all // input ranges // It supports overlapping ranges. - // NOTE: It expects range to be ted by start time. + // NOTE: It expects range to be sorted by start time. Partition(length int, rng func(int) (uint64, uint64)) []Part } @@ -2216,11 +2239,17 @@ func decodeSeriesForTime(b []byte, lset *[]symbolizedLabel, chks *[]chunks.Meta, return len(*chks) > 0, d.Err() } +type preloadIdx struct { + off uint32 + i int + j int +} + type bucketChunkReader struct { ctx context.Context block *bucketBlock - preloads [][]uint32 + preloads [][]preloadIdx // Mutex protects access to following fields, when updated from chunks-loading goroutines. // After chunks are loaded, mutex is no longer used. @@ -2235,13 +2264,34 @@ func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkR ctx: ctx, block: block, stats: &queryStats{}, - preloads: make([][]uint32, len(block.chunkObjs)), + preloads: make([][]preloadIdx, len(block.chunkObjs)), chunks: map[uint64]chunkenc.Chunk{}, } } -// addPreload adds the chunk with id to the data set that will be fetched on calling preload. -func (r *bucketChunkReader) addPreload(id uint64) error { +func (r *bucketChunkReader) Chunk(id uint64) (chunkenc.Chunk, error) { + c, ok := r.chunks[id] + if !ok { + return nil, errors.Errorf("chunk with ID %d not found", id) + } + + r.stats.chunksTouched++ + r.stats.chunksTouchedSizeSum += len(c.Bytes()) + + return c, nil +} + +func (r *bucketChunkReader) Close() error { + r.block.pendingReaders.Done() + + for _, b := range r.chunkBytes { + r.block.chunkPool.Put(b) + } + return nil +} + +// appPreload adds the chunk with id to the data set that will be fetched on calling preload. +func (r *bucketChunkReader) addPreload(id uint64, i, j int) error { var ( seq = int(id >> 32) off = uint32(id) @@ -2249,31 +2299,28 @@ func (r *bucketChunkReader) addPreload(id uint64) error { if seq >= len(r.preloads) { return errors.Errorf("reference sequence %d out of range", seq) } - r.preloads[seq] = append(r.preloads[seq], off) + r.preloads[seq] = append(r.preloads[seq], preloadIdx{off, i, j}) return nil } // preload all added chunk IDs. Must be called before the first call to Chunk is made. -func (r *bucketChunkReader) preload() error { +func (r *bucketChunkReader) preload(res []seriesEntry, aggrs []storepb.Aggr) error { g, ctx := errgroup.WithContext(r.ctx) - for seq, offsets := range r.preloads { - sort.Slice(offsets, func(i, j int) bool { - return offsets[i] < offsets[j] + for seq, pIdxs := range r.preloads { + sort.Slice(pIdxs, func(i, j int) bool { + return pIdxs[i].off < pIdxs[j].off }) - parts := r.block.partitioner.Partition(len(offsets), func(i int) (start, end uint64) { - return uint64(offsets[i]), uint64(offsets[i]) + EstimatedMaxChunkSize + parts := r.block.partitioner.Partition(len(pIdxs), func(i int) (start, end uint64) { + return uint64(pIdxs[i].off), uint64(pIdxs[i].off) + EstimatedMaxChunkSize }) - seq := seq - offsets := offsets - for _, p := range parts { - s, e := uint32(p.Start), uint32(p.End) - m, n := p.ElemRng[0], p.ElemRng[1] - + seq := seq + p := p + indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]] g.Go(func() error { - return r.loadChunks(ctx, offsets[m:n], seq, s, e) + return r.loadChunks(ctx, res, aggrs, seq, p, indices) }) } } @@ -2282,17 +2329,16 @@ func (r *bucketChunkReader) preload() error { // loadChunks will read range [start, end] from the segment file with sequence number seq. // This data range covers chunks starting at supplied offsets. -func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq int, start, end uint32) error { +func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []preloadIdx) error { fetchBegin := time.Now() - // Compute the byte ranges of chunks we actually need. The total read data may be bigger - // than required because of the partitioner. - chunkRanges := chunkOffsetsToByteRanges(offs, start) - - b, err := r.block.readChunkRange(ctx, seq, int64(start), int64(end-start), chunkRanges) + // Get a reader for the required range. + reader, err := r.block.chunkRangeReader(ctx, seq, int64(part.Start), int64(part.End-part.Start)) if err != nil { - return errors.Wrapf(err, "read range for %d", seq) + return errors.Wrap(err, "get range reader") } + defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader") + bufReader := bufio.NewReaderSize(reader, EstimatedMaxChunkSize) locked := true r.mtx.Lock() @@ -2303,32 +2349,59 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i } }() - r.chunkBytes = append(r.chunkBytes, b) r.stats.chunksFetchCount++ - r.stats.chunksFetched += len(offs) + r.stats.chunksFetched += len(pIdxs) r.stats.chunksFetchDurationSum += time.Since(fetchBegin) - r.stats.chunksFetchedSizeSum += int(end - start) + r.stats.chunksFetchedSizeSum += int(part.End - part.Start) - readOffset := 0 - for idx, o := range offs { - chunkRange := chunkRanges[idx] + var ( + buf = make([]byte, EstimatedMaxChunkSize) + readOffset = int(pIdxs[0].off) + + // Save a few allocations. + written int64 + diff uint32 + chunkLen int + n int + ) - // The chunks byte ranges are stored contiguously in the data buffer. - cb := (*b)[readOffset : readOffset+chunkRange.length] - readOffset += chunkRange.length + for i, pIdx := range pIdxs { + // Fast forward range reader to the next chunk start in case of sparse (for our purposes) byte range. + for readOffset < int(pIdx.off) { + written, err = io.CopyN(ioutil.Discard, bufReader, int64(pIdx.off)-int64(readOffset)) + if err != nil { + return errors.Wrap(err, "fast forward range reader") + } + readOffset += int(written) + } + // Presume chunk length. + chunkLen = EstimatedMaxChunkSize + if i+1 < len(pIdxs) { + if diff = pIdxs[i+1].off - pIdx.off; int(diff) < chunkLen { + chunkLen = int(diff) + } + } + cb := buf[:chunkLen] + n, err = io.ReadFull(bufReader, cb) + readOffset += n + if errors.Is(err, io.ErrUnexpectedEOF) && i == len(pIdxs)-1 { + // This may be a valid case. + } else if err != nil { + return errors.Wrapf(err, "read range for seq %d offset %x", seq, pIdx.off) + } l, n := binary.Uvarint(cb) if n < 1 { return errors.New("reading chunk length failed") } - chunkRef := uint64(seq<<32) | uint64(o) - // Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and l for actual chunk data. // There is also crc32 after the chunk, but we ignore that. - chLen := n + 1 + int(l) - if len(cb) >= chLen { - r.chunks[chunkRef] = rawChunk(cb[n:chLen]) + chunkLen = n + 1 + int(l) + if chunkLen <= len(cb) { + if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk(cb[n:chunkLen]), aggrs, r.saviour); err != nil { + return errors.Wrap(err, "populate chunk") + } continue } @@ -2339,62 +2412,39 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i fetchBegin = time.Now() // Read entire chunk into new buffer. - nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen), []byteRange{{offset: 0, length: chLen}}) + nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.off), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) if err != nil { - return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chLen) + return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen) } - - cb = *nb - if len(cb) != chLen { - return errors.Errorf("preloaded chunk too small, expecting %d", chLen) + if len(*nb) != chunkLen { + return errors.Errorf("preloaded chunk too small, expecting %d", chunkLen) } r.mtx.Lock() locked = true - r.chunkBytes = append(r.chunkBytes, nb) r.stats.chunksFetchCount++ r.stats.chunksFetchDurationSum += time.Since(fetchBegin) - r.stats.chunksFetchedSizeSum += len(cb) + r.stats.chunksFetchedSizeSum += len(*nb) - r.chunks[chunkRef] = rawChunk(cb[n:]) - } - return nil -} - -// chunkOffsetsToByteRanges returns non-overlapping byte ranges with each range offset -// relative to start. The provided input offsets must be sorted. -func chunkOffsetsToByteRanges(offsets []uint32, start uint32) byteRanges { - ranges := make([]byteRange, len(offsets)) - - for idx := 0; idx < len(offsets); idx++ { - ranges[idx] = byteRange{ - // The byte range offset is required to be relative to the start of the read slice. - offset: int(offsets[idx] - start), - length: EstimatedMaxChunkSize, + if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk((*nb)[n:]), aggrs, r.saviour); err != nil { + return errors.Wrap(err, "populate chunk") } - if idx > 0 { - // Ensure ranges are non overlapping. - if prev := ranges[idx-1]; prev.length > ranges[idx].offset-prev.offset { - ranges[idx-1].length = ranges[idx].offset - prev.offset - } - } + r.block.chunkPool.Put(nb) } - - return ranges + return nil } -func (r *bucketChunkReader) Chunk(id uint64) (chunkenc.Chunk, error) { - c, ok := r.chunks[id] - if !ok { - return nil, errors.Errorf("chunk with ID %d not found", id) +func (r *bucketChunkReader) saviour(b []byte) ([]byte, error) { + cb, err := r.block.chunkPool.Get(len(b)) + if err != nil { + return nil, errors.Wrap(err, "allocate chunk bytes") } + *cb = append(*cb, b...) - r.stats.chunksTouched++ - r.stats.chunksTouchedSizeSum += len(c.Bytes()) - - return c, nil + r.chunkBytes = append(r.chunkBytes, cb) + return *cb, nil } // rawChunk is a helper type that wraps a chunk's raw bytes and implements the chunkenc.Chunk @@ -2423,15 +2473,6 @@ func (b rawChunk) NumSamples() int { panic("invalid call") } -func (r *bucketChunkReader) Close() error { - r.block.pendingReaders.Done() - - for _, b := range r.chunkBytes { - r.block.chunkPool.Put(b) - } - return nil -} - type queryStats struct { blocksQueried int @@ -2515,5 +2556,5 @@ func (s queryStats) merge(o *queryStats) *queryStats { // NewDefaultChunkBytesPool returns a chunk bytes pool with default settings. func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.Bytes, error) { - return pool.NewBucketedBytes(EstimatedMaxChunkSize, 50e6, 2, maxChunkPoolBytes) + return pool.NewBucketedBytes(chunkBytesPoolMinSize, 50e6, 2, maxChunkPoolBytes) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 158b4e39a78..f5522adc6f9 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1256,7 +1256,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer f, err := block.NewRawMetaFetcher(logger, ibkt) testutil.Ok(t, err) - chunkPool, err := pool.NewBucketedBytes(EstimatedMaxChunkSize, 50e6, 2, 1e9) // 1GB. + chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, 50e6, 2, 1e9) // 1GB. testutil.Ok(t, err) st, err := NewBucketStore( @@ -1319,8 +1319,10 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer if !t.IsBenchmark() { if !skipChunk { + chunksPerSeriesPerBlock := int(math.Ceil(float64(samplesPerSeriesPerBlock) / float64(MaxSamplesPerChunk))) + expectedChunks := numOfBlocks * seriesPerBlock * chunksPerSeriesPerBlock // Make sure the pool is correctly used. This is expected for 200k numbers. - testutil.Equals(t, numOfBlocks, int(st.chunkPool.(*mockedPool).gets.Load())) + testutil.Equals(t, expectedChunks, int(st.chunkPool.(*mockedPool).gets.Load())) // TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate. testutil.Equals(t, 0, int(st.chunkPool.(*mockedPool).balance.Load())) st.chunkPool.(*mockedPool).gets.Store(0) @@ -1382,7 +1384,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { Source: metadata.TestSource, } - chunkPool, err := pool.NewBucketedBytes(EstimatedMaxChunkSize, 50e6, 2, 100e7) + chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, 50e6, 2, 100e7) testutil.Ok(t, err) indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{ @@ -2276,8 +2278,8 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) { testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - // Create a chunk pool with buckets between 1KB and 32KB. - chunkPool, err := pool.NewBucketedBytes(1024, 32*1024, 2, 1e10) + // Create a chunk pool with buckets between 8B and 32KB. + chunkPool, err := pool.NewBucketedBytes(8, 32*1024, 2, 1e10) testutil.Ok(b, err) // Create a bucket block with only the dependencies we need for the benchmark. diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index fb1b2cb199a..46faece2a62 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -52,7 +52,7 @@ type HeadGenOptions struct { // CreateHeadWithSeries returns head filled with given samples and same series returned in separate list for assertion purposes. // Returned series list has "ext1"="1" prepended. Each series looks as follows: // {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} where number indicate sample number from 0. -// Returned series are frame in same way as remote read would frame them. +// Returned series are framed in the same way as remote read would frame them. func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, []*storepb.Series) { if opts.SamplesPerSeries < 1 || opts.Series < 1 { t.Fatal("samples and series has to be 1 or more") From 6818d9a8c0d0d4dac9920199fe5c1f45ee9d5571 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 16 Mar 2021 04:44:40 +0300 Subject: [PATCH 05/10] pkg/store/bucket: translate variable name from English to US Linter failed with: `saviour` is a misspelling of `savior` (misspell) Signed-off-by: Vladimir Kononov --- pkg/store/bucket.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 888025b3ee3..02e799097b2 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -797,9 +797,9 @@ func blockSeries( return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } -func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, saviour func([]byte) ([]byte, error)) error { +func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, savior func([]byte) ([]byte, error)) error { if in.Encoding() == chunkenc.EncXOR { - b, err := saviour(in.Bytes()) + b, err := savior(in.Bytes()) if err != nil { return err } @@ -819,7 +819,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrCount) } - b, err := saviour(x.Bytes()) + b, err := savior(x.Bytes()) if err != nil { return err } @@ -829,7 +829,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrSum) } - b, err := saviour(x.Bytes()) + b, err := savior(x.Bytes()) if err != nil { return err } @@ -839,7 +839,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrMin) } - b, err := saviour(x.Bytes()) + b, err := savior(x.Bytes()) if err != nil { return err } @@ -849,7 +849,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrMax) } - b, err := saviour(x.Bytes()) + b, err := savior(x.Bytes()) if err != nil { return err } @@ -859,7 +859,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrCounter) } - b, err := saviour(x.Bytes()) + b, err := savior(x.Bytes()) if err != nil { return err } @@ -2399,7 +2399,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a // There is also crc32 after the chunk, but we ignore that. chunkLen = n + 1 + int(l) if chunkLen <= len(cb) { - if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk(cb[n:chunkLen]), aggrs, r.saviour); err != nil { + if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk(cb[n:chunkLen]), aggrs, r.savior); err != nil { return errors.Wrap(err, "populate chunk") } continue @@ -2427,7 +2427,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a r.stats.chunksFetchDurationSum += time.Since(fetchBegin) r.stats.chunksFetchedSizeSum += len(*nb) - if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk((*nb)[n:]), aggrs, r.saviour); err != nil { + if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk((*nb)[n:]), aggrs, r.savior); err != nil { return errors.Wrap(err, "populate chunk") } @@ -2436,7 +2436,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a return nil } -func (r *bucketChunkReader) saviour(b []byte) ([]byte, error) { +func (r *bucketChunkReader) savior(b []byte) ([]byte, error) { cb, err := r.block.chunkPool.Get(len(b)) if err != nil { return nil, errors.Wrap(err, "allocate chunk bytes") From 38d6b4942687a0a1d5f1bf75e27e5829ed1430b7 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 16 Mar 2021 14:24:41 +0300 Subject: [PATCH 06/10] store: use slab allocations for chunks Signed-off-by: Vladimir Kononov --- pkg/store/bucket.go | 24 +++++++++++++++--------- pkg/store/bucket_test.go | 8 ++------ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 02e799097b2..318e0b1a8d5 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -66,7 +66,9 @@ const ( // EstimatedMaxChunkSize is average max of chunk size. This can be exceeded though in very rare (valid) cases. EstimatedMaxChunkSize = 16000 maxSeriesSize = 64 * 1024 - chunkBytesPoolMinSize = 8 + // Relatively large in order to reduce memory waste, yet small enough to avoid excessive allocations. + chunkBytesPoolMinSize = 64 * 1024 // 64 KiB + chunkBytesPoolMaxSize = 64 * 1024 * 1024 // 64 MiB // CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility // with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels. @@ -2437,14 +2439,18 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a } func (r *bucketChunkReader) savior(b []byte) ([]byte, error) { - cb, err := r.block.chunkPool.Get(len(b)) - if err != nil { - return nil, errors.Wrap(err, "allocate chunk bytes") + // Ensure we never grow slab beyond original capacity. + if len(r.chunkBytes) == 0 || + cap(*r.chunkBytes[len(r.chunkBytes)-1])-len(*r.chunkBytes[len(r.chunkBytes)-1]) < len(b) { + s, err := r.block.chunkPool.Get(len(b)) + if err != nil { + return nil, errors.Wrap(err, "allocate chunk bytes") + } + r.chunkBytes = append(r.chunkBytes, s) } - *cb = append(*cb, b...) - - r.chunkBytes = append(r.chunkBytes, cb) - return *cb, nil + slab := r.chunkBytes[len(r.chunkBytes)-1] + *slab = append(*slab, b...) + return (*slab)[len(*slab)-len(b):], nil } // rawChunk is a helper type that wraps a chunk's raw bytes and implements the chunkenc.Chunk @@ -2556,5 +2562,5 @@ func (s queryStats) merge(o *queryStats) *queryStats { // NewDefaultChunkBytesPool returns a chunk bytes pool with default settings. func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.Bytes, error) { - return pool.NewBucketedBytes(chunkBytesPoolMinSize, 50e6, 2, maxChunkPoolBytes) + return pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, maxChunkPoolBytes) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index f5522adc6f9..ecad9567fd9 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1256,7 +1256,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer f, err := block.NewRawMetaFetcher(logger, ibkt) testutil.Ok(t, err) - chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, 50e6, 2, 1e9) // 1GB. + chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 1e9) // 1GB. testutil.Ok(t, err) st, err := NewBucketStore( @@ -1319,10 +1319,6 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer if !t.IsBenchmark() { if !skipChunk { - chunksPerSeriesPerBlock := int(math.Ceil(float64(samplesPerSeriesPerBlock) / float64(MaxSamplesPerChunk))) - expectedChunks := numOfBlocks * seriesPerBlock * chunksPerSeriesPerBlock - // Make sure the pool is correctly used. This is expected for 200k numbers. - testutil.Equals(t, expectedChunks, int(st.chunkPool.(*mockedPool).gets.Load())) // TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate. testutil.Equals(t, 0, int(st.chunkPool.(*mockedPool).balance.Load())) st.chunkPool.(*mockedPool).gets.Store(0) @@ -1384,7 +1380,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { Source: metadata.TestSource, } - chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, 50e6, 2, 100e7) + chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 100e7) testutil.Ok(t, err) indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{ From c8218ecdf050d8658042a9b2c936c44ed04fdc01 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 16 Mar 2021 15:10:00 +0300 Subject: [PATCH 07/10] Update changelog Signed-off-by: Vladimir Kononov --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb71db7d434..c735b8b95c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,8 +19,10 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re ### Fixed - [#3204](https://github.com/thanos-io/thanos/pull/3204) Mixin: Use sidecar's metric timestamp for healthcheck. - [#3922](https://github.com/thanos-io/thanos/pull/3922) Fix panic in http logging middleware. +- [#3937](https://github.com/thanos-io/thanos/pull/3937) Store: Fix race condition in chunk pool. ### Changed +- [#3937](https://github.com/thanos-io/thanos/pull/3937) Store: Reduce memory usage for range queries. ### Removed From 07978c9b9129bb18f9d4ee97098481aa2ace16c8 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Thu, 18 Mar 2021 00:20:41 +0300 Subject: [PATCH 08/10] Revert "pkg/testutil: fix data race" This reverts commit 194d23411954ec79f65a814c98c3d9d8afc604b2. Signed-off-by: Vladimir Kononov --- pkg/testutil/testutil.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index c477281319f..984d137c927 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -9,7 +9,6 @@ import ( "reflect" "runtime" "runtime/debug" - "sync" "testing" "github.com/davecgh/go-spew/spew" @@ -20,12 +19,8 @@ import ( "go.uber.org/goleak" ) -var mu sync.Mutex - // Assert fails the test if the condition is false. func Assert(tb testing.TB, condition bool, v ...interface{}) { - mu.Lock() - defer mu.Unlock() tb.Helper() if condition { return @@ -41,8 +36,6 @@ func Assert(tb testing.TB, condition bool, v ...interface{}) { // Ok fails the test if an err is not nil. func Ok(tb testing.TB, err error, v ...interface{}) { - mu.Lock() - defer mu.Unlock() tb.Helper() if err == nil { return @@ -58,8 +51,6 @@ func Ok(tb testing.TB, err error, v ...interface{}) { // NotOk fails the test if an err is nil. func NotOk(tb testing.TB, err error, v ...interface{}) { - mu.Lock() - defer mu.Unlock() tb.Helper() if err != nil { return @@ -75,8 +66,6 @@ func NotOk(tb testing.TB, err error, v ...interface{}) { // Equals fails the test if exp is not equal to act. func Equals(tb testing.TB, exp, act interface{}, v ...interface{}) { - mu.Lock() - defer mu.Unlock() tb.Helper() if reflect.DeepEqual(exp, act) { return From e38d3490c3536486c63c20ccefd6da822b4c32c4 Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 23 Mar 2021 18:42:41 +0300 Subject: [PATCH 09/10] Apply code review suggestions Signed-off-by: Vladimir Kononov --- pkg/store/bucket.go | 102 +++++++++++++-------------- pkg/store/bucket_test.go | 8 ++- pkg/store/storepb/testutil/series.go | 16 +++-- 3 files changed, 65 insertions(+), 61 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 318e0b1a8d5..57a76369a42 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -764,9 +764,10 @@ func blockSeries( s.refs = make([]uint64, 0, len(chks)) s.chks = make([]storepb.AggrChunk, 0, len(chks)) for j, meta := range chks { - // s is appended to res, but not at every iteration, hence len(res) is the index we need. - if err := chunkr.addPreload(meta.Ref, len(res), j); err != nil { - return nil, nil, errors.Wrap(err, "add chunk preload") + // seriesEntry s is appended to res, but not at every outer loop iteration, + // therefore len(res) is the index we need here, not outer loop iteration number. + if err := chunkr.addLoad(meta.Ref, len(res), j); err != nil { + return nil, nil, errors.Wrap(err, "add chunk load") } s.chks = append(s.chks, storepb.AggrChunk{ MinTime: meta.MinTime, @@ -792,16 +793,16 @@ func blockSeries( return newBucketSeriesSet(res), indexr.stats, nil } - if err := chunkr.preload(res, req.Aggregates); err != nil { - return nil, nil, errors.Wrap(err, "preload chunks") + if err := chunkr.load(res, req.Aggregates); err != nil { + return nil, nil, errors.Wrap(err, "load chunks") } return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } -func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, savior func([]byte) ([]byte, error)) error { +func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error)) error { if in.Encoding() == chunkenc.EncXOR { - b, err := savior(in.Bytes()) + b, err := save(in.Bytes()) if err != nil { return err } @@ -821,7 +822,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrCount) } - b, err := savior(x.Bytes()) + b, err := save(x.Bytes()) if err != nil { return err } @@ -831,7 +832,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrSum) } - b, err := savior(x.Bytes()) + b, err := save(x.Bytes()) if err != nil { return err } @@ -841,7 +842,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrMin) } - b, err := savior(x.Bytes()) + b, err := save(x.Bytes()) if err != nil { return err } @@ -851,7 +852,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrMax) } - b, err := savior(x.Bytes()) + b, err := save(x.Bytes()) if err != nil { return err } @@ -861,7 +862,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrCounter) } - b, err := savior(x.Bytes()) + b, err := save(x.Bytes()) if err != nil { return err } @@ -2242,9 +2243,10 @@ func decodeSeriesForTime(b []byte, lset *[]symbolizedLabel, chks *[]chunks.Meta, } type preloadIdx struct { - off uint32 - i int - j int + offset uint32 + // Indices, not actual entries and chunks. + seriesEntry int + chunk int } type bucketChunkReader struct { @@ -2267,20 +2269,7 @@ func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkR block: block, stats: &queryStats{}, preloads: make([][]preloadIdx, len(block.chunkObjs)), - chunks: map[uint64]chunkenc.Chunk{}, - } -} - -func (r *bucketChunkReader) Chunk(id uint64) (chunkenc.Chunk, error) { - c, ok := r.chunks[id] - if !ok { - return nil, errors.Errorf("chunk with ID %d not found", id) } - - r.stats.chunksTouched++ - r.stats.chunksTouchedSizeSum += len(c.Bytes()) - - return c, nil } func (r *bucketChunkReader) Close() error { @@ -2292,8 +2281,9 @@ func (r *bucketChunkReader) Close() error { return nil } -// appPreload adds the chunk with id to the data set that will be fetched on calling preload. -func (r *bucketChunkReader) addPreload(id uint64, i, j int) error { +// addLoad adds the chunk with id to the data set to be fetched. +// Chunk will be fetched and saved to res[seriesEntry][chunk] upon r.load(res, <...>) call. +func (r *bucketChunkReader) addLoad(id uint64, seriesEntry, chunk int) error { var ( seq = int(id >> 32) off = uint32(id) @@ -2301,20 +2291,20 @@ func (r *bucketChunkReader) addPreload(id uint64, i, j int) error { if seq >= len(r.preloads) { return errors.Errorf("reference sequence %d out of range", seq) } - r.preloads[seq] = append(r.preloads[seq], preloadIdx{off, i, j}) + r.preloads[seq] = append(r.preloads[seq], preloadIdx{off, seriesEntry, chunk}) return nil } -// preload all added chunk IDs. Must be called before the first call to Chunk is made. -func (r *bucketChunkReader) preload(res []seriesEntry, aggrs []storepb.Aggr) error { +// load loads all added chunks and saves resulting aggrs to res. +func (r *bucketChunkReader) load(res []seriesEntry, aggrs []storepb.Aggr) error { g, ctx := errgroup.WithContext(r.ctx) for seq, pIdxs := range r.preloads { sort.Slice(pIdxs, func(i, j int) bool { - return pIdxs[i].off < pIdxs[j].off + return pIdxs[i].offset < pIdxs[j].offset }) parts := r.block.partitioner.Partition(len(pIdxs), func(i int) (start, end uint64) { - return uint64(pIdxs[i].off), uint64(pIdxs[i].off) + EstimatedMaxChunkSize + return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + EstimatedMaxChunkSize }) for _, p := range parts { @@ -2358,7 +2348,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a var ( buf = make([]byte, EstimatedMaxChunkSize) - readOffset = int(pIdxs[0].off) + readOffset = int(pIdxs[0].offset) // Save a few allocations. written int64 @@ -2369,41 +2359,44 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a for i, pIdx := range pIdxs { // Fast forward range reader to the next chunk start in case of sparse (for our purposes) byte range. - for readOffset < int(pIdx.off) { - written, err = io.CopyN(ioutil.Discard, bufReader, int64(pIdx.off)-int64(readOffset)) + for readOffset < int(pIdx.offset) { + written, err = io.CopyN(ioutil.Discard, bufReader, int64(pIdx.offset)-int64(readOffset)) if err != nil { return errors.Wrap(err, "fast forward range reader") } readOffset += int(written) } - // Presume chunk length. + // Presume chunk length to be reasonably large for common use cases. + // However, declaration for EstimatedMaxChunkSize warns us some chunks could be larger in some rare cases. + // This is handled further down below. chunkLen = EstimatedMaxChunkSize if i+1 < len(pIdxs) { - if diff = pIdxs[i+1].off - pIdx.off; int(diff) < chunkLen { + if diff = pIdxs[i+1].offset - pIdx.offset; int(diff) < chunkLen { chunkLen = int(diff) } } cb := buf[:chunkLen] n, err = io.ReadFull(bufReader, cb) readOffset += n - if errors.Is(err, io.ErrUnexpectedEOF) && i == len(pIdxs)-1 { - // This may be a valid case. - } else if err != nil { - return errors.Wrapf(err, "read range for seq %d offset %x", seq, pIdx.off) + // Unexpected EOF for last chunk could be a valid case. Any other errors are definitely real. + if err != nil && !(errors.Is(err, io.ErrUnexpectedEOF) && i == len(pIdxs)-1) { + return errors.Wrapf(err, "read range for seq %d offset %x", seq, pIdx.offset) } - l, n := binary.Uvarint(cb) + chunkDataLen, n := binary.Uvarint(cb) if n < 1 { return errors.New("reading chunk length failed") } - // Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and l for actual chunk data. + // Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and chunkDataLen for actual chunk data. // There is also crc32 after the chunk, but we ignore that. - chunkLen = n + 1 + int(l) + chunkLen = n + 1 + int(chunkDataLen) if chunkLen <= len(cb) { - if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk(cb[n:chunkLen]), aggrs, r.savior); err != nil { + if err := populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save); err != nil { return errors.Wrap(err, "populate chunk") } + r.stats.chunksTouched++ + r.stats.chunksTouchedSizeSum += int(chunkDataLen) continue } @@ -2414,7 +2407,8 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a fetchBegin = time.Now() // Read entire chunk into new buffer. - nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.off), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) + // TODO: readChunkRange call could be avoided for any chunk but last in this particular part. + nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) if err != nil { return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen) } @@ -2428,17 +2422,21 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a r.stats.chunksFetchCount++ r.stats.chunksFetchDurationSum += time.Since(fetchBegin) r.stats.chunksFetchedSizeSum += len(*nb) - - if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk((*nb)[n:]), aggrs, r.savior); err != nil { + if err := populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save); err != nil { + r.block.chunkPool.Put(nb) return errors.Wrap(err, "populate chunk") } + r.stats.chunksTouched++ + r.stats.chunksTouchedSizeSum += int(chunkDataLen) r.block.chunkPool.Put(nb) } return nil } -func (r *bucketChunkReader) savior(b []byte) ([]byte, error) { +// save saves a copy of b's payload to a memory pool of its own and returns a new byte slice referencing said copy. +// Returned slice becomes invalid once r.block.chunkPool.Put() is called. +func (r *bucketChunkReader) save(b []byte) ([]byte, error) { // Ensure we never grow slab beyond original capacity. if len(r.chunkBytes) == 0 || cap(*r.chunkBytes[len(r.chunkBytes)-1])-len(*r.chunkBytes[len(r.chunkBytes)-1]) < len(b) { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index ecad9567fd9..f165a6785de 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -20,6 +20,7 @@ import ( "strconv" "sync" "testing" + "time" "github.com/go-kit/kit/log" "github.com/gogo/protobuf/proto" @@ -1053,6 +1054,7 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in }, nil) testutil.Ok(t, err) testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) return id } @@ -2328,7 +2330,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck head, _ := storetestutil.CreateHeadWithSeries(b, 0, storetestutil.HeadGenOptions{ TSDBDir: filepath.Join(tmpDir, "head"), SamplesPerSeries: 86400 / 15, // Simulate 1 day block with 15s scrape interval. - ScrapeInterval: 15 * 1000, + ScrapeInterval: 15 * time.Second, Series: 1000, PrependLabels: nil, Random: rand.New(rand.NewSource(120)), @@ -2349,7 +2351,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) if resolutionLevel > 0 { - // Downsample newly-created block + // Downsample newly-created block. blockID, err = downsample.Downsample(logger, blockMeta, head, tmpDir, int64(resolutionLevel)) testutil.Ok(b, err) blockMeta, err = metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) @@ -2412,6 +2414,8 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet } matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) + // TODO FIXME! testutil.Ok calls b.Fatalf under the hood, which + // must be called only from the goroutine running the Benchmark function. testutil.Ok(b, err) indexReader := blk.indexReader(ctx) diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index 46faece2a62..cb9305cfaaa 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -13,6 +13,7 @@ import ( "runtime" "sort" "testing" + "time" "github.com/gogo/protobuf/types" "github.com/prometheus/prometheus/pkg/labels" @@ -39,8 +40,9 @@ func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { } type HeadGenOptions struct { - TSDBDir string - SamplesPerSeries, Series, ScrapeInterval int + TSDBDir string + SamplesPerSeries, Series int + ScrapeInterval time.Duration WithWAL bool PrependLabels labels.Labels @@ -58,14 +60,14 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, t.Fatal("samples and series has to be 1 or more") } if opts.ScrapeInterval == 0 { - opts.ScrapeInterval = 1 + opts.ScrapeInterval = 1 * time.Millisecond } fmt.Printf( - "Creating %d %d-sample series with %d ms interval in %s\n", + "Creating %d %d-sample series with %s interval in %s\n", opts.Series, opts.SamplesPerSeries, - opts.ScrapeInterval, + opts.ScrapeInterval.String(), opts.TSDBDir, ) @@ -88,13 +90,13 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, tsLabel := j*opts.Series*opts.SamplesPerSeries + i*opts.SamplesPerSeries ref, err := app.Add( labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix)), - int64(tsLabel*opts.ScrapeInterval), + int64(tsLabel)*opts.ScrapeInterval.Milliseconds(), opts.Random.Float64(), ) testutil.Ok(t, err) for is := 1; is < opts.SamplesPerSeries; is++ { - testutil.Ok(t, app.AddFast(ref, int64((tsLabel+is)*opts.ScrapeInterval), opts.Random.Float64())) + testutil.Ok(t, app.AddFast(ref, int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), opts.Random.Float64())) } } testutil.Ok(t, app.Commit()) From 4b426441e1aa5f62d7b1ab54cab9269f2d2eaf4a Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Mon, 29 Mar 2021 17:24:54 +0300 Subject: [PATCH 10/10] pkg/store: rename variables Signed-off-by: Vladimir Kononov --- pkg/store/bucket.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 413695f8837..c97bd61e1a6 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -170,7 +170,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }) m.seriesGetAllDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_get_all_duration_seconds", - Help: "Time it takes until all per-block prepares and preloads for a query are finished.", + Help: "Time it takes until all per-block prepares and loads for a query are finished.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, }) m.seriesMergeDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ @@ -2242,7 +2242,7 @@ func decodeSeriesForTime(b []byte, lset *[]symbolizedLabel, chks *[]chunks.Meta, return len(*chks) > 0, d.Err() } -type preloadIdx struct { +type loadIdx struct { offset uint32 // Indices, not actual entries and chunks. seriesEntry int @@ -2253,7 +2253,7 @@ type bucketChunkReader struct { ctx context.Context block *bucketBlock - preloads [][]preloadIdx + toLoad [][]loadIdx // Mutex protects access to following fields, when updated from chunks-loading goroutines. // After chunks are loaded, mutex is no longer used. @@ -2264,10 +2264,10 @@ type bucketChunkReader struct { func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkReader { return &bucketChunkReader{ - ctx: ctx, - block: block, - stats: &queryStats{}, - preloads: make([][]preloadIdx, len(block.chunkObjs)), + ctx: ctx, + block: block, + stats: &queryStats{}, + toLoad: make([][]loadIdx, len(block.chunkObjs)), } } @@ -2287,10 +2287,10 @@ func (r *bucketChunkReader) addLoad(id uint64, seriesEntry, chunk int) error { seq = int(id >> 32) off = uint32(id) ) - if seq >= len(r.preloads) { + if seq >= len(r.toLoad) { return errors.Errorf("reference sequence %d out of range", seq) } - r.preloads[seq] = append(r.preloads[seq], preloadIdx{off, seriesEntry, chunk}) + r.toLoad[seq] = append(r.toLoad[seq], loadIdx{off, seriesEntry, chunk}) return nil } @@ -2298,7 +2298,7 @@ func (r *bucketChunkReader) addLoad(id uint64, seriesEntry, chunk int) error { func (r *bucketChunkReader) load(res []seriesEntry, aggrs []storepb.Aggr) error { g, ctx := errgroup.WithContext(r.ctx) - for seq, pIdxs := range r.preloads { + for seq, pIdxs := range r.toLoad { sort.Slice(pIdxs, func(i, j int) bool { return pIdxs[i].offset < pIdxs[j].offset }) @@ -2320,7 +2320,7 @@ func (r *bucketChunkReader) load(res []seriesEntry, aggrs []storepb.Aggr) error // loadChunks will read range [start, end] from the segment file with sequence number seq. // This data range covers chunks starting at supplied offsets. -func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []preloadIdx) error { +func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx) error { fetchBegin := time.Now() // Get a reader for the required range. @@ -2391,7 +2391,8 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a // There is also crc32 after the chunk, but we ignore that. chunkLen = n + 1 + int(chunkDataLen) if chunkLen <= len(cb) { - if err := populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save); err != nil { + err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save) + if err != nil { return errors.Wrap(err, "populate chunk") } r.stats.chunksTouched++ @@ -2421,7 +2422,8 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a r.stats.chunksFetchCount++ r.stats.chunksFetchDurationSum += time.Since(fetchBegin) r.stats.chunksFetchedSizeSum += len(*nb) - if err := populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save); err != nil { + err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save) + if err != nil { r.block.chunkPool.Put(nb) return errors.Wrap(err, "populate chunk") }