@@ -214,7 +214,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
214
214
// if the last task is not successfully handled in last round for error or panic, pass it to this round to retry
215
215
var lastTask * NeededItemTask
216
216
for {
217
- task , err := h .HandleOneTask (lastTask , readerCtx , ctx .(sqlexec.RestrictedSQLExecutor ), exit )
217
+ task , err := h .HandleOneTask (ctx , lastTask , readerCtx , ctx .(sqlexec.RestrictedSQLExecutor ), exit )
218
218
lastTask = task
219
219
if err != nil {
220
220
switch err {
@@ -235,7 +235,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
235
235
// - If the task is handled successfully, return nil, nil.
236
236
// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep.
237
237
// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep.
238
- func (h * Handle ) HandleOneTask (lastTask * NeededItemTask , readerCtx * StatsReaderContext , ctx sqlexec.RestrictedSQLExecutor , exit chan struct {}) (task * NeededItemTask , err error ) {
238
+ func (h * Handle ) HandleOneTask (sctx sessionctx. Context , lastTask * NeededItemTask , readerCtx * StatsReaderContext , ctx sqlexec.RestrictedSQLExecutor , exit chan struct {}) (task * NeededItemTask , err error ) {
239
239
defer func () {
240
240
// recover for each task, worker keeps working
241
241
if r := recover (); r != nil {
@@ -244,7 +244,7 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC
244
244
}
245
245
}()
246
246
if lastTask == nil {
247
- task , err = h .drainColTask (exit )
247
+ task , err = h .drainColTask (sctx , exit )
248
248
if err != nil {
249
249
if err != errExit {
250
250
logutil .BgLogger ().Error ("Fail to drain task for stats loading." , zap .Error (err ))
@@ -447,7 +447,7 @@ func (h *Handle) readStatsForOneItem(item model.TableItemID, w *statsWrapper, re
447
447
}
448
448
449
449
// drainColTask will hang until a column task can return, and either task or error will be returned.
450
- func (h * Handle ) drainColTask (exit chan struct {}) (* NeededItemTask , error ) {
450
+ func (h * Handle ) drainColTask (sctx sessionctx. Context , exit chan struct {}) (* NeededItemTask , error ) {
451
451
// select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh
452
452
for {
453
453
select {
@@ -460,6 +460,7 @@ func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
460
460
// if the task has already timeout, no sql is sync-waiting for it,
461
461
// so do not handle it just now, put it to another channel with lower priority
462
462
if time .Now ().After (task .ToTimeout ) {
463
+ task .ToTimeout .Add (time .Duration (sctx .GetSessionVars ().StatsLoadSyncWait .Load ()) * time .Microsecond )
463
464
h .writeToTimeoutChan (h .StatsLoad .TimeoutItemsCh , task )
464
465
continue
465
466
}
0 commit comments