Skip to content

Commit 2998524

Browse files
authored
feat(log): add detached_lazy_load_index_reader to optimize limit query (openGemini#617)
Signed-off-by: noFloat <[email protected]>
1 parent 595a435 commit 2998524

8 files changed

+771
-54
lines changed
+295
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/*
2+
Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package engine
17+
18+
import (
19+
"github.com/openGemini/openGemini/engine/comm"
20+
"github.com/openGemini/openGemini/engine/executor"
21+
"github.com/openGemini/openGemini/engine/hybridqp"
22+
"github.com/openGemini/openGemini/engine/immutable"
23+
"github.com/openGemini/openGemini/engine/immutable/colstore"
24+
"github.com/openGemini/openGemini/engine/index/sparseindex"
25+
"github.com/openGemini/openGemini/lib/fragment"
26+
"github.com/openGemini/openGemini/lib/obs"
27+
"github.com/openGemini/openGemini/lib/record"
28+
"github.com/openGemini/openGemini/lib/tracing"
29+
"github.com/openGemini/openGemini/lib/util"
30+
"github.com/openGemini/openGemini/lib/util/lifted/influx/influxql"
31+
)
32+
33+
// detachedLazyLoadIndexReader is used to reduce the number of BF in the "select order by time limit" scenario
34+
type detachedLazyLoadIndexReader struct {
35+
dataPath string
36+
ctx *indexContext
37+
obsOptions *obs.ObsOptions
38+
readerCtx *immutable.FileReaderContext
39+
}
40+
41+
func NewDetachedLazyLoadIndexReader(ctx *indexContext, obsOption *obs.ObsOptions, readerCtx *immutable.FileReaderContext) *detachedLazyLoadIndexReader {
42+
return &detachedLazyLoadIndexReader{
43+
obsOptions: obsOption,
44+
ctx: ctx,
45+
readerCtx: readerCtx,
46+
}
47+
}
48+
49+
func (r *detachedLazyLoadIndexReader) CreateCursors() ([]comm.KeyCursor, int, error) {
50+
cursors := make([]comm.KeyCursor, 0)
51+
mst := r.ctx.schema.Options().GetMeasurements()[0]
52+
r.dataPath = obs.GetBaseMstPath(r.ctx.shardPath, mst.Name)
53+
c, err := NewStreamDetachedReader(r.readerCtx, sparseindex.NewOBSFilterPath("", r.dataPath, r.obsOptions), r.ctx)
54+
if err != nil {
55+
return nil, 0, err
56+
}
57+
sortLimitCursor := immutable.NewSortLimitCursor(r.ctx.schema.Options(), r.readerCtx.GetSchemas(), c)
58+
cursors = append(cursors, sortLimitCursor)
59+
return cursors, 0, nil
60+
}
61+
62+
// StreamDetachedReader implement comm.KeyCursor and comm.TimeCutKeyCursor, it can stream read detached data to reduce IO of BF.
63+
type StreamDetachedReader struct {
64+
isInitDataReader bool
65+
idx int
66+
blockId uint64
67+
localPath string
68+
dataPath string
69+
dataReader comm.KeyCursor
70+
tr util.TimeRange
71+
skFileReader []sparseindex.SKFileReader
72+
info *executor.DetachedIndexInfo
73+
options hybridqp.Options
74+
path *sparseindex.OBSFilterPath
75+
readerCtx *immutable.FileReaderContext
76+
ctx *indexContext
77+
tempFrs []*fragment.FragmentRange
78+
}
79+
80+
func NewStreamDetachedReader(readerCtx *immutable.FileReaderContext, path *sparseindex.OBSFilterPath, ctx *indexContext) (*StreamDetachedReader, error) {
81+
r := &StreamDetachedReader{
82+
options: ctx.schema.Options(),
83+
ctx: ctx,
84+
path: path,
85+
readerCtx: readerCtx,
86+
tr: util.TimeRange{Min: ctx.tr.Min, Max: ctx.tr.Max},
87+
tempFrs: make([]*fragment.FragmentRange, 1),
88+
}
89+
r.tempFrs[0] = fragment.NewFragmentRange(uint32(0), uint32(0))
90+
err := r.Init()
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
return r, nil
96+
}
97+
98+
func (r *StreamDetachedReader) Init() (err error) {
99+
mst := r.options.GetMeasurements()[0]
100+
r.dataPath = obs.GetBaseMstPath(r.ctx.shardPath, mst.Name)
101+
if immutable.GetDetachedFlushEnabled() {
102+
r.localPath = obs.GetLocalMstPath(obs.GetPrefixDataPath(), r.dataPath)
103+
}
104+
chunkCount, err := immutable.GetMetaIndexChunkCount(r.path.Option(), r.dataPath)
105+
if err != nil {
106+
return
107+
}
108+
if chunkCount == 0 {
109+
return
110+
}
111+
112+
miChunkIds, miFiltered, err := immutable.GetMetaIndexAndBlockId(r.dataPath, r.path.Option(), chunkCount, r.ctx.tr)
113+
114+
if err != nil {
115+
return
116+
}
117+
118+
if len(miFiltered) == 0 {
119+
return nil
120+
}
121+
122+
pkMetaInfo, pkItems, err := immutable.GetPKItems(r.dataPath, r.path.Option(), miChunkIds)
123+
if err != nil {
124+
return err
125+
}
126+
r.info = executor.NewDetachedIndexInfo(miFiltered, pkItems)
127+
128+
mstInfo := r.ctx.schema.Options().GetMeasurements()[0]
129+
r.skFileReader, err = r.ctx.skIndexReader.CreateSKFileReaders(r.ctx.schema.Options(), mstInfo, true)
130+
if err != nil {
131+
return err
132+
}
133+
if r.ctx.keyCondition != nil {
134+
return
135+
}
136+
137+
for j := range r.skFileReader {
138+
if err = r.skFileReader[j].ReInit(sparseindex.NewOBSFilterPath(r.localPath, r.dataPath, r.path.Option())); err != nil {
139+
return err
140+
}
141+
}
142+
143+
return initKeyCondition(r.info.Infos()[0].Data.Schema, r.ctx, pkMetaInfo.TCLocation)
144+
}
145+
146+
func (t *StreamDetachedReader) Name() string {
147+
return "StreamDetachedReader"
148+
}
149+
150+
func (t *StreamDetachedReader) StartSpan(span *tracing.Span) {
151+
}
152+
153+
func (t *StreamDetachedReader) EndSpan() {
154+
}
155+
156+
func (t *StreamDetachedReader) SinkPlan(plan hybridqp.QueryNode) {
157+
}
158+
159+
func (t *StreamDetachedReader) GetSchema() record.Schemas {
160+
return t.readerCtx.GetSchemas()
161+
}
162+
163+
func (t *StreamDetachedReader) UpdateTime(time int64) {
164+
if t.options.IsAscending() {
165+
t.tr.Max = time
166+
} else {
167+
t.tr.Min = time
168+
}
169+
}
170+
171+
func (t *StreamDetachedReader) Next() (*record.Record, comm.SeriesInfoIntf, error) {
172+
for {
173+
if t.info == nil {
174+
return nil, nil, nil
175+
}
176+
if t.idx >= len(t.info.Files()) {
177+
return nil, nil, nil
178+
}
179+
if !t.isInitDataReader {
180+
isExist, err := t.initDataReader()
181+
if err != nil {
182+
return nil, nil, err
183+
}
184+
if !isExist {
185+
continue
186+
}
187+
}
188+
re, se, err := t.dataReader.Next()
189+
if err != nil {
190+
return nil, nil, err
191+
}
192+
if re == nil {
193+
t.isInitDataReader = false
194+
continue
195+
}
196+
return re, se, err
197+
}
198+
}
199+
200+
func (t *StreamDetachedReader) initDataReader() (bool, error) {
201+
currIdx := t.idx
202+
if !t.options.IsAscending() {
203+
currIdx = len(t.info.Files()) - 1 - t.idx
204+
}
205+
currInfo := t.info.Infos()[currIdx]
206+
if t.blockId >= currInfo.EndBlockId-currInfo.StartBlockId {
207+
t.resetIndex()
208+
return false, nil
209+
}
210+
if !t.info.Files()[currIdx].IsExist(t.tr) {
211+
t.resetIndex()
212+
return false, nil
213+
}
214+
currBlocks := make([]int, 0, immutable.ChunkMetaReadNum)
215+
for {
216+
if t.blockId >= currInfo.EndBlockId-currInfo.StartBlockId {
217+
break
218+
}
219+
if len(currBlocks) >= immutable.ChunkMetaReadNum {
220+
break
221+
}
222+
currBlockId := t.blockId
223+
if !t.options.IsAscending() {
224+
currBlockId = currInfo.EndBlockId - currInfo.StartBlockId - 1 - t.blockId
225+
}
226+
t.blockId += 1
227+
if !t.tr.Overlaps(currInfo.Data.Time(int(currBlockId)), currInfo.Data.Time(int(currBlockId)+1)) {
228+
continue
229+
}
230+
t.tempFrs[0] = fragment.NewFragmentRange(uint32(currBlockId), uint32(currBlockId+1))
231+
isExist, err := t.filterBySk(currInfo)
232+
if err != nil {
233+
return false, err
234+
}
235+
if isExist {
236+
currBlocks = append(currBlocks, int(t.tempFrs[0].Start))
237+
}
238+
}
239+
if len(currBlocks) == 0 {
240+
return false, nil
241+
}
242+
243+
var unnest *influxql.Unnest
244+
if t.ctx.schema.HasUnnests() {
245+
unnest = t.ctx.schema.GetUnnests()[0]
246+
}
247+
248+
blocks := make([][]int, 1)
249+
blocks[0] = currBlocks
250+
var err error
251+
t.dataReader, err = immutable.NewTSSPFileDetachedReader(t.info.Files()[currIdx:currIdx+1], blocks, t.readerCtx,
252+
sparseindex.NewOBSFilterPath("", t.dataPath, t.path.Option()), unnest, true, t.ctx.schema.Options())
253+
if err != nil {
254+
return false, err
255+
}
256+
return true, nil
257+
}
258+
259+
// filter PKInfo By SkIndexRead
260+
func (t *StreamDetachedReader) filterBySk(currInfo *colstore.DetachedPKInfo) (bool, error) {
261+
isExist := true
262+
for j := range t.skFileReader {
263+
t.tempFrs[0].Start += uint32(currInfo.StartBlockId)
264+
t.tempFrs[0].End += uint32(currInfo.StartBlockId)
265+
frs, err := t.ctx.skIndexReader.Scan(t.skFileReader[j], t.tempFrs)
266+
if err != nil {
267+
return false, err
268+
}
269+
if frs.Empty() {
270+
isExist = false
271+
break
272+
}
273+
t.tempFrs[0].Start -= uint32(currInfo.StartBlockId)
274+
t.tempFrs[0].End -= uint32(currInfo.StartBlockId)
275+
}
276+
return isExist, nil
277+
}
278+
279+
func (t *StreamDetachedReader) resetIndex() {
280+
t.blockId = 0
281+
t.isInitDataReader = false
282+
t.idx += 1
283+
}
284+
285+
func (t *StreamDetachedReader) Close() error {
286+
return nil
287+
}
288+
289+
func (t *StreamDetachedReader) SetOps(ops []*comm.CallOption) {
290+
291+
}
292+
293+
func (t *StreamDetachedReader) NextAggData() (*record.Record, *comm.FileInfo, error) {
294+
return nil, nil, nil
295+
}

engine/hybrid_index_reader.go

+3-47
Original file line numberDiff line numberDiff line change
@@ -284,66 +284,22 @@ func (r *detachedIndexReader) Init() (err error) {
284284
if chunkCount == 0 {
285285
return
286286
}
287-
startChunkId, endChunkId := int64(0), chunkCount
288287

289-
// init the obs meta index reader
290-
metaIndexReader, err := immutable.NewDetachedMetaIndexReader(r.dataPath, r.obsOptions)
291-
if err != nil {
292-
return
293-
}
294-
defer metaIndexReader.Close()
288+
miChunkIds, miFiltered, err := immutable.GetMetaIndexAndBlockId(r.dataPath, r.obsOptions, chunkCount, r.ctx.tr)
295289

296-
var miChunkIds []int64
297-
var miFiltered []*immutable.MetaIndex
298-
299-
// step1: init the meta index
300-
offsets, lengths := make([]int64, 0, chunkCount), make([]int64, 0, chunkCount)
301-
for i := startChunkId; i < endChunkId; i++ {
302-
offset, length := immutable.GetMetaIndexOffsetAndLengthByChunkId(i)
303-
offsets, lengths = append(offsets, offset), append(lengths, length)
304-
}
305-
metaIndexes, err := metaIndexReader.ReadMetaIndex(offsets, lengths)
306290
if err != nil {
307-
return err
308-
}
309-
for i := range metaIndexes {
310-
if metaIndexes[i].IsExist(r.ctx.tr) {
311-
miFiltered = append(miFiltered, metaIndexes[i])
312-
miChunkIds = append(miChunkIds, startChunkId+int64(i))
313-
}
291+
return
314292
}
315293

316294
if len(miFiltered) == 0 {
317295
return nil
318296
}
319297

320298
// step2: init the pk items
321-
pkMetaInfo, err := immutable.ReadPKMetaInfoAll(r.dataPath, r.obsOptions)
322-
if err != nil {
323-
return
324-
}
325-
326-
offsets, lengths = offsets[:0], lengths[:0]
327-
for _, chunkId := range miChunkIds {
328-
offset, length := immutable.GetPKMetaOffsetLengthByChunkId(pkMetaInfo, int(chunkId))
329-
offsets, lengths = append(offsets, offset), append(lengths, length)
330-
}
331-
pkMetas, err := immutable.ReadPKMetaAll(r.dataPath, r.obsOptions, offsets, lengths)
299+
pkMetaInfo, pkItems, err := immutable.GetPKItems(r.dataPath, r.obsOptions, miChunkIds)
332300
if err != nil {
333301
return err
334302
}
335-
offsets, lengths = offsets[:0], lengths[:0]
336-
for i := range pkMetas {
337-
offsets, lengths = append(offsets, int64(pkMetas[i].Offset)), append(lengths, int64(pkMetas[i].Length))
338-
}
339-
pkDatas, err := immutable.ReadPKDataAll(r.dataPath, r.obsOptions, offsets, lengths, pkMetas, pkMetaInfo)
340-
if err != nil {
341-
return err
342-
}
343-
var pkItems []*colstore.DetachedPKInfo
344-
for i := range pkDatas {
345-
pkItems = append(pkItems, colstore.GetPKInfoByPKMetaData(pkMetas[i], pkDatas[i], pkMetaInfo.TCLocation))
346-
}
347303
r.info = executor.NewDetachedIndexInfo(miFiltered, pkItems)
348304

349305
// step3: init the key condition and sk file readers

engine/hybrid_store_reader.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,11 @@ func (r *HybridStoreReader) initIndexReader() {
208208
r.indexReaders = append(r.indexReaders, NewAttachedIndexReader(ctx, &r.indexInfo.AttachedIndexInfo, r.readerCtx))
209209
}
210210
if _, err := os.Stat(obs.GetLocalMstPath(obs.GetPrefixDataPath(), ctx.shardPath)); !os.IsNotExist(err) {
211-
r.indexReaders = append(r.indexReaders, NewDetachedIndexReader(ctx, r.obsOptions, r.readerCtx))
211+
if ctx.schema.Options().CanTimeLimitPushDown() && r.opt.Sources[0].(*influxql.Measurement).IsTimeSorted {
212+
r.indexReaders = append(r.indexReaders, NewDetachedLazyLoadIndexReader(ctx, r.obsOptions, r.readerCtx))
213+
} else {
214+
r.indexReaders = append(r.indexReaders, NewDetachedIndexReader(ctx, r.obsOptions, r.readerCtx))
215+
}
212216
}
213217
}
214218

@@ -441,7 +445,6 @@ func (r *HybridStoreReader) run(ctx context.Context, reader comm.KeyCursor) (err
441445
if rec.RowNums() == 0 {
442446
continue
443447
}
444-
445448
r.iterCount++
446449
r.rowCountAfterFilter += rec.RowNums()
447450

0 commit comments

Comments
 (0)