diff --git a/pkg/engine/basic_engine.go b/pkg/engine/basic_engine.go index d49bcab8fb972..570c1157850ea 100644 --- a/pkg/engine/basic_engine.go +++ b/pkg/engine/basic_engine.go @@ -209,7 +209,7 @@ func (e *Basic) Execute(ctx context.Context, params logql.Params) (logqlmodel.Re var builder ResultBuilder switch params.GetExpression().(type) { case syntax.LogSelectorExpr: - builder = newStreamsResultBuilder() + builder = newStreamsResultBuilder(params.Direction()) case syntax.SampleExpr: if params.Step() > 0 { builder = newMatrixResultBuilder() diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index 7a92eb44d42ff..8293cec599f6e 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -31,8 +31,9 @@ var ( _ ResultBuilder = &matrixResultBuilder{} ) -func newStreamsResultBuilder() *streamsResultBuilder { +func newStreamsResultBuilder(dir logproto.Direction) *streamsResultBuilder { return &streamsResultBuilder{ + direction: dir, data: make(logqlmodel.Streams, 0), streams: make(map[string]int), rowBuilders: nil, @@ -40,6 +41,8 @@ func newStreamsResultBuilder() *streamsResultBuilder { } type streamsResultBuilder struct { + direction logproto.Direction + streams map[string]int data logqlmodel.Streams count int @@ -213,7 +216,7 @@ func (b *streamsResultBuilder) resetRowBuilder(i int) { } func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int)) { - for rowIdx := 0; rowIdx < numRows; rowIdx++ { + for rowIdx := range numRows { if col.IsNull(rowIdx) { continue } @@ -222,6 +225,19 @@ func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int)) } func (b *streamsResultBuilder) Build(s stats.Result, md *metadata.Context) logqlmodel.Result { + // Executor does not guarantee order of entries, so we sort them here. + for _, stream := range b.data { + if b.direction == logproto.BACKWARD { + sort.Slice(stream.Entries, func(a, b int) bool { + return stream.Entries[a].Timestamp.After(stream.Entries[b].Timestamp) + }) + } else { + sort.Slice(stream.Entries, func(a, b int) bool { + return stream.Entries[a].Timestamp.Before(stream.Entries[b].Timestamp) + }) + } + } + sort.Sort(b.data) return logqlmodel.Result{ Data: b.data, diff --git a/pkg/engine/compat_bench_test.go b/pkg/engine/compat_bench_test.go index f8b8fcd34ca7e..5c37f2311dc7b 100644 --- a/pkg/engine/compat_bench_test.go +++ b/pkg/engine/compat_bench_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/loki/v3/pkg/engine/internal/semconv" "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/util/arrowtest" ) @@ -68,7 +69,7 @@ func BenchmarkStreamsResultBuilder(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - rb := newStreamsResultBuilder() + rb := newStreamsResultBuilder(logproto.BACKWARD) // Collect records twice on purpose to see how efficient CollectRecord is when the builder already has // some data rb.CollectRecord(record1) diff --git a/pkg/engine/compat_test.go b/pkg/engine/compat_test.go index ca526cc7ee610..4dac7ecad70f9 100644 --- a/pkg/engine/compat_test.go +++ b/pkg/engine/compat_test.go @@ -25,7 +25,7 @@ import ( func TestStreamsResultBuilder(t *testing.T) { t.Run("empty builder returns non-nil result", func(t *testing.T) { - builder := newStreamsResultBuilder() + builder := newStreamsResultBuilder(logproto.BACKWARD) md, _ := metadata.NewContext(t.Context()) require.NotNil(t, builder.Build(stats.Result{}, md).Data) }) @@ -66,7 +66,7 @@ func TestStreamsResultBuilder(t *testing.T) { pipeline := executor.NewBufferedPipeline(record) defer pipeline.Close() - builder := newStreamsResultBuilder() + builder := newStreamsResultBuilder(logproto.BACKWARD) err := collectResult(context.Background(), pipeline, builder) require.NoError(t, err) @@ -147,7 +147,7 @@ func TestStreamsResultBuilder(t *testing.T) { pipeline := executor.NewBufferedPipeline(record) defer pipeline.Close() - builder := newStreamsResultBuilder() + builder := newStreamsResultBuilder(logproto.BACKWARD) err := collectResult(context.Background(), pipeline, builder) require.NoError(t, err) @@ -244,7 +244,7 @@ func TestStreamsResultBuilder(t *testing.T) { record2 := rows2.Record(memory.DefaultAllocator, schema) defer record2.Release() - builder := newStreamsResultBuilder() + builder := newStreamsResultBuilder(logproto.FORWARD) // Collect first record builder.CollectRecord(record1) @@ -300,7 +300,7 @@ func TestStreamsResultBuilder(t *testing.T) { nil, ) - builder := newStreamsResultBuilder() + builder := newStreamsResultBuilder(logproto.BACKWARD) // First record: 5 rows (buffer grows to 5) rows1 := make(arrowtest.Rows, 5) @@ -369,7 +369,7 @@ func TestStreamsResultBuilder(t *testing.T) { nil, ) - builder := newStreamsResultBuilder() + builder := newStreamsResultBuilder(logproto.BACKWARD) // First record: 3 valid rows rows1 := make(arrowtest.Rows, 3) diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index cf38cf295e0ad..f047984d307cf 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -358,7 +358,7 @@ func (e *Engine) collectResult(ctx context.Context, logger log.Logger, params lo var builder ResultBuilder switch params.GetExpression().(type) { case syntax.LogSelectorExpr: - builder = newStreamsResultBuilder() + builder = newStreamsResultBuilder(params.Direction()) case syntax.SampleExpr: if params.Step() > 0 { builder = newMatrixResultBuilder() diff --git a/pkg/engine/internal/arrowagg/mapper.go b/pkg/engine/internal/arrowagg/mapper.go index a550131554ad0..06384fb462e84 100644 --- a/pkg/engine/internal/arrowagg/mapper.go +++ b/pkg/engine/internal/arrowagg/mapper.go @@ -74,15 +74,21 @@ func newMapping(schema *arrow.Schema, to []arrow.Field) *mapping { checked: make(map[*arrow.Schema]struct{}), } - for i, field := range to { + for i, target := range to { // Default to -1 for fields that are not found in the schema. mapping.lookups[i] = -1 - for j, schemaField := range schema.Fields() { - if field.Equal(schemaField) { - mapping.lookups[i] = j - break - } + fieldIdxs := schema.FieldIndices(target.Name) + if len(fieldIdxs) == 0 { + continue + } else if len(fieldIdxs) > 1 { + // this should not occur as FQN should make field names unique. + panic("mapper: multiple fields with the same name in schema") + } + + // this check might be unnecessary given FQN uniqueness? + if schema.Field(fieldIdxs[0]).Equal(target) { + mapping.lookups[i] = fieldIdxs[0] } } diff --git a/pkg/engine/internal/executor/arrow_scalar_compare.go b/pkg/engine/internal/executor/arrow_compare.go similarity index 61% rename from pkg/engine/internal/executor/arrow_scalar_compare.go rename to pkg/engine/internal/executor/arrow_compare.go index c759c4381d3d6..0e5a9c414e5b6 100644 --- a/pkg/engine/internal/executor/arrow_scalar_compare.go +++ b/pkg/engine/internal/executor/arrow_compare.go @@ -6,6 +6,7 @@ import ( "errors" "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/scalar" ) @@ -136,3 +137,89 @@ func compareScalars(left, right scalar.Scalar, nullsFirst bool) (int, error) { return 0, nil } + +// compareArrays compares values at the given indices from two arrays, returning: +// +// - -1 if left < right +// - 0 if left == right +// - 1 if left > right +// +// If nullsFirst is true, then null values are considered to sort before +// non-null values. +// +// compareArrays returns an error if the two arrays are of different types, +// or if the array type is not supported for comparison. +func compareArrays(left, right arrow.Array, leftIdx, rightIdx int, nullsFirst bool) (int, error) { + leftNull := left == nil || !left.IsValid(leftIdx) + rightNull := right == nil || !right.IsValid(rightIdx) + + // First, handle one or both of the values being null. + switch { + case leftNull && rightNull: + return 0, nil + + case leftNull && !rightNull: + if nullsFirst { + return -1, nil + } + return 1, nil + + case !leftNull && rightNull: + if nullsFirst { + return 1, nil + } + return -1, nil + } + + if !arrow.TypeEqual(left.DataType(), right.DataType()) { + // We should never hit this, since compareRow is only called for two arrays + // coming from the same [arrow.Field]. + return 0, errors.New("received arrays of different types") + } + + // Fast-path: if both arrays reference the same underlying data and same index, + // they're equal. This is an optimization for common cases. + if left == right && leftIdx == rightIdx { + return 0, nil + } + + // Switch on the array type to compare the values. This is only composed of + // types we know the query engine uses, and types that we know have clear + // sorting semantics. + // + // Unsupported types are treated as equal for consistent sorting, but + // otherwise it's up to the caller to detect unexpected sort types and reject + // the query. + switch left := left.(type) { + case *array.Binary: + right := right.(*array.Binary) + return bytes.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil + + case *array.Duration: + right := right.(*array.Duration) + return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil + + case *array.Float64: + right := right.(*array.Float64) + return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil + + case *array.Uint64: + right := right.(*array.Uint64) + return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil + + case *array.Int64: + right := right.(*array.Int64) + return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil + + case *array.String: + right := right.(*array.String) + return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil + + case *array.Timestamp: + right := right.(*array.Timestamp) + return cmp.Compare(left.Value(leftIdx), right.Value(rightIdx)), nil + + } + + return 0, nil +} diff --git a/pkg/engine/internal/executor/topk_batch.go b/pkg/engine/internal/executor/topk_batch.go index c806b0711d40a..9cbaad33c0800 100644 --- a/pkg/engine/internal/executor/topk_batch.go +++ b/pkg/engine/internal/executor/topk_batch.go @@ -6,17 +6,20 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/memory" - "github.com/apache/arrow-go/v18/arrow/scalar" "github.com/grafana/loki/v3/pkg/engine/internal/arrowagg" "github.com/grafana/loki/v3/pkg/util/topk" ) // topkBatch calculates the top K rows from a stream of [arrow.Record]s, where -// rows are sorted by the specified Fields. +// rows are ranked by the specified Fields. // -// Rows with equal values for all sort fields are sorted by the order in which +// Rows with equal values for all the fields are ranked by the order in which // they were appended. +// +// topkBatch only identifies which rows belong in the top K, but does not +// guarantee any specific ordering of those rows in the compacted output. Callers +// should sort the result if a specific order is required. type topkBatch struct { // Fields holds the list of fields to sort by, in order of precedence. If an // incoming record is missing one of these fields, the value for that field @@ -144,25 +147,17 @@ func (b *topkBatch) less(left, right *topkReference) bool { leftArray := b.findRecordArray(left.Record, b.mapper, fieldIndex) rightArray := b.findRecordArray(right.Record, b.mapper, fieldIndex) - var leftScalar, rightScalar scalar.Scalar - - if leftArray != nil { - leftScalar, _ = scalar.GetScalar(leftArray, left.Row) - } - if rightArray != nil { - rightScalar, _ = scalar.GetScalar(rightArray, right.Row) - } - - res, err := compareScalars(leftScalar, rightScalar, b.NullsFirst) + // Compare directly from arrays without creating scalars to avoid allocations + res, err := compareArrays(leftArray, rightArray, left.Row, right.Row, b.NullsFirst) if err != nil { - // Treat failure to compare scalars as equal, so that the sort order is + // Treat failure to compare as equal, so that the sort order is // consistent. This should only happen when given invalid values to - // compare, as we know leftScalar and rightScalar are of the same type. + // compare, as we know leftArray and rightArray are of the same type. continue } switch res { case 0: // left == right - // Continue to the next field if two scalars are equal. + // Continue to the next field if two values are equal. continue case -1: // left < right return true @@ -215,9 +210,9 @@ func (b *topkBatch) Size() (rows int, unused int) { // the current top K rows. // // The returned record will have a combined schema from all of the input -// records. The sort order of fields in the returned record is not guaranteed. -// Rows that did not have one of the combined fields will be filled with null -// values for those fields. +// records. Neither the order of fields nor the order of rows in the returned +// record is guaranteed. Rows that did not have one of the +// combined fields will be filled with null values for those fields. // // Compact returns nil if no rows are in the top K. func (b *topkBatch) Compact() arrow.Record { @@ -228,13 +223,23 @@ func (b *topkBatch) Compact() arrow.Record { // Get all row references to compact. rowRefs := b.heap.PopAll() - slices.Reverse(rowRefs) - compactor := arrowagg.NewRecords(memory.DefaultAllocator) + recordRows := make(map[arrow.Record][]int, len(b.usedCount)) for _, ref := range rowRefs { - compactor.AppendSlice(ref.Record, int64(ref.Row), int64(ref.Row)+1) + recordRows[ref.Record] = append(recordRows[ref.Record], ref.Row) + } + + compactor := arrowagg.NewRecords(memory.DefaultAllocator) + for rec, rows := range recordRows { + slices.Sort(rows) + iterContiguousRanges(rows, func(start, end int) bool { + compactor.AppendSlice(rec, int64(start), int64(end)) + return true + }) } + // Rows are grouped by their source record and appended + // in contiguous ranges for efficiency. compacted, err := compactor.Aggregate() if err != nil { // Aggregate should only fail if we didn't aggregate anything, which we @@ -257,3 +262,30 @@ func (b *topkBatch) Reset() { clear(b.usedCount) clear(b.usedSchemas) } + +// iterContiguousRanges iterates over contiguous ranges of row indices from a sorted +// slice. Rows must be sorted in ascending order. +// +// For example, if rows is [1, 2, 3, 5, 6, 7], it will yield two ranges: +// [1, 4) and [5, 8), representing the contiguous sequences. +// +// The function calls yield for each contiguous range found. If yield returns false, +// iteration stops. +func iterContiguousRanges(rows []int, yield func(start, end int) bool) { + if len(rows) == 0 { + return + } + + startRow := rows[0] + for i := 1; i < len(rows); i++ { + // If current row is not contiguous with previous, yield the previous range + if rows[i] != rows[i-1]+1 { + if !yield(startRow, rows[i-1]+1) { + return + } + startRow = rows[i] + } + } + // Yield the final contiguous range + yield(startRow, rows[len(rows)-1]+1) +} diff --git a/pkg/engine/internal/executor/topk_batch_test.go b/pkg/engine/internal/executor/topk_batch_test.go index dc42be5b4e59f..3c9e3a917925f 100644 --- a/pkg/engine/internal/executor/topk_batch_test.go +++ b/pkg/engine/internal/executor/topk_batch_test.go @@ -39,7 +39,7 @@ var ( { // This record contains a nil sort key to test the behaviour of // NullsFirst. - {"ts": nil, "table": "D", "line": "line A"}, + {"table": "D", "line": "line A"}, }, } ) @@ -122,7 +122,8 @@ func Test_topkBatch(t *testing.T) { actual, err := arrowtest.RecordRows(output) require.NoError(t, err) - require.Equal(t, tc.expect, actual) + require.Len(t, actual, len(tc.expect)) + require.ElementsMatch(t, tc.expect, actual, "rows should match (order may differ)") }) } } @@ -202,3 +203,57 @@ func Test_topkBatch_MaxUnused(t *testing.T) { assert.Equal(t, tc.expectUnused, unused, "unexpected number of unused rows after record %d", i) } } + +func Test_iterContiguousRanges(t *testing.T) { + tt := []struct { + name string + rows []int + ranges []struct{ start, end int } + }{ + { + name: "empty slice", + rows: []int{}, + }, + { + name: "single row", + rows: []int{5}, + ranges: []struct{ start, end int }{{5, 6}}, + }, + { + name: "single contiguous range", + rows: []int{1, 2, 3}, + ranges: []struct{ start, end int }{{1, 4}}, + }, + { + name: "multiple contiguous ranges", + rows: []int{1, 2, 3, 5, 6, 7}, + ranges: []struct{ start, end int }{{1, 4}, {5, 8}}, + }, + { + name: "non-contiguous single rows", + rows: []int{1, 3, 5}, + ranges: []struct{ start, end int }{{1, 2}, {3, 4}, {5, 6}}, + }, + { + name: "all rows", + rows: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + ranges: []struct{ start, end int }{{0, 10}}, + }, + { + name: "first and last row", + rows: []int{0, 9}, + ranges: []struct{ start, end int }{{0, 1}, {9, 10}}, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + var got []struct{ start, end int } + iterContiguousRanges(tc.rows, func(start, end int) bool { + got = append(got, struct{ start, end int }{start, end}) + return true + }) + require.Equal(t, tc.ranges, got, "ranges should match") + }) + } +} diff --git a/pkg/engine/internal/executor/topk_test.go b/pkg/engine/internal/executor/topk_test.go index d3f6debc8006f..f969d9cd27a98 100644 --- a/pkg/engine/internal/executor/topk_test.go +++ b/pkg/engine/internal/executor/topk_test.go @@ -72,7 +72,7 @@ func Test_topk(t *testing.T) { rows, err := arrowtest.RecordRows(rec) require.NoError(t, err, "should be able to convert record back to rows") - require.Equal(t, expect, rows, "should return the top 3 rows in ascending order by timestamp") + require.ElementsMatch(t, expect, rows, "should return the top 3 rows") } func Test_topk_emptyPipelines(t *testing.T) { diff --git a/pkg/engine/internal/planner/physical/topk.go b/pkg/engine/internal/planner/physical/topk.go index f763177af5f3e..8a39432e052e0 100644 --- a/pkg/engine/internal/planner/physical/topk.go +++ b/pkg/engine/internal/planner/physical/topk.go @@ -5,8 +5,8 @@ import ( ) // TopK represents a physical plan node that performs topK operation. -// It sorts rows based on sort expressions and limits the result to the top K rows. -// This is equivalent to a SORT followed by a LIMIT operation. +// It ranks rows based on sort expressions and limits the result to the top K rows. +// Implementations may not guarantee the topK rows to be in sorted order. type TopK struct { id string