Skip to content

Commit d3a2c83

Browse files
authored
feat(log): add time clipping push down (openGemini#584)
Signed-off-by: noFloat <[email protected]>
1 parent 7901971 commit d3a2c83

9 files changed

+87
-9
lines changed

engine/comm/cursor.go

+5
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ type KeyCursor interface {
4242
NextAggData() (*record.Record, *FileInfo, error)
4343
}
4444

45+
type TimeCutKeyCursor interface {
46+
KeyCursor
47+
UpdateTime(time int64)
48+
}
49+
4550
type FileInfo struct {
4651
MinTime int64
4752
MaxTime int64

engine/hybrid_store_reader.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ func (r *HybridStoreReader) initDetachedFileReader(frags executor.IndexFrags) (c
247247
if err != nil {
248248
return nil, err
249249
}
250-
if r.schema.Options().CanTimeLimitPushDown() {
250+
if r.schema.Options().CanLimitPushDown() {
251251
sortLimitCursor := immutable.NewSortLimitCursor(r.schema.Options(), r.readerCtx.GetSchemas(), fileReader)
252252
return sortLimitCursor, nil
253253
}

engine/hybridqp/compiler.go

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type Options interface {
6060
GetIterId() int32
6161
IsIncQuery() bool
6262
CanTimeLimitPushDown() bool
63+
CanLimitPushDown() bool
6364
SetPromQuery(bool)
6465
IsPromQuery() bool
6566
IsPromInstantQuery() bool

engine/immutable/detached_metadata.go

-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ type SegmentTask struct {
6161
func (reader *DetachedMetaDataReader) InitReadBatch(s []*SegmentMeta, schema record.Schemas) {
6262
reader.task = make(map[int]*SegmentTask, len(s))
6363
reader.positionMap = make(map[int64]*position, len(s))
64-
6564
offset := make([]int64, 0, len(s)*len(schema))
6665
length := make([]int64, 0, len(s)*len(schema))
6766
startSegmentIndex := -1

engine/immutable/sort_limit_cursor.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ type SortLimitCursor struct {
2929
isRead bool
3030
options hybridqp.Options
3131
schemas record.Schemas
32-
input comm.KeyCursor
32+
input comm.TimeCutKeyCursor
3333
sortHeap *SortLimitRows
3434
}
3535

36-
func NewSortLimitCursor(options hybridqp.Options, schemas record.Schemas, input comm.KeyCursor) *SortLimitCursor {
36+
func NewSortLimitCursor(options hybridqp.Options, schemas record.Schemas, input comm.TimeCutKeyCursor) *SortLimitCursor {
3737
sortIndex := make([]int, len(options.GetSortFields()))
3838
for fk, field := range options.GetSortFields() {
3939
for sk, v := range schemas {
@@ -116,6 +116,10 @@ func (t *SortLimitCursor) Next() (*record.Record, comm.SeriesInfoIntf, error) {
116116
heap.Push(t.sortHeap, data)
117117
}
118118
}
119+
120+
if t.options.CanTimeLimitPushDown() && t.sortHeap.Len() >= t.options.GetLimit() {
121+
t.input.UpdateTime(t.sortHeap.rows[0][len(t.schemas)-1].(int64))
122+
}
119123
}
120124
t.isRead = true
121125
return t.sortHeap.PopToRec(), nil, nil

engine/immutable/sort_limit_cursor_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -253,3 +253,6 @@ func (t *MockCursor) SetOps(ops []*comm.CallOption) {
253253
func (t *MockCursor) NextAggData() (*record.Record, *comm.FileInfo, error) {
254254
return nil, nil, nil
255255
}
256+
257+
func (t *MockCursor) UpdateTime(time int64) {
258+
}

engine/immutable/tssp_file_detached_reader.go

+17-4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type TSSPFileDetachedReader struct {
5050
filterSeqTime int64
5151
filterSeqId int64
5252
filterSeqFunc func(int64, int64) bool
53+
tr util.TimeRange
5354

5455
metaDataQueue MetaControl
5556
metaIndex []*MetaIndex
@@ -74,9 +75,10 @@ func NewTSSPFileDetachedReader(metaIndex []*MetaIndex, blocks [][]int, ctx *File
7475
blocks: blocks,
7576
obsOpts: path.Option(),
7677
ctx: ctx,
77-
metaDataQueue: NewMetaControl(true, chunkMetaReadNum),
78+
metaDataQueue: NewMetaControl(ctx.readCtx.Ascending, chunkMetaReadNum),
7879
unnest: unnest,
7980
seqIndex: -1,
81+
tr: util.TimeRange{Min: ctx.tr.Min, Max: ctx.tr.Max},
8082
}
8183
if config.IsLogKeeper() && options.GetLogQueryCurrId() != "" && options.GetLimit() > 0 {
8284
err := r.parseSeqId(options)
@@ -203,6 +205,14 @@ func (t *TSSPFileDetachedReader) GetSchema() record.Schemas {
203205
return t.ctx.schemas
204206
}
205207

208+
func (t *TSSPFileDetachedReader) UpdateTime(time int64) {
209+
if t.ctx.readCtx.Ascending {
210+
t.tr.Max = time
211+
} else {
212+
t.tr.Min = time
213+
}
214+
}
215+
206216
func (t *TSSPFileDetachedReader) Next() (*record.Record, comm.SeriesInfoIntf, error) {
207217
for {
208218
if !t.isInit {
@@ -251,9 +261,9 @@ func (t *TSSPFileDetachedReader) readBatch() (*record.Record, error) {
251261
func (t *TSSPFileDetachedReader) filterData(rec *record.Record) *record.Record {
252262
if rec != nil && (t.ctx.isOrder || t.isSort) {
253263
if t.ctx.readCtx.Ascending {
254-
rec = FilterByTime(rec, t.ctx.tr)
264+
rec = FilterByTime(rec, t.tr)
255265
} else {
256-
rec = FilterByTimeDescend(rec, t.ctx.tr)
266+
rec = FilterByTimeDescend(rec, t.tr)
257267
}
258268
}
259269

@@ -304,7 +314,10 @@ func (t *TSSPFileDetachedReader) initChunkMeta() (bool, error) {
304314
chunkMetas := make([]*SegmentMeta, 0)
305315
for len(chunkMetas) < chunkReadNum && !t.metaDataQueue.IsEmpty() {
306316
s, _ := t.metaDataQueue.Pop()
307-
chunkMetas = append(chunkMetas, s.(*SegmentMeta))
317+
currTr := s.(*SegmentMeta).chunkMeta.GetTimeRangeBy(s.(*SegmentMeta).id)
318+
if currTr[0] <= t.tr.Max && currTr[1] >= t.tr.Min {
319+
chunkMetas = append(chunkMetas, s.(*SegmentMeta))
320+
}
308321
}
309322
t.dataReader.InitReadBatch(chunkMetas, t.ctx.schemas)
310323
return true, nil

engine/immutable/tssp_file_detached_reader_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -419,3 +419,52 @@ func TestSeqFilterReader(t *testing.T) {
419419

420420
config.SetProductType("basic")
421421
}
422+
423+
func TestSeqFilterReaderWhenUpdataBy(t *testing.T) {
424+
testCompDir := t.TempDir()
425+
config.SetProductType("logkeeper")
426+
_ = fileops.RemoveAll(testCompDir)
427+
sig := interruptsignal.NewInterruptSignal()
428+
defer func() {
429+
sig.Close()
430+
_ = fileops.RemoveAll(testCompDir)
431+
}()
432+
mstName := "mst"
433+
err := writeData(testCompDir, mstName)
434+
if err != nil {
435+
t.Errorf(err.Error())
436+
}
437+
438+
p := path.Join(testCompDir, mstName)
439+
reader, _ := NewDetachedMetaIndexReader(p, nil)
440+
441+
metaIndex, _ := reader.ReadMetaIndex([]int64{16, 56}, []int64{40, 40})
442+
option := query2.ProcessorOptions{
443+
QueryId: 1,
444+
Query: "select * from mst",
445+
}
446+
option.LogQueryCurrId = "1635724829000000000|9^^"
447+
option.Limit = 10
448+
schema3 := []record.Field{
449+
{Name: record.SeqIDField, Type: influx.Field_Type_Int},
450+
{Name: "field2_int", Type: influx.Field_Type_Int},
451+
{Name: "time", Type: influx.Field_Type_Int},
452+
}
453+
decs := NewFileReaderContext(util.TimeRange{Min: 1635724829000000000, Max: 1645724819000000000}, schema3, NewReadContext(true), NewFilterOpts(nil, nil, nil, nil), nil, false)
454+
treader, _ := NewTSSPFileDetachedReader(metaIndex[:1], [][]int{[]int{0, 2}}, decs, sparseindex.NewOBSFilterPath("", p, nil), nil, true, &option)
455+
totalRow := 0
456+
treader.UpdateTime(1635724839000000000)
457+
for {
458+
data, _, _ := treader.Next()
459+
if data == nil {
460+
break
461+
}
462+
assert.Equal(t, data.RowNums(), data.ColVals[0].Len)
463+
totalRow += data.RowNums()
464+
}
465+
if totalRow != 0 {
466+
t.Errorf("tssp_file_detached update time wrong")
467+
}
468+
469+
config.SetProductType("basic")
470+
}

lib/util/lifted/influx/query/select.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -579,10 +579,14 @@ func (opt *ProcessorOptions) GetOptDimension() []string {
579579
return opt.Dimensions
580580
}
581581

582-
func (opt *ProcessorOptions) CanTimeLimitPushDown() bool {
582+
func (opt *ProcessorOptions) CanLimitPushDown() bool {
583583
return opt.Limit > 0 && len(opt.SortFields) > 0
584584
}
585585

586+
func (opt *ProcessorOptions) CanTimeLimitPushDown() bool {
587+
return opt.CanLimitPushDown() && opt.SortFields[0].Name == "time"
588+
}
589+
586590
// Zone returns the zone information for the given time. The offset is in nanoseconds.
587591
func (opt *ProcessorOptions) Zone(ns int64) (string, int64) {
588592
if opt.Location == nil {

0 commit comments

Comments
 (0)