diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 6867b0e1519c..4086831bb33f 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -658,6 +658,10 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool { return lbs.Get("log_stream") == "dispatcher" } +func (t *testFilter) RequiredLabelNames() []string { + return []string{"log_stream"} +} + func Test_ChunkFilter(t *testing.T) { instance := defaultInstance(t) instance.chunkFilter = &testFilter{} diff --git a/pkg/storage/chunk/interface.go b/pkg/storage/chunk/interface.go index 8da4312c6039..1fbb2beb5207 100644 --- a/pkg/storage/chunk/interface.go +++ b/pkg/storage/chunk/interface.go @@ -67,4 +67,5 @@ type RequestChunkFilterer interface { // Filterer filters chunks based on the metric. type Filterer interface { ShouldFilter(metric labels.Labels) bool + RequiredLabelNames() []string } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 3c9acdfa5a63..101c906b8b4f 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -866,6 +866,10 @@ func (f fakeChunkFilterer) ShouldFilter(metric labels.Labels) bool { return metric.Get("foo") == "bazz" } +func (f fakeChunkFilterer) RequiredLabelNames() []string { + return []string{"foo"} +} + func Test_ChunkFilterer(t *testing.T) { s := &LokiStore{ Store: storeFixture, diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go b/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go index 3a0cf3cdbfc7..cf709e7bd97c 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go @@ -146,14 +146,23 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int6 return s.fp, nil } -func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, index.ChunkStats, error) { +func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) { s := h.head.series.getByID(uint64(ref)) if s == nil { h.head.metrics.seriesNotFound.Inc() return 0, index.ChunkStats{}, storage.ErrNotFound } - *lbls = append((*lbls)[:0], s.ls...) + if len(by) == 0 { + *lbls = append((*lbls)[:0], s.ls...) + } else { + *lbls = (*lbls)[:0] + for _, l := range s.ls { + if _, ok := by[l.Name]; ok { + *lbls = append(*lbls, l) + } + } + } queryBounds := newBounds(model.Time(from), model.Time(through)) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go index 123750aea3de..f3cb7653cbe9 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go @@ -1832,7 +1832,7 @@ func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *l return fprint, nil } -func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) { +func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) { offset := id // In version 2+ series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. @@ -1844,7 +1844,7 @@ func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *lab return 0, ChunkStats{}, d.Err() } - return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls) + return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls, by) } func (r *Reader) Postings(name string, fpFilter FingerprintFilter, values ...string) (Postings, error) { @@ -2216,11 +2216,53 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) if d.Err() != nil { return nil, 0, errors.Wrap(d.Err(), "read series label offsets") } + // todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls. + ln, err := dec.LookupSymbol(lno) + if err != nil { + return nil, 0, errors.Wrap(err, "lookup label name") + } + lv, err := dec.LookupSymbol(lvo) + if err != nil { + return nil, 0, errors.Wrap(err, "lookup label value") + } + + *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) + } + return &d, fprint, nil +} + +// prepSeriesBy returns series labels and chunks for a series and only returning selected `by` label names. +// If `by` is empty, it returns all labels for the series. +func (dec *Decoder) prepSeriesBy(b []byte, lbls *labels.Labels, chks *[]ChunkMeta, by map[string]struct{}) (*encoding.Decbuf, uint64, error) { + if by == nil { + return dec.prepSeries(b, lbls, chks) + } + *lbls = (*lbls)[:0] + if chks != nil { + *chks = (*chks)[:0] + } + d := encoding.DecWrap(tsdb_enc.Decbuf{B: b}) + + fprint := d.Be64() + k := d.Uvarint() + + for i := 0; i < k; i++ { + lno := uint32(d.Uvarint()) + lvo := uint32(d.Uvarint()) + + if d.Err() != nil { + return nil, 0, errors.Wrap(d.Err(), "read series label offsets") + } + // todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls. ln, err := dec.LookupSymbol(lno) if err != nil { return nil, 0, errors.Wrap(err, "lookup label name") } + if _, ok := by[ln]; !ok { + continue + } + lv, err := dec.LookupSymbol(lvo) if err != nil { return nil, 0, errors.Wrap(err, "lookup label value") @@ -2231,8 +2273,8 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) return &d, fprint, nil } -func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) { - d, fp, err := dec.prepSeries(b, lbls, nil) +func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) { + d, fp, err := dec.prepSeriesBy(b, lbls, nil, by) if err != nil { return 0, ChunkStats{}, err } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go index b29556c348cf..60ec32ee954b 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go @@ -69,7 +69,7 @@ type IndexReader interface { Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) // ChunkStats returns the stats for the chunks in the given series. - ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels) (uint64, index.ChunkStats, error) + ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) // LabelNames returns all the unique label names present in the index in sorted order. LabelNames(matchers ...*labels.Matcher) ([]string, error) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index 7934b952ba88..255425b286f2 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -293,12 +293,18 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim // TODO(owen-d): use pool var ls labels.Labels var filterer chunk.Filterer + by := make(map[string]struct{}) if i.chunkFilter != nil { filterer = i.chunkFilter.ForRequest(ctx) + if filterer != nil { + for _, k := range filterer.RequiredLabelNames() { + by[k] = struct{}{} + } + } } for p.Next() { - fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls) + fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by) if err != nil { return err } @@ -362,16 +368,29 @@ func (i *TSDBIndex) Volume( seriesLabels := labels.Labels(make([]labels.Label, 0, len(labelsToMatch))) aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == "" + var by map[string]struct{} + var filterer chunk.Filterer + if i.chunkFilter != nil { + filterer = i.chunkFilter.ForRequest(ctx) + } + if !includeAll && (aggregateBySeries || len(targetLabels) > 0) { + by = make(map[string]struct{}, len(labelsToMatch)) + for k := range labelsToMatch { + by[k] = struct{}{} + } - return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error { - var ls labels.Labels - var filterer chunk.Filterer - if i.chunkFilter != nil { - filterer = i.chunkFilter.ForRequest(ctx) + // If we are aggregating by series, we need to include all labels in the series required for filtering chunks. + if filterer != nil { + for _, k := range filterer.RequiredLabelNames() { + by[k] = struct{}{} + } } + } + return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error { + var ls labels.Labels for p.Next() { - fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls) + fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by) if err != nil { return fmt.Errorf("series volume: %w", err) } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go index 068630c553a0..9784475091bf 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go @@ -2,6 +2,7 @@ package tsdb import ( "context" + "fmt" "math/rand" "sort" "testing" @@ -140,7 +141,6 @@ func TestSingleIdx(t *testing.T) { End: 10, Checksum: 3, }}, shardedRefs) - }) t.Run("Series", func(t *testing.T) { @@ -202,10 +202,8 @@ func TestSingleIdx(t *testing.T) { require.Nil(t, err) require.Equal(t, []string{"bar"}, vs) }) - }) } - } func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) { @@ -743,10 +741,50 @@ func TestTSDBIndex_Volume(t *testing.T) { Limit: 10, }, acc.Volumes()) }) + // todo(cyriltovena): tests with chunk filterer }) }) } +func BenchmarkTSDBIndex_Volume(b *testing.B) { + var series []LoadableSeries + for i := 0; i < 1000; i++ { + series = append(series, LoadableSeries{ + Labels: mustParseLabels(fmt.Sprintf(`{foo="bar", fizz="fizz%d", buzz="buzz%d",bar="bar%d", bozz="bozz%d"}`, i, i, i, i)), + Chunks: []index.ChunkMeta{ + { + MinTime: 0, + MaxTime: 10, + Checksum: uint32(i), + KB: 10, + Entries: 10, + }, + { + MinTime: 10, + MaxTime: 20, + Checksum: uint32(i), + KB: 10, + Entries: 10, + }, + }, + }) + } + ctx := context.Background() + from := model.Earliest + through := model.Latest + // Create the TSDB index + tempDir := b.TempDir() + tsdbIndex := BuildIndex(b, tempDir, series) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + acc := seriesvolume.NewAccumulator(10, 10) + err := tsdbIndex.Volume(ctx, "fake", from, through, acc, nil, nil, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+")) + require.NoError(b, err) + } +} + type filterAll struct{} func (f *filterAll) ForRequest(_ context.Context) chunk.Filterer { @@ -758,3 +796,7 @@ type filterAllFilterer struct{} func (f *filterAllFilterer) ShouldFilter(_ labels.Labels) bool { return true } + +func (f *filterAllFilterer) RequiredLabelNames() []string { + return nil +}