Skip to content

Commit 8cbcb29

Browse files
authored
feat(promql): sampling based on the intersection time of the query and shard (openGemini#598)
Signed-off-by: Jack Liu <[email protected]>
1 parent 9adcfb2 commit 8cbcb29

7 files changed

+108
-25
lines changed

engine/engine_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1625,7 +1625,7 @@ func TestRangeVectorCursorFunc(t *testing.T) {
16251625
opt := &query.ProcessorOptions{Step: 0}
16261626
schema := executor.NewQuerySchema(nil, nil, opt, nil)
16271627
pool := record.NewRecordPool(record.AggPool)
1628-
c := NewRangeVectorCursor(ic, schema, pool)
1628+
c := NewRangeVectorCursor(ic, schema, pool, util.TimeRange{})
16291629
rec := record.NewRecord([]record.Field{{Name: "time", Type: influx.Field_Type_Int}}, false)
16301630
c.getIntervalIndex(rec)
16311631
assert2.Equal(t, c.Name(), "range_vector_cursor")

engine/iterators.go

+32-12
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,17 @@ func (s *shard) CreateCursor(ctx context.Context, schema *executor.QuerySchema)
182182
if (startTime >= shardStartTime && startTime <= shardEndTime) || (endTime >= shardStartTime && endTime <= shardEndTime) {
183183
hasTimeFilter = true
184184
}
185-
185+
iTr := util.TimeRange{}
186+
if schema.Options().IsPromQuery() {
187+
iTr = GetIntersectTimeRange(startTime, endTime, shardStartTime, shardEndTime)
188+
}
186189
immutableReader, mutableReader := s.cloneReaders(schema.Options().OptionsName(), hasTimeFilter, tr)
187190
if cloneMsSpan != nil {
188191
cloneMsSpan.SetNameValue(fmt.Sprintf("order=%d,unorder=%d", len(immutableReader.Orders), len(immutableReader.OutOfOrders)))
189192
cloneMsSpan.Finish()
190193
}
191194

192-
groupCursors, err := s.createGroupCursors(span, schema, lazyInit, result, immutableReader, mutableReader)
195+
groupCursors, err := s.createGroupCursors(span, schema, lazyInit, result, immutableReader, mutableReader, iTr)
193196

194197
// unref file(no need lock here), series iterator will ref/unref file itself
195198
unRefReaders(immutableReader, mutableReader)
@@ -225,7 +228,7 @@ func (s *shard) GetTSSPFiles(mm string, isOrder bool) (*immutable.TSSPFiles, boo
225228
}
226229

227230
func (s *shard) initGroupCursors(querySchema *executor.QuerySchema, parallelism int,
228-
readers *immutable.MmsReaders, memTables *mutable.MemTables) (comm.KeyCursors, error) {
231+
readers *immutable.MmsReaders, memTables *mutable.MemTables, iTr util.TimeRange) (comm.KeyCursors, error) {
229232
var schema record.Schemas
230233
var filterFieldsIdx []int
231234
var filterTags []string
@@ -255,6 +258,7 @@ func (s *shard) initGroupCursors(querySchema *executor.QuerySchema, parallelism
255258
seriesPool: SeriesPool,
256259
tmsMergePool: TsmMergePool,
257260
querySchema: querySchema,
261+
interTr: util.TimeRange{Min: iTr.Min, Max: iTr.Max},
258262
},
259263
querySchema: querySchema,
260264
}
@@ -350,7 +354,7 @@ func getQueryTimeRange(readers *immutable.MmsReaders, querySchema *executor.Quer
350354
}
351355

352356
func (s *shard) createGroupCursors(span *tracing.Span, schema *executor.QuerySchema, lazyInit bool, tagSets []*tsi.TagSetInfo,
353-
readers *immutable.MmsReaders, memTables *mutable.MemTables) ([]comm.KeyCursor, error) {
357+
readers *immutable.MmsReaders, memTables *mutable.MemTables, iTr util.TimeRange) ([]comm.KeyCursor, error) {
354358

355359
parallelism, totalSid := getParallelismNumAndSidNum(schema, tagSets)
356360

@@ -363,7 +367,7 @@ func (s *shard) createGroupCursors(span *tracing.Span, schema *executor.QuerySch
363367
defer groupSpan.Finish()
364368
}
365369

366-
cursors, err := s.initGroupCursors(schema, parallelism, readers, memTables)
370+
cursors, err := s.initGroupCursors(schema, parallelism, readers, memTables, iTr)
367371
if err != nil {
368372
return nil, err
369373
}
@@ -383,7 +387,6 @@ func (s *shard) createGroupCursors(span *tracing.Span, schema *executor.QuerySch
383387
cursors[i].(*groupCursor).span = subGroupSpan
384388
}
385389
}
386-
387390
enableFileCursor := executor.IsEnableFileCursor(schema)
388391
var startGroupIdx int
389392
errs := make([]error, parallelism)
@@ -537,6 +540,7 @@ type idKeyCursorContext struct {
537540
engineType config.EngineType
538541
tr util.TimeRange
539542
queryTr util.TimeRange
543+
interTr util.TimeRange // intersection of the query time and shard time
540544
auxTags []string
541545
schema record.Schemas
542546
readers *immutable.MmsReaders
@@ -772,10 +776,10 @@ func (s *shard) iteratorInit(ctx *idKeyCursorContext, span *tracing.Span, schema
772776
return NewAggregateCursor(itr, schema, ctx.aggPool, ctx.hasAuxTags()), nil
773777
}
774778
if schema.Options().IsInstantVectorSelector() {
775-
return NewInstantVectorCursor(itr, schema, ctx.aggPool), nil
779+
return NewInstantVectorCursor(itr, schema, ctx.aggPool, ctx.interTr), nil
776780
}
777781
if schema.Options().IsRangeVectorSelector() && len(schema.Calls()) > 0 {
778-
return NewRangeVectorCursor(itr, schema, ctx.aggPool), nil
782+
return NewRangeVectorCursor(itr, schema, ctx.aggPool, ctx.interTr), nil
779783
}
780784
}
781785
return itr, nil
@@ -809,9 +813,9 @@ func itrsInit(ctx *idKeyCursorContext, span *tracing.Span, schema *executor.Quer
809813
if !canNotAggOnSeries && (len(schema.Calls()) > 0 && !havePreAgg) && !schema.Options().IsPromQuery() {
810814
itrAgg = NewAggregateCursor(itr, schema, ctx.aggPool, ctx.hasAuxTags())
811815
} else if schema.Options().IsInstantVectorSelector() {
812-
itrAgg = NewInstantVectorCursor(itr, schema, ctx.aggPool)
816+
itrAgg = NewInstantVectorCursor(itr, schema, ctx.aggPool, ctx.interTr)
813817
} else if schema.Options().IsRangeVectorSelector() && len(schema.Calls()) > 0 {
814-
itrAgg = NewRangeVectorCursor(itr, schema, ctx.aggPool)
818+
itrAgg = NewRangeVectorCursor(itr, schema, ctx.aggPool, ctx.interTr)
815819
}
816820
var itrLimit *limitCursor
817821

@@ -868,9 +872,9 @@ func itrsInitWithLimit(ctx *idKeyCursorContext, span *tracing.Span, schema *exec
868872
if !schema.Options().IsPromQuery() {
869873
itrAgg = NewAggregateCursor(itr, schema, ctx.aggPool, ctx.hasAuxTags())
870874
} else if schema.Options().IsInstantVectorSelector() {
871-
itrAgg = NewInstantVectorCursor(itr, schema, ctx.aggPool)
875+
itrAgg = NewInstantVectorCursor(itr, schema, ctx.aggPool, ctx.interTr)
872876
} else if schema.Options().IsRangeVectorSelector() && len(schema.Calls()) > 0 {
873-
itrAgg = NewRangeVectorCursor(itr, schema, ctx.aggPool)
877+
itrAgg = NewRangeVectorCursor(itr, schema, ctx.aggPool, ctx.interTr)
874878
}
875879
}
876880
var itrLimit *limitCursor
@@ -1026,3 +1030,19 @@ func getParallelismNumAndSidNum(schema *executor.QuerySchema, tagSets []*tsi.Tag
10261030

10271031
return parallelism, totalSid
10281032
}
1033+
1034+
// GetIntersectTimeRange used to get intersection of the query time and shard time
1035+
func GetIntersectTimeRange(queryStartTime, queryEndTime, shardStartTime, shardEndTime int64) util.TimeRange {
1036+
tr := util.TimeRange{}
1037+
if queryStartTime <= shardStartTime {
1038+
tr.Min = shardStartTime
1039+
} else {
1040+
tr.Min = queryStartTime
1041+
}
1042+
if queryEndTime <= shardEndTime {
1043+
tr.Max = queryEndTime
1044+
} else {
1045+
tr.Max = shardEndTime
1046+
}
1047+
return tr
1048+
}

engine/iterators_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/openGemini/openGemini/engine/hybridqp"
3131
"github.com/openGemini/openGemini/lib/record"
3232
"github.com/openGemini/openGemini/lib/tracing"
33+
"github.com/openGemini/openGemini/lib/util"
3334
"github.com/openGemini/openGemini/lib/util/lifted/influx/influxql"
3435
"github.com/openGemini/openGemini/lib/util/lifted/influx/query"
3536
"github.com/openGemini/openGemini/lib/util/lifted/vm/protoparser/influx"
@@ -3141,3 +3142,43 @@ func TestGetCtx(t *testing.T) {
31413142
t.Fatal("get wrong ctx")
31423143
}
31433144
}
3145+
3146+
func TestGetIntersectTimeRange(t *testing.T) {
3147+
type args struct {
3148+
queryStartTime int64
3149+
queryEndTime int64
3150+
shardStartTime int64
3151+
shardEndTime int64
3152+
}
3153+
tests := []struct {
3154+
name string
3155+
args args
3156+
want util.TimeRange
3157+
}{
3158+
{
3159+
name: "full contain",
3160+
args: args{queryStartTime: 1, queryEndTime: 5, shardStartTime: 2, shardEndTime: 3},
3161+
want: util.TimeRange{Min: 2, Max: 3},
3162+
},
3163+
{
3164+
name: "left contain",
3165+
args: args{queryStartTime: 1, queryEndTime: 5, shardStartTime: 2, shardEndTime: 6},
3166+
want: util.TimeRange{Min: 2, Max: 5},
3167+
},
3168+
{
3169+
name: "right contain",
3170+
args: args{queryStartTime: 2, queryEndTime: 5, shardStartTime: 1, shardEndTime: 3},
3171+
want: util.TimeRange{Min: 2, Max: 3},
3172+
},
3173+
}
3174+
for _, tt := range tests {
3175+
t.Run(tt.name, func(t *testing.T) {
3176+
assert.Equal(
3177+
t,
3178+
tt.want,
3179+
engine.GetIntersectTimeRange(tt.args.queryStartTime, tt.args.queryEndTime, tt.args.shardStartTime, tt.args.shardEndTime),
3180+
"GetIntersectTimeRange(%v, %v, %v, %v)", tt.args.queryStartTime, tt.args.queryEndTime, tt.args.shardStartTime, tt.args.shardEndTime,
3181+
)
3182+
})
3183+
}
3184+
}

engine/prom_instant_vector_cursor.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/openGemini/openGemini/engine/executor"
2424
"github.com/openGemini/openGemini/engine/hybridqp"
2525
"github.com/openGemini/openGemini/lib/record"
26+
"github.com/openGemini/openGemini/lib/util"
2627
"github.com/openGemini/openGemini/lib/util/lifted/influx/influxql"
2728
"github.com/openGemini/openGemini/lib/util/lifted/vm/protoparser/influx"
2829
)
@@ -48,7 +49,7 @@ type InstantVectorCursor struct {
4849
firstStep int64 // first step of each record for prom sampling
4950
}
5051

51-
func NewInstantVectorCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool) *InstantVectorCursor {
52+
func NewInstantVectorCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool, tr util.TimeRange) *InstantVectorCursor {
5253
c := &InstantVectorCursor{}
5354
c.aggregateCursor = *NewAggregateCursor(input, schema, globalPool, false)
5455
c.aggregateCursor.r = c
@@ -61,11 +62,13 @@ func NewInstantVectorCursor(input comm.KeyCursor, schema *executor.QuerySchema,
6162
c.startSample = c.start + c.lookUpDelta
6263
if c.step == 0 {
6364
c.endSample = c.startSample
65+
c.firstStep = c.startSample
66+
c.reducerParams.lastStep = c.endSample
6467
} else {
6568
c.endSample = c.start + c.lookUpDelta + (c.end-(c.start+c.lookUpDelta))/c.step*c.step
69+
c.firstStep = getCurrStep(c.startSample, c.endSample, c.step, tr.Min)
70+
c.reducerParams.lastStep = getPrevStep(c.startSample, c.endSample, c.step, tr.Max)
6671
}
67-
c.firstStep = c.startSample
68-
c.reducerParams.lastStep = c.endSample
6972
return c
7073
}
7174

@@ -202,6 +205,17 @@ func isSameWindow(
202205
return prevStep == nextStep
203206
}
204207

208+
func getPrevStep(startSample, endSample, step, t int64) int64 {
209+
if t <= startSample {
210+
return startSample
211+
}
212+
if t == endSample {
213+
return t
214+
}
215+
n := (t - startSample) / step
216+
return hybridqp.MinInt64(startSample+n*step, endSample)
217+
}
218+
205219
func getCurrStep(startSample, endSample, step, t int64) int64 {
206220
if t <= startSample {
207221
return startSample

engine/prom_instant_vector_cursor_test.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/openGemini/openGemini/engine/executor"
2626
"github.com/openGemini/openGemini/engine/hybridqp"
2727
"github.com/openGemini/openGemini/lib/record"
28+
"github.com/openGemini/openGemini/lib/util"
2829
"github.com/openGemini/openGemini/lib/util/lifted/influx/influxql"
2930
"github.com/openGemini/openGemini/lib/util/lifted/influx/query"
3031
"github.com/openGemini/openGemini/lib/util/lifted/vm/protoparser/influx"
@@ -45,7 +46,7 @@ func TestInstantVectorCursorSinkPlan(t *testing.T) {
4546
series := executor.NewLogicalSeries(schema)
4647
srcCursor := newReaderKeyCursor(nil)
4748
srcCursor.schema = []record.Field{{Name: "cpu", Type: influx.Field_Type_Float}, {Name: "time", Type: influx.Field_Type_Int}}
48-
promCursor := engine.NewInstantVectorCursor(srcCursor, schema, AggPool)
49+
promCursor := engine.NewInstantVectorCursor(srcCursor, schema, AggPool, util.TimeRange{})
4950
promCursor.SinkPlan(series)
5051
assert.True(t, len(promCursor.GetSchema()) == 2)
5152
schema.Visit(&influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "cpu", Type: influxql.Integer}}})
@@ -65,7 +66,8 @@ func testInstantVectorCursor(
6566
) {
6667
outRecords := make([]*record.Record, 0, len(dstRecords))
6768
srcCursor := newReaderKeyCursor(srcRecords)
68-
promCursor := engine.NewInstantVectorCursor(srcCursor, querySchema, AggPool)
69+
tr := util.TimeRange{Min: querySchema.Options().GetStartTime(), Max: querySchema.Options().GetEndTime()}
70+
promCursor := engine.NewInstantVectorCursor(srcCursor, querySchema, AggPool, tr)
6971
promCursor.SetSchema(inSchema, outSchema, exprOpt)
7072

7173
for {
@@ -282,13 +284,13 @@ func TestInstantVectorCursor(t *testing.T) {
282284

283285
func benchmarkInstantVectorCursor(b *testing.B, recordCount, recordSize, tagPerRecord, intervalPerRecord int,
284286
exprOpt []hybridqp.ExprOptions, schema *executor.QuerySchema, inSchema, outSchema record.Schemas) {
285-
287+
tr := util.TimeRange{Min: schema.Options().GetStartTime(), Max: schema.Options().GetEndTime()}
286288
b.ReportAllocs()
287289
b.ResetTimer()
288290
for i := 0; i < b.N; i++ {
289291
srcRecords := buildBenchSrcRecords(inSchema, recordCount, recordSize, tagPerRecord, intervalPerRecord)
290292
srcCursor := newReaderKeyCursor(srcRecords)
291-
sampleCursor := engine.NewInstantVectorCursor(srcCursor, schema, AggPool)
293+
sampleCursor := engine.NewInstantVectorCursor(srcCursor, schema, AggPool, tr)
292294
sampleCursor.SetSchema(inSchema, outSchema, exprOpt)
293295
b.StartTimer()
294296
for {

engine/prom_range_vector_cursor.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/openGemini/openGemini/engine/executor"
2222
"github.com/openGemini/openGemini/engine/hybridqp"
2323
"github.com/openGemini/openGemini/lib/record"
24+
"github.com/openGemini/openGemini/lib/util"
2425
)
2526

2627
// RangeVectorCursor is used to process the calculation of the function with range duration for
@@ -46,7 +47,7 @@ type RangeVectorCursor struct {
4647
firstStep int64 // first step of each record for prom sampling
4748
}
4849

49-
func NewRangeVectorCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool) *RangeVectorCursor {
50+
func NewRangeVectorCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool, tr util.TimeRange) *RangeVectorCursor {
5051
c := &RangeVectorCursor{}
5152
c.aggregateCursor = *NewAggregateCursor(input, schema, globalPool, false)
5253
c.aggregateCursor.r = c
@@ -60,11 +61,13 @@ func NewRangeVectorCursor(input comm.KeyCursor, schema *executor.QuerySchema, gl
6061
c.startSample = c.start + c.rangeDuration
6162
if c.step == 0 {
6263
c.endSample = c.startSample
64+
c.firstStep = c.startSample
65+
c.reducerParams.lastStep = c.endSample
6366
} else {
6467
c.endSample = c.start + c.rangeDuration + (c.end-(c.start+c.rangeDuration))/c.step*c.step
68+
c.firstStep = getCurrStep(c.startSample, c.endSample, c.step, tr.Min)
69+
c.reducerParams.lastStep = getPrevStep(c.startSample, c.endSample, c.step, tr.Max)
6570
}
66-
c.firstStep = c.startSample
67-
c.reducerParams.lastStep = c.endSample
6871
return c
6972
}
7073

engine/prom_range_vector_cursor_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/openGemini/openGemini/engine/executor"
2626
"github.com/openGemini/openGemini/engine/hybridqp"
2727
"github.com/openGemini/openGemini/lib/record"
28+
"github.com/openGemini/openGemini/lib/util"
2829
"github.com/openGemini/openGemini/lib/util/lifted/influx/influxql"
2930
"github.com/openGemini/openGemini/lib/util/lifted/influx/query"
3031
"github.com/openGemini/openGemini/lib/util/lifted/vm/protoparser/influx"
@@ -126,7 +127,8 @@ func TestRangeVectorCursorSinkPlan(t *testing.T) {
126127
srcCursor := newReaderKeyCursor(nil)
127128
srcCursor.schema = []record.Field{{Name: "cpu", Type: influx.Field_Type_Float}, {Name: "time", Type: influx.Field_Type_Int}}
128129
schema.Visit(&influxql.Call{Name: "rate_prom", Args: []influxql.Expr{&influxql.VarRef{Val: "cpu", Type: influxql.Float}}})
129-
promCursor := engine.NewRangeVectorCursor(srcCursor, schema, AggPool)
130+
tr := util.TimeRange{Min: opt.StartTime, Max: opt.EndTime}
131+
promCursor := engine.NewRangeVectorCursor(srcCursor, schema, AggPool, tr)
130132
assert.Equal(t, promCursor.Name(), "range_vector_cursor")
131133
agg := executor.NewLogicalAggregate(series, schema)
132134
promCursor.SinkPlan(agg)
@@ -144,7 +146,8 @@ func testRangeVectorCursor(
144146
) {
145147
outRecords := make([]*record.Record, 0, len(dstRecords))
146148
srcCursor := newReaderKeyCursor(srcRecords)
147-
promCursor := engine.NewRangeVectorCursor(srcCursor, querySchema, AggPool)
149+
tr := util.TimeRange{Min: querySchema.Options().GetStartTime(), Max: querySchema.Options().GetEndTime()}
150+
promCursor := engine.NewRangeVectorCursor(srcCursor, querySchema, AggPool, tr)
148151
promCursor.SetSchema(inSchema, outSchema, exprOpt)
149152

150153
for {

0 commit comments

Comments
 (0)