Skip to content

Commit d085e62

Browse files
jesusvazquezfrancoposa
authored andcommitted
fix(search): constraint filter panics out of index (#92)
Updated filter function by sorting with slices.sort instead of custom loop that led to out of index. Also updated unit tests for queryable and constraint filter Also rename MatchersToConstraint into MatchersToConstraints Signed-off-by: Jesus Vazquez <[email protected]> Signed-off-by: francoposa <[email protected]>
1 parent b98fda4 commit d085e62

File tree

4 files changed

+183
-35
lines changed

4 files changed

+183
-35
lines changed

queryable/parquet_queryable.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage
359359

360360
for rgi := range rowGroupCount {
361361
errGroup.Go(func() error {
362-
cs, err := search.MatchersToConstraint(matchers...)
362+
cs, err := search.MatchersToConstraints(matchers...)
363363
if err != nil {
364364
return err
365365
}
@@ -421,7 +421,7 @@ func (b queryableShard) LabelNames(ctx context.Context, limit int64, matchers []
421421

422422
for rgi := range b.shard.LabelsFile().RowGroups() {
423423
errGroup.Go(func() error {
424-
cs, err := search.MatchersToConstraint(matchers...)
424+
cs, err := search.MatchersToConstraints(matchers...)
425425
if err != nil {
426426
return err
427427
}
@@ -461,7 +461,7 @@ func (b queryableShard) LabelValues(ctx context.Context, name string, limit int6
461461

462462
for rgi := range b.shard.LabelsFile().RowGroups() {
463463
errGroup.Go(func() error {
464-
cs, err := search.MatchersToConstraint(matchers...)
464+
cs, err := search.MatchersToConstraints(matchers...)
465465
if err != nil {
466466
return err
467467
}

queryable/parquet_queryable_test.go

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (st *acceptanceTestStorage) Querier(from, to int64) (prom_storage.Querier,
101101

102102
h := st.st.Head()
103103
data := util.TestData{MinTime: h.MinTime(), MaxTime: h.MaxTime()}
104-
block := convertToParquet(st.t, context.Background(), bkt, data, h)
104+
block := convertToParquet(st.t, context.Background(), bkt, data, h, nil)
105105

106106
q, err := createQueryable(block)
107107
if err != nil {
@@ -163,25 +163,37 @@ func TestQueryable(t *testing.T) {
163163
require.NoError(t, err)
164164

165165
testCases := map[string]struct {
166-
ops []storage.FileOption
166+
storageOpts []storage.FileOption
167+
convertOpts []convert.ConvertOption
167168
}{
168169
"default": {
169-
ops: []storage.FileOption{},
170+
storageOpts: []storage.FileOption{},
171+
convertOpts: defaultConvertOpts,
170172
},
171173
"skipBloomFilters": {
172-
ops: []storage.FileOption{
174+
storageOpts: []storage.FileOption{
173175
storage.WithFileOptions(
174176
parquet.SkipBloomFilters(true),
175177
parquet.OptimisticRead(true),
176178
),
177179
},
180+
convertOpts: defaultConvertOpts,
181+
},
182+
"multipleSortingColumns": {
183+
storageOpts: []storage.FileOption{},
184+
convertOpts: []convert.ConvertOption{
185+
convert.WithName("shard"),
186+
convert.WithColDuration(time.Hour),
187+
convert.WithRowGroupSize(500),
188+
convert.WithPageBufferSize(300),
189+
convert.WithSortBy(fmt.Sprintf("%s,%s", labels.MetricName, "label_name_1")),
190+
},
178191
},
179192
}
180193

181194
for n, tc := range testCases {
182195
t.Run(n, func(t *testing.T) {
183-
// Convert to Parquet
184-
shard := convertToParquet(t, ctx, bkt, data, st.Head(), tc.ops...)
196+
shard := convertToParquet(t, ctx, bkt, data, st.Head(), tc.convertOpts, tc.storageOpts...)
185197

186198
t.Run("QueryByUniqueLabel", func(t *testing.T) {
187199
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "unique_0")}
@@ -210,6 +222,25 @@ func TestQueryable(t *testing.T) {
210222
}
211223
})
212224

225+
t.Run("QueryByMultipleLabels", func(t *testing.T) {
226+
for i := 0; i < 50; i++ {
227+
name := fmt.Sprintf("metric_%d", rand.Int()%cfg.TotalMetricNames)
228+
matchers := []*labels.Matcher{
229+
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, name),
230+
labels.MustNewMatcher(labels.MatchEqual, "label_name_1", "label_value_1"),
231+
}
232+
sFound := queryWithQueryable(t, data.MinTime, data.MaxTime, shard, nil, matchers...)
233+
totalFound := 0
234+
for _, series := range sFound {
235+
totalFound++
236+
require.Equal(t, series.Labels().Get(labels.MetricName), name)
237+
require.Equal(t, series.Labels().Get("label_name_1"), "label_value_1")
238+
require.Contains(t, data.SeriesHash, series.Labels().Hash())
239+
}
240+
require.Equal(t, cfg.MetricsPerMetricName, totalFound)
241+
}
242+
})
243+
213244
t.Run("QueryByUniqueLabel and SkipChunks=true", func(t *testing.T) {
214245
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "unique_0")}
215246
hints := &prom_storage.SelectHints{
@@ -718,7 +749,7 @@ func BenchmarkSelect(b *testing.B) {
718749

719750
cbkt := newCountingBucket(bkt)
720751
data := util.TestData{MinTime: h.MinTime(), MaxTime: h.MaxTime()}
721-
block := convertToParquetForBenchWithCountingBucket(b, ctx, bkt, cbkt, data, h)
752+
block := convertToParquetForBenchWithCountingBucket(b, ctx, bkt, cbkt, data, h, nil)
722753
queryable, err := createQueryable(block)
723754
require.NoError(b, err, "unable to create queryable")
724755

@@ -754,18 +785,25 @@ func BenchmarkSelect(b *testing.B) {
754785
}
755786
}
756787

757-
func convertToParquet(t *testing.T, ctx context.Context, bkt *bucket, data util.TestData, h convert.Convertible, opts ...storage.FileOption) storage.ParquetShard {
758-
colDuration := time.Hour
788+
var defaultConvertOpts = []convert.ConvertOption{
789+
convert.WithName("shard"),
790+
convert.WithColDuration(time.Hour),
791+
convert.WithRowGroupSize(500),
792+
convert.WithPageBufferSize(300),
793+
}
794+
795+
func convertToParquet(t *testing.T, ctx context.Context, bkt *bucket, data util.TestData, h convert.Convertible, convertOpts []convert.ConvertOption, opts ...storage.FileOption) storage.ParquetShard {
796+
if convertOpts == nil {
797+
convertOpts = defaultConvertOpts
798+
}
799+
759800
shards, err := convert.ConvertTSDBBlock(
760801
ctx,
761802
bkt,
762803
data.MinTime,
763804
data.MaxTime,
764805
[]convert.Convertible{h},
765-
convert.WithName("shard"),
766-
convert.WithColDuration(colDuration),
767-
convert.WithRowGroupSize(500),
768-
convert.WithPageBufferSize(300),
806+
convertOpts...,
769807
)
770808
if err != nil {
771809
t.Fatalf("error converting to parquet: %v", err)
@@ -785,18 +823,18 @@ func convertToParquet(t *testing.T, ctx context.Context, bkt *bucket, data util.
785823
return shard
786824
}
787825

788-
func convertToParquetForBenchWithCountingBucket(tb testing.TB, ctx context.Context, bkt *bucket, cbkt *countingBucket, data util.TestData, h convert.Convertible, opts ...storage.FileOption) storage.ParquetShard {
789-
colDuration := time.Hour
826+
func convertToParquetForBenchWithCountingBucket(tb testing.TB, ctx context.Context, bkt *bucket, cbkt *countingBucket, data util.TestData, h convert.Convertible, convertOpts []convert.ConvertOption, opts ...storage.FileOption) storage.ParquetShard {
827+
if convertOpts == nil {
828+
convertOpts = defaultConvertOpts
829+
}
830+
790831
shards, err := convert.ConvertTSDBBlock(
791832
ctx,
792833
bkt,
793834
data.MinTime,
794835
data.MaxTime,
795836
[]convert.Convertible{h},
796-
convert.WithName("shard"),
797-
convert.WithColDuration(colDuration),
798-
convert.WithRowGroupSize(500),
799-
convert.WithPageBufferSize(300),
837+
convertOpts...,
800838
)
801839
if err != nil {
802840
tb.Fatalf("error converting to parquet: %v", err)

search/constraint.go

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type Constraint interface {
4040
path() string
4141
}
4242

43-
func MatchersToConstraint(matchers ...*labels.Matcher) ([]Constraint, error) {
43+
func MatchersToConstraints(matchers ...*labels.Matcher) ([]Constraint, error) {
4444
r := make([]Constraint, 0, len(matchers))
4545
for _, matcher := range matchers {
4646
switch matcher.Type {
@@ -76,23 +76,44 @@ func Initialize(f storage.ParquetFileView, cs ...Constraint) error {
7676
return nil
7777
}
7878

79+
// sortConstraintsBySortingColumns reorders constraints to prioritize those that match sorting columns.
80+
// Constraints matching sorting columns are moved to the front, ordered by the sorting column priority.
81+
// Other constraints maintain their original relative order.
82+
func sortConstraintsBySortingColumns(cs []Constraint, sc []parquet.SortingColumn) {
83+
if len(sc) == 0 {
84+
return // No sorting columns, nothing to do
85+
}
86+
87+
sortingPaths := make(map[string]int, len(sc))
88+
for i, col := range sc {
89+
sortingPaths[col.Path()[0]] = i
90+
}
91+
92+
// Sort constraints: sorting column constraints first (by their order in sc), then others
93+
slices.SortStableFunc(cs, func(a, b Constraint) int {
94+
aIdx, aIsSorting := sortingPaths[a.path()]
95+
bIdx, bIsSorting := sortingPaths[b.path()]
96+
97+
if aIsSorting && bIsSorting {
98+
return aIdx - bIdx // Sort by sorting column order
99+
}
100+
if aIsSorting {
101+
return -1 // a comes first
102+
}
103+
if bIsSorting {
104+
return 1 // b comes first
105+
}
106+
return 0 // preserve original order for non-sorting constraints
107+
})
108+
}
109+
79110
func Filter(ctx context.Context, s storage.ParquetShard, rgIdx int, cs ...Constraint) ([]RowRange, error) {
80111
rg := s.LabelsFile().RowGroups()[rgIdx]
81112
// Constraints for sorting columns are cheaper to evaluate, so we sort them first.
82113
sc := rg.SortingColumns()
83114

84-
var n int
85-
for i := range sc {
86-
if n == len(cs) {
87-
break
88-
}
89-
for j := range cs {
90-
if cs[j].path() == sc[i].Path()[0] {
91-
cs[n], cs[j] = cs[j], cs[n]
92-
n++
93-
}
94-
}
95-
}
115+
sortConstraintsBySortingColumns(cs, sc)
116+
96117
var err error
97118
rr := []RowRange{{From: int64(0), Count: rg.NumRows()}}
98119
for i := range cs {

search/constraint_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,3 +507,92 @@ func TestFilter(t *testing.T) {
507507
}
508508
})
509509
}
510+
511+
// Mock constraint for testing constraint ordering
512+
type mockConstraint struct {
513+
pathName string
514+
}
515+
516+
func (m *mockConstraint) String() string { return fmt.Sprintf("mock(%s)", m.pathName) }
517+
func (m *mockConstraint) path() string { return m.pathName }
518+
func (m *mockConstraint) init(f *storage.ParquetFile) error { return nil }
519+
func (m *mockConstraint) filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) {
520+
return rr, nil
521+
}
522+
523+
type mockSortingColumn struct {
524+
pathName string
525+
}
526+
527+
func (m *mockSortingColumn) Path() []string { return []string{m.pathName} }
528+
func (m *mockSortingColumn) Descending() bool { return false }
529+
func (m *mockSortingColumn) NullsFirst() bool { return false }
530+
531+
func TestSortConstraintsBySortingColumns(t *testing.T) {
532+
tests := []struct {
533+
name string
534+
sortingColumns []string
535+
constraints []string
536+
expectedOrder []string
537+
}{
538+
{
539+
name: "no sorting columns",
540+
sortingColumns: []string{},
541+
constraints: []string{"a", "b", "c"},
542+
expectedOrder: []string{"a", "b", "c"}, // original order preserved
543+
},
544+
{
545+
name: "single sorting column with matching constraint",
546+
sortingColumns: []string{"b"},
547+
constraints: []string{"a", "b", "c"},
548+
expectedOrder: []string{"b", "a", "c"}, // b moved to front
549+
},
550+
{
551+
name: "multiple sorting columns with matching constraints",
552+
sortingColumns: []string{"c", "a"},
553+
constraints: []string{"a", "b", "c", "d"},
554+
expectedOrder: []string{"c", "a", "b", "d"}, // c first (sc[0]), then a (sc[1])
555+
},
556+
{
557+
name: "multiple constraints per sorting column",
558+
sortingColumns: []string{"x", "y"},
559+
constraints: []string{"a", "x", "b", "x", "y", "c"},
560+
expectedOrder: []string{"x", "x", "y", "a", "b", "c"}, // all x constraints first, then y, then others
561+
},
562+
{
563+
name: "sorting columns with no matching constraints",
564+
sortingColumns: []string{"x", "y"},
565+
constraints: []string{"a", "b", "c"},
566+
expectedOrder: []string{"a", "b", "c"}, // original order preserved
567+
},
568+
{
569+
name: "mixed scenario",
570+
sortingColumns: []string{"col1", "col2", "col3"},
571+
constraints: []string{"other1", "col2", "col1", "other2", "col1", "col3"},
572+
expectedOrder: []string{"col1", "col1", "col2", "col3", "other1", "other2"}, // sorting cols by priority, then others
573+
},
574+
}
575+
576+
for _, tt := range tests {
577+
t.Run(tt.name, func(t *testing.T) {
578+
var sortingColumns []parquet.SortingColumn
579+
for _, colName := range tt.sortingColumns {
580+
sortingColumns = append(sortingColumns, &mockSortingColumn{pathName: colName})
581+
}
582+
583+
var constraints []Constraint
584+
for _, constraintPath := range tt.constraints {
585+
constraints = append(constraints, &mockConstraint{pathName: constraintPath})
586+
}
587+
588+
sortConstraintsBySortingColumns(constraints, sortingColumns)
589+
590+
var actualOrder []string
591+
for _, c := range constraints {
592+
actualOrder = append(actualOrder, c.path())
593+
}
594+
595+
require.Equal(t, tt.expectedOrder, actualOrder, "expected order %v, got %v", tt.expectedOrder, actualOrder)
596+
})
597+
}
598+
}

0 commit comments

Comments
 (0)