Skip to content

Commit ce0bb15

Browse files
authored
chore: panic when unordered merge is closed abnormally (openGemini#616)
Signed-off-by: fx408 <[email protected]>
1 parent c4bdfe4 commit ce0bb15

File tree

4 files changed

+68
-8
lines changed

4 files changed

+68
-8
lines changed

engine/immutable/merge_performer.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ func (p *mergePerformer) Finish() (TSSPFile, error) {
191191
}
192192

193193
p.Close()
194+
p.Release()
194195
return file, nil
195196
}
196197

@@ -388,6 +389,9 @@ func (p *mergePerformer) WriteOriginal(fi *FileIterator) error {
388389

389390
func (p *mergePerformer) Close() {
390391
p.itr.Close()
392+
}
393+
394+
func (p *mergePerformer) Release() {
391395
p.itr = nil
392396
p.ur = nil
393397
p.sw = nil
@@ -548,15 +552,21 @@ func (c *MergePerformers) Pop() interface{} {
548552
return v
549553
}
550554

555+
func (c *MergePerformers) Done() {
556+
c.wg.Done()
557+
}
558+
551559
func (c *MergePerformers) Close() {
552560
c.once.Do(func() {
553561
close(c.signal)
554562
c.closed = true
555-
c.wg.Wait()
556-
557563
for _, item := range c.items {
558564
item.Close()
559565
}
566+
c.wg.Wait()
567+
for _, item := range c.items {
568+
item.Release()
569+
}
560570
c.items = nil
561571
c.ur = nil
562572
})

engine/immutable/merge_tool.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (mt *mergeTool) execute(mst string, order, unordered *TSSPFiles) (*TSSPFile
141141
var tempFiles []fileops.File
142142

143143
defer func() {
144-
performers.wg.Done()
144+
performers.Done()
145145
performers.Close()
146146
ur.Close()
147147

engine/immutable/merge_tool_test.go

+38
Original file line numberDiff line numberDiff line change
@@ -1137,5 +1137,43 @@ func BenchmarkChunkMeta_compress(b *testing.B) {
11371137
immutable.SetChunkMetaCompressMode(immutable.ChunkMetaCompressLZ4)
11381138
run(2000)
11391139
})
1140+
}
1141+
1142+
func TestMergePerformers_close(t *testing.T) {
1143+
var begin int64 = 1e12
1144+
defer beforeTest(t, 0)()
1145+
1146+
schemas := getDefaultSchemas()
1147+
mh := NewMergeTestHelper(immutable.NewTsStoreConfig())
1148+
defer mh.store.Close()
1149+
rg := newRecordGenerator(begin, defaultInterval, true)
1150+
1151+
for i := 0; i < 8; i++ {
1152+
mh.addRecord(uint64(100+i), rg.generate(schemas, 10))
1153+
require.NoError(t, mh.saveToOrder())
1154+
}
1155+
1156+
performers := immutable.NewMergePerformers(nil)
1157+
var tempFiles []*immutable.StreamWriteFile
1158+
1159+
defer func() {
1160+
for _, f := range tempFiles {
1161+
f.Close(true)
1162+
}
1163+
}()
1164+
for _, f := range mh.store.Order["mst"].Files() {
1165+
sw := mh.store.NewStreamWriteFile("mst")
1166+
require.NoError(t, sw.InitMergedFile(f))
1167+
sw.SetValidate(true)
1168+
tempFiles = append(tempFiles, sw)
1169+
1170+
p := immutable.NewMergePerformer(nil, nil)
1171+
1172+
itr := immutable.NewColumnIterator(immutable.NewFileIterator(f, nil))
1173+
p.Reset(sw, itr)
1174+
performers.Push(p)
1175+
}
11401176

1177+
performers.Done()
1178+
performers.Close()
11411179
}

lib/util/lifted/influx/coordinator/statement_executor.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -201,23 +201,35 @@ func (e *StatementExecutor) Close() error {
201201
// ExecuteStatement executes the given statement with the given execution context.
202202
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext, seq int) error {
203203
e.MaxQueryParallel = int(atomic.LoadInt32(&syscontrol.QueryParallel))
204+
stmtString := stmt.String()
205+
204206
// Select statements are handled separately so that they can be streamed.
205207
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
208+
begin := time.Now()
206209
err := e.retryExecuteSelectStatement(stmt, ctx, seq)
210+
dur := time.Since(begin)
207211
if err == nil {
212+
if dur.Nanoseconds() > time.Second.Nanoseconds() {
213+
e.StmtExecLogger.GetZapLogger().Warn("slow query",
214+
zap.String("stmt", stmtString),
215+
zap.Float64("duration", dur.Seconds()))
216+
}
208217
return nil
209-
} else if errno.Equal(err, errno.DatabaseNotFound) ||
210-
errno.Equal(err, errno.ErrMeasurementNotFound) {
211-
e.StmtExecLogger.Error("execute select statement 400 error", zap.Any("stmt", stmt), zap.Error(err))
218+
}
219+
220+
if errno.Equal(err, errno.DatabaseNotFound, errno.ErrMeasurementNotFound) {
221+
e.StmtExecLogger.Error("execute select statement 400 error", zap.Any("stmt", stmtString),
222+
zap.Error(err), zap.Float64("duration", dur.Seconds()))
212223
atomic.AddInt64(&statistics.HandlerStat.Query400ErrorStmtCount, 1)
213224
} else {
214-
e.StmtExecLogger.Error("execute select statement 500 error", zap.Any("stmt", stmt), zap.Error(err))
225+
e.StmtExecLogger.Error("execute select statement 500 error", zap.Any("stmt", stmtString),
226+
zap.Error(err), zap.Float64("duration", dur.Seconds()))
215227
atomic.AddInt64(&statistics.HandlerStat.QueryErrorStmtCount, 1)
216228
}
217229
return err
218230
}
219231

220-
e.StmtExecLogger.Info("start execute statement", zap.Any("stmt", stmt))
232+
e.StmtExecLogger.Info("start execute statement", zap.Any("stmt", stmtString))
221233
var rows models.Rows
222234
var messages []*query.Message
223235
var err error

0 commit comments

Comments
 (0)