@@ -26,9 +26,11 @@ import (
26
26
"github.com/milvus-io/milvus/internal/log"
27
27
"github.com/milvus-io/milvus/internal/proto/querypb"
28
28
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
29
+ . "github.com/milvus-io/milvus/internal/querycoordv2/params"
29
30
"github.com/milvus-io/milvus/internal/querycoordv2/session"
30
31
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
31
32
"github.com/milvus-io/milvus/internal/util/tsoutil"
33
+ "go.uber.org/atomic"
32
34
"go.uber.org/zap"
33
35
)
34
36
@@ -45,7 +47,8 @@ type Executor struct {
45
47
// Merge load segment requests
46
48
merger * Merger [segmentIndex , * querypb.LoadSegmentsRequest ]
47
49
48
- executingTasks sync.Map
50
+ executingTasks sync.Map
51
+ executingTaskNum atomic.Int32
49
52
}
50
53
51
54
func NewExecutor (meta * meta.Meta ,
@@ -82,10 +85,14 @@ func (ex *Executor) Stop() {
82
85
// does nothing and returns false if the action is already committed,
83
86
// returns true otherwise.
84
87
func (ex * Executor ) Execute (task Task , step int ) bool {
88
+ if ex .executingTaskNum .Load () > Params .QueryCoordCfg .TaskExecutionCap {
89
+ return false
90
+ }
85
91
_ , exist := ex .executingTasks .LoadOrStore (task .ID (), struct {}{})
86
92
if exist {
87
93
return false
88
94
}
95
+ ex .executingTaskNum .Inc ()
89
96
90
97
log := log .With (
91
98
zap .Int64 ("taskID" , task .ID ()),
@@ -137,7 +144,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
137
144
defer func () {
138
145
for i := range mergeTask .tasks {
139
146
mergeTask .tasks [i ].SetErr (task .Err ())
140
- ex .removeAction (mergeTask .tasks [i ], mergeTask .steps [i ])
147
+ ex .removeTask (mergeTask .tasks [i ], mergeTask .steps [i ])
141
148
}
142
149
}()
143
150
@@ -180,7 +187,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
180
187
log .Info ("load segments done" , zap .Int64 ("taskID" , task .ID ()), zap .Duration ("timeTaken" , elapsed ))
181
188
}
182
189
183
- func (ex * Executor ) removeAction (task Task , step int ) {
190
+ func (ex * Executor ) removeTask (task Task , step int ) {
184
191
if task .Err () != nil {
185
192
log .Info ("excute action done, remove it" ,
186
193
zap .Int64 ("taskID" , task .ID ()),
@@ -189,6 +196,7 @@ func (ex *Executor) removeAction(task Task, step int) {
189
196
}
190
197
191
198
ex .executingTasks .Delete (task .ID ())
199
+ ex .executingTaskNum .Dec ()
192
200
}
193
201
194
202
func (ex * Executor ) executeSegmentAction (task * SegmentTask , step int ) {
@@ -218,7 +226,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
218
226
if err != nil {
219
227
task .SetErr (err )
220
228
task .Cancel ()
221
- ex .removeAction (task , step )
229
+ ex .removeTask (task , step )
222
230
}
223
231
}()
224
232
@@ -270,7 +278,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
270
278
}
271
279
272
280
func (ex * Executor ) releaseSegment (task * SegmentTask , step int ) {
273
- defer ex .removeAction (task , step )
281
+ defer ex .removeTask (task , step )
274
282
startTs := time .Now ()
275
283
action := task .Actions ()[step ].(* SegmentAction )
276
284
defer action .isReleaseCommitted .Store (true )
@@ -343,7 +351,7 @@ func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) {
343
351
}
344
352
345
353
func (ex * Executor ) subDmChannel (task * ChannelTask , step int ) error {
346
- defer ex .removeAction (task , step )
354
+ defer ex .removeTask (task , step )
347
355
startTs := time .Now ()
348
356
action := task .Actions ()[step ].(* ChannelAction )
349
357
log := log .With (
@@ -415,7 +423,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
415
423
}
416
424
417
425
func (ex * Executor ) unsubDmChannel (task * ChannelTask , step int ) error {
418
- defer ex .removeAction (task , step )
426
+ defer ex .removeTask (task , step )
419
427
startTs := time .Now ()
420
428
action := task .Actions ()[step ].(* ChannelAction )
421
429
log := log .With (
0 commit comments