From a1aadecb1b3fd4777a16a4588efadbcfb455b318 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 29 Jul 2024 16:30:19 +0200 Subject: [PATCH 01/16] feat: Introduce wal segment read path. --- pkg/ingester-rf1/flush.go | 2 +- pkg/ingester-rf1/metastore/metastore.go | 2 +- pkg/querier-rf1/wal/chunks.go | 228 +++++++++++++++ pkg/querier-rf1/wal/querier.go | 149 ++++++++++ .../indexshipper/tsdb/index/postings.go | 7 + pkg/storage/wal/chunks/chunks.go | 91 +++++- pkg/storage/wal/index/index.go | 273 ++++++++++++++++++ pkg/storage/wal/segment.go | 6 +- pkg/storage/wal/segment_test.go | 4 +- 9 files changed, 754 insertions(+), 8 deletions(-) create mode 100644 pkg/querier-rf1/wal/chunks.go create mode 100644 pkg/querier-rf1/wal/querier.go diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go index 2242569c2c25..ce53fdac0a58 100644 --- a/pkg/ingester-rf1/flush.go +++ b/pkg/ingester-rf1/flush.go @@ -112,7 +112,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter } id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String() - if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil { + if err := i.store.PutObject(ctx, fmt.Sprintf(wal.Dir+id), buf); err != nil { i.metrics.flushFailuresTotal.Inc() return fmt.Errorf("failed to put object: %w", err) } diff --git a/pkg/ingester-rf1/metastore/metastore.go b/pkg/ingester-rf1/metastore/metastore.go index 6d999b8edd51..8c30dd003848 100644 --- a/pkg/ingester-rf1/metastore/metastore.go +++ b/pkg/ingester-rf1/metastore/metastore.go @@ -25,7 +25,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/raftleader" ) -const metastoreRaftLeaderHealthServiceName = "metastorepb.MetastoreService.RaftLeader" +const metastoreRaftLeaderHealthServiceName = "metastorepb.MetastoreService" type Config struct { DataDir string `yaml:"data_dir"` diff --git a/pkg/querier-rf1/wal/chunks.go b/pkg/querier-rf1/wal/chunks.go new file mode 100644 index 000000000000..ba008bdf1ea7 --- /dev/null +++ b/pkg/querier-rf1/wal/chunks.go @@ -0,0 +1,228 @@ +package wal + +import ( + "bytes" + "context" + "sort" + "sync" + + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/storage/wal" + "github.com/grafana/loki/v3/pkg/storage/wal/chunks" + "github.com/grafana/loki/v3/pkg/storage/wal/index" + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/sync/errgroup" +) + +const batchSize = 16 + +var _ iter.EntryIterator = (*lazyChunks)(nil) + +type lazyChunk struct { + meta *chunks.Meta + labels labels.Labels + id string +} + +func newLazyChunk(id string, lbs *labels.ScratchBuilder, meta *chunks.Meta) lazyChunk { + lbs.Sort() + return lazyChunk{ + id: id, + meta: meta, + labels: lbs.Labels(), + } +} + +type lazyChunks struct { + chunks []lazyChunk + direction logproto.Direction + pipeline log.Pipeline + minT, maxT int64 + storage BlockStorage + ctx context.Context + + current iter.EntryIterator + batch []lazyChunk + err error +} + +// todo: Support SampleIterator. +func NewChunksEntryIterator( + ctx context.Context, + storage BlockStorage, + chunks []lazyChunk, + pipeline log.Pipeline, + direction logproto.Direction, + minT, maxT int64, +) *lazyChunks { + // sort by time and then by labels following the direction. + sort.Slice(chunks, func(i, j int) bool { + if direction == logproto.FORWARD { + t1, t2 := chunks[i].meta.MinTime, chunks[j].meta.MinTime + if t1 != t2 { + return t1 < t2 + } + return labels.Compare(chunks[i].labels, chunks[j].labels) < 0 + } + t1, t2 := chunks[i].meta.MaxTime, chunks[j].meta.MaxTime + if t1 != t2 { + return t1 > t2 + } + return labels.Compare(chunks[i].labels, chunks[j].labels) < 0 + }) + return &lazyChunks{ + ctx: ctx, + chunks: chunks, + direction: direction, + pipeline: pipeline, + storage: storage, + batch: make([]lazyChunk, 0, batchSize), + minT: minT, + maxT: maxT, + } +} + +// At implements iter.EntryIterator. +func (l *lazyChunks) At() push.Entry { + if l.current == nil { + return push.Entry{} + } + return l.current.At() +} + +func (l *lazyChunks) Labels() string { + if l.current == nil { + return "" + } + return l.current.Labels() +} + +func (l *lazyChunks) StreamHash() uint64 { + if l.current == nil { + return 0 + } + return l.current.StreamHash() +} + +// Close implements iter.EntryIterator. +func (l *lazyChunks) Close() error { + if l.current == nil { + return nil + } + return l.current.Close() +} + +// Err implements iter.EntryIterator. +func (l *lazyChunks) Err() error { + if l.err != nil { + return l.err + } + if l.current == nil { + return l.current.Err() + } + return nil +} + +// Next implements iter.EntryIterator. +func (l *lazyChunks) Next() bool { + if l.current != nil && l.current.Next() { + return true + } + if l.current != nil { + if err := l.current.Close(); err != nil { + l.err = err + return false + } + } + if len(l.chunks) == 0 { + return false + } + // take the next batch of chunks + if err := l.nextBatch(); err != nil { + l.err = err + return false + } + return l.current.Next() +} + +func (l *lazyChunks) nextBatch() error { + l.batch = l.batch[:0] + for len(l.chunks) > 0 && + (len(l.batch) < batchSize || + isOverlapping(l.batch[len(l.batch)-1], l.chunks[0], l.direction)) { + l.batch = append(l.batch, l.chunks[0]) + l.chunks = l.chunks[1:] + } + // todo: error if the batch is too big. + // todo: reuse previous sortIterator array + // todo: Try to use iter.NonOverlappingEntryIterator if possible which can reduce the amount of work. + var ( + iters []iter.EntryIterator + mtx sync.Mutex + ) + g, ctx := errgroup.WithContext(l.ctx) + g.SetLimit(64) + for _, c := range l.batch { + c := c + g.Go(func() error { + iter, err := fetchChunkEntries(ctx, c, l.minT, l.maxT, l.direction, l.pipeline, l.storage) + if err != nil { + return err + } + mtx.Lock() + iters = append(iters, iter) + mtx.Unlock() + return nil + }) + } + if err := g.Wait(); err != nil { + return err + } + l.current = iter.NewSortEntryIterator(iters, l.direction) + return nil +} + +func fetchChunkEntries( + ctx context.Context, + c lazyChunk, + from, through int64, + direction logproto.Direction, + pipeline log.Pipeline, + storage BlockStorage, +) (iter.EntryIterator, error) { + offset, size := c.meta.Ref.Unpack() + reader, err := storage.GetRangeObject(ctx, wal.Dir+c.id, int64(offset), int64(size)) + if err != nil { + return nil, err + } + defer reader.Close() + // todo: use a pool + buf := bytes.NewBuffer(make([]byte, 0, size)) + _, err = buf.ReadFrom(reader) + if err != nil { + return nil, err + } + // create logql pipeline and remove tenantID + // todo: we might want to create a single pipeline for all chunks from the same series. + streamPipeline := pipeline.ForStream( + labels.NewBuilder(c.labels).Del(index.TenantLabel).Labels(), + ) + it, err := chunks.NewEntryIterator(buf.Bytes(), streamPipeline, direction, from, through) + if err != nil { + return nil, err + } + return iter.EntryIteratorWithClose(it, func() error { + // todo: return buffer to pool. + return nil + }), nil +} + +func isOverlapping(first, second lazyChunk, direction logproto.Direction) bool { + if direction == logproto.BACKWARD { + return first.meta.MinTime <= second.meta.MaxTime + } + return first.meta.MaxTime < second.meta.MinTime +} diff --git a/pkg/querier-rf1/wal/querier.go b/pkg/querier-rf1/wal/querier.go new file mode 100644 index 000000000000..956b7567e6f5 --- /dev/null +++ b/pkg/querier-rf1/wal/querier.go @@ -0,0 +1,149 @@ +package wal + +import ( + "bytes" + "context" + "io" + "sync" + + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/sync/errgroup" + grpc "google.golang.org/grpc" + + "github.com/grafana/dskit/tenant" + "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/storage/wal" + "github.com/grafana/loki/v3/pkg/storage/wal/chunks" + "github.com/grafana/loki/v3/pkg/storage/wal/index" +) + +var _ logql.Querier = (*Querier)(nil) + +type BlockStorage interface { + GetRangeObject(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) +} + +type Metastore interface { + ListBlocksForQuery(ctx context.Context, in *metastorepb.ListBlocksForQueryRequest, opts ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error) +} + +type Querier struct { + blockStorage BlockStorage + metaStore Metastore +} + +func New( + metaStore Metastore, + blockStorage BlockStorage, +) (*Querier, error) { + return &Querier{ + blockStorage: blockStorage, + metaStore: metaStore, + }, nil +} + +func (q *Querier) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { + // todo request validation and delete markers. + tenantID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + expr, err := req.LogSelector() + if err != nil { + return nil, err + } + matchers := expr.Matchers() + // todo: not sure if Pipeline is thread safe + pipeline, err := expr.Pipeline() + if err != nil { + return nil, err + } + // todo support sharding + var ( + lazyChunks []lazyChunk + mtx sync.Mutex + ) + + err = q.forSeries(ctx, &metastorepb.ListBlocksForQueryRequest{ + TenantId: tenantID, + StartTime: req.Start.UnixNano(), + EndTime: req.End.UnixNano(), + }, func(id string, lbs *labels.ScratchBuilder, chk *chunks.Meta) error { + mtx.Lock() + lazyChunks = append(lazyChunks, newLazyChunk(id, lbs, chk)) + mtx.Unlock() + return nil + }, matchers...) + + return NewChunksEntryIterator(ctx, + q.blockStorage, + lazyChunks, + pipeline, + req.Direction, + req.Start.UnixNano(), + req.End.UnixNano()), err +} + +func (q *Querier) SelectSamples(context.Context, logql.SelectSampleParams) (iter.SampleIterator, error) { + // todo: implement + return nil, nil +} + +func (q *Querier) forSeries(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(string, *labels.ScratchBuilder, *chunks.Meta) error, ms ...*labels.Matcher) error { + return q.forIndices(ctx, req, func(ir *index.Reader, id string) error { + bufLbls := labels.ScratchBuilder{} + chunks := make([]chunks.Meta, 0, 1) + p, err := ir.PostingsForMatchers(ctx, req.TenantId, ms...) + if err != nil { + return err + } + for p.Next() { + err := ir.Series(p.At(), &bufLbls, &chunks) + if err != nil { + return err + } + if err := fn(id, &bufLbls, &chunks[0]); err != nil { + return err + } + } + return p.Err() + }) +} + +func (q *Querier) forIndices(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(ir *index.Reader, id string) error) error { + resp, err := q.metaStore.ListBlocksForQuery(ctx, req) + if err != nil { + return err + } + metas := resp.Blocks + if len(metas) == 0 { + return nil + } + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(32) + for _, meta := range metas { + + meta := meta + g.Go(func() error { + reader, err := q.blockStorage.GetRangeObject(ctx, wal.Dir+meta.Id, meta.IndexRef.Offset, meta.IndexRef.Length) + if err != nil { + return err + } + defer reader.Close() + // todo: use a buffer pool + buf := bytes.NewBuffer(make([]byte, 0, meta.IndexRef.Length)) + _, err = buf.ReadFrom(reader) + if err != nil { + return err + } + index, err := index.NewReader(index.RealByteSlice(buf.Bytes())) + if err != nil { + return err + } + return fn(index, meta.Id) + }) + } + return g.Wait() +} diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go index 248cd523dab5..7c2dd99023b7 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/postings.go @@ -424,6 +424,13 @@ func EmptyPostings() Postings { return emptyPostings } +// IsEmptyPostingsType returns true if the postings are an empty postings list. +// When this function returns false, it doesn't mean that the postings isn't empty +// (it could be an empty intersection of two non-empty postings, for example). +func IsEmptyPostingsType(p Postings) bool { + return p == emptyPostings +} + // ErrPostings returns new postings that immediately error. func ErrPostings(err error) Postings { return errPostings{err} diff --git a/pkg/storage/wal/chunks/chunks.go b/pkg/storage/wal/chunks/chunks.go index f0f2625596f5..5c09962a0550 100644 --- a/pkg/storage/wal/chunks/chunks.go +++ b/pkg/storage/wal/chunks/chunks.go @@ -9,13 +9,17 @@ import ( "hash/crc32" "io" "sync" + "time" "unsafe" "github.com/golang/snappy" "github.com/klauspost/compress/s2" + "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" ) // EncodingType defines the type for encoding enums @@ -27,7 +31,10 @@ const ( ) // Initialize the CRC32 table -var castagnoliTable *crc32.Table +var ( + castagnoliTable *crc32.Table + _ iter.EntryIterator = (*entryBufferedIterator)(nil) +) func init() { castagnoliTable = crc32.MakeTable(crc32.Castagnoli) @@ -332,3 +339,85 @@ func (r *ChunkReader) readChunkHeader() error { func unsafeGetBytes(s string) []byte { return unsafe.Slice(unsafe.StringData(s), len(s)) } + +type entryBufferedIterator struct { + reader *ChunkReader + pipeline log.StreamPipeline + from, through int64 + + cur logproto.Entry + currLabels log.LabelsResult +} + +func NewEntryIterator( + chunkData []byte, + pipeline log.StreamPipeline, + direction logproto.Direction, + from, through int64, +) (iter.EntryIterator, error) { + chkReader, err := NewChunkReader(chunkData) + if err != nil { + return nil, err + } + it := &entryBufferedIterator{ + reader: chkReader, + pipeline: pipeline, + from: from, + through: through, + } + if direction == logproto.FORWARD { + return it, nil + } + return iter.NewEntryReversedIter(it) +} + +// At implements iter.EntryIterator. +func (e *entryBufferedIterator) At() push.Entry { + return e.cur +} + +// Close implements iter.EntryIterator. +func (e *entryBufferedIterator) Close() error { + return e.reader.Close() +} + +// Err implements iter.EntryIterator. +func (e *entryBufferedIterator) Err() error { + return e.reader.Err() +} + +// Labels implements iter.EntryIterator. +func (e *entryBufferedIterator) Labels() string { + return e.currLabels.String() +} + +// Next implements iter.EntryIterator. +func (e *entryBufferedIterator) Next() bool { + for e.reader.Next() { + ts, line := e.reader.At() + // check if the timestamp is within the range before applying the pipeline. + if ts < e.from { + continue + } + if ts >= e.through { + return false + } + // todo: structured metadata. + newLine, lbs, matches := e.pipeline.Process(ts, line) + if !matches { + continue + } + e.currLabels = lbs + e.cur.Timestamp = time.Unix(0, ts) + e.cur.Line = string(newLine) + e.cur.StructuredMetadata = logproto.FromLabelsToLabelAdapters(lbs.StructuredMetadata()) + e.cur.Parsed = logproto.FromLabelsToLabelAdapters(lbs.Parsed()) + return true + } + return false +} + +// StreamHash implements iter.EntryIterator. +func (e *entryBufferedIterator) StreamHash() uint64 { + return e.pipeline.BaseLabels().Hash() +} diff --git a/pkg/storage/wal/index/index.go b/pkg/storage/wal/index/index.go index 29436bd2044b..d4d35c0266ed 100644 --- a/pkg/storage/wal/index/index.go +++ b/pkg/storage/wal/index/index.go @@ -24,6 +24,8 @@ import ( "math" "slices" "sort" + "strings" + "unicode/utf8" "unsafe" "github.com/prometheus/prometheus/model/labels" @@ -51,8 +53,24 @@ const ( // checkContextEveryNIterations is used in some tight loops to check if the context is done. checkContextEveryNIterations = 128 + + TenantLabel = "__loki_tenant__" ) +// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped. +var regexMetaCharacterBytes [16]byte + +// isRegexMetaCharacter reports whether byte b needs to be escaped. +func isRegexMetaCharacter(b byte) bool { + return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0 +} + +func init() { + for _, b := range []byte(`.+*?()|[]{}^$`) { + regexMetaCharacterBytes[b%16] |= 1 << (b / 16) + } +} + var AllPostingsKey = labels.Label{} type indexWriterSeries struct { @@ -1908,3 +1926,258 @@ func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chu func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } + +// PostingsForMatchers assembles a single postings iterator against the index reader +// based on the given matchers. The resulting postings are not ordered by series. +func (r *Reader) PostingsForMatchers(ctx context.Context, tenantID string, ms ...*labels.Matcher) (index.Postings, error) { + ms = append(ms, labels.MustNewMatcher(labels.MatchEqual, TenantLabel, tenantID)) + + var its, notIts []index.Postings + // See which label must be non-empty. + // Optimization for case like {l=~".", l!="1"}. + labelMustBeSet := make(map[string]bool, len(ms)) + for _, m := range ms { + if !m.Matches("") { + labelMustBeSet[m.Name] = true + } + } + isSubtractingMatcher := func(m *labels.Matcher) bool { + if !labelMustBeSet[m.Name] { + return true + } + return (m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp) && m.Matches("") + } + hasSubtractingMatchers, hasIntersectingMatchers := false, false + for _, m := range ms { + if isSubtractingMatcher(m) { + hasSubtractingMatchers = true + } else { + hasIntersectingMatchers = true + } + } + + if hasSubtractingMatchers && !hasIntersectingMatchers { + // If there's nothing to subtract from, add in everything and remove the notIts later. + // We prefer to get AllPostings so that the base of subtraction (i.e. allPostings) + // doesn't include series that may be added to the index reader during this function call. + k, v := index.AllPostingsKey() + allPostings, err := r.Postings(ctx, k, v) + if err != nil { + return nil, err + } + its = append(its, allPostings) + } + + // Sort matchers to have the intersecting matchers first. + // This way the base for subtraction is smaller and + // there is no chance that the set we subtract from + // contains postings of series that didn't exist when + // we constructed the set we subtract by. + slices.SortStableFunc(ms, func(i, j *labels.Matcher) int { + if !isSubtractingMatcher(i) && isSubtractingMatcher(j) { + return -1 + } + + return +1 + }) + + for _, m := range ms { + if ctx.Err() != nil { + return nil, ctx.Err() + } + switch { + case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least. + k, v := index.AllPostingsKey() + allPostings, err := r.Postings(ctx, k, v) + if err != nil { + return nil, err + } + its = append(its, allPostings) + case labelMustBeSet[m.Name]: + // If this matcher must be non-empty, we can be smarter. + matchesEmpty := m.Matches("") + isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp + switch { + case isNot && matchesEmpty: // l!="foo" + // If the label can't be empty and is a Not and the inner matcher + // doesn't match empty, then subtract it out at the end. + inverse, err := m.Inverse() + if err != nil { + return nil, err + } + + it, err := postingsForMatcher(ctx, r, inverse) + if err != nil { + return nil, err + } + notIts = append(notIts, it) + case isNot && !matchesEmpty: // l!="" + // If the label can't be empty and is a Not, but the inner matcher can + // be empty we need to use inversePostingsForMatcher. + inverse, err := m.Inverse() + if err != nil { + return nil, err + } + + it, err := inversePostingsForMatcher(ctx, r, inverse) + if err != nil { + return nil, err + } + if index.IsEmptyPostingsType(it) { + return index.EmptyPostings(), nil + } + its = append(its, it) + default: // l="a" + // Non-Not matcher, use normal postingsForMatcher. + it, err := postingsForMatcher(ctx, r, m) + if err != nil { + return nil, err + } + if index.IsEmptyPostingsType(it) { + return index.EmptyPostings(), nil + } + its = append(its, it) + } + default: // l="" + // If the matchers for a labelname selects an empty value, it selects all + // the series which don't have the label name set too. See: + // https://github.com/prometheus/prometheus/issues/3575 and + // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 + it, err := inversePostingsForMatcher(ctx, r, m) + if err != nil { + return nil, err + } + notIts = append(notIts, it) + } + } + + it := index.Intersect(its...) + + for _, n := range notIts { + it = index.Without(it, n) + } + + return it, nil +} + +// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. +func inversePostingsForMatcher(ctx context.Context, ix *Reader, m *labels.Matcher) (index.Postings, error) { + // Fast-path for MatchNotRegexp matching. + // Inverse of a MatchNotRegexp is MatchRegexp (double negation). + // Fast-path for set matching. + if m.Type == labels.MatchNotRegexp { + setMatches := findSetMatches(m.GetRegexString()) + if len(setMatches) > 0 { + return ix.Postings(ctx, m.Name, setMatches...) + } + } + + // Fast-path for MatchNotEqual matching. + // Inverse of a MatchNotEqual is MatchEqual (double negation). + if m.Type == labels.MatchNotEqual { + return ix.Postings(ctx, m.Name, m.Value) + } + + vals, err := ix.LabelValues(ctx, m.Name) + if err != nil { + return nil, err + } + + var res []string + // If the inverse match is ="", we just want all the values. + if m.Type == labels.MatchEqual && m.Value == "" { + res = vals + } else { + for _, val := range vals { + if !m.Matches(val) { + res = append(res, val) + } + } + } + + return ix.Postings(ctx, m.Name, res...) +} + +func postingsForMatcher(ctx context.Context, ix *Reader, m *labels.Matcher) (index.Postings, error) { + // This method will not return postings for missing labels. + + // Fast-path for equal matching. + if m.Type == labels.MatchEqual { + return ix.Postings(ctx, m.Name, m.Value) + } + + // Fast-path for set matching. + if m.Type == labels.MatchRegexp { + setMatches := findSetMatches(m.GetRegexString()) + if len(setMatches) > 0 { + return ix.Postings(ctx, m.Name, setMatches...) + } + } + + vals, err := ix.LabelValues(ctx, m.Name) + if err != nil { + return nil, err + } + + var res []string + for _, val := range vals { + if m.Matches(val) { + res = append(res, val) + } + } + + if len(res) == 0 { + return index.EmptyPostings(), nil + } + + return ix.Postings(ctx, m.Name, res...) +} + +func findSetMatches(pattern string) []string { + // Return empty matches if the wrapper from Prometheus is missing. + if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" { + return nil + } + escaped := false + sets := []*strings.Builder{{}} + init := 4 + end := len(pattern) - 2 + // If the regex is wrapped in a group we can remove the first and last parentheses + if pattern[init] == '(' && pattern[end-1] == ')' { + init++ + end-- + } + for i := init; i < end; i++ { + if escaped { + switch { + case isRegexMetaCharacter(pattern[i]): + sets[len(sets)-1].WriteByte(pattern[i]) + case pattern[i] == '\\': + sets[len(sets)-1].WriteByte('\\') + default: + return nil + } + escaped = false + } else { + switch { + case isRegexMetaCharacter(pattern[i]): + if pattern[i] == '|' { + sets = append(sets, &strings.Builder{}) + } else { + return nil + } + case pattern[i] == '\\': + escaped = true + default: + sets[len(sets)-1].WriteByte(pattern[i]) + } + } + } + matches := make([]string, 0, len(sets)) + for _, s := range sets { + if s.Len() > 0 { + matches = append(matches, s.String()) + } + } + return matches +} diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index 57ad096fac30..9aecead70bea 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -34,7 +34,7 @@ var ( } }, } - tenantLabel = "__loki_tenant__" + Dir = "loki-v2/wal/anon/" ) func init() { @@ -101,8 +101,8 @@ func (b *SegmentWriter) getOrCreateStream(id streamID, lbls labels.Labels) *stre if ok { return s } - if lbls.Get(tenantLabel) == "" { - lbls = labels.NewBuilder(lbls).Set(tenantLabel, id.tenant).Labels() + if lbls.Get(index.TenantLabel) == "" { + lbls = labels.NewBuilder(lbls).Set(index.TenantLabel, id.tenant).Labels() } s = streamSegmentPool.Get().(*streamSegment) s.lbls = lbls diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go index 34b8d78b2d58..fbf751e8dde1 100644 --- a/pkg/storage/wal/segment_test.go +++ b/pkg/storage/wal/segment_test.go @@ -122,7 +122,7 @@ func TestWalSegmentWriter_Append(t *testing.T) { require.True(t, ok) lbs, err := syntax.ParseLabels(expected.labels) require.NoError(t, err) - lbs = append(lbs, labels.Label{Name: tenantLabel, Value: expected.tenant}) + lbs = append(lbs, labels.Label{Name: index.TenantLabel, Value: expected.tenant}) sort.Sort(lbs) require.Equal(t, lbs, stream.lbls) require.Equal(t, expected.entries, stream.entries) @@ -168,7 +168,7 @@ func TestMultiTenantWrite(t *testing.T) { for _, tenant := range tenants { for _, lbl := range lbls { - expectedSeries = append(expectedSeries, labels.NewBuilder(lbl).Set(tenantLabel, tenant).Labels().String()) + expectedSeries = append(expectedSeries, labels.NewBuilder(lbl).Set(index.TenantLabel, tenant).Labels().String()) } } From 52b1a317b9224ec4c66e89bbc81d2cfb00caa231 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 29 Jul 2024 17:48:35 +0200 Subject: [PATCH 02/16] revert accidental change on metastore service name --- pkg/ingester-rf1/metastore/metastore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester-rf1/metastore/metastore.go b/pkg/ingester-rf1/metastore/metastore.go index 8c30dd003848..6d999b8edd51 100644 --- a/pkg/ingester-rf1/metastore/metastore.go +++ b/pkg/ingester-rf1/metastore/metastore.go @@ -25,7 +25,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/raftleader" ) -const metastoreRaftLeaderHealthServiceName = "metastorepb.MetastoreService" +const metastoreRaftLeaderHealthServiceName = "metastorepb.MetastoreService.RaftLeader" type Config struct { DataDir string `yaml:"data_dir"` From 4a51badb9dd8d4b30d78caba8d53b970da6c8957 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 29 Jul 2024 18:18:32 +0200 Subject: [PATCH 03/16] Add more todos --- pkg/querier-rf1/wal/chunks.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/querier-rf1/wal/chunks.go b/pkg/querier-rf1/wal/chunks.go index ba008bdf1ea7..e54e33273e0a 100644 --- a/pkg/querier-rf1/wal/chunks.go +++ b/pkg/querier-rf1/wal/chunks.go @@ -194,6 +194,9 @@ func fetchChunkEntries( storage BlockStorage, ) (iter.EntryIterator, error) { offset, size := c.meta.Ref.Unpack() + // todo: We should be able to avoid many IOPS to object storage + // if chunks are next to each other and we should be able to pack range request + // together. reader, err := storage.GetRangeObject(ctx, wal.Dir+c.id, int64(offset), int64(size)) if err != nil { return nil, err From 35e7a5c8af3d197b6c20ae0694774aae93566414 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 30 Jul 2024 12:56:58 +0200 Subject: [PATCH 04/16] Add support for SampleIterator and refactoring --- pkg/querier-rf1/wal/chunks.go | 358 +++++++++++------- pkg/querier-rf1/wal/querier.go | 78 +++- pkg/storage/wal/chunks/chunks.go | 85 ----- pkg/storage/wal/chunks/doc.go | 20 + pkg/storage/wal/chunks/entry_iterator.go | 114 ++++++ pkg/storage/wal/chunks/entry_iterator_test.go | 143 +++++++ pkg/storage/wal/chunks/sample_iterator.go | 87 +++++ .../wal/chunks/sample_iterator_test.go | 201 ++++++++++ 8 files changed, 849 insertions(+), 237 deletions(-) create mode 100644 pkg/storage/wal/chunks/doc.go create mode 100644 pkg/storage/wal/chunks/entry_iterator.go create mode 100644 pkg/storage/wal/chunks/entry_iterator_test.go create mode 100644 pkg/storage/wal/chunks/sample_iterator.go create mode 100644 pkg/storage/wal/chunks/sample_iterator_test.go diff --git a/pkg/querier-rf1/wal/chunks.go b/pkg/querier-rf1/wal/chunks.go index e54e33273e0a..be52b819f400 100644 --- a/pkg/querier-rf1/wal/chunks.go +++ b/pkg/querier-rf1/wal/chunks.go @@ -1,8 +1,8 @@ package wal import ( - "bytes" "context" + "fmt" "sort" "sync" @@ -10,55 +10,99 @@ import ( "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" - "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/storage/wal/chunks" - "github.com/grafana/loki/v3/pkg/storage/wal/index" "github.com/prometheus/prometheus/model/labels" "golang.org/x/sync/errgroup" ) const batchSize = 16 -var _ iter.EntryIterator = (*lazyChunks)(nil) - -type lazyChunk struct { +type ChunkData struct { meta *chunks.Meta labels labels.Labels id string } -func newLazyChunk(id string, lbs *labels.ScratchBuilder, meta *chunks.Meta) lazyChunk { +func newChunkData(id string, lbs *labels.ScratchBuilder, meta *chunks.Meta) ChunkData { lbs.Sort() - return lazyChunk{ + return ChunkData{ id: id, meta: meta, labels: lbs.Labels(), } } -type lazyChunks struct { - chunks []lazyChunk - direction logproto.Direction - pipeline log.Pipeline - minT, maxT int64 - storage BlockStorage - ctx context.Context +// ChunksEntryIterator iterates over log entries +type ChunksEntryIterator[T iter.EntryIterator] struct { + baseChunksIterator[T] - current iter.EntryIterator - batch []lazyChunk - err error + pipeline log.Pipeline + current iter.EntryIterator +} + +// ChunksSampleIterator iterates over metric samples +type ChunksSampleIterator[T iter.SampleIterator] struct { + baseChunksIterator[T] + current iter.SampleIterator + extractor log.SampleExtractor } -// todo: Support SampleIterator. func NewChunksEntryIterator( ctx context.Context, storage BlockStorage, - chunks []lazyChunk, + chunks []ChunkData, pipeline log.Pipeline, direction logproto.Direction, minT, maxT int64, -) *lazyChunks { - // sort by time and then by labels following the direction. +) *ChunksEntryIterator[iter.EntryIterator] { + sortChunks(chunks, direction) + return &ChunksEntryIterator[iter.EntryIterator]{ + baseChunksIterator: baseChunksIterator[iter.EntryIterator]{ + ctx: ctx, + chunks: chunks, + direction: direction, + storage: storage, + batch: make([]ChunkData, 0, batchSize), + minT: minT, + maxT: maxT, + + iteratorFactory: func(chunks []ChunkData) (iter.EntryIterator, error) { + return createNextEntryIterator(ctx, chunks, direction, pipeline, storage, minT, maxT) + }, + isNil: func(it iter.EntryIterator) bool { return it == nil }, + }, + pipeline: pipeline, + } +} + +func NewChunksSampleIterator( + ctx context.Context, + storage BlockStorage, + chunks []ChunkData, + extractor log.SampleExtractor, + minT, maxT int64, +) *ChunksSampleIterator[iter.SampleIterator] { + sortChunks(chunks, logproto.FORWARD) + return &ChunksSampleIterator[iter.SampleIterator]{ + baseChunksIterator: baseChunksIterator[iter.SampleIterator]{ + ctx: ctx, + chunks: chunks, + direction: logproto.FORWARD, + storage: storage, + batch: make([]ChunkData, 0, batchSize), + minT: minT, + maxT: maxT, + + iteratorFactory: func(chunks []ChunkData) (iter.SampleIterator, error) { + return createNextSampleIterator(ctx, chunks, extractor, storage, minT, maxT) + }, + isNil: func(it iter.SampleIterator) bool { return it == nil }, + }, + extractor: extractor, + } +} + +func sortChunks(chunks []ChunkData, direction logproto.Direction) { sort.Slice(chunks, func(i, j int) bool { if direction == logproto.FORWARD { t1, t2 := chunks[i].meta.MinTime, chunks[j].meta.MinTime @@ -73,159 +117,205 @@ func NewChunksEntryIterator( } return labels.Compare(chunks[i].labels, chunks[j].labels) < 0 }) - return &lazyChunks{ - ctx: ctx, - chunks: chunks, - direction: direction, - pipeline: pipeline, - storage: storage, - batch: make([]lazyChunk, 0, batchSize), - minT: minT, - maxT: maxT, - } -} - -// At implements iter.EntryIterator. -func (l *lazyChunks) At() push.Entry { - if l.current == nil { - return push.Entry{} - } - return l.current.At() -} - -func (l *lazyChunks) Labels() string { - if l.current == nil { - return "" - } - return l.current.Labels() } -func (l *lazyChunks) StreamHash() uint64 { - if l.current == nil { - return 0 - } - return l.current.StreamHash() -} +// baseChunksIterator contains common fields and methods for both entry and sample iterators +type baseChunksIterator[T interface { + Next() bool + Close() error + Err() error + StreamHash() uint64 + Labels() string +}] struct { + chunks []ChunkData + direction logproto.Direction + minT, maxT int64 + storage BlockStorage + ctx context.Context + iteratorFactory func([]ChunkData) (T, error) + isNil func(T) bool -// Close implements iter.EntryIterator. -func (l *lazyChunks) Close() error { - if l.current == nil { - return nil - } - return l.current.Close() + batch []ChunkData + current T + err error } -// Err implements iter.EntryIterator. -func (l *lazyChunks) Err() error { - if l.err != nil { - return l.err - } - if l.current == nil { - return l.current.Err() +func (b *baseChunksIterator[T]) nextBatch() error { + b.batch = b.batch[:0] + for len(b.chunks) > 0 && + (len(b.batch) < batchSize || + isOverlapping(b.batch[len(b.batch)-1], b.chunks[0], b.direction)) { + b.batch = append(b.batch, b.chunks[0]) + b.chunks = b.chunks[1:] } + // todo: error if the batch is too big. return nil } -// Next implements iter.EntryIterator. -func (l *lazyChunks) Next() bool { - if l.current != nil && l.current.Next() { - return true - } - if l.current != nil { - if err := l.current.Close(); err != nil { - l.err = err +// todo: better chunk batch iterator +func (b *baseChunksIterator[T]) Next() bool { + for b.isNil(b.current) || !b.current.Next() { + if !b.isNil(b.current) { + if err := b.current.Close(); err != nil { + b.err = err + return false + } + } + if len(b.chunks) == 0 { + return false + } + if err := b.nextBatch(); err != nil { + b.err = err + return false + } + var err error + b.current, err = b.iteratorFactory(b.batch) + if err != nil { + b.err = err return false } } - if len(l.chunks) == 0 { - return false + return true +} + +func createNextEntryIterator( + ctx context.Context, + batch []ChunkData, + direction logproto.Direction, + pipeline log.Pipeline, + storage BlockStorage, + minT, maxT int64, +) (iter.EntryIterator, error) { + var ( + iterators []iter.EntryIterator + mtx sync.Mutex + ) + + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(64) + for _, chunk := range batch { + chunk := chunk // https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + chunkData, err := readChunkData(ctx, storage, chunk) + if err != nil { + return fmt.Errorf("error reading chunk data: %w", err) + } + + streamPipeline := pipeline.ForStream(chunk.labels) + chunkIterator, err := chunks.NewEntryIterator(chunkData, streamPipeline, direction, minT, maxT) + if err != nil { + return fmt.Errorf("error creating entry iterator: %w", err) + } + + mtx.Lock() + iterators = append(iterators, chunkIterator) + mtx.Unlock() + + return nil + }) } - // take the next batch of chunks - if err := l.nextBatch(); err != nil { - l.err = err - return false + + if err := g.Wait(); err != nil { + return nil, err } - return l.current.Next() + + return iter.NewSortEntryIterator(iterators, direction), nil } -func (l *lazyChunks) nextBatch() error { - l.batch = l.batch[:0] - for len(l.chunks) > 0 && - (len(l.batch) < batchSize || - isOverlapping(l.batch[len(l.batch)-1], l.chunks[0], l.direction)) { - l.batch = append(l.batch, l.chunks[0]) - l.chunks = l.chunks[1:] - } - // todo: error if the batch is too big. - // todo: reuse previous sortIterator array - // todo: Try to use iter.NonOverlappingEntryIterator if possible which can reduce the amount of work. +func createNextSampleIterator( + ctx context.Context, + batch []ChunkData, + pipeline log.SampleExtractor, + storage BlockStorage, + minT, maxT int64, +) (iter.SampleIterator, error) { var ( - iters []iter.EntryIterator - mtx sync.Mutex + iterators []iter.SampleIterator + mtx sync.Mutex ) - g, ctx := errgroup.WithContext(l.ctx) + + g, ctx := errgroup.WithContext(ctx) g.SetLimit(64) - for _, c := range l.batch { - c := c + for _, chunk := range batch { + chunk := chunk // https://golang.org/doc/faq#closures_and_goroutines g.Go(func() error { - iter, err := fetchChunkEntries(ctx, c, l.minT, l.maxT, l.direction, l.pipeline, l.storage) + chunkData, err := readChunkData(ctx, storage, chunk) + if err != nil { + return fmt.Errorf("error reading chunk data: %w", err) + } + + streamPipeline := pipeline.ForStream(chunk.labels) + chunkIterator, err := chunks.NewSampleIterator(chunkData, streamPipeline, minT, maxT) if err != nil { - return err + return fmt.Errorf("error creating sample iterator: %w", err) } + mtx.Lock() - iters = append(iters, iter) + iterators = append(iterators, chunkIterator) mtx.Unlock() + return nil }) } + if err := g.Wait(); err != nil { - return err + return nil, err + } + + return iter.NewSortSampleIterator(iterators), nil +} + +func (b *baseChunksIterator[T]) Close() error { + if !b.isNil(b.current) { + return b.current.Close() } - l.current = iter.NewSortEntryIterator(iters, l.direction) return nil } -func fetchChunkEntries( - ctx context.Context, - c lazyChunk, - from, through int64, - direction logproto.Direction, - pipeline log.Pipeline, - storage BlockStorage, -) (iter.EntryIterator, error) { - offset, size := c.meta.Ref.Unpack() +func (b *baseChunksIterator[T]) Err() error { + if b.err != nil { + return b.err + } + if !b.isNil(b.current) { + return b.current.Err() + } + return nil +} + +func (b *baseChunksIterator[T]) Labels() string { + return b.current.Labels() +} + +func (b *baseChunksIterator[T]) StreamHash() uint64 { + return b.current.StreamHash() +} + +func (c *ChunksEntryIterator[T]) At() push.Entry { return c.current.At() } +func (c *ChunksSampleIterator[T]) At() logproto.Sample { return c.current.At() } + +func isOverlapping(first, second ChunkData, direction logproto.Direction) bool { + if direction == logproto.BACKWARD { + return first.meta.MinTime <= second.meta.MaxTime + } + return first.meta.MaxTime < second.meta.MinTime +} + +func readChunkData(ctx context.Context, storage BlockStorage, chunk ChunkData) ([]byte, error) { + offset, size := chunk.meta.Ref.Unpack() // todo: We should be able to avoid many IOPS to object storage // if chunks are next to each other and we should be able to pack range request // together. - reader, err := storage.GetRangeObject(ctx, wal.Dir+c.id, int64(offset), int64(size)) + reader, err := storage.GetRangeObject(ctx, chunk.id, int64(offset), int64(size)) if err != nil { return nil, err } defer reader.Close() - // todo: use a pool - buf := bytes.NewBuffer(make([]byte, 0, size)) - _, err = buf.ReadFrom(reader) - if err != nil { - return nil, err - } - // create logql pipeline and remove tenantID - // todo: we might want to create a single pipeline for all chunks from the same series. - streamPipeline := pipeline.ForStream( - labels.NewBuilder(c.labels).Del(index.TenantLabel).Labels(), - ) - it, err := chunks.NewEntryIterator(buf.Bytes(), streamPipeline, direction, from, through) + + data := make([]byte, size) + _, err = reader.Read(data) if err != nil { return nil, err } - return iter.EntryIteratorWithClose(it, func() error { - // todo: return buffer to pool. - return nil - }), nil -} -func isOverlapping(first, second lazyChunk, direction logproto.Direction) bool { - if direction == logproto.BACKWARD { - return first.meta.MinTime <= second.meta.MaxTime - } - return first.meta.MaxTime < second.meta.MinTime + return data, nil } diff --git a/pkg/querier-rf1/wal/querier.go b/pkg/querier-rf1/wal/querier.go index 956b7567e6f5..a7ac9976cd32 100644 --- a/pkg/querier-rf1/wal/querier.go +++ b/pkg/querier-rf1/wal/querier.go @@ -60,35 +60,77 @@ func (q *Querier) SelectLogs(ctx context.Context, req logql.SelectLogParams) (it if err != nil { return nil, err } + + chks, err := q.matchingChunks(ctx, tenantID, req.Start.UnixNano(), req.End.UnixNano(), matchers...) + if err != nil { + return nil, err + } + + return NewChunksEntryIterator(ctx, + q.blockStorage, + chks, + pipeline, + req.Direction, + req.Start.UnixNano(), + req.End.UnixNano()), nil +} + +func (q *Querier) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { + // todo request validation and delete markers. + tenantID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + expr, err := req.Expr() + if err != nil { + return nil, err + } + selector, err := expr.Selector() + if err != nil { + return nil, err + } + matchers := selector.Matchers() + // todo: not sure if Extractor is thread safe + + extractor, err := expr.Extractor() + if err != nil { + return nil, err + } + + chks, err := q.matchingChunks(ctx, tenantID, req.Start.UnixNano(), req.End.UnixNano(), matchers...) + if err != nil { + return nil, err + } + + return NewChunksSampleIterator(ctx, + q.blockStorage, + chks, + extractor, + req.Start.UnixNano(), + req.End.UnixNano()), nil +} + +func (q *Querier) matchingChunks(ctx context.Context, tenantID string, from, through int64, matchers ...*labels.Matcher) ([]ChunkData, error) { // todo support sharding var ( - lazyChunks []lazyChunk + lazyChunks []ChunkData mtx sync.Mutex ) - err = q.forSeries(ctx, &metastorepb.ListBlocksForQueryRequest{ + err := q.forSeries(ctx, &metastorepb.ListBlocksForQueryRequest{ TenantId: tenantID, - StartTime: req.Start.UnixNano(), - EndTime: req.End.UnixNano(), + StartTime: from, + EndTime: through, }, func(id string, lbs *labels.ScratchBuilder, chk *chunks.Meta) error { mtx.Lock() - lazyChunks = append(lazyChunks, newLazyChunk(id, lbs, chk)) + lazyChunks = append(lazyChunks, newChunkData(id, lbs, chk)) mtx.Unlock() return nil }, matchers...) - - return NewChunksEntryIterator(ctx, - q.blockStorage, - lazyChunks, - pipeline, - req.Direction, - req.Start.UnixNano(), - req.End.UnixNano()), err -} - -func (q *Querier) SelectSamples(context.Context, logql.SelectSampleParams) (iter.SampleIterator, error) { - // todo: implement - return nil, nil + if err != nil { + return nil, err + } + return lazyChunks, nil } func (q *Querier) forSeries(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(string, *labels.ScratchBuilder, *chunks.Meta) error, ms ...*labels.Matcher) error { diff --git a/pkg/storage/wal/chunks/chunks.go b/pkg/storage/wal/chunks/chunks.go index 5c09962a0550..60b7b6612446 100644 --- a/pkg/storage/wal/chunks/chunks.go +++ b/pkg/storage/wal/chunks/chunks.go @@ -9,17 +9,14 @@ import ( "hash/crc32" "io" "sync" - "time" "unsafe" "github.com/golang/snappy" "github.com/klauspost/compress/s2" - "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql/log" ) // EncodingType defines the type for encoding enums @@ -339,85 +336,3 @@ func (r *ChunkReader) readChunkHeader() error { func unsafeGetBytes(s string) []byte { return unsafe.Slice(unsafe.StringData(s), len(s)) } - -type entryBufferedIterator struct { - reader *ChunkReader - pipeline log.StreamPipeline - from, through int64 - - cur logproto.Entry - currLabels log.LabelsResult -} - -func NewEntryIterator( - chunkData []byte, - pipeline log.StreamPipeline, - direction logproto.Direction, - from, through int64, -) (iter.EntryIterator, error) { - chkReader, err := NewChunkReader(chunkData) - if err != nil { - return nil, err - } - it := &entryBufferedIterator{ - reader: chkReader, - pipeline: pipeline, - from: from, - through: through, - } - if direction == logproto.FORWARD { - return it, nil - } - return iter.NewEntryReversedIter(it) -} - -// At implements iter.EntryIterator. -func (e *entryBufferedIterator) At() push.Entry { - return e.cur -} - -// Close implements iter.EntryIterator. -func (e *entryBufferedIterator) Close() error { - return e.reader.Close() -} - -// Err implements iter.EntryIterator. -func (e *entryBufferedIterator) Err() error { - return e.reader.Err() -} - -// Labels implements iter.EntryIterator. -func (e *entryBufferedIterator) Labels() string { - return e.currLabels.String() -} - -// Next implements iter.EntryIterator. -func (e *entryBufferedIterator) Next() bool { - for e.reader.Next() { - ts, line := e.reader.At() - // check if the timestamp is within the range before applying the pipeline. - if ts < e.from { - continue - } - if ts >= e.through { - return false - } - // todo: structured metadata. - newLine, lbs, matches := e.pipeline.Process(ts, line) - if !matches { - continue - } - e.currLabels = lbs - e.cur.Timestamp = time.Unix(0, ts) - e.cur.Line = string(newLine) - e.cur.StructuredMetadata = logproto.FromLabelsToLabelAdapters(lbs.StructuredMetadata()) - e.cur.Parsed = logproto.FromLabelsToLabelAdapters(lbs.Parsed()) - return true - } - return false -} - -// StreamHash implements iter.EntryIterator. -func (e *entryBufferedIterator) StreamHash() uint64 { - return e.pipeline.BaseLabels().Hash() -} diff --git a/pkg/storage/wal/chunks/doc.go b/pkg/storage/wal/chunks/doc.go new file mode 100644 index 000000000000..23baa49c11cd --- /dev/null +++ b/pkg/storage/wal/chunks/doc.go @@ -0,0 +1,20 @@ +// Package chunks provides functionality for efficient storage and retrieval of log data. +// +// The chunks package implements a compact and performant way to store and access +// log entries. It uses various compression and encoding techniques to minimize +// storage requirements while maintaining fast access times. +// +// Key features: +// - Efficient chunk writing with multiple encoding options +// - Fast chunk reading with iterators for forward and backward traversal +// - Support for time-based filtering of log entries +// - Integration with Loki's log query language (LogQL) for advanced filtering and processing +// +// Main types and functions: +// - WriteChunk: Writes log entries to a compressed chunk format +// - NewChunkReader: Creates a reader for parsing and accessing chunk data +// - NewEntryIterator: Provides an iterator for efficient traversal of log entries in a chunk +// +// This package is designed to work seamlessly with other components of the Loki +// log aggregation system, providing a crucial layer for data storage and retrieval. +package chunks diff --git a/pkg/storage/wal/chunks/entry_iterator.go b/pkg/storage/wal/chunks/entry_iterator.go new file mode 100644 index 000000000000..bc9731d31d76 --- /dev/null +++ b/pkg/storage/wal/chunks/entry_iterator.go @@ -0,0 +1,114 @@ +package chunks + +import ( + "time" + + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" +) + +type entryBufferedIterator struct { + reader *ChunkReader + pipeline log.StreamPipeline + from, through int64 + + cur logproto.Entry + currLabels log.LabelsResult +} + +// NewEntryIterator creates an iterator for efficiently traversing log entries in a chunk. +// It takes compressed chunk data, a processing pipeline, iteration direction, and a time range. +// The returned iterator filters entries based on the time range and applies the given pipeline. +// It handles both forward and backward iteration. +// +// Parameters: +// - chunkData: Compressed chunk data containing log entries +// - pipeline: StreamPipeline for processing and filtering entries +// - direction: Direction of iteration (FORWARD or BACKWARD) +// - from: Start timestamp (inclusive) for filtering entries +// - through: End timestamp (exclusive) for filtering entries +// +// Returns an EntryIterator and an error if creation fails. +func NewEntryIterator( + chunkData []byte, + pipeline log.StreamPipeline, + direction logproto.Direction, + from, through int64, +) (iter.EntryIterator, error) { + chkReader, err := NewChunkReader(chunkData) + if err != nil { + return nil, err + } + it := &entryBufferedIterator{ + reader: chkReader, + pipeline: pipeline, + from: from, + through: through, + } + if direction == logproto.FORWARD { + return it, nil + } + return iter.NewEntryReversedIter(it) +} + +// At implements iter.EntryIterator. +func (e *entryBufferedIterator) At() push.Entry { + return e.cur +} + +// Close implements iter.EntryIterator. +func (e *entryBufferedIterator) Close() error { + return e.reader.Close() +} + +// Err implements iter.EntryIterator. +func (e *entryBufferedIterator) Err() error { + return e.reader.Err() +} + +// Labels implements iter.EntryIterator. +func (e *entryBufferedIterator) Labels() string { + return e.currLabels.String() +} + +// Next implements iter.EntryIterator. +func (e *entryBufferedIterator) Next() bool { + for e.reader.Next() { + ts, line := e.reader.At() + // check if the timestamp is within the range before applying the pipeline. + if ts < e.from { + continue + } + if ts >= e.through { + return false + } + // todo: structured metadata. + newLine, lbs, matches := e.pipeline.Process(ts, line) + if !matches { + continue + } + e.currLabels = lbs + e.cur.Timestamp = time.Unix(0, ts) + e.cur.Line = string(newLine) + e.cur.StructuredMetadata = logproto.FromLabelsToLabelAdapters(lbs.StructuredMetadata()) + e.cur.Parsed = logproto.FromLabelsToLabelAdapters(lbs.Parsed()) + return true + } + return false +} + +// StreamHash implements iter.EntryIterator. +func (e *entryBufferedIterator) StreamHash() uint64 { + return e.pipeline.BaseLabels().Hash() +} + +type sampleBufferedIterator struct { + reader *ChunkReader + pipeline log.StreamSampleExtractor + from, through int64 + + cur logproto.Sample + currLabels log.LabelsResult +} diff --git a/pkg/storage/wal/chunks/entry_iterator_test.go b/pkg/storage/wal/chunks/entry_iterator_test.go new file mode 100644 index 000000000000..a098161134a5 --- /dev/null +++ b/pkg/storage/wal/chunks/entry_iterator_test.go @@ -0,0 +1,143 @@ +package chunks + +import ( + "bytes" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/syntax" +) + +func TestNewEntryIterator(t *testing.T) { + tests := []struct { + name string + entries []*logproto.Entry + direction logproto.Direction + from int64 + through int64 + pipeline log.StreamPipeline + expected []*logproto.Entry + }{ + { + name: "Forward direction, all entries within range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "line 1"}, + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + }, + direction: logproto.FORWARD, + from: 0, + through: 4, + pipeline: noopStreamPipeline(), + expected: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "line 1"}, + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + }, + }, + { + name: "Backward direction, all entries within range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "line 1"}, + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + }, + direction: logproto.BACKWARD, + from: 0, + through: 4, + pipeline: noopStreamPipeline(), + expected: []*logproto.Entry{ + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 1), Line: "line 1"}, + }, + }, + { + name: "Forward direction, partial range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "line 1"}, + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + {Timestamp: time.Unix(0, 4), Line: "line 4"}, + }, + direction: logproto.FORWARD, + from: 2, + through: 4, + pipeline: noopStreamPipeline(), + expected: []*logproto.Entry{ + {Timestamp: time.Unix(0, 2), Line: "line 2"}, + {Timestamp: time.Unix(0, 3), Line: "line 3"}, + }, + }, + { + name: "Forward direction with logql pipeline filter", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1).UTC(), Line: "error: something went wrong"}, + {Timestamp: time.Unix(0, 2).UTC(), Line: "info: operation successful"}, + {Timestamp: time.Unix(0, 3).UTC(), Line: "error: another error occurred"}, + {Timestamp: time.Unix(0, 4).UTC(), Line: "debug: checking status"}, + }, + direction: logproto.FORWARD, + from: 1, + through: 5, + pipeline: mustNewPipeline(t, `{foo="bar"} | line_format "foo {{ __line__ }}" |= "error"`), + expected: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "foo error: something went wrong"}, + {Timestamp: time.Unix(0, 3), Line: "foo error: another error occurred"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + + // Write the chunk + _, err := WriteChunk(&buf, tt.entries, EncodingSnappy) + require.NoError(t, err, "WriteChunk failed") + + // Create the iterator + iter, err := NewEntryIterator(buf.Bytes(), tt.pipeline, tt.direction, tt.from, tt.through) + require.NoError(t, err, "NewEntryIterator failed") + defer iter.Close() + + // Read entries using the iterator + var actualEntries []*logproto.Entry + for iter.Next() { + entry := iter.At() + actualEntries = append(actualEntries, &logproto.Entry{ + Timestamp: entry.Timestamp, + Line: entry.Line, + }) + } + require.NoError(t, iter.Err(), "Iterator encountered an error") + + // Compare actual entries with expected entries + require.Equal(t, tt.expected, actualEntries, "Entries do not match expected values") + }) + } +} + +// mustNewPipeline creates a new pipeline or fails the test +func mustNewPipeline(t *testing.T, query string) log.StreamPipeline { + t.Helper() + if query == "" { + return log.NewNoopPipeline().ForStream(labels.Labels{}) + } + expr, err := syntax.ParseLogSelector(query, true) + require.NoError(t, err) + + pipeline, err := expr.Pipeline() + require.NoError(t, err) + + return pipeline.ForStream(labels.Labels{}) +} + +func noopStreamPipeline() log.StreamPipeline { + return log.NewNoopPipeline().ForStream(labels.Labels{}) +} diff --git a/pkg/storage/wal/chunks/sample_iterator.go b/pkg/storage/wal/chunks/sample_iterator.go new file mode 100644 index 000000000000..07c9d026f1d6 --- /dev/null +++ b/pkg/storage/wal/chunks/sample_iterator.go @@ -0,0 +1,87 @@ +package chunks + +import ( + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" +) + +// NewSampleIterator creates an iterator for efficiently traversing samples in a chunk. +// It takes compressed chunk data, a processing pipeline, iteration direction, and a time range. +// The returned iterator filters samples based on the time range and applies the given pipeline. +// It handles both forward and backward iteration. +// +// Parameters: +// - chunkData: Compressed chunk data containing samples +// - pipeline: StreamSampleExtractor for processing and filtering samples +// - from: Start timestamp (inclusive) for filtering samples +// - through: End timestamp (exclusive) for filtering samples +// +// Returns a SampleIterator and an error if creation fails. +func NewSampleIterator( + chunkData []byte, + pipeline log.StreamSampleExtractor, + from, through int64, +) (iter.SampleIterator, error) { + chkReader, err := NewChunkReader(chunkData) + if err != nil { + return nil, err + } + it := &sampleBufferedIterator{ + reader: chkReader, + pipeline: pipeline, + from: from, + through: through, + } + return it, nil +} + +// At implements iter.SampleIterator. +func (s *sampleBufferedIterator) At() logproto.Sample { + return s.cur +} + +// Close implements iter.SampleIterator. +func (s *sampleBufferedIterator) Close() error { + return s.reader.Close() +} + +// Err implements iter.SampleIterator. +func (s *sampleBufferedIterator) Err() error { + return s.reader.Err() +} + +// Labels implements iter.SampleIterator. +func (s *sampleBufferedIterator) Labels() string { + return s.currLabels.String() +} + +// Next implements iter.SampleIterator. +func (e *sampleBufferedIterator) Next() bool { + for e.reader.Next() { + // todo: Only use length columns for bytes_over_time without filter. + ts, line := e.reader.At() + // check if the timestamp is within the range before applying the pipeline. + if ts < e.from { + continue + } + if ts >= e.through { + return false + } + // todo: structured metadata. + val, lbs, matches := e.pipeline.Process(ts, line) + if !matches { + continue + } + e.currLabels = lbs + e.cur.Value = val + e.cur.Timestamp = ts + return true + } + return false +} + +// StreamHash implements iter.SampleIterator. +func (s *sampleBufferedIterator) StreamHash() uint64 { + return s.pipeline.BaseLabels().Hash() +} diff --git a/pkg/storage/wal/chunks/sample_iterator_test.go b/pkg/storage/wal/chunks/sample_iterator_test.go new file mode 100644 index 000000000000..e3693461d44d --- /dev/null +++ b/pkg/storage/wal/chunks/sample_iterator_test.go @@ -0,0 +1,201 @@ +package chunks + +import ( + "bytes" + "testing" + "time" + + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestNewSampleIterator(t *testing.T) { + tests := []struct { + name string + entries []*logproto.Entry + from int64 + through int64 + extractor log.StreamSampleExtractor + expected []logproto.Sample + expectErr bool + }{ + { + name: "All samples within range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1.0"}, + {Timestamp: time.Unix(0, 2), Line: "2.0"}, + {Timestamp: time.Unix(0, 3), Line: "3.0"}, + }, + from: 0, + through: 4, + extractor: mustNewExtractor(t, ""), + expected: []logproto.Sample{ + {Timestamp: 1, Value: 1.0, Hash: 0}, + {Timestamp: 2, Value: 1.0, Hash: 0}, + {Timestamp: 3, Value: 1.0, Hash: 0}, + }, + }, + { + name: "Partial range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1.0"}, + {Timestamp: time.Unix(0, 2), Line: "2.0"}, + {Timestamp: time.Unix(0, 3), Line: "3.0"}, + {Timestamp: time.Unix(0, 4), Line: "4.0"}, + }, + from: 2, + through: 4, + extractor: mustNewExtractor(t, ""), + expected: []logproto.Sample{ + {Timestamp: 2, Value: 1.0, Hash: 0}, + {Timestamp: 3, Value: 1.0, Hash: 0}, + }, + }, + { + name: "Pipeline filter", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "error: 1.0"}, + {Timestamp: time.Unix(0, 2), Line: "info: 2.0"}, + {Timestamp: time.Unix(0, 3), Line: "error: 3.0"}, + {Timestamp: time.Unix(0, 4), Line: "debug: 4.0"}, + }, + from: 1, + through: 5, + extractor: mustNewExtractor(t, `count_over_time({foo="bar"} |= "error"[1m])`), + expected: []logproto.Sample{ + {Timestamp: 1, Value: 1.0, Hash: 0}, + {Timestamp: 3, Value: 1.0, Hash: 0}, + }, + }, + { + name: "Pipeline filter with bytes_over_time", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "error: 1.0"}, + {Timestamp: time.Unix(0, 2), Line: "info: 2.0"}, + {Timestamp: time.Unix(0, 3), Line: "error: 3.0"}, + {Timestamp: time.Unix(0, 4), Line: "debug: 4.0"}, + }, + from: 1, + through: 5, + extractor: mustNewExtractor(t, `bytes_over_time({foo="bar"} |= "error"[1m])`), + expected: []logproto.Sample{ + {Timestamp: 1, Value: 10, Hash: 0}, + {Timestamp: 3, Value: 10, Hash: 0}, + }, + }, + { + name: "No samples within range", + entries: []*logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1.0"}, + {Timestamp: time.Unix(0, 2), Line: "2.0"}, + }, + from: 3, + through: 5, + extractor: mustNewExtractor(t, ""), + expected: nil, + }, + { + name: "Empty chunk", + entries: []*logproto.Entry{}, + from: 0, + through: 5, + extractor: mustNewExtractor(t, ""), + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + + // Write the chunk + _, err := WriteChunk(&buf, tt.entries, EncodingSnappy) + require.NoError(t, err, "WriteChunk failed") + + // Create the iterator + iter, err := NewSampleIterator(buf.Bytes(), tt.extractor, tt.from, tt.through) + if tt.expectErr { + require.Error(t, err, "Expected an error but got none") + return + } + require.NoError(t, err, "NewSampleIterator failed") + defer iter.Close() + + // Read samples using the iterator + var actualSamples []logproto.Sample + for iter.Next() { + actualSamples = append(actualSamples, iter.At()) + } + require.NoError(t, iter.Err(), "Iterator encountered an error") + + // Compare actual samples with expected samples + require.Equal(t, tt.expected, actualSamples, "Samples do not match expected values") + + // Check labels + if len(actualSamples) > 0 { + require.Equal(t, tt.extractor.BaseLabels().String(), iter.Labels(), "Unexpected labels") + } + + // Check StreamHash + if len(actualSamples) > 0 { + require.Equal(t, tt.extractor.BaseLabels().Hash(), iter.StreamHash(), "Unexpected StreamHash") + } + }) + } +} + +func TestNewSampleIteratorErrors(t *testing.T) { + tests := []struct { + name string + chunkData []byte + extractor log.StreamSampleExtractor + from int64 + through int64 + }{ + { + name: "Invalid chunk data", + chunkData: []byte("invalid chunk data"), + extractor: mustNewExtractor(t, ""), + from: 0, + through: 10, + }, + { + name: "Nil extractor", + chunkData: []byte{}, // valid empty chunk + extractor: nil, + from: 0, + through: 10, + }, + { + name: "Invalid time range", + chunkData: []byte{}, // valid empty chunk + extractor: mustNewExtractor(t, ""), + from: 10, + through: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewSampleIterator(tt.chunkData, tt.extractor, tt.from, tt.through) + require.Error(t, err, "Expected an error but got none") + }) + } +} + +func mustNewExtractor(t *testing.T, query string) log.StreamSampleExtractor { + t.Helper() + if query == `` { + query = `count_over_time({foo="bar"}[1m])` + } + expr, err := syntax.ParseSampleExpr(query) + require.NoError(t, err) + + extractor, err := expr.Extractor() + require.NoError(t, err) + + return extractor.ForStream(labels.Labels{}) +} From 9fb50057892a1df79f8adf2904680e740142712e Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 30 Jul 2024 12:59:59 +0200 Subject: [PATCH 05/16] correct doc --- pkg/storage/wal/chunks/doc.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/storage/wal/chunks/doc.go b/pkg/storage/wal/chunks/doc.go index 23baa49c11cd..b3b50ed818c6 100644 --- a/pkg/storage/wal/chunks/doc.go +++ b/pkg/storage/wal/chunks/doc.go @@ -1,20 +1,37 @@ -// Package chunks provides functionality for efficient storage and retrieval of log data. +// Package chunks provides functionality for efficient storage and retrieval of log data and metrics. // // The chunks package implements a compact and performant way to store and access -// log entries. It uses various compression and encoding techniques to minimize +// log entries and metric samples. It uses various compression and encoding techniques to minimize // storage requirements while maintaining fast access times. // // Key features: // - Efficient chunk writing with multiple encoding options // - Fast chunk reading with iterators for forward and backward traversal -// - Support for time-based filtering of log entries +// - Support for time-based filtering of log entries and metric samples // - Integration with Loki's log query language (LogQL) for advanced filtering and processing +// - Separate iterators for log entries and metric samples // // Main types and functions: // - WriteChunk: Writes log entries to a compressed chunk format // - NewChunkReader: Creates a reader for parsing and accessing chunk data // - NewEntryIterator: Provides an iterator for efficient traversal of log entries in a chunk +// - NewSampleIterator: Provides an iterator for efficient traversal of metric samples in a chunk +// +// Entry Iterator: +// The EntryIterator allows efficient traversal of log entries within a chunk. It supports +// both forward and backward iteration, time-based filtering, and integration with LogQL pipelines +// for advanced log processing. +// +// Sample Iterator: +// The SampleIterator enables efficient traversal of metric samples within a chunk. It supports +// time-based filtering and integration with LogQL extractors for advanced metric processing. +// This iterator is particularly useful for handling numeric data extracted from logs or +// pre-aggregated metrics. +// +// Both iterators implement methods for accessing the current entry or sample, checking for errors, +// and retrieving associated labels and stream hashes. // // This package is designed to work seamlessly with other components of the Loki -// log aggregation system, providing a crucial layer for data storage and retrieval. +// log aggregation system, providing a crucial layer for data storage and retrieval of +// both logs and metrics. package chunks From d554e4d15a139b60f884939516d56d8816f2d3b0 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 31 Jul 2024 14:40:47 +0200 Subject: [PATCH 06/16] lint/tests --- pkg/querier-rf1/wal/chunks.go | 8 +++++--- pkg/querier-rf1/wal/querier.go | 1 + pkg/storage/wal/chunks/entry_iterator.go | 3 ++- pkg/storage/wal/chunks/sample_iterator.go | 18 +++++++++--------- pkg/storage/wal/chunks/sample_iterator_test.go | 5 +++-- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/pkg/querier-rf1/wal/chunks.go b/pkg/querier-rf1/wal/chunks.go index be52b819f400..e784a00d1648 100644 --- a/pkg/querier-rf1/wal/chunks.go +++ b/pkg/querier-rf1/wal/chunks.go @@ -6,13 +6,15 @@ import ( "sort" "sync" - "github.com/grafana/loki/pkg/push" + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/sync/errgroup" + "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/storage/wal/chunks" - "github.com/prometheus/prometheus/model/labels" - "golang.org/x/sync/errgroup" + + "github.com/grafana/loki/pkg/push" ) const batchSize = 16 diff --git a/pkg/querier-rf1/wal/querier.go b/pkg/querier-rf1/wal/querier.go index a7ac9976cd32..8b7fea0ac7b5 100644 --- a/pkg/querier-rf1/wal/querier.go +++ b/pkg/querier-rf1/wal/querier.go @@ -11,6 +11,7 @@ import ( grpc "google.golang.org/grpc" "github.com/grafana/dskit/tenant" + "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logql" diff --git a/pkg/storage/wal/chunks/entry_iterator.go b/pkg/storage/wal/chunks/entry_iterator.go index bc9731d31d76..9a127266b07a 100644 --- a/pkg/storage/wal/chunks/entry_iterator.go +++ b/pkg/storage/wal/chunks/entry_iterator.go @@ -3,10 +3,11 @@ package chunks import ( "time" - "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" + + "github.com/grafana/loki/pkg/push" ) type entryBufferedIterator struct { diff --git a/pkg/storage/wal/chunks/sample_iterator.go b/pkg/storage/wal/chunks/sample_iterator.go index 07c9d026f1d6..4d4b397b1dd5 100644 --- a/pkg/storage/wal/chunks/sample_iterator.go +++ b/pkg/storage/wal/chunks/sample_iterator.go @@ -57,25 +57,25 @@ func (s *sampleBufferedIterator) Labels() string { } // Next implements iter.SampleIterator. -func (e *sampleBufferedIterator) Next() bool { - for e.reader.Next() { +func (s *sampleBufferedIterator) Next() bool { + for s.reader.Next() { // todo: Only use length columns for bytes_over_time without filter. - ts, line := e.reader.At() + ts, line := s.reader.At() // check if the timestamp is within the range before applying the pipeline. - if ts < e.from { + if ts < s.from { continue } - if ts >= e.through { + if ts >= s.through { return false } // todo: structured metadata. - val, lbs, matches := e.pipeline.Process(ts, line) + val, lbs, matches := s.pipeline.Process(ts, line) if !matches { continue } - e.currLabels = lbs - e.cur.Value = val - e.cur.Timestamp = ts + s.currLabels = lbs + s.cur.Value = val + s.cur.Timestamp = ts return true } return false diff --git a/pkg/storage/wal/chunks/sample_iterator_test.go b/pkg/storage/wal/chunks/sample_iterator_test.go index e3693461d44d..4e208301ab9d 100644 --- a/pkg/storage/wal/chunks/sample_iterator_test.go +++ b/pkg/storage/wal/chunks/sample_iterator_test.go @@ -5,11 +5,12 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logql/syntax" - "github.com/prometheus/prometheus/model/labels" - "github.com/stretchr/testify/require" ) func TestNewSampleIterator(t *testing.T) { From a43e976c6addb6057e3b31fd4eb857558c9f9a0c Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Wed, 31 Jul 2024 11:34:42 +0100 Subject: [PATCH 07/16] Connect RF1 querier with WAL querier --- pkg/loki/modules.go | 7 ++++++- pkg/querier-rf1/querier.go | 13 ++++++++++--- pkg/querier-rf1/wal/chunks.go | 3 ++- pkg/querier-rf1/wal/querier.go | 6 +++--- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1ede7ee806b7..b04d2b197086 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -52,6 +52,7 @@ import ( metastoreclient "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/client" "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/health" "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" + "github.com/grafana/loki/v3/pkg/ingester-rf1/objstore" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" @@ -415,7 +416,11 @@ func (t *Loki) initQuerier() (services.Service, error) { if t.Cfg.QuerierRF1.Enabled { logger.Log("Using RF-1 querier implementation") - t.Querier, err = querierrf1.New(t.Cfg.QuerierRF1, t.Store, t.Overrides, deleteStore, logger) + store, err := objstore.New(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics) + if err != nil { + return nil, err + } + t.Querier, err = querierrf1.New(t.Cfg.QuerierRF1, t.Store, t.Overrides, deleteStore, t.MetastoreClient, store, logger) if err != nil { return nil, err } diff --git a/pkg/querier-rf1/querier.go b/pkg/querier-rf1/querier.go index 9504fe23482a..c4a9dd76ba5f 100644 --- a/pkg/querier-rf1/querier.go +++ b/pkg/querier-rf1/querier.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/querier" + "github.com/grafana/loki/v3/pkg/querier-rf1/wal" querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/storage" @@ -97,6 +98,7 @@ type Rf1Querier struct { deleteGetter deleteGetter logger log.Logger patternQuerier PatterQuerier + walQuerier logql.Querier } type deleteGetter interface { @@ -104,12 +106,17 @@ type deleteGetter interface { } // New makes a new Querier for RF1 work. -func New(cfg Config, store Store, limits Limits, d deleteGetter, logger log.Logger) (*Rf1Querier, error) { +func New(cfg Config, store Store, limits Limits, d deleteGetter, metastore wal.Metastore, b wal.BlockStorage, logger log.Logger) (*Rf1Querier, error) { + querier, err := wal.New(metastore, b) + if err != nil { + return nil, err + } return &Rf1Querier{ cfg: cfg, store: store, limits: limits, deleteGetter: d, + walQuerier: querier, logger: logger, }, nil } @@ -134,7 +141,7 @@ func (q *Rf1Querier) SelectLogs(ctx context.Context, params logql.SelectLogParam "msg", "querying rf1 store", "params", params) } - storeIter, err := q.store.SelectLogs(ctx, params) + storeIter, err := q.walQuerier.SelectLogs(ctx, params) if err != nil { return nil, err } @@ -164,7 +171,7 @@ func (q *Rf1Querier) SelectSamples(ctx context.Context, params logql.SelectSampl "msg", "querying rf1 store for samples", "params", params) } - storeIter, err := q.store.SelectSamples(ctx, params) + storeIter, err := q.walQuerier.SelectSamples(ctx, params) if err != nil { return nil, err } diff --git a/pkg/querier-rf1/wal/chunks.go b/pkg/querier-rf1/wal/chunks.go index e784a00d1648..500667d4b7f2 100644 --- a/pkg/querier-rf1/wal/chunks.go +++ b/pkg/querier-rf1/wal/chunks.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/storage/wal/chunks" "github.com/grafana/loki/pkg/push" @@ -307,7 +308,7 @@ func readChunkData(ctx context.Context, storage BlockStorage, chunk ChunkData) ( // todo: We should be able to avoid many IOPS to object storage // if chunks are next to each other and we should be able to pack range request // together. - reader, err := storage.GetRangeObject(ctx, chunk.id, int64(offset), int64(size)) + reader, err := storage.GetObjectRange(ctx, wal.Dir+chunk.id, int64(offset), int64(size)) if err != nil { return nil, err } diff --git a/pkg/querier-rf1/wal/querier.go b/pkg/querier-rf1/wal/querier.go index 8b7fea0ac7b5..5c9152f2584d 100644 --- a/pkg/querier-rf1/wal/querier.go +++ b/pkg/querier-rf1/wal/querier.go @@ -8,7 +8,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "golang.org/x/sync/errgroup" - grpc "google.golang.org/grpc" + "google.golang.org/grpc" "github.com/grafana/dskit/tenant" @@ -23,7 +23,7 @@ import ( var _ logql.Querier = (*Querier)(nil) type BlockStorage interface { - GetRangeObject(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) + GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) } type Metastore interface { @@ -170,7 +170,7 @@ func (q *Querier) forIndices(ctx context.Context, req *metastorepb.ListBlocksFor meta := meta g.Go(func() error { - reader, err := q.blockStorage.GetRangeObject(ctx, wal.Dir+meta.Id, meta.IndexRef.Offset, meta.IndexRef.Length) + reader, err := q.blockStorage.GetObjectRange(ctx, wal.Dir+meta.Id, meta.IndexRef.Offset, meta.IndexRef.Length) if err != nil { return err } From 7a3a47f67674cefa2edd0b84d529f7f9f61cd574 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 1 Aug 2024 14:49:48 +0200 Subject: [PATCH 08/16] add more tests --- pkg/querier-rf1/wal/chunks.go | 129 ++++---- pkg/querier-rf1/wal/chunks_test.go | 515 +++++++++++++++++++++++++++++ 2 files changed, 572 insertions(+), 72 deletions(-) create mode 100644 pkg/querier-rf1/wal/chunks_test.go diff --git a/pkg/querier-rf1/wal/chunks.go b/pkg/querier-rf1/wal/chunks.go index e784a00d1648..7f7d4fe3eb1d 100644 --- a/pkg/querier-rf1/wal/chunks.go +++ b/pkg/querier-rf1/wal/chunks.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sort" - "sync" "github.com/prometheus/prometheus/model/labels" "golang.org/x/sync/errgroup" @@ -17,7 +16,7 @@ import ( "github.com/grafana/loki/pkg/push" ) -const batchSize = 16 +const defaultBatchSize = 16 type ChunkData struct { meta *chunks.Meta @@ -37,16 +36,11 @@ func newChunkData(id string, lbs *labels.ScratchBuilder, meta *chunks.Meta) Chun // ChunksEntryIterator iterates over log entries type ChunksEntryIterator[T iter.EntryIterator] struct { baseChunksIterator[T] - - pipeline log.Pipeline - current iter.EntryIterator } // ChunksSampleIterator iterates over metric samples type ChunksSampleIterator[T iter.SampleIterator] struct { baseChunksIterator[T] - current iter.SampleIterator - extractor log.SampleExtractor } func NewChunksEntryIterator( @@ -64,7 +58,8 @@ func NewChunksEntryIterator( chunks: chunks, direction: direction, storage: storage, - batch: make([]ChunkData, 0, batchSize), + bachSize: defaultBatchSize, + batch: make([]ChunkData, 0, defaultBatchSize), minT: minT, maxT: maxT, @@ -73,7 +68,6 @@ func NewChunksEntryIterator( }, isNil: func(it iter.EntryIterator) bool { return it == nil }, }, - pipeline: pipeline, } } @@ -91,7 +85,8 @@ func NewChunksSampleIterator( chunks: chunks, direction: logproto.FORWARD, storage: storage, - batch: make([]ChunkData, 0, batchSize), + bachSize: defaultBatchSize, + batch: make([]ChunkData, 0, defaultBatchSize), minT: minT, maxT: maxT, @@ -100,7 +95,6 @@ func NewChunksSampleIterator( }, isNil: func(it iter.SampleIterator) bool { return it == nil }, }, - extractor: extractor, } } @@ -137,15 +131,16 @@ type baseChunksIterator[T interface { iteratorFactory func([]ChunkData) (T, error) isNil func(T) bool - batch []ChunkData - current T - err error + bachSize int + batch []ChunkData + current T + err error } func (b *baseChunksIterator[T]) nextBatch() error { b.batch = b.batch[:0] for len(b.chunks) > 0 && - (len(b.batch) < batchSize || + (len(b.batch) < b.bachSize || isOverlapping(b.batch[len(b.batch)-1], b.chunks[0], b.direction)) { b.batch = append(b.batch, b.chunks[0]) b.chunks = b.chunks[1:] @@ -188,39 +183,23 @@ func createNextEntryIterator( storage BlockStorage, minT, maxT int64, ) (iter.EntryIterator, error) { - var ( - iterators []iter.EntryIterator - mtx sync.Mutex - ) + iterators := make([]iter.EntryIterator, 0, len(batch)) - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(64) - for _, chunk := range batch { - chunk := chunk // https://golang.org/doc/faq#closures_and_goroutines - g.Go(func() error { - chunkData, err := readChunkData(ctx, storage, chunk) - if err != nil { - return fmt.Errorf("error reading chunk data: %w", err) - } - - streamPipeline := pipeline.ForStream(chunk.labels) - chunkIterator, err := chunks.NewEntryIterator(chunkData, streamPipeline, direction, minT, maxT) - if err != nil { - return fmt.Errorf("error creating entry iterator: %w", err) - } - - mtx.Lock() - iterators = append(iterators, chunkIterator) - mtx.Unlock() - - return nil - }) + data, err := downloadChunks(ctx, storage, batch) + if err != nil { + return nil, err } - if err := g.Wait(); err != nil { - return nil, err + for i, chunk := range batch { + streamPipeline := pipeline.ForStream(chunk.labels) + chunkIterator, err := chunks.NewEntryIterator(data[i], streamPipeline, direction, minT, maxT) + if err != nil { + return nil, fmt.Errorf("error creating entry iterator: %w", err) + } + iterators = append(iterators, chunkIterator) } + // todo: Use NonOverlapping iterator when possible. This will reduce the amount of entries processed during iteration. return iter.NewSortEntryIterator(iterators, direction), nil } @@ -231,37 +210,20 @@ func createNextSampleIterator( storage BlockStorage, minT, maxT int64, ) (iter.SampleIterator, error) { - var ( - iterators []iter.SampleIterator - mtx sync.Mutex - ) - - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(64) - for _, chunk := range batch { - chunk := chunk // https://golang.org/doc/faq#closures_and_goroutines - g.Go(func() error { - chunkData, err := readChunkData(ctx, storage, chunk) - if err != nil { - return fmt.Errorf("error reading chunk data: %w", err) - } + iterators := make([]iter.SampleIterator, 0, len(batch)) - streamPipeline := pipeline.ForStream(chunk.labels) - chunkIterator, err := chunks.NewSampleIterator(chunkData, streamPipeline, minT, maxT) - if err != nil { - return fmt.Errorf("error creating sample iterator: %w", err) - } - - mtx.Lock() - iterators = append(iterators, chunkIterator) - mtx.Unlock() - - return nil - }) + data, err := downloadChunks(ctx, storage, batch) + if err != nil { + return nil, err } - if err := g.Wait(); err != nil { - return nil, err + for i, chunk := range batch { + streamPipeline := pipeline.ForStream(chunk.labels) + chunkIterator, err := chunks.NewSampleIterator(data[i], streamPipeline, minT, maxT) + if err != nil { + return nil, fmt.Errorf("error creating sample iterator: %w", err) + } + iterators = append(iterators, chunkIterator) } return iter.NewSortSampleIterator(iterators), nil @@ -299,7 +261,30 @@ func isOverlapping(first, second ChunkData, direction logproto.Direction) bool { if direction == logproto.BACKWARD { return first.meta.MinTime <= second.meta.MaxTime } - return first.meta.MaxTime < second.meta.MinTime + return first.meta.MaxTime >= second.meta.MinTime +} + +func downloadChunks(ctx context.Context, storage BlockStorage, chks []ChunkData) ([][]byte, error) { + data := make([][]byte, len(chks)) + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(64) + for i, chunk := range chks { + chunk := chunk + i := i + g.Go(func() error { + chunkData, err := readChunkData(ctx, storage, chunk) + if err != nil { + return fmt.Errorf("error reading chunk data: %w", err) + } + data[i] = chunkData + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + return data, nil } func readChunkData(ctx context.Context, storage BlockStorage, chunk ChunkData) ([]byte, error) { diff --git a/pkg/querier-rf1/wal/chunks_test.go b/pkg/querier-rf1/wal/chunks_test.go new file mode 100644 index 000000000000..6438fb2ca1aa --- /dev/null +++ b/pkg/querier-rf1/wal/chunks_test.go @@ -0,0 +1,515 @@ +package wal + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" + walchunks "github.com/grafana/loki/v3/pkg/storage/wal/chunks" +) + +type mockBlockStorage struct { + data map[string][]byte +} + +func (m *mockBlockStorage) GetRangeObject(_ context.Context, objectKey string, off, length int64) (io.ReadCloser, error) { + data := m.data[objectKey] + return io.NopCloser(bytes.NewReader(data[off : off+length])), nil +} + +func TestChunksEntryIterator(t *testing.T) { + ctx := context.Background() + storage := &mockBlockStorage{data: make(map[string][]byte)} + + // Generate test data with multiple batches + chunkData := generateTestChunkData(5 * defaultBatchSize) + chks := writeChunksToStorage(t, storage, chunkData) + + tests := []struct { + name string + direction logproto.Direction + start time.Time + end time.Time + expected []logproto.Entry + }{ + { + name: "forward direction, all entries", + direction: logproto.FORWARD, + start: time.Unix(0, 0), + end: time.Unix(int64(5*defaultBatchSize+1), 0), + expected: flattenEntries(chunkData), + }, + { + name: "backward direction, all entries", + direction: logproto.BACKWARD, + start: time.Unix(0, 0), + end: time.Unix(int64(5*defaultBatchSize+1), 0), + expected: reverseEntries(flattenEntries(chunkData)), + }, + { + name: "forward direction, partial range", + direction: logproto.FORWARD, + start: time.Unix(int64(defaultBatchSize), 0), + end: time.Unix(int64(3*defaultBatchSize), 0), + expected: selectEntries(flattenEntries(chunkData), defaultBatchSize, 3*defaultBatchSize), + }, + { + name: "backward direction, partial range", + direction: logproto.BACKWARD, + start: time.Unix(int64(defaultBatchSize), 0), + end: time.Unix(int64(3*defaultBatchSize), 0), + expected: reverseEntries(selectEntries(flattenEntries(chunkData), defaultBatchSize, 3*defaultBatchSize)), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + expr, err := syntax.ParseLogSelector(`{app=~".+"}`, false) + require.NoError(t, err) + + pipeline, err := expr.Pipeline() + require.NoError(t, err) + + iterator := NewChunksEntryIterator(ctx, storage, chks, pipeline, tt.direction, tt.start.UnixNano(), tt.end.UnixNano()) + + result := iterateEntries(iterator) + require.NoError(t, iterator.Close()) + require.NoError(t, iterator.Err()) + + assertEqualEntries(t, tt.expected, result) + }) + } +} + +func TestChunksSampleIterator(t *testing.T) { + ctx := context.Background() + storage := &mockBlockStorage{data: make(map[string][]byte)} + + // Generate test data with multiple batches + chunkData := generateTestChunkData(5 * defaultBatchSize) + chks := writeChunksToStorage(t, storage, chunkData) + + tests := []struct { + name string + start time.Time + end time.Time + expected []logproto.Sample + }{ + { + name: "all samples", + start: time.Unix(0, 0), + end: time.Unix(int64(5*defaultBatchSize+1), 0), + expected: entriesToSamples(flattenEntries(chunkData)), + }, + { + name: "partial range", + start: time.Unix(int64(defaultBatchSize), 0), + end: time.Unix(int64(3*defaultBatchSize), 0), + expected: entriesToSamples(selectEntries(flattenEntries(chunkData), defaultBatchSize, 3*defaultBatchSize)), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + expr, err := syntax.ParseSampleExpr(`count_over_time({app=~".+"} [1m])`) + require.NoError(t, err) + + extractor, err := expr.Extractor() + require.NoError(t, err) + iterator := NewChunksSampleIterator(ctx, storage, chks, extractor, tt.start.UnixNano(), tt.end.UnixNano()) + + result := iterateSamples(iterator) + require.NoError(t, iterator.Close()) + require.NoError(t, iterator.Err()) + + assertEqualSamples(t, tt.expected, result) + }) + } +} + +func TestSortChunks(t *testing.T) { + chks := []ChunkData{ + { + meta: &walchunks.Meta{MinTime: 2, MaxTime: 4}, + labels: labels.FromStrings("app", "test1"), + }, + { + meta: &walchunks.Meta{MinTime: 1, MaxTime: 3}, + labels: labels.FromStrings("app", "test2"), + }, + { + meta: &walchunks.Meta{MinTime: 1, MaxTime: 3}, + labels: labels.FromStrings("app", "test1"), + }, + } + + t.Run("forward direction", func(t *testing.T) { + sortChunks(chks, logproto.FORWARD) + require.Equal(t, int64(1), chks[0].meta.MinTime) + require.Equal(t, "test1", chks[0].labels.Get("app")) + require.Equal(t, int64(1), chks[1].meta.MinTime) + require.Equal(t, "test2", chks[1].labels.Get("app")) + require.Equal(t, int64(2), chks[2].meta.MinTime) + }) + + t.Run("backward direction", func(t *testing.T) { + sortChunks(chks, logproto.BACKWARD) + require.Equal(t, int64(4), chks[0].meta.MaxTime) + require.Equal(t, "test1", chks[0].labels.Get("app")) + require.Equal(t, int64(3), chks[1].meta.MaxTime) + require.Equal(t, "test1", chks[1].labels.Get("app")) + require.Equal(t, int64(3), chks[2].meta.MaxTime) + require.Equal(t, "test2", chks[2].labels.Get("app")) + }) +} + +func TestIsOverlapping(t *testing.T) { + tests := []struct { + name string + first ChunkData + second ChunkData + direction logproto.Direction + expected bool + }{ + { + name: "overlapping forward", + first: ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 3}}, + second: ChunkData{meta: &walchunks.Meta{MinTime: 2, MaxTime: 4}}, + direction: logproto.FORWARD, + expected: true, + }, + { + name: "non-overlapping forward", + first: ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 2}}, + second: ChunkData{meta: &walchunks.Meta{MinTime: 3, MaxTime: 4}}, + direction: logproto.FORWARD, + expected: false, + }, + { + name: "overlapping backward", + first: ChunkData{meta: &walchunks.Meta{MinTime: 2, MaxTime: 4}}, + second: ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 3}}, + direction: logproto.BACKWARD, + expected: true, + }, + { + name: "non-overlapping backward", + first: ChunkData{meta: &walchunks.Meta{MinTime: 3, MaxTime: 4}}, + second: ChunkData{meta: &walchunks.Meta{MinTime: 1, MaxTime: 2}}, + direction: logproto.BACKWARD, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isOverlapping(tt.first, tt.second, tt.direction) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestBaseChunkIterator(t *testing.T) { + ctx := context.Background() + + testCases := []struct { + name string + chunks []ChunkData + direction logproto.Direction + expected [][]ChunkData + }{ + { + name: "Forward, non-overlapping", + chunks: []ChunkData{ + newTestChunkData("1", 100, 200), + newTestChunkData("2", 300, 400), + newTestChunkData("3", 500, 600), + newTestChunkData("4", 700, 800), + }, + direction: logproto.FORWARD, + expected: [][]ChunkData{ + {newTestChunkData("1", 100, 200), newTestChunkData("2", 300, 400)}, + {newTestChunkData("3", 500, 600), newTestChunkData("4", 700, 800)}, + }, + }, + { + name: "Backward, non-overlapping", + chunks: []ChunkData{ + newTestChunkData("4", 700, 800), + newTestChunkData("3", 500, 600), + newTestChunkData("2", 300, 400), + newTestChunkData("1", 100, 200), + }, + direction: logproto.BACKWARD, + expected: [][]ChunkData{ + {newTestChunkData("4", 700, 800), newTestChunkData("3", 500, 600)}, + {newTestChunkData("2", 300, 400), newTestChunkData("1", 100, 200)}, + }, + }, + { + name: "Forward, overlapping", + chunks: []ChunkData{ + newTestChunkData("1", 100, 300), + newTestChunkData("2", 200, 400), + newTestChunkData("3", 350, 550), + newTestChunkData("4", 600, 800), + }, + direction: logproto.FORWARD, + expected: [][]ChunkData{ + {newTestChunkData("1", 100, 300), newTestChunkData("2", 200, 400), newTestChunkData("3", 350, 550)}, + {newTestChunkData("4", 600, 800)}, + }, + }, + { + name: "Backward, overlapping", + chunks: []ChunkData{ + newTestChunkData("4", 600, 800), + newTestChunkData("3", 350, 550), + newTestChunkData("2", 200, 400), + newTestChunkData("1", 100, 300), + newTestChunkData("0", 10, 20), + }, + direction: logproto.BACKWARD, + expected: [][]ChunkData{ + {newTestChunkData("4", 600, 800), newTestChunkData("3", 350, 550), newTestChunkData("2", 200, 400), newTestChunkData("1", 100, 300)}, + {newTestChunkData("0", 10, 20)}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + iter := &testBaseChunkIterator{ + baseChunksIterator: baseChunksIterator[*testIterator]{ + ctx: ctx, + chunks: tc.chunks, + direction: tc.direction, + bachSize: 2, + batch: make([]ChunkData, 0, 2), + iteratorFactory: func(chunks []ChunkData) (*testIterator, error) { + return &testIterator{chunks: chunks}, nil + }, + isNil: func(it *testIterator) bool { return it == nil }, + }, + } + var batches [][]ChunkData + for len(iter.chunks) > 0 { + err := iter.nextBatch() + require.NoError(t, err) + + batch := make([]ChunkData, len(iter.batch)) + copy(batch, iter.batch) + batches = append(batches, batch) + } + + require.Equal(t, tc.expected, batches) + }) + } +} + +// Helper functions and types + +type testBaseChunkIterator struct { + baseChunksIterator[*testIterator] +} + +type testIterator struct { + chunks []ChunkData + index int +} + +func (t *testIterator) Next() bool { + t.index++ + return t.index < len(t.chunks) +} + +func (t *testIterator) Close() error { return nil } +func (t *testIterator) Err() error { return nil } +func (t *testIterator) StreamHash() uint64 { return 0 } +func (t *testIterator) Labels() string { return "" } +func (t *testIterator) At() logproto.Entry { return logproto.Entry{} } + +func newTestChunkData(id string, minTime, maxTime int64) ChunkData { + return ChunkData{ + id: id, + meta: &walchunks.Meta{ + MinTime: minTime, + MaxTime: maxTime, + }, + labels: labels.Labels{}, + } +} + +func createChunk(minTime, maxTime int64, labelName, labelValue string) ChunkData { + return ChunkData{ + meta: &walchunks.Meta{ + MinTime: minTime, + MaxTime: maxTime, + }, + labels: labels.FromStrings(labelName, labelValue), + } +} + +func assertEqualChunks(t *testing.T, expected, actual ChunkData) { + require.Equal(t, expected.meta.MinTime, actual.meta.MinTime, "MinTime mismatch") + require.Equal(t, expected.meta.MaxTime, actual.meta.MaxTime, "MaxTime mismatch") + require.Equal(t, expected.labels, actual.labels, "Labels mismatch") +} + +func generateTestChunkData(totalEntries int) []struct { + labels labels.Labels + entries []*logproto.Entry +} { + var chunkData []struct { + labels labels.Labels + entries []*logproto.Entry + } + + entriesPerChunk := defaultBatchSize * 2 // Each chunk will contain 2 batches worth of entries + numChunks := (totalEntries + entriesPerChunk - 1) / entriesPerChunk + + for i := 0; i < numChunks; i++ { + startIndex := i * entriesPerChunk + endIndex := (i + 1) * entriesPerChunk + if endIndex > totalEntries { + endIndex = totalEntries + } + + chunkData = append(chunkData, struct { + labels labels.Labels + entries []*logproto.Entry + }{ + labels: labels.FromStrings("app", fmt.Sprintf("test%d", i)), + entries: generateEntries(startIndex, endIndex-1), + }) + } + + return chunkData +} + +func writeChunksToStorage(t *testing.T, storage *mockBlockStorage, chunkData []struct { + labels labels.Labels + entries []*logproto.Entry +}, +) []ChunkData { + chks := make([]ChunkData, 0, len(chunkData)) + for i, cd := range chunkData { + var buf bytes.Buffer + chunkID := fmt.Sprintf("chunk%d", i) + _, err := walchunks.WriteChunk(&buf, cd.entries, walchunks.EncodingSnappy) + require.NoError(t, err) + + storage.data[chunkID] = buf.Bytes() + chks = append(chks, newChunkData(chunkID, labelsToScratchBuilder(cd.labels), &walchunks.Meta{ + Ref: walchunks.NewChunkRef(0, uint64(buf.Len())), + MinTime: cd.entries[0].Timestamp.UnixNano(), + MaxTime: cd.entries[len(cd.entries)-1].Timestamp.UnixNano(), + })) + } + return chks +} + +func generateEntries(start, end int) []*logproto.Entry { + var entries []*logproto.Entry + for i := start; i <= end; i++ { + entries = append(entries, &logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf("line%d", i), + }) + } + return entries +} + +func flattenEntries(chunkData []struct { + labels labels.Labels + entries []*logproto.Entry +}, +) []logproto.Entry { + var result []logproto.Entry + for _, cd := range chunkData { + for _, e := range cd.entries { + result = append(result, logproto.Entry{Timestamp: e.Timestamp, Line: e.Line}) + } + } + return result +} + +func reverseEntries(entries []logproto.Entry) []logproto.Entry { + for i := 0; i < len(entries)/2; i++ { + j := len(entries) - 1 - i + entries[i], entries[j] = entries[j], entries[i] + } + return entries +} + +func selectEntries(entries []logproto.Entry, start, end int) []logproto.Entry { + var result []logproto.Entry + for _, e := range entries { + if e.Timestamp.Unix() >= int64(start) && e.Timestamp.Unix() < int64(end) { + result = append(result, e) + } + } + return result +} + +func entriesToSamples(entries []logproto.Entry) []logproto.Sample { + var samples []logproto.Sample + for _, e := range entries { + samples = append(samples, logproto.Sample{ + Timestamp: e.Timestamp.UnixNano(), + Value: float64(1), // Use timestamp as value for simplicity + }) + } + return samples +} + +func iterateEntries(iterator *ChunksEntryIterator[iter.EntryIterator]) []logproto.Entry { + var result []logproto.Entry + for iterator.Next() { + entry := iterator.At() + result = append(result, logproto.Entry{Timestamp: entry.Timestamp, Line: entry.Line}) + } + return result +} + +func iterateSamples(iterator *ChunksSampleIterator[iter.SampleIterator]) []logproto.Sample { + var result []logproto.Sample + for iterator.Next() { + result = append(result, iterator.At()) + } + return result +} + +func assertEqualEntries(t *testing.T, expected, actual []logproto.Entry) { + require.Equal(t, len(expected), len(actual), "Number of entries mismatch") + for i := range expected { + require.Equal(t, expected[i].Timestamp, actual[i].Timestamp, "Timestamp mismatch at index %d", i) + require.Equal(t, expected[i].Line, actual[i].Line, "Line mismatch at index %d", i) + } +} + +func assertEqualSamples(t *testing.T, expected, actual []logproto.Sample) { + require.Equal(t, len(expected), len(actual), "Number of samples mismatch") + for i := range expected { + require.Equal(t, expected[i].Timestamp, actual[i].Timestamp, "Timestamp mismatch at index %d", i) + require.Equal(t, expected[i].Value, actual[i].Value, "Value mismatch at index %d", i) + } +} + +func labelsToScratchBuilder(lbs labels.Labels) *labels.ScratchBuilder { + sb := labels.NewScratchBuilder(len(lbs)) + sb.Reset() + for i := 0; i < len(lbs); i++ { + sb.Add(lbs[i].Name, lbs[i].Value) + } + return &sb +} From 7163c9cfbae6d91566b028848a528ab5b356e1ac Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 1 Aug 2024 14:51:19 +0200 Subject: [PATCH 09/16] fixes merge --- pkg/querier-rf1/wal/chunks_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier-rf1/wal/chunks_test.go b/pkg/querier-rf1/wal/chunks_test.go index 6438fb2ca1aa..a67c2fc6f1cc 100644 --- a/pkg/querier-rf1/wal/chunks_test.go +++ b/pkg/querier-rf1/wal/chunks_test.go @@ -21,7 +21,7 @@ type mockBlockStorage struct { data map[string][]byte } -func (m *mockBlockStorage) GetRangeObject(_ context.Context, objectKey string, off, length int64) (io.ReadCloser, error) { +func (m *mockBlockStorage) GetObjectRange(_ context.Context, objectKey string, off, length int64) (io.ReadCloser, error) { data := m.data[objectKey] return io.NopCloser(bytes.NewReader(data[off : off+length])), nil } From aa397505a7128ea8e434c01de4efc953476ea3da Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 1 Aug 2024 16:43:24 +0200 Subject: [PATCH 10/16] fixes tests --- pkg/querier-rf1/wal/querier_test.go | 278 ++++++++++++++++++++++++++++ 1 file changed, 278 insertions(+) create mode 100644 pkg/querier-rf1/wal/querier_test.go diff --git a/pkg/querier-rf1/wal/querier_test.go b/pkg/querier-rf1/wal/querier_test.go new file mode 100644 index 000000000000..9ce40e359686 --- /dev/null +++ b/pkg/querier-rf1/wal/querier_test.go @@ -0,0 +1,278 @@ +package wal + +import ( + "bytes" + "context" + "fmt" + "io" + "sort" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/grafana/dskit/user" + "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" + "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/querier/plan" + "github.com/grafana/loki/v3/pkg/storage/wal" +) + +// InMemoryStorage implements BlockStorage interface +type InMemoryStorage struct { + data map[string][]byte +} + +func NewInMemoryStorage() *InMemoryStorage { + return &InMemoryStorage{ + data: make(map[string][]byte), + } +} + +func (s *InMemoryStorage) GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) { + data, ok := s.data[objectKey] + if !ok { + return nil, fmt.Errorf("object not found: %s", objectKey) + } + if off < 0 || length < 0 || off+length > int64(len(data)) { + return nil, fmt.Errorf("invalid range: off=%d, length=%d, dataLen=%d", off, length, len(data)) + } + return io.NopCloser(bytes.NewReader(data[off : off+length])), nil +} + +func (s *InMemoryStorage) PutObject(objectKey string, data []byte) { + s.data[objectKey] = data +} + +// InMemoryMetastore implements Metastore interface +type InMemoryMetastore struct { + blocks map[string][]*metastorepb.BlockMeta +} + +func NewInMemoryMetastore() *InMemoryMetastore { + return &InMemoryMetastore{ + blocks: make(map[string][]*metastorepb.BlockMeta), + } +} + +func (m *InMemoryMetastore) ListBlocksForQuery(ctx context.Context, in *metastorepb.ListBlocksForQueryRequest, opts ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error) { + blocks := m.blocks[in.TenantId] + var result []*metastorepb.BlockMeta + for _, block := range blocks { + if block.MinTime <= in.EndTime && block.MaxTime >= in.StartTime { + result = append(result, block) + } + } + return &metastorepb.ListBlocksForQueryResponse{Blocks: result}, nil +} + +func (m *InMemoryMetastore) AddBlock(tenantID string, block *metastorepb.BlockMeta) { + m.blocks[tenantID] = append(m.blocks[tenantID], block) +} + +func TestQuerier_Integration(t *testing.T) { + storage := NewInMemoryStorage() + metastore := NewInMemoryMetastore() + + querier, err := New(metastore, storage) + require.NoError(t, err) + + tenantID := "test-tenant" + ctx := user.InjectOrgID(context.Background(), tenantID) + + testData := generateTestData() + setupTestData(t, storage, metastore, tenantID, testData) + + t.Run("SelectLogs", func(t *testing.T) { + testSelectLogs(t, ctx, querier) + }) + + t.Run("SelectLogs_FilterByApp", func(t *testing.T) { + testSelectLogsFilterByApp(t, ctx, querier) + }) + + t.Run("SelectSamples", func(t *testing.T) { + testSelectSamples(t, ctx, querier) + }) +} + +func generateTestData() []struct { + labels labels.Labels + entries []*logproto.Entry +} { + return []struct { + labels labels.Labels + entries []*logproto.Entry + }{ + { + labels: labels.FromStrings("app", "test1", "env", "prod"), + entries: generateEntries(1000, 1200), + }, + { + labels: labels.FromStrings("app", "test2", "env", "dev"), + entries: generateEntries(1100, 1300), + }, + { + labels: labels.FromStrings("app", "test3", "env", "staging"), + entries: generateEntries(1200, 1400), + }, + } +} + +func setupTestData(t *testing.T, storage *InMemoryStorage, metastore *InMemoryMetastore, tenantID string, testData []struct { + labels labels.Labels + entries []*logproto.Entry +}, +) { + for i, data := range testData { + chunkSize := 50 // This will create multiple chunks per set of test data + for j := 0; j < len(data.entries); j += chunkSize { + end := j + chunkSize + if end > len(data.entries) { + end = len(data.entries) + } + + segmentID := fmt.Sprintf("segment%d_%d", i, j/chunkSize) + writer, err := wal.NewWalSegmentWriter() + require.NoError(t, err) + + writer.Append(tenantID, data.labels.String(), data.labels, data.entries[j:end], time.Now()) + + var buf bytes.Buffer + _, err = writer.WriteTo(&buf) + require.NoError(t, err) + + segmentData := buf.Bytes() + storage.PutObject(wal.Dir+segmentID, segmentData) + + blockMeta := writer.Meta(segmentID) + metastore.AddBlock(tenantID, blockMeta) + } + } +} + +func testSelectLogs(t *testing.T, ctx context.Context, querier *Querier) { + query := `{app=~"test.*"}` + expr, err := syntax.ParseExpr(query) + require.NoError(t, err) + req := logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: query, + Start: time.Unix(1000, 0), + End: time.Unix(1400, 0), + Limit: 1000, + Direction: logproto.FORWARD, + Plan: &plan.QueryPlan{ + AST: expr, + }, + }, + } + + iter, err := querier.SelectLogs(ctx, req) + require.NoError(t, err) + + results := collectLogEntries(t, iter) + + assert.Len(t, results, 601, "Expected 601 log entries") + validateLogEntries(t, results) +} + +func testSelectLogsFilterByApp(t *testing.T, ctx context.Context, querier *Querier) { + query := `{app="test2"}` + expr, err := syntax.ParseExpr(query) + require.NoError(t, err) + req := logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: query, + Start: time.Unix(1000, 0), + End: time.Unix(1400, 0), + Limit: 1000, + Direction: logproto.FORWARD, + Plan: &plan.QueryPlan{ + AST: expr, + }, + }, + } + + iter, err := querier.SelectLogs(ctx, req) + require.NoError(t, err) + + results := collectLogEntries(t, iter) + + assert.Len(t, results, 201, "Expected 201 log entries for test2") + for _, entry := range results { + assert.Contains(t, entry.Line, "line", "Log line should contain 'line'") + assert.True(t, entry.Timestamp.Unix() >= 1100 && entry.Timestamp.Unix() <= 1300, "Timestamp should be between 1100 and 1300") + } +} + +func testSelectSamples(t *testing.T, ctx context.Context, querier *Querier) { + query := `rate({app=~"test.*"}[5m])` + expr, err := syntax.ParseExpr(query) + require.NoError(t, err) + + req := logql.SelectSampleParams{ + SampleQueryRequest: &logproto.SampleQueryRequest{ + Selector: query, + Start: time.Unix(1000, 0), + End: time.Unix(1400, 0), + Plan: &plan.QueryPlan{ + AST: expr, + }, + }, + } + + iter, err := querier.SelectSamples(ctx, req) + require.NoError(t, err) + + results := collectSamples(t, iter) + + assert.NotEmpty(t, results, "Should have sample results") + for _, sample := range results { + assert.True(t, sample.Value > 0 && sample.Value <= 0.2, "Rate should be between 0 and 0.2 logs/second") + } +} + +func collectLogEntries(t *testing.T, iter iter.EntryIterator) []logproto.Entry { + var results []logproto.Entry + for iter.Next() { + entry := iter.At() + results = append(results, logproto.Entry{ + Timestamp: entry.Timestamp, + Line: entry.Line, + }) + } + require.NoError(t, iter.Close()) + return results +} + +func collectSamples(t *testing.T, iter iter.SampleIterator) []logproto.Sample { + var results []logproto.Sample + for iter.Next() { + results = append(results, iter.At()) + } + require.NoError(t, iter.Close()) + return results +} + +func validateLogEntries(t *testing.T, entries []logproto.Entry) { + sort.Slice(entries, func(i, j int) bool { + return entries[i].Timestamp.Before(entries[j].Timestamp) + }) + + var prevTimestamp time.Time + for _, entry := range entries { + assert.True(t, entry.Timestamp.After(prevTimestamp) || entry.Timestamp.Equal(prevTimestamp), + "Timestamps should be in non-decreasing order") + prevTimestamp = entry.Timestamp + + assert.Contains(t, entry.Line, "line", "Log line should contain 'line'") + assert.True(t, entry.Timestamp.Unix() >= 1000 && entry.Timestamp.Unix() <= 1400, "Timestamp should be between 1000 and 1400") + } +} From 83370fd31079d302df7edf3c39935ee026589bae Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 1 Aug 2024 16:45:56 +0200 Subject: [PATCH 11/16] fixes tests --- pkg/querier-rf1/wal/chunks_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/querier-rf1/wal/chunks_test.go b/pkg/querier-rf1/wal/chunks_test.go index a67c2fc6f1cc..0d0192c04b17 100644 --- a/pkg/querier-rf1/wal/chunks_test.go +++ b/pkg/querier-rf1/wal/chunks_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/storage/wal" walchunks "github.com/grafana/loki/v3/pkg/storage/wal/chunks" ) @@ -408,7 +409,7 @@ func writeChunksToStorage(t *testing.T, storage *mockBlockStorage, chunkData []s _, err := walchunks.WriteChunk(&buf, cd.entries, walchunks.EncodingSnappy) require.NoError(t, err) - storage.data[chunkID] = buf.Bytes() + storage.data[wal.Dir+chunkID] = buf.Bytes() chks = append(chks, newChunkData(chunkID, labelsToScratchBuilder(cd.labels), &walchunks.Meta{ Ref: walchunks.NewChunkRef(0, uint64(buf.Len())), MinTime: cd.entries[0].Timestamp.UnixNano(), From ed56fe2b83b2680bafa736b70320e69a200f4ca6 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 2 Aug 2024 01:15:20 +0200 Subject: [PATCH 12/16] Add more tests for querier code --- pkg/querier-rf1/wal/chunks.go | 12 +- pkg/querier-rf1/wal/querier_test.go | 712 ++++++++++++++++++++++------ 2 files changed, 576 insertions(+), 148 deletions(-) diff --git a/pkg/querier-rf1/wal/chunks.go b/pkg/querier-rf1/wal/chunks.go index 5dd196c18f79..0216fdabc9aa 100644 --- a/pkg/querier-rf1/wal/chunks.go +++ b/pkg/querier-rf1/wal/chunks.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/storage/wal" "github.com/grafana/loki/v3/pkg/storage/wal/chunks" + "github.com/grafana/loki/v3/pkg/storage/wal/index" "github.com/grafana/loki/pkg/push" ) @@ -27,10 +28,19 @@ type ChunkData struct { func newChunkData(id string, lbs *labels.ScratchBuilder, meta *chunks.Meta) ChunkData { lbs.Sort() + newLbs := lbs.Labels() + j := 0 + for _, l := range newLbs { + if l.Name != index.TenantLabel { + newLbs[j] = l + j++ + } + } + newLbs = newLbs[:j] return ChunkData{ id: id, meta: meta, - labels: lbs.Labels(), + labels: newLbs, } } diff --git a/pkg/querier-rf1/wal/querier_test.go b/pkg/querier-rf1/wal/querier_test.go index 9ce40e359686..be6d5191df78 100644 --- a/pkg/querier-rf1/wal/querier_test.go +++ b/pkg/querier-rf1/wal/querier_test.go @@ -22,63 +22,57 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/storage/wal" + "github.com/grafana/loki/v3/pkg/storage/wal/chunks" ) -// InMemoryStorage implements BlockStorage interface -type InMemoryStorage struct { +// MockStorage is a simple in-memory storage for testing +type MockStorage struct { data map[string][]byte } -func NewInMemoryStorage() *InMemoryStorage { - return &InMemoryStorage{ - data: make(map[string][]byte), - } +func NewMockStorage() *MockStorage { + return &MockStorage{data: make(map[string][]byte)} } -func (s *InMemoryStorage) GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) { - data, ok := s.data[objectKey] +func (m *MockStorage) GetObjectRange(_ context.Context, objectKey string, off, length int64) (io.ReadCloser, error) { + data, ok := m.data[objectKey] if !ok { return nil, fmt.Errorf("object not found: %s", objectKey) } - if off < 0 || length < 0 || off+length > int64(len(data)) { - return nil, fmt.Errorf("invalid range: off=%d, length=%d, dataLen=%d", off, length, len(data)) - } return io.NopCloser(bytes.NewReader(data[off : off+length])), nil } -func (s *InMemoryStorage) PutObject(objectKey string, data []byte) { - s.data[objectKey] = data +func (m *MockStorage) PutObject(objectKey string, data []byte) { + m.data[objectKey] = data } -// InMemoryMetastore implements Metastore interface -type InMemoryMetastore struct { +// MockMetastore is a simple in-memory metastore for testing +type MockMetastore struct { blocks map[string][]*metastorepb.BlockMeta } -func NewInMemoryMetastore() *InMemoryMetastore { - return &InMemoryMetastore{ - blocks: make(map[string][]*metastorepb.BlockMeta), - } +func NewMockMetastore() *MockMetastore { + return &MockMetastore{blocks: make(map[string][]*metastorepb.BlockMeta)} } -func (m *InMemoryMetastore) ListBlocksForQuery(ctx context.Context, in *metastorepb.ListBlocksForQueryRequest, opts ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error) { - blocks := m.blocks[in.TenantId] +func (m *MockMetastore) ListBlocksForQuery(_ context.Context, req *metastorepb.ListBlocksForQueryRequest, _ ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error) { + blocks := m.blocks[req.TenantId] var result []*metastorepb.BlockMeta for _, block := range blocks { - if block.MinTime <= in.EndTime && block.MaxTime >= in.StartTime { + if block.MinTime <= req.EndTime && block.MaxTime >= req.StartTime { result = append(result, block) } } return &metastorepb.ListBlocksForQueryResponse{Blocks: result}, nil } -func (m *InMemoryMetastore) AddBlock(tenantID string, block *metastorepb.BlockMeta) { +func (m *MockMetastore) AddBlock(tenantID string, block *metastorepb.BlockMeta) { m.blocks[tenantID] = append(m.blocks[tenantID], block) } -func TestQuerier_Integration(t *testing.T) { - storage := NewInMemoryStorage() - metastore := NewInMemoryMetastore() +func TestQuerier_SelectLogs(t *testing.T) { + storage := NewMockStorage() + metastore := NewMockMetastore() querier, err := New(metastore, storage) require.NoError(t, err) @@ -86,193 +80,617 @@ func TestQuerier_Integration(t *testing.T) { tenantID := "test-tenant" ctx := user.InjectOrgID(context.Background(), tenantID) - testData := generateTestData() - setupTestData(t, storage, metastore, tenantID, testData) - - t.Run("SelectLogs", func(t *testing.T) { - testSelectLogs(t, ctx, querier) - }) - - t.Run("SelectLogs_FilterByApp", func(t *testing.T) { - testSelectLogsFilterByApp(t, ctx, querier) - }) - - t.Run("SelectSamples", func(t *testing.T) { - testSelectSamples(t, ctx, querier) - }) -} - -func generateTestData() []struct { - labels labels.Labels - entries []*logproto.Entry -} { - return []struct { + // Create expanded test data + testData := []struct { labels labels.Labels entries []*logproto.Entry }{ { labels: labels.FromStrings("app", "test1", "env", "prod"), - entries: generateEntries(1000, 1200), + entries: generateEntries(1000, 1050), }, { - labels: labels.FromStrings("app", "test2", "env", "dev"), - entries: generateEntries(1100, 1300), + labels: labels.FromStrings("app", "test2", "env", "staging"), + entries: generateEntries(1025, 1075), }, { - labels: labels.FromStrings("app", "test3", "env", "staging"), - entries: generateEntries(1200, 1400), + labels: labels.FromStrings("app", "test3", "env", "dev", "version", "v1"), + entries: generateEntries(1050, 1100), + }, + { + labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + entries: generateEntries(1075, 1125), }, } -} -func setupTestData(t *testing.T, storage *InMemoryStorage, metastore *InMemoryMetastore, tenantID string, testData []struct { - labels labels.Labels - entries []*logproto.Entry -}, -) { - for i, data := range testData { - chunkSize := 50 // This will create multiple chunks per set of test data - for j := 0; j < len(data.entries); j += chunkSize { - end := j + chunkSize - if end > len(data.entries) { - end = len(data.entries) - } + // Setup test data + setupTestData(t, storage, metastore, tenantID, testData) - segmentID := fmt.Sprintf("segment%d_%d", i, j/chunkSize) - writer, err := wal.NewWalSegmentWriter() + // Test cases + testCases := []struct { + name string + query string + expectedCount int + expectedFirst logproto.Entry + expectedLast logproto.Entry + }{ + { + name: "Query all logs", + query: `{app=~"test.*"}`, + expectedCount: 204, + expectedFirst: logproto.Entry{ + Timestamp: time.Unix(1000, 0), + Line: "line1000", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test1"}, + {Name: "env", Value: "prod"}, + }, + }, + expectedLast: logproto.Entry{ + Timestamp: time.Unix(1125, 0), + Line: "line1125", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test4"}, + {Name: "env", Value: "prod"}, + {Name: "version", Value: "v2"}, + }, + }, + }, + { + name: "Query specific app", + query: `{app="test1"}`, + expectedCount: 51, + expectedFirst: logproto.Entry{ + Timestamp: time.Unix(1000, 0), + Line: "line1000", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test1"}, + {Name: "env", Value: "prod"}, + }, + }, + expectedLast: logproto.Entry{ + Timestamp: time.Unix(1050, 0), + Line: "line1050", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test1"}, + {Name: "env", Value: "prod"}, + }, + }, + }, + { + name: "Query with multiple label equality", + query: `{app="test4", env="prod"}`, + expectedCount: 51, + expectedFirst: logproto.Entry{ + Timestamp: time.Unix(1075, 0), + Line: "line1075", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test4"}, + {Name: "env", Value: "prod"}, + {Name: "version", Value: "v2"}, + }, + }, + expectedLast: logproto.Entry{ + Timestamp: time.Unix(1125, 0), + Line: "line1125", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test4"}, + {Name: "env", Value: "prod"}, + {Name: "version", Value: "v2"}, + }, + }, + }, + { + name: "Query with negative regex", + query: `{app=~"test.*", env!~"stag.*|dev"}`, + expectedCount: 102, + expectedFirst: logproto.Entry{ + Timestamp: time.Unix(1000, 0), + Line: "line1000", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test1"}, + {Name: "env", Value: "prod"}, + }, + }, + expectedLast: logproto.Entry{ + Timestamp: time.Unix(1125, 0), + Line: "line1125", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test4"}, + {Name: "env", Value: "prod"}, + {Name: "version", Value: "v2"}, + }, + }, + }, + { + name: "Query with label presence", + query: `{app=~"test.*", version=""}`, + expectedCount: 102, + expectedFirst: logproto.Entry{ + Timestamp: time.Unix(1000, 0), + Line: "line1000", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test1"}, + {Name: "env", Value: "prod"}, + }, + }, + expectedLast: logproto.Entry{ + Timestamp: time.Unix(1075, 0), + Line: "line1075", + Parsed: []logproto.LabelAdapter{ + {Name: "app", Value: "test2"}, + {Name: "env", Value: "staging"}, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + expr, err := syntax.ParseExpr(tc.query) require.NoError(t, err) - writer.Append(tenantID, data.labels.String(), data.labels, data.entries[j:end], time.Now()) + req := logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: tc.query, + Start: time.Unix(1000, 0), + End: time.Unix(1126, 0), + Limit: 10000, + Direction: logproto.FORWARD, + Plan: &plan.QueryPlan{ + AST: expr, + }, + }, + } - var buf bytes.Buffer - _, err = writer.WriteTo(&buf) + iter, err := querier.SelectLogs(ctx, req) require.NoError(t, err) - segmentData := buf.Bytes() - storage.PutObject(wal.Dir+segmentID, segmentData) + results := collectPushEntries(t, iter) - blockMeta := writer.Meta(segmentID) - metastore.AddBlock(tenantID, blockMeta) - } + assert.Len(t, results, tc.expectedCount, "Unexpected number of log entries") + if len(results) > 0 { + assert.Equal(t, tc.expectedFirst, results[0], "First log entry mismatch") + assert.Equal(t, tc.expectedLast, results[len(results)-1], "Last log entry mismatch") + } + }) } } -func testSelectLogs(t *testing.T, ctx context.Context, querier *Querier) { - query := `{app=~"test.*"}` - expr, err := syntax.ParseExpr(query) - require.NoError(t, err) - req := logql.SelectLogParams{ - QueryRequest: &logproto.QueryRequest{ - Selector: query, - Start: time.Unix(1000, 0), - End: time.Unix(1400, 0), - Limit: 1000, - Direction: logproto.FORWARD, - Plan: &plan.QueryPlan{ - AST: expr, - }, - }, - } +// SampleWithLabels is a new struct to hold both the sample and its labels +type SampleWithLabels struct { + Sample logproto.Sample + Labels labels.Labels +} - iter, err := querier.SelectLogs(ctx, req) +func TestQuerier_SelectSamples(t *testing.T) { + storage := NewMockStorage() + metastore := NewMockMetastore() + + querier, err := New(metastore, storage) require.NoError(t, err) - results := collectLogEntries(t, iter) + tenantID := "test-tenant" + ctx := user.InjectOrgID(context.Background(), tenantID) - assert.Len(t, results, 601, "Expected 601 log entries") - validateLogEntries(t, results) -} + // Create test data + testData := []struct { + labels labels.Labels + samples []logproto.Sample + }{ + { + labels: labels.FromStrings("app", "test1", "env", "prod"), + samples: generateSamples(1000, 1050, 1), + }, + { + labels: labels.FromStrings("app", "test2", "env", "staging"), + samples: generateSamples(1025, 1075, 2), + }, + { + labels: labels.FromStrings("app", "test3", "env", "dev", "version", "v1"), + samples: generateSamples(1050, 1100, 3), + }, + { + labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + samples: generateSamples(1075, 1125, 4), + }, + } -func testSelectLogsFilterByApp(t *testing.T, ctx context.Context, querier *Querier) { - query := `{app="test2"}` - expr, err := syntax.ParseExpr(query) - require.NoError(t, err) - req := logql.SelectLogParams{ - QueryRequest: &logproto.QueryRequest{ - Selector: query, - Start: time.Unix(1000, 0), - End: time.Unix(1400, 0), - Limit: 1000, - Direction: logproto.FORWARD, - Plan: &plan.QueryPlan{ - AST: expr, + // Setup test data + setupTestSampleData(t, storage, metastore, tenantID, testData) + + // Test cases + testCases := []struct { + name string + query string + expectedCount int + expectedFirst SampleWithLabels + expectedLast SampleWithLabels + }{ + { + name: "Query all samples", + query: `sum_over_time({app=~"test.*"} | label_format v="{{__line__}}" | unwrap v[1s])`, + expectedCount: 204, + expectedFirst: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1000, 0).UnixNano(), + Value: 1, + }, + Labels: labels.FromStrings("app", "test1", "env", "prod"), + }, + expectedLast: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1125, 0).UnixNano(), + Value: 4, + }, + Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + }, + }, + { + name: "Query specific app", + query: `sum_over_time({app="test1"}| label_format v="{{__line__}}" | unwrap v[1s])`, + expectedCount: 51, + expectedFirst: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1000, 0).UnixNano(), + Value: 1, + }, + Labels: labels.FromStrings("app", "test1", "env", "prod"), + }, + expectedLast: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1050, 0).UnixNano(), + Value: 1, + }, + Labels: labels.FromStrings("app", "test1", "env", "prod"), + }, + }, + { + name: "Query with multiple label equality", + query: `sum_over_time({app="test4", env="prod"}| label_format v="{{__line__}}" | unwrap v[1s])`, + expectedCount: 51, + expectedFirst: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1075, 0).UnixNano(), + Value: 4, + }, + Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + }, + expectedLast: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1125, 0).UnixNano(), + Value: 4, + }, + Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), + }, + }, + { + name: "Query with negative regex", + query: `sum_over_time({app=~"test.*", env!~"stag.*|dev"}| label_format v="{{__line__}}" | unwrap v[1s])`, + expectedCount: 102, + expectedFirst: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1000, 0).UnixNano(), + Value: 1, + }, + Labels: labels.FromStrings("app", "test1", "env", "prod"), + }, + expectedLast: SampleWithLabels{ + Sample: logproto.Sample{ + Timestamp: time.Unix(1125, 0).UnixNano(), + Value: 4, + }, + Labels: labels.FromStrings("app", "test4", "env", "prod", "version", "v2"), }, }, } - iter, err := querier.SelectLogs(ctx, req) - require.NoError(t, err) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + expr, err := syntax.ParseExpr(tc.query) + require.NoError(t, err) - results := collectLogEntries(t, iter) + req := logql.SelectSampleParams{ + SampleQueryRequest: &logproto.SampleQueryRequest{ + Selector: tc.query, + Start: time.Unix(1000, 0), + End: time.Unix(1126, 0), + Plan: &plan.QueryPlan{ + AST: expr, + }, + }, + } + + iter, err := querier.SelectSamples(ctx, req) + require.NoError(t, err) - assert.Len(t, results, 201, "Expected 201 log entries for test2") - for _, entry := range results { - assert.Contains(t, entry.Line, "line", "Log line should contain 'line'") - assert.True(t, entry.Timestamp.Unix() >= 1100 && entry.Timestamp.Unix() <= 1300, "Timestamp should be between 1100 and 1300") + results := collectSamplesWithLabels(t, iter) + + assert.Len(t, results, tc.expectedCount, "Unexpected number of samples") + if len(results) > 0 { + assert.Equal(t, tc.expectedFirst.Sample, results[0].Sample, "First sample mismatch") + assert.Equal(t, tc.expectedFirst.Labels, results[0].Labels, "First sample labels mismatch") + assert.Equal(t, tc.expectedLast.Sample, results[len(results)-1].Sample, "Last sample mismatch") + assert.Equal(t, tc.expectedLast.Labels, results[len(results)-1].Labels, "Last sample labels mismatch") + } + }) } } -func testSelectSamples(t *testing.T, ctx context.Context, querier *Querier) { - query := `rate({app=~"test.*"}[5m])` - expr, err := syntax.ParseExpr(query) +func TestQuerier_matchingChunks(t *testing.T) { + storage := NewMockStorage() + metastore := NewMockMetastore() + + querier, err := New(metastore, storage) require.NoError(t, err) - req := logql.SelectSampleParams{ - SampleQueryRequest: &logproto.SampleQueryRequest{ - Selector: query, - Start: time.Unix(1000, 0), - End: time.Unix(1400, 0), - Plan: &plan.QueryPlan{ - AST: expr, + tenantID := "test-tenant" + ctx := user.InjectOrgID(context.Background(), tenantID) + + // Create test data + testData := []struct { + labels labels.Labels + entries []*logproto.Entry + }{ + { + labels: labels.FromStrings("app", "app1", "env", "prod"), + entries: generateEntries(1000, 1050), + }, + { + labels: labels.FromStrings("app", "app2", "env", "staging"), + entries: generateEntries(1025, 1075), + }, + { + labels: labels.FromStrings("app", "app3", "env", "dev"), + entries: generateEntries(1050, 1100), + }, + } + + // Setup test data + setupTestData(t, storage, metastore, tenantID, testData) + + // Test cases + testCases := []struct { + name string + matchers []*labels.Matcher + start int64 + end int64 + expectedChunks []ChunkData + }{ + { + name: "Equality matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "app1"), + }, + start: time.Unix(1000, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app1", "env", "prod"), + meta: &chunks.Meta{MinTime: time.Unix(1000, 0).UnixNano(), MaxTime: time.Unix(1050, 0).UnixNano()}, + }, + }, + }, + { + name: "Negative matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "app", "app1"), + }, + start: time.Unix(1000, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app2", "env", "staging"), + meta: &chunks.Meta{MinTime: time.Unix(1025, 0).UnixNano(), MaxTime: time.Unix(1075, 0).UnixNano()}, + }, + { + labels: labels.FromStrings("app", "app3", "env", "dev"), + meta: &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()}, + }, + }, + }, + { + name: "Regex matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "app", "app[12]"), + }, + start: time.Unix(1000, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app1", "env", "prod"), + meta: &chunks.Meta{MinTime: time.Unix(1000, 0).UnixNano(), MaxTime: time.Unix(1050, 0).UnixNano()}, + }, + { + labels: labels.FromStrings("app", "app2", "env", "staging"), + meta: &chunks.Meta{MinTime: time.Unix(1025, 0).UnixNano(), MaxTime: time.Unix(1075, 0).UnixNano()}, + }, + }, + }, + { + name: "Not regex matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotRegexp, "app", "app[12]"), + }, + start: time.Unix(1000, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app3", "env", "dev"), + meta: &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()}, + }, + }, + }, + { + name: "Multiple matchers", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "app", "app.*"), + labels.MustNewMatcher(labels.MatchNotEqual, "env", "prod"), + }, + start: time.Unix(1000, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app2", "env", "staging"), + meta: &chunks.Meta{MinTime: time.Unix(1025, 0).UnixNano(), MaxTime: time.Unix(1075, 0).UnixNano()}, + }, + { + labels: labels.FromStrings("app", "app3", "env", "dev"), + meta: &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()}, + }, + }, + }, + { + name: "Time range filter", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "app", "app.*"), + }, + start: time.Unix(1080, 0).UnixNano(), + end: time.Unix(1100, 0).UnixNano(), + expectedChunks: []ChunkData{ + { + labels: labels.FromStrings("app", "app3", "env", "dev"), + meta: &chunks.Meta{MinTime: time.Unix(1050, 0).UnixNano(), MaxTime: time.Unix(1100, 0).UnixNano()}, + }, }, }, } - iter, err := querier.SelectSamples(ctx, req) - require.NoError(t, err) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + chunks, err := querier.matchingChunks(ctx, tenantID, tc.start, tc.end, tc.matchers...) + require.NoError(t, err) - results := collectSamples(t, iter) + sort.Slice(tc.expectedChunks, func(i, j int) bool { + return tc.expectedChunks[i].labels.String() < tc.expectedChunks[j].labels.String() + }) + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].labels.String() < chunks[j].labels.String() + }) + assert.Equal(t, len(tc.expectedChunks), len(chunks), "Unexpected number of matching chunks") + + // Verify that all returned chunks match the expected chunks + for i, expectedChunk := range tc.expectedChunks { + if i < len(chunks) { + assert.Equal(t, expectedChunk.labels, chunks[i].labels, "Labels mismatch for chunk %d", i) + assert.Equal(t, expectedChunk.meta.MinTime, chunks[i].meta.MinTime, "MinTime mismatch for chunk %d", i) + assert.Equal(t, expectedChunk.meta.MaxTime, chunks[i].meta.MaxTime, "MaxTime mismatch for chunk %d", i) + } + } - assert.NotEmpty(t, results, "Should have sample results") - for _, sample := range results { - assert.True(t, sample.Value > 0 && sample.Value <= 0.2, "Rate should be between 0 and 0.2 logs/second") + // Additional checks for time range and matchers + for _, chunk := range chunks { + for _, matcher := range tc.matchers { + assert.True(t, matcher.Matches(chunk.labels.Get(matcher.Name)), + "Chunk labels %v do not match criteria %v", chunk.labels, matcher) + } + } + }) } } -func collectLogEntries(t *testing.T, iter iter.EntryIterator) []logproto.Entry { +func setupTestData(t *testing.T, storage *MockStorage, metastore *MockMetastore, tenantID string, testData []struct { + labels labels.Labels + entries []*logproto.Entry +}, +) { + total := 0 + for i, data := range testData { + segmentID := fmt.Sprintf("segment%d", i) + writer, err := wal.NewWalSegmentWriter() + require.NoError(t, err) + total += len(data.entries) + writer.Append(tenantID, data.labels.String(), data.labels, data.entries, time.Now()) + + var buf bytes.Buffer + _, err = writer.WriteTo(&buf) + require.NoError(t, err) + + segmentData := buf.Bytes() + storage.PutObject(wal.Dir+segmentID, segmentData) + + blockMeta := writer.Meta(segmentID) + metastore.AddBlock(tenantID, blockMeta) + } + t.Log("Total entries in storage:", total) +} + +func collectPushEntries(t *testing.T, iter iter.EntryIterator) []logproto.Entry { var results []logproto.Entry for iter.Next() { entry := iter.At() + lbs := iter.Labels() + parsed, err := syntax.ParseLabels(lbs) + require.NoError(t, err) results = append(results, logproto.Entry{ Timestamp: entry.Timestamp, Line: entry.Line, + Parsed: logproto.FromLabelsToLabelAdapters(parsed), }) } require.NoError(t, iter.Close()) return results } -func collectSamples(t *testing.T, iter iter.SampleIterator) []logproto.Sample { - var results []logproto.Sample +func collectSamplesWithLabels(t *testing.T, iter iter.SampleIterator) []SampleWithLabels { + var results []SampleWithLabels for iter.Next() { - results = append(results, iter.At()) + sample := iter.At() + labelString := iter.Labels() + parsedLabels, err := syntax.ParseLabels(labelString) + require.NoError(t, err) + results = append(results, SampleWithLabels{ + Sample: sample, + Labels: parsedLabels, + }) } require.NoError(t, iter.Close()) return results } -func validateLogEntries(t *testing.T, entries []logproto.Entry) { - sort.Slice(entries, func(i, j int) bool { - return entries[i].Timestamp.Before(entries[j].Timestamp) - }) +func generateSamples(start, end int64, value float64) []logproto.Sample { + var samples []logproto.Sample + for i := start; i <= end; i++ { + samples = append(samples, logproto.Sample{ + Timestamp: time.Unix(i, 0).UnixNano(), + Value: value, + }) + } + return samples +} + +func setupTestSampleData(t *testing.T, storage *MockStorage, metastore *MockMetastore, tenantID string, testData []struct { + labels labels.Labels + samples []logproto.Sample +}, +) { + total := 0 + for i, data := range testData { + segmentID := fmt.Sprintf("segment%d", i) + writer, err := wal.NewWalSegmentWriter() + require.NoError(t, err) + total += len(data.samples) + + // Convert samples to entries for the WAL writer + entries := make([]*logproto.Entry, len(data.samples)) + for i, sample := range data.samples { + entries[i] = &logproto.Entry{ + Timestamp: time.Unix(0, sample.Timestamp), + Line: fmt.Sprintf("%f", sample.Value), + } + } + + writer.Append(tenantID, data.labels.String(), data.labels, entries, time.Now()) + + var buf bytes.Buffer + _, err = writer.WriteTo(&buf) + require.NoError(t, err) - var prevTimestamp time.Time - for _, entry := range entries { - assert.True(t, entry.Timestamp.After(prevTimestamp) || entry.Timestamp.Equal(prevTimestamp), - "Timestamps should be in non-decreasing order") - prevTimestamp = entry.Timestamp + segmentData := buf.Bytes() + storage.PutObject(wal.Dir+segmentID, segmentData) - assert.Contains(t, entry.Line, "line", "Log line should contain 'line'") - assert.True(t, entry.Timestamp.Unix() >= 1000 && entry.Timestamp.Unix() <= 1400, "Timestamp should be between 1000 and 1400") + blockMeta := writer.Meta(segmentID) + metastore.AddBlock(tenantID, blockMeta) } + t.Log("Total samples in storage:", total) } From cfd0fbdc353a4eebcc296604bedb16bc3e3e9926 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 2 Aug 2024 09:02:25 +0200 Subject: [PATCH 13/16] make format --- pkg/querier-rf1/wal/querier_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/querier-rf1/wal/querier_test.go b/pkg/querier-rf1/wal/querier_test.go index be6d5191df78..5d446b851590 100644 --- a/pkg/querier-rf1/wal/querier_test.go +++ b/pkg/querier-rf1/wal/querier_test.go @@ -15,6 +15,7 @@ import ( "google.golang.org/grpc" "github.com/grafana/dskit/user" + "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" From 3bd8918465d232487000bd42c547722607f661bf Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 2 Aug 2024 10:34:03 +0200 Subject: [PATCH 14/16] fix: Make sure we don't overwrite matchers while sorting --- pkg/loki/modules.go | 2 +- pkg/querier-rf1/wal/querier.go | 9 +++++++-- pkg/storage/wal/index/index.go | 4 +--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index b04d2b197086..d823f5cedb5c 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1823,7 +1823,7 @@ func (t *Loki) initMetastore() (services.Service, error) { return nil, nil } if t.Cfg.isTarget(All) { - t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%s", t.Cfg.Server.GRPCListenAddress) + t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%d", t.Cfg.Server.GRPCListenPort) } m, err := metastore.New(t.Cfg.Metastore, log.With(util_log.Logger, "component", "metastore"), prometheus.DefaultRegisterer, t.health) if err != nil { diff --git a/pkg/querier-rf1/wal/querier.go b/pkg/querier-rf1/wal/querier.go index 5c9152f2584d..ed43a671e606 100644 --- a/pkg/querier-rf1/wal/querier.go +++ b/pkg/querier-rf1/wal/querier.go @@ -134,11 +134,16 @@ func (q *Querier) matchingChunks(ctx context.Context, tenantID string, from, thr return lazyChunks, nil } -func (q *Querier) forSeries(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(string, *labels.ScratchBuilder, *chunks.Meta) error, ms ...*labels.Matcher) error { +func (q *Querier) forSeries(ctx context.Context, req *metastorepb.ListBlocksForQueryRequest, fn func(string, *labels.ScratchBuilder, *chunks.Meta) error, matchers ...*labels.Matcher) error { + // copy matchers to avoid modifying the original slice. + ms := make([]*labels.Matcher, 0, len(matchers)+1) + ms = append(ms, matchers...) + ms = append(ms, labels.MustNewMatcher(labels.MatchEqual, index.TenantLabel, req.TenantId)) + return q.forIndices(ctx, req, func(ir *index.Reader, id string) error { bufLbls := labels.ScratchBuilder{} chunks := make([]chunks.Meta, 0, 1) - p, err := ir.PostingsForMatchers(ctx, req.TenantId, ms...) + p, err := ir.PostingsForMatchers(ctx, ms...) if err != nil { return err } diff --git a/pkg/storage/wal/index/index.go b/pkg/storage/wal/index/index.go index d4d35c0266ed..8959824c9278 100644 --- a/pkg/storage/wal/index/index.go +++ b/pkg/storage/wal/index/index.go @@ -1929,9 +1929,7 @@ func yoloString(b []byte) string { // PostingsForMatchers assembles a single postings iterator against the index reader // based on the given matchers. The resulting postings are not ordered by series. -func (r *Reader) PostingsForMatchers(ctx context.Context, tenantID string, ms ...*labels.Matcher) (index.Postings, error) { - ms = append(ms, labels.MustNewMatcher(labels.MatchEqual, TenantLabel, tenantID)) - +func (r *Reader) PostingsForMatchers(ctx context.Context, ms ...*labels.Matcher) (index.Postings, error) { var its, notIts []index.Postings // See which label must be non-empty. // Optimization for case like {l=~".", l!="1"}. From 3b192050cdd9b15a479c262396010a7f903e4db5 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Fri, 2 Aug 2024 14:15:44 +0100 Subject: [PATCH 15/16] Copy chunks.Meta in lazy chunk --- pkg/querier-rf1/wal/chunks.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/querier-rf1/wal/chunks.go b/pkg/querier-rf1/wal/chunks.go index 0216fdabc9aa..76070006aff4 100644 --- a/pkg/querier-rf1/wal/chunks.go +++ b/pkg/querier-rf1/wal/chunks.go @@ -38,8 +38,12 @@ func newChunkData(id string, lbs *labels.ScratchBuilder, meta *chunks.Meta) Chun } newLbs = newLbs[:j] return ChunkData{ - id: id, - meta: meta, + id: id, + meta: &chunks.Meta{ // incoming Meta is from a shared buffer, so create a new one + Ref: meta.Ref, + MinTime: meta.MinTime, + MaxTime: meta.MaxTime, + }, labels: newLbs, } } From 1d135fbc3830bd6394d7766c2608f97311a08cf8 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Fri, 2 Aug 2024 15:16:18 +0100 Subject: [PATCH 16/16] Add span for matchingChunks --- pkg/querier-rf1/wal/querier.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/querier-rf1/wal/querier.go b/pkg/querier-rf1/wal/querier.go index ed43a671e606..0fb2cc23dc52 100644 --- a/pkg/querier-rf1/wal/querier.go +++ b/pkg/querier-rf1/wal/querier.go @@ -6,6 +6,7 @@ import ( "io" "sync" + "github.com/opentracing/opentracing-go" "github.com/prometheus/prometheus/model/labels" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -112,6 +113,8 @@ func (q *Querier) SelectSamples(ctx context.Context, req logql.SelectSampleParam } func (q *Querier) matchingChunks(ctx context.Context, tenantID string, from, through int64, matchers ...*labels.Matcher) ([]ChunkData, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "matchingChunks") + defer sp.Finish() // todo support sharding var ( lazyChunks []ChunkData @@ -131,6 +134,9 @@ func (q *Querier) matchingChunks(ctx context.Context, tenantID string, from, thr if err != nil { return nil, err } + if sp != nil { + sp.LogKV("matchedChunks", len(lazyChunks)) + } return lazyChunks, nil }