Skip to content

Commit 9adcfb2

Browse files
authored
refactor(log): refactor the code for generating cursors (openGemini#597)
Signed-off-by: noFloat <[email protected]>
1 parent 1246a0c commit 9adcfb2

File tree

3 files changed

+140
-102
lines changed

3 files changed

+140
-102
lines changed

engine/hybrid_index_reader.go

+103-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"math"
2222
"strconv"
2323

24+
"github.com/openGemini/openGemini/engine/comm"
2425
"github.com/openGemini/openGemini/engine/executor"
2526
"github.com/openGemini/openGemini/engine/hybridqp"
2627
"github.com/openGemini/openGemini/engine/immutable"
@@ -42,7 +43,7 @@ const (
4243
)
4344

4445
type IndexReader interface {
45-
Next() (executor.IndexFrags, error)
46+
CreateCursors() ([]comm.KeyCursor, int, error)
4647
}
4748

4849
type indexContext struct {
@@ -77,12 +78,14 @@ type attachedIndexReader struct {
7778
ctx *indexContext
7879
info *executor.AttachedIndexInfo
7980
skFileReader []sparseindex.SKFileReader
81+
readerCtx *immutable.FileReaderContext
8082
}
8183

82-
func NewAttachedIndexReader(ctx *indexContext, info *executor.AttachedIndexInfo) *attachedIndexReader {
84+
func NewAttachedIndexReader(ctx *indexContext, info *executor.AttachedIndexInfo, readerCtx *immutable.FileReaderContext) *attachedIndexReader {
8385
return &attachedIndexReader{
84-
info: info,
85-
ctx: ctx,
86+
info: info,
87+
ctx: ctx,
88+
readerCtx: readerCtx,
8689
}
8790
}
8891

@@ -96,6 +99,46 @@ func (r *attachedIndexReader) Init() (err error) {
9699
return
97100
}
98101

102+
func (r *attachedIndexReader) CreateCursors() ([]comm.KeyCursor, int, error) {
103+
var fragCount int
104+
cursors := make([]comm.KeyCursor, 0)
105+
for {
106+
frags, err := r.Next()
107+
if err != nil {
108+
return nil, 0, err
109+
}
110+
if frags == nil {
111+
break
112+
}
113+
fragCount += int(frags.FragCount())
114+
reader, err := r.initFileReader(frags)
115+
if err != nil {
116+
return nil, 0, err
117+
}
118+
cursors = append(cursors, reader)
119+
}
120+
return cursors, fragCount, nil
121+
}
122+
123+
func (r *attachedIndexReader) initFileReader(frags executor.IndexFrags) (comm.KeyCursor, error) {
124+
files, ok := frags.Indexes().([]immutable.TSSPFile)
125+
if !ok {
126+
return nil, fmt.Errorf("invalid index info for attached file reader")
127+
}
128+
129+
var unnest *influxql.Unnest
130+
if r.ctx.schema.HasUnnests() {
131+
unnest = r.ctx.schema.GetUnnests()[0]
132+
}
133+
134+
fragRanges := frags.FragRanges()
135+
fileReader, err := immutable.NewTSSPFileAttachedReader(files, fragRanges, r.readerCtx, r.ctx.schema.Options(), unnest)
136+
if err != nil {
137+
return nil, err
138+
}
139+
return fileReader, nil
140+
}
141+
99142
func (r *attachedIndexReader) Next() (executor.IndexFrags, error) {
100143
if r.info == nil || len(r.info.Files()) == 0 {
101144
return nil, nil
@@ -164,13 +207,68 @@ type detachedIndexReader struct {
164207
ctx *indexContext
165208
skFileReader []sparseindex.SKFileReader
166209
obsOptions *obs.ObsOptions
210+
readerCtx *immutable.FileReaderContext
167211
}
168212

169-
func NewDetachedIndexReader(ctx *indexContext, obsOption *obs.ObsOptions) *detachedIndexReader {
213+
func NewDetachedIndexReader(ctx *indexContext, obsOption *obs.ObsOptions, readerCtx *immutable.FileReaderContext) *detachedIndexReader {
170214
return &detachedIndexReader{
171215
obsOptions: obsOption,
172216
ctx: ctx,
217+
readerCtx: readerCtx,
218+
}
219+
}
220+
221+
func (r *detachedIndexReader) CreateCursors() ([]comm.KeyCursor, int, error) {
222+
var fragCount int
223+
cursors := make([]comm.KeyCursor, 0)
224+
for {
225+
frags, err := r.Next()
226+
if err != nil {
227+
return nil, 0, err
228+
}
229+
if frags == nil {
230+
break
231+
}
232+
fragCount += int(frags.FragCount())
233+
reader, err := r.initFileReader(frags)
234+
if err != nil {
235+
return nil, 0, err
236+
}
237+
cursors = append(cursors, reader)
238+
}
239+
return cursors, fragCount, nil
240+
}
241+
242+
func (r *detachedIndexReader) initFileReader(frags executor.IndexFrags) (comm.KeyCursor, error) {
243+
metaIndexes, ok := frags.Indexes().([]*immutable.MetaIndex)
244+
if !ok {
245+
return nil, fmt.Errorf("invalid index info for detached file reader")
246+
}
247+
fragRanges := frags.FragRanges()
248+
blocks := make([][]int, len(fragRanges))
249+
for i, frs := range fragRanges {
250+
for j := range frs {
251+
for k := frs[j].Start; k < frs[j].End; k++ {
252+
blocks[i] = append(blocks[i], int(k))
253+
}
254+
}
255+
}
256+
257+
var unnest *influxql.Unnest
258+
if r.ctx.schema.HasUnnests() {
259+
unnest = r.ctx.schema.GetUnnests()[0]
260+
}
261+
262+
fileReader, err := immutable.NewTSSPFileDetachedReader(metaIndexes, blocks, r.readerCtx,
263+
sparseindex.NewOBSFilterPath("", frags.BasePath(), r.obsOptions), unnest, true, r.ctx.schema.Options())
264+
if err != nil {
265+
return nil, err
266+
}
267+
if r.ctx.schema.Options().CanLimitPushDown() {
268+
sortLimitCursor := immutable.NewSortLimitCursor(r.ctx.schema.Options(), r.readerCtx.GetSchemas(), fileReader)
269+
return sortLimitCursor, nil
173270
}
271+
return fileReader, nil
174272
}
175273

176274
func (r *detachedIndexReader) Init() (err error) {

engine/hybrid_store_reader.go

+7-79
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/openGemini/openGemini/engine/executor"
2929
"github.com/openGemini/openGemini/engine/hybridqp"
3030
"github.com/openGemini/openGemini/engine/immutable"
31-
"github.com/openGemini/openGemini/engine/index/sparseindex"
3231
"github.com/openGemini/openGemini/lib/binaryfilterfunc"
3332
"github.com/openGemini/openGemini/lib/bitmap"
3433
"github.com/openGemini/openGemini/lib/config"
@@ -203,75 +202,13 @@ func (r *HybridStoreReader) initQueryCtx() (err error) {
203202
return
204203
}
205204

206-
func (r *HybridStoreReader) initAttachedFileReader(frags executor.IndexFrags) (comm.KeyCursor, error) {
207-
files, ok := frags.Indexes().([]immutable.TSSPFile)
208-
if !ok {
209-
return nil, fmt.Errorf("invalid index info for attached file reader")
210-
}
211-
212-
var unnest *influxql.Unnest
213-
if r.schema.HasUnnests() {
214-
unnest = r.schema.GetUnnests()[0]
215-
}
216-
217-
fragRanges := frags.FragRanges()
218-
fileReader, err := immutable.NewTSSPFileAttachedReader(files, fragRanges, r.readerCtx, r.schema.Options(), unnest)
219-
if err != nil {
220-
return nil, err
221-
}
222-
return fileReader, nil
223-
}
224-
225-
func (r *HybridStoreReader) initDetachedFileReader(frags executor.IndexFrags) (comm.KeyCursor, error) {
226-
metaIndexes, ok := frags.Indexes().([]*immutable.MetaIndex)
227-
if !ok {
228-
return nil, fmt.Errorf("invalid index info for detached file reader")
229-
}
230-
fragRanges := frags.FragRanges()
231-
blocks := make([][]int, len(fragRanges))
232-
for i, frs := range fragRanges {
233-
for j := range frs {
234-
for k := frs[j].Start; k < frs[j].End; k++ {
235-
blocks[i] = append(blocks[i], int(k))
236-
}
237-
}
238-
}
239-
240-
var unnest *influxql.Unnest
241-
if r.schema.HasUnnests() {
242-
unnest = r.schema.GetUnnests()[0]
243-
}
244-
245-
fileReader, err := immutable.NewTSSPFileDetachedReader(metaIndexes, blocks, r.readerCtx,
246-
sparseindex.NewOBSFilterPath("", frags.BasePath(), r.obsOptions), unnest, true, r.schema.Options())
247-
if err != nil {
248-
return nil, err
249-
}
250-
if r.schema.Options().CanLimitPushDown() {
251-
sortLimitCursor := immutable.NewSortLimitCursor(r.schema.Options(), r.readerCtx.GetSchemas(), fileReader)
252-
return sortLimitCursor, nil
253-
}
254-
return fileReader, nil
255-
}
256-
257-
func (r *HybridStoreReader) initFileReader(frags executor.IndexFrags) (comm.KeyCursor, error) {
258-
switch frags.FileMode() {
259-
case executor.Attached:
260-
return r.initAttachedFileReader(frags)
261-
case executor.Detached:
262-
return r.initDetachedFileReader(frags)
263-
default:
264-
return nil, fmt.Errorf("invalid file reader")
265-
}
266-
}
267-
268205
func (r *HybridStoreReader) initIndexReader() {
269206
ctx := NewIndexContext(!r.opt.IsIncQuery(), SegmentBatchCount, r.schema, r.indexInfo.ShardPath())
270207
if !r.opt.IsIncQuery() || r.opt.IterID == 0 {
271-
r.indexReaders = append(r.indexReaders, NewAttachedIndexReader(ctx, &r.indexInfo.AttachedIndexInfo))
208+
r.indexReaders = append(r.indexReaders, NewAttachedIndexReader(ctx, &r.indexInfo.AttachedIndexInfo, r.readerCtx))
272209
}
273210
if _, err := os.Stat(obs.GetLocalMstPath(obs.GetPrefixDataPath(), ctx.shardPath)); !os.IsNotExist(err) {
274-
r.indexReaders = append(r.indexReaders, NewDetachedIndexReader(ctx, r.obsOptions))
211+
r.indexReaders = append(r.indexReaders, NewDetachedIndexReader(ctx, r.obsOptions, r.readerCtx))
275212
}
276213
}
277214

@@ -419,21 +356,12 @@ func (r *HybridStoreReader) CreateCursors() ([]comm.KeyCursor, error) {
419356
cursors := make([]comm.KeyCursor, 0)
420357
r.initIndexReader()
421358
for i := range r.indexReaders {
422-
for {
423-
frags, err := r.indexReaders[i].Next()
424-
if err != nil {
425-
return nil, err
426-
}
427-
if frags == nil {
428-
break
429-
}
430-
r.fragCount += int(frags.FragCount())
431-
reader, err := r.initFileReader(frags)
432-
if err != nil {
433-
return nil, err
434-
}
435-
cursors = append(cursors, reader)
359+
keyCursors, fagCount, err := r.indexReaders[i].CreateCursors()
360+
if err != nil {
361+
return nil, err
436362
}
363+
r.fragCount += fagCount
364+
cursors = append(cursors, keyCursors...)
437365
}
438366
return cursors, nil
439367
}

engine/hybrid_store_reader_test.go

+30-18
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,6 @@ func TestHybridStoreReaderFunctions(t *testing.T) {
173173
reader.schema = schema
174174
err = reader.initQueryCtx()
175175
assert2.Equal(t, err, nil)
176-
_, err = reader.initAttachedFileReader(executor.NewDetachedFrags("", 0))
177-
assert2.Equal(t, strings.Contains(err.Error(), "invalid index info for attached file reader"), true)
178-
_, err = reader.initDetachedFileReader(executor.NewAttachedFrags("", 0))
179-
assert2.Equal(t, strings.Contains(err.Error(), "invalid index info for detached file reader"), true)
180-
_, err = reader.initFileReader(&executor.DetachedFrags{BaseFrags: *executor.NewBaseFrags("", 3)})
181-
assert2.Equal(t, strings.Contains(err.Error(), "invalid file reader"), true)
182176
frag := executor.NewBaseFrags("", 3)
183177
assert2.Equal(t, 72, frag.Size())
184178
err = reader.initSchema()
@@ -244,27 +238,45 @@ func TestHybridStoreReaderFunctionsForFullText(t *testing.T) {
244238
reader.schema = schema
245239
err = reader.initQueryCtx()
246240
assert2.Equal(t, err, nil)
247-
_, err = reader.initAttachedFileReader(executor.NewDetachedFrags("", 0))
248-
assert2.Equal(t, strings.Contains(err.Error(), "invalid index info for attached file reader"), true)
249-
_, err = reader.initDetachedFileReader(executor.NewAttachedFrags("", 0))
250-
assert2.Equal(t, strings.Contains(err.Error(), "invalid index info for detached file reader"), true)
251-
_, err = reader.initFileReader(&executor.DetachedFrags{BaseFrags: *executor.NewBaseFrags("", 3)})
252-
assert2.Equal(t, strings.Contains(err.Error(), "invalid file reader"), true)
253241
frag := executor.NewBaseFrags("", 3)
254242
assert2.Equal(t, 72, frag.Size())
255243
err = reader.initSchema()
256244
assert2.Equal(t, err, nil)
257245
}
258246

259247
func TestHybridIndexReaderFunctions(t *testing.T) {
260-
indexReader := NewDetachedIndexReader(NewIndexContext(true, 8, createSortQuerySchema(), ""), &obs.ObsOptions{})
261-
err := indexReader.Init()
248+
unnest := &influxql.Unnest{
249+
Expr: &influxql.Call{
250+
Name: "match_all",
251+
Args: []influxql.Expr{
252+
&influxql.VarRef{Val: "([a-z]+),([0-9]+)", Type: influxql.String},
253+
&influxql.VarRef{Val: "field1", Type: influxql.String},
254+
},
255+
},
256+
Aliases: []string{"key1", "value1"},
257+
DstType: []influxql.DataType{influxql.String, influxql.String},
258+
}
259+
schema := createSortQuerySchema()
260+
schema.SetUnnests([]*influxql.Unnest{unnest})
261+
readCtx := immutable.NewFileReaderContext(util.TimeRange{}, nil, immutable.NewReadContext(true), nil, nil, true)
262+
indexReader := NewDetachedIndexReader(NewIndexContext(true, 8, schema, ""), &obs.ObsOptions{}, readCtx)
263+
_, _, err := indexReader.CreateCursors()
264+
assert2.Equal(t, strings.Contains(err.Error(), "endpoint is not set"), true)
265+
_, err = indexReader.initFileReader(executor.NewDetachedFrags("", 0))
266+
assert2.Equal(t, strings.Contains(err.Error(), "endpoint is not set"), true)
267+
268+
indexReader = NewDetachedIndexReader(NewIndexContext(true, 8, createSortQuerySchema(), ""), &obs.ObsOptions{}, nil)
269+
err = indexReader.Init()
262270
assert2.Equal(t, strings.Contains(err.Error(), "endpoint is not set"), true)
263271
_, err = indexReader.Next()
264272
assert2.Equal(t, strings.Contains(err.Error(), "endpoint is not set"), true)
265-
indexReader1 := NewAttachedIndexReader(NewIndexContext(true, 8, createSortQuerySchema(), ""), &executor.AttachedIndexInfo{})
273+
indexReader1 := NewAttachedIndexReader(NewIndexContext(true, 8, createSortQuerySchema(), ""), &executor.AttachedIndexInfo{}, nil)
266274
_, err = indexReader1.Next()
267275
assert2.Equal(t, err, nil)
276+
277+
indexReader1 = NewAttachedIndexReader(NewIndexContext(true, 8, createSortQuerySchema(), ""), &executor.AttachedIndexInfo{}, nil)
278+
_, err = indexReader1.initFileReader(executor.NewDetachedFrags("", 0))
279+
assert2.Equal(t, strings.Contains(err.Error(), "invalid index info for attached file reader"), true)
268280
db, rp, mst, ptId := "db0", "rp0", "mst", uint32(0)
269281
testDir := t.TempDir()
270282
defer func() {
@@ -287,7 +299,7 @@ func TestHybridIndexReaderFunctions(t *testing.T) {
287299
_, err = sh.GetIndexInfo(querySchema)
288300
assert2.Equal(t, err, nil)
289301
immutable.SetDetachedFlushEnabled(true)
290-
indexReader = NewDetachedIndexReader(NewIndexContext(false, 0, querySchema, ""), &obs.ObsOptions{})
302+
indexReader = NewDetachedIndexReader(NewIndexContext(false, 0, querySchema, ""), &obs.ObsOptions{}, nil)
291303
err = indexReader.Init()
292304
assert2.Equal(t, strings.Contains(err.Error(), "endpoint is not set"), true)
293305
immutable.SetDetachedFlushEnabled(false)
@@ -625,7 +637,7 @@ func TestHybridStoreReaderForInc(t *testing.T) {
625637
schema := executor.NewQuerySchema(fields2, names, &opt, nil)
626638
schema.AddTable(m, schema.MakeRefs())
627639
for {
628-
indexReader := NewDetachedIndexReader(NewIndexContext(true, 8, schema, sh.filesPath), nil)
640+
indexReader := NewDetachedIndexReader(NewIndexContext(true, 8, schema, sh.filesPath), nil, nil)
629641
schema.Options().(*query.ProcessorOptions).IterID += 1
630642
indexReader.Init()
631643
frag, err := indexReader.Next()
@@ -639,7 +651,7 @@ func TestHybridStoreReaderForInc(t *testing.T) {
639651
}
640652
opt.EndTime = 0
641653
schema = executor.NewQuerySchema(fields2, names, &opt, nil)
642-
indexReader := NewDetachedIndexReader(NewIndexContext(true, 8, schema, sh.filesPath), nil)
654+
indexReader := NewDetachedIndexReader(NewIndexContext(true, 8, schema, sh.filesPath), nil, nil)
643655
schema.Options().(*query.ProcessorOptions).IterID += 1
644656
indexReader.Init()
645657
_, ok := immutable.GetDetachedSegmentTask(sh.filesPath + queryID)

0 commit comments

Comments
 (0)