diff --git a/queryable/parquet_queryable.go b/queryable/parquet_queryable.go index a674c96..82176ef 100644 --- a/queryable/parquet_queryable.go +++ b/queryable/parquet_queryable.go @@ -357,7 +357,7 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage for rgi := range b.shard.LabelsFile().RowGroups() { errGroup.Go(func() error { - cs, err := search.MatchersToConstraint(matchers...) + cs, err := search.MatchersToConstraints(matchers...) if err != nil { return err } @@ -411,7 +411,7 @@ func (b queryableShard) LabelNames(ctx context.Context, limit int64, matchers [] for rgi := range b.shard.LabelsFile().RowGroups() { errGroup.Go(func() error { - cs, err := search.MatchersToConstraint(matchers...) + cs, err := search.MatchersToConstraints(matchers...) if err != nil { return err } @@ -451,7 +451,7 @@ func (b queryableShard) LabelValues(ctx context.Context, name string, limit int6 for rgi := range b.shard.LabelsFile().RowGroups() { errGroup.Go(func() error { - cs, err := search.MatchersToConstraint(matchers...) + cs, err := search.MatchersToConstraints(matchers...) if err != nil { return err } diff --git a/queryable/parquet_queryable_test.go b/queryable/parquet_queryable_test.go index 132bdbd..e174010 100644 --- a/queryable/parquet_queryable_test.go +++ b/queryable/parquet_queryable_test.go @@ -101,7 +101,7 @@ func (st *acceptanceTestStorage) Querier(from, to int64) (prom_storage.Querier, h := st.st.Head() data := util.TestData{MinTime: h.MinTime(), MaxTime: h.MaxTime()} - block := convertToParquet(st.t, context.Background(), bkt, data, h) + block := convertToParquet(st.t, context.Background(), bkt, data, h, nil) q, err := createQueryable(block) if err != nil { @@ -163,25 +163,37 @@ func TestQueryable(t *testing.T) { require.NoError(t, err) testCases := map[string]struct { - ops []storage.FileOption + storageOpts []storage.FileOption + convertOpts []convert.ConvertOption }{ "default": { - ops: []storage.FileOption{}, + storageOpts: []storage.FileOption{}, + convertOpts: defaultConvertOpts, }, "skipBloomFilters": { - ops: []storage.FileOption{ + storageOpts: []storage.FileOption{ storage.WithFileOptions( parquet.SkipBloomFilters(true), parquet.OptimisticRead(true), ), }, + convertOpts: defaultConvertOpts, + }, + "multipleSortingColumns": { + storageOpts: []storage.FileOption{}, + convertOpts: []convert.ConvertOption{ + convert.WithName("shard"), + convert.WithColDuration(time.Hour), + convert.WithRowGroupSize(500), + convert.WithPageBufferSize(300), + convert.WithSortBy(fmt.Sprintf("%s,%s", labels.MetricName, "label_name_1")), + }, }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - // Convert to Parquet - shard := convertToParquet(t, ctx, bkt, data, st.Head(), tc.ops...) + shard := convertToParquet(t, ctx, bkt, data, st.Head(), tc.convertOpts, tc.storageOpts...) t.Run("QueryByUniqueLabel", func(t *testing.T) { matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "unique_0")} @@ -210,6 +222,25 @@ func TestQueryable(t *testing.T) { } }) + t.Run("QueryByMultipleLabels", func(t *testing.T) { + for i := 0; i < 50; i++ { + name := fmt.Sprintf("metric_%d", rand.Int()%cfg.TotalMetricNames) + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, name), + labels.MustNewMatcher(labels.MatchEqual, "label_name_1", "label_value_1"), + } + sFound := queryWithQueryable(t, data.MinTime, data.MaxTime, shard, nil, matchers...) + totalFound := 0 + for _, series := range sFound { + totalFound++ + require.Equal(t, series.Labels().Get(labels.MetricName), name) + require.Equal(t, series.Labels().Get("label_name_1"), "label_value_1") + require.Contains(t, data.SeriesHash, series.Labels().Hash()) + } + require.Equal(t, cfg.MetricsPerMetricName, totalFound) + } + }) + t.Run("QueryByUniqueLabel and SkipChunks=true", func(t *testing.T) { matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "unique_0")} hints := &prom_storage.SelectHints{ @@ -718,7 +749,7 @@ func BenchmarkSelect(b *testing.B) { cbkt := newCountingBucket(bkt) data := util.TestData{MinTime: h.MinTime(), MaxTime: h.MaxTime()} - block := convertToParquetForBenchWithCountingBucket(b, ctx, bkt, cbkt, data, h) + block := convertToParquetForBenchWithCountingBucket(b, ctx, bkt, cbkt, data, h, nil) queryable, err := createQueryable(block) require.NoError(b, err, "unable to create queryable") @@ -754,18 +785,25 @@ func BenchmarkSelect(b *testing.B) { } } -func convertToParquet(t *testing.T, ctx context.Context, bkt *bucket, data util.TestData, h convert.Convertible, opts ...storage.FileOption) storage.ParquetShard { - colDuration := time.Hour +var defaultConvertOpts = []convert.ConvertOption{ + convert.WithName("shard"), + convert.WithColDuration(time.Hour), + convert.WithRowGroupSize(500), + convert.WithPageBufferSize(300), +} + +func convertToParquet(t *testing.T, ctx context.Context, bkt *bucket, data util.TestData, h convert.Convertible, convertOpts []convert.ConvertOption, opts ...storage.FileOption) storage.ParquetShard { + if convertOpts == nil { + convertOpts = defaultConvertOpts + } + shards, err := convert.ConvertTSDBBlock( ctx, bkt, data.MinTime, data.MaxTime, []convert.Convertible{h}, - convert.WithName("shard"), - convert.WithColDuration(colDuration), - convert.WithRowGroupSize(500), - convert.WithPageBufferSize(300), + convertOpts..., ) if err != nil { t.Fatalf("error converting to parquet: %v", err) @@ -785,18 +823,18 @@ func convertToParquet(t *testing.T, ctx context.Context, bkt *bucket, data util. return shard } -func convertToParquetForBenchWithCountingBucket(tb testing.TB, ctx context.Context, bkt *bucket, cbkt *countingBucket, data util.TestData, h convert.Convertible, opts ...storage.FileOption) storage.ParquetShard { - colDuration := time.Hour +func convertToParquetForBenchWithCountingBucket(tb testing.TB, ctx context.Context, bkt *bucket, cbkt *countingBucket, data util.TestData, h convert.Convertible, convertOpts []convert.ConvertOption, opts ...storage.FileOption) storage.ParquetShard { + if convertOpts == nil { + convertOpts = defaultConvertOpts + } + shards, err := convert.ConvertTSDBBlock( ctx, bkt, data.MinTime, data.MaxTime, []convert.Convertible{h}, - convert.WithName("shard"), - convert.WithColDuration(colDuration), - convert.WithRowGroupSize(500), - convert.WithPageBufferSize(300), + convertOpts..., ) if err != nil { tb.Fatalf("error converting to parquet: %v", err) diff --git a/search/constraint.go b/search/constraint.go index f77bcf4..dfd26e9 100644 --- a/search/constraint.go +++ b/search/constraint.go @@ -40,7 +40,7 @@ type Constraint interface { path() string } -func MatchersToConstraint(matchers ...*labels.Matcher) ([]Constraint, error) { +func MatchersToConstraints(matchers ...*labels.Matcher) ([]Constraint, error) { r := make([]Constraint, 0, len(matchers)) for _, matcher := range matchers { switch matcher.Type { @@ -76,23 +76,44 @@ func Initialize(f *storage.ParquetFile, cs ...Constraint) error { return nil } +// sortConstraintsBySortingColumns reorders constraints to prioritize those that match sorting columns. +// Constraints matching sorting columns are moved to the front, ordered by the sorting column priority. +// Other constraints maintain their original relative order. +func sortConstraintsBySortingColumns(cs []Constraint, sc []parquet.SortingColumn) { + if len(sc) == 0 { + return // No sorting columns, nothing to do + } + + sortingPaths := make(map[string]int, len(sc)) + for i, col := range sc { + sortingPaths[col.Path()[0]] = i + } + + // Sort constraints: sorting column constraints first (by their order in sc), then others + slices.SortStableFunc(cs, func(a, b Constraint) int { + aIdx, aIsSorting := sortingPaths[a.path()] + bIdx, bIsSorting := sortingPaths[b.path()] + + if aIsSorting && bIsSorting { + return aIdx - bIdx // Sort by sorting column order + } + if aIsSorting { + return -1 // a comes first + } + if bIsSorting { + return 1 // b comes first + } + return 0 // preserve original order for non-sorting constraints + }) +} + func Filter(ctx context.Context, s storage.ParquetShard, rgIdx int, cs ...Constraint) ([]RowRange, error) { rg := s.LabelsFile().RowGroups()[rgIdx] // Constraints for sorting columns are cheaper to evaluate, so we sort them first. sc := rg.SortingColumns() - var n int - for i := range sc { - if n == len(cs) { - break - } - for j := range cs { - if cs[j].path() == sc[i].Path()[0] { - cs[n], cs[j] = cs[j], cs[n] - n++ - } - } - } + sortConstraintsBySortingColumns(cs, sc) + var err error rr := []RowRange{{From: int64(0), Count: rg.NumRows()}} for i := range cs { diff --git a/search/constraint_test.go b/search/constraint_test.go index 6295731..e8b0643 100644 --- a/search/constraint_test.go +++ b/search/constraint_test.go @@ -507,3 +507,92 @@ func TestFilter(t *testing.T) { } }) } + +// Mock constraint for testing constraint ordering +type mockConstraint struct { + pathName string +} + +func (m *mockConstraint) String() string { return fmt.Sprintf("mock(%s)", m.pathName) } +func (m *mockConstraint) path() string { return m.pathName } +func (m *mockConstraint) init(f *storage.ParquetFile) error { return nil } +func (m *mockConstraint) filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) { + return rr, nil +} + +type mockSortingColumn struct { + pathName string +} + +func (m *mockSortingColumn) Path() []string { return []string{m.pathName} } +func (m *mockSortingColumn) Descending() bool { return false } +func (m *mockSortingColumn) NullsFirst() bool { return false } + +func TestSortConstraintsBySortingColumns(t *testing.T) { + tests := []struct { + name string + sortingColumns []string + constraints []string + expectedOrder []string + }{ + { + name: "no sorting columns", + sortingColumns: []string{}, + constraints: []string{"a", "b", "c"}, + expectedOrder: []string{"a", "b", "c"}, // original order preserved + }, + { + name: "single sorting column with matching constraint", + sortingColumns: []string{"b"}, + constraints: []string{"a", "b", "c"}, + expectedOrder: []string{"b", "a", "c"}, // b moved to front + }, + { + name: "multiple sorting columns with matching constraints", + sortingColumns: []string{"c", "a"}, + constraints: []string{"a", "b", "c", "d"}, + expectedOrder: []string{"c", "a", "b", "d"}, // c first (sc[0]), then a (sc[1]) + }, + { + name: "multiple constraints per sorting column", + sortingColumns: []string{"x", "y"}, + constraints: []string{"a", "x", "b", "x", "y", "c"}, + expectedOrder: []string{"x", "x", "y", "a", "b", "c"}, // all x constraints first, then y, then others + }, + { + name: "sorting columns with no matching constraints", + sortingColumns: []string{"x", "y"}, + constraints: []string{"a", "b", "c"}, + expectedOrder: []string{"a", "b", "c"}, // original order preserved + }, + { + name: "mixed scenario", + sortingColumns: []string{"col1", "col2", "col3"}, + constraints: []string{"other1", "col2", "col1", "other2", "col1", "col3"}, + expectedOrder: []string{"col1", "col1", "col2", "col3", "other1", "other2"}, // sorting cols by priority, then others + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var sortingColumns []parquet.SortingColumn + for _, colName := range tt.sortingColumns { + sortingColumns = append(sortingColumns, &mockSortingColumn{pathName: colName}) + } + + var constraints []Constraint + for _, constraintPath := range tt.constraints { + constraints = append(constraints, &mockConstraint{pathName: constraintPath}) + } + + sortConstraintsBySortingColumns(constraints, sortingColumns) + + var actualOrder []string + for _, c := range constraints { + actualOrder = append(actualOrder, c.path()) + } + + require.Equal(t, tt.expectedOrder, actualOrder, "expected order %v, got %v", tt.expectedOrder, actualOrder) + }) + } +}