Skip to content

Commit 215a341

Browse files
authored
perf: optimizing the unordered file merge self (openGemini#607)
Signed-off-by: fx408 <[email protected]>
1 parent 12f3e33 commit 215a341

File tree

9 files changed

+369
-55
lines changed

9 files changed

+369
-55
lines changed

engine/immutable/chunk_iterators.go

+47-28
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,15 @@ import (
2020
"container/heap"
2121
"sync/atomic"
2222

23-
"github.com/openGemini/openGemini/lib/fileops"
2423
Log "github.com/openGemini/openGemini/lib/logger"
2524
"github.com/openGemini/openGemini/lib/record"
26-
"go.uber.org/zap"
2725
)
2826

2927
type ChunkIterator struct {
3028
*FileIterator
3129
ctx *ReadContext
3230
id uint64
3331
fields record.Schemas
34-
rec *record.Record
3532
merge *record.Record
3633
log *Log.Logger
3734
}
@@ -165,7 +162,6 @@ func NewChunkIterator(r *FileIterator) *ChunkIterator {
165162
FileIterator: r,
166163
ctx: NewReadContext(true),
167164
merge: allocRecord(),
168-
rec: allocRecord(),
169165
}
170166

171167
return itr
@@ -177,7 +173,6 @@ func (c *ChunkIterator) WithLog(log *Log.Logger) {
177173

178174
func (c *ChunkIterator) Close() {
179175
c.FileIterator.Close()
180-
freeRecord(c.rec)
181176
freeRecord(c.merge)
182177
c.ctx.Release()
183178
c.ctx = nil
@@ -208,47 +203,40 @@ func (c *ChunkIterator) Next() bool {
208203
c.fields[i].Type = int(cm.ty)
209204
}
210205

211-
if c.err = c.read(); c.err != nil {
206+
if c.err = c.readRecord(); c.err != nil {
212207
return false
213208
}
214209

215210
return true
216211
}
217212

218-
func (c *ChunkIterator) read() error {
213+
func (c *ChunkIterator) readRecord() error {
219214
var err error
220215
c.id = c.curtChunkMeta.sid
221216
cMeta := c.curtChunkMeta
222217

223218
c.merge.Reset()
224219
c.merge.SetSchema(c.fields)
225220
c.merge.ReserveColVal(len(c.fields))
226-
timeMeta := cMeta.timeMeta()
227-
for i := range timeMeta.entries {
228-
c.rec.Reset()
229-
c.rec.SetSchema(c.fields)
230-
c.rec.ReserveColVal(len(c.fields))
231-
c.rec.ReserveColumnRows(8)
232-
record.CheckRecord(c.rec)
233-
234-
c.rec, err = c.r.ReadAt(cMeta, i, c.rec, c.ctx, fileops.IO_PRIORITY_LOW_READ)
235-
if err != nil {
236-
c.log.Error("read segment error", zap.String("file", c.r.Path()), zap.Error(err))
237-
return err
238-
}
239-
240-
c.segPos++
241221

242-
record.CheckRecord(c.rec)
243-
c.merge.Merge(c.rec)
244-
record.CheckRecord(c.merge)
222+
buf, err := c.readData(cMeta.offset, cMeta.size)
223+
if err != nil {
224+
return err
245225
}
246226

247-
if c.segPos >= len(timeMeta.entries) {
248-
c.curtChunkMeta = nil
249-
c.chunkUsed++
227+
c.merge.Reset()
228+
c.merge.SetSchema(c.fields)
229+
c.merge.ReserveColVal(len(c.fields))
230+
231+
err = decodeRecord(c.ctx, buf, cMeta, c.merge)
232+
if err != nil {
233+
return err
250234
}
251235

236+
record.CheckRecord(c.merge)
237+
c.curtChunkMeta = nil
238+
c.chunkUsed++
239+
252240
return nil
253241
}
254242

@@ -259,3 +247,34 @@ func (c *ChunkIterator) GetSeriesID() uint64 {
259247
func (c *ChunkIterator) GetRecord() *record.Record {
260248
return c.merge
261249
}
250+
251+
func decodeRecord(ctx *ReadContext, chunkData []byte, cm *ChunkMeta, dst *record.Record) error {
252+
var err error
253+
254+
schema := dst.Schema
255+
swap := &ctx.col
256+
257+
for i := 0; i < schema.Len(); i++ {
258+
ref := &schema[i]
259+
colMeta := &cm.colMeta[i]
260+
col := dst.Column(i)
261+
262+
for n := range colMeta.entries {
263+
buf := columnData(chunkData, cm.offset, colMeta.entries[n].offset, colMeta.entries[n].size)
264+
265+
if ref.Name == record.TimeField {
266+
err = appendTimeColumnData(buf, swap, ctx, false)
267+
} else {
268+
err = decodeColumnData(ref, buf, swap, ctx, false)
269+
}
270+
271+
if err != nil {
272+
return err
273+
}
274+
275+
col.AppendColVal(swap, ref.Type, 0, swap.Len)
276+
}
277+
}
278+
279+
return nil
280+
}

engine/immutable/merge_self.go

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package immutable
18+
19+
import (
20+
"container/heap"
21+
"sync"
22+
23+
"github.com/openGemini/openGemini/lib/config"
24+
"github.com/openGemini/openGemini/lib/logger"
25+
"github.com/openGemini/openGemini/lib/record"
26+
)
27+
28+
type MergeSelf struct {
29+
once sync.Once
30+
signal chan struct{}
31+
mts *MmsTables
32+
lg *logger.Logger
33+
}
34+
35+
func NewMergeSelf(mts *MmsTables, lg *logger.Logger) *MergeSelf {
36+
return &MergeSelf{
37+
signal: make(chan struct{}),
38+
mts: mts,
39+
lg: lg,
40+
}
41+
}
42+
43+
func (m *MergeSelf) Merge(mst string, files []TSSPFile) (TSSPFile, error) {
44+
builder := m.createMsBuilder(mst, files[0].FileName())
45+
sh := record.NewColumnSortHelper()
46+
defer sh.Release()
47+
48+
itrs := m.createIterators(files)
49+
50+
for {
51+
sid, rec, err := itrs.Next()
52+
if err != nil {
53+
builder.Reset()
54+
return nil, err
55+
}
56+
57+
if rec == nil || sid == 0 {
58+
break
59+
}
60+
61+
record.CheckRecord(rec)
62+
rec = sh.Sort(rec)
63+
itrs.merged = rec
64+
builder, err = builder.WriteRecord(sid, rec, nil)
65+
if err != nil {
66+
builder.Reset()
67+
return nil, err
68+
}
69+
}
70+
71+
itrs.Close()
72+
73+
merged, err := builder.NewTSSPFile(true)
74+
return merged, err
75+
}
76+
77+
func (m *MergeSelf) createIterators(files []TSSPFile) *ChunkIterators {
78+
var dropping int64 = 0
79+
itrs := &ChunkIterators{
80+
dropping: &dropping,
81+
closed: m.signal,
82+
stopCompMerge: m.signal,
83+
itrs: make([]*ChunkIterator, 0, len(files)),
84+
merged: &record.Record{},
85+
}
86+
itrs.WithLog(m.lg)
87+
88+
for _, f := range files {
89+
fi := NewFileIterator(f, m.lg)
90+
itr := NewChunkIterator(fi)
91+
itr.WithLog(m.lg)
92+
ok := itr.Next()
93+
if !ok || itr.err != nil {
94+
itr.Close()
95+
continue
96+
}
97+
itrs.itrs = append(itrs.itrs, itr)
98+
}
99+
100+
heap.Init(itrs)
101+
return itrs
102+
}
103+
104+
func (m *MergeSelf) createMsBuilder(mst string, fileName TSSPFileName) *MsBuilder {
105+
fileName.merge++
106+
fileName.lock = m.mts.lock
107+
builder := NewMsBuilder(m.mts.path, mst, m.mts.lock, m.mts.Conf,
108+
0, fileName, 0, nil, 0, config.TSSTORE, nil, m.mts.shardId)
109+
return builder
110+
}
111+
112+
func (m *MergeSelf) Stop() {
113+
m.once.Do(func() {
114+
close(m.signal)
115+
})
116+
}

engine/immutable/merge_self_test.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package immutable_test
18+
19+
import (
20+
"testing"
21+
22+
"github.com/openGemini/openGemini/engine/immutable"
23+
"github.com/openGemini/openGemini/lib/config"
24+
"github.com/openGemini/openGemini/lib/logger"
25+
"github.com/openGemini/openGemini/lib/record"
26+
"github.com/openGemini/openGemini/lib/util/lifted/vm/protoparser/influx"
27+
"github.com/stretchr/testify/require"
28+
)
29+
30+
func TestMergeSelf(t *testing.T) {
31+
var begin int64 = 1e12
32+
defer beforeTest(t, 0)()
33+
conf := config.GetStoreConfig()
34+
conf.Merge.MergeSelfOnly = true
35+
36+
mh := NewMergeTestHelper(immutable.NewTsStoreConfig())
37+
defer mh.store.Close()
38+
rg := newRecordGenerator(begin, defaultInterval, true)
39+
40+
mh.addRecord(100, rg.generate(getDefaultSchemas(), 10))
41+
require.NoError(t, mh.saveToOrder())
42+
43+
schema := getDefaultSchemas()
44+
for i := 0; i < 14; i++ {
45+
rg.setBegin(begin + int64(i))
46+
mh.addRecord(uint64(100+i%2), rg.generate(schema, 10))
47+
require.NoError(t, mh.saveToUnordered())
48+
}
49+
50+
schema = append(schema, record.Field{Type: influx.Field_Type_Int, Name: "int_1"})
51+
rg.setBegin(begin + 100)
52+
mh.addRecord(100, rg.generate(schema, 10))
53+
require.NoError(t, mh.saveToUnordered())
54+
55+
schema = append(getDefaultSchemas(), record.Field{Type: influx.Field_Type_Float, Name: "float_1"})
56+
rg.setBegin(begin + 101)
57+
mh.addRecord(100, rg.generate(schema, 10))
58+
require.NoError(t, mh.saveToUnordered())
59+
60+
conf.Merge.MergeSelfOnly = false
61+
err := mh.store.MergeOutOfOrder(1, false, true)
62+
require.NoError(t, err)
63+
mh.store.Wait()
64+
65+
require.NoError(t, mh.mergeAndCompact(true))
66+
require.NoError(t, compareRecords(mh.readExpectRecord(), mh.readMergedRecord()))
67+
}
68+
69+
func TestMergeSelf_Stop(t *testing.T) {
70+
var begin int64 = 1e12
71+
defer beforeTest(t, 0)()
72+
conf := config.GetStoreConfig()
73+
conf.Merge.MergeSelfOnly = true
74+
75+
mh := NewMergeTestHelper(immutable.NewTsStoreConfig())
76+
defer mh.store.Close()
77+
rg := newRecordGenerator(begin, defaultInterval, true)
78+
79+
mh.addRecord(100, rg.generate(getDefaultSchemas(), 10))
80+
require.NoError(t, mh.saveToOrder())
81+
82+
schema := getDefaultSchemas()
83+
for i := 0; i < 8; i++ {
84+
rg.setBegin(begin + 1)
85+
mh.addRecord(100, rg.generate(schema, 10))
86+
require.NoError(t, mh.saveToUnordered())
87+
}
88+
89+
var files []immutable.TSSPFile
90+
for _, f := range mh.store.OutOfOrder["mst"].Files() {
91+
files = append(files, f)
92+
}
93+
94+
ms := immutable.NewMergeSelf(mh.store, logger.NewLogger(0))
95+
ms.Stop()
96+
_, err := ms.Merge("mst", files)
97+
require.EqualError(t, err, "compact stopped")
98+
99+
ms = immutable.NewMergeSelf(mh.store, logger.NewLogger(0))
100+
files[1], files[0] = files[0], files[1]
101+
_ = files[1].Close()
102+
_, err = ms.Merge("mst", files)
103+
require.NoError(t, err)
104+
}

0 commit comments

Comments
 (0)