Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/disttask/framework/scheduler/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
llog "github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/util/intest"
Expand Down Expand Up @@ -116,6 +117,9 @@ func (b *balancer) doBalanceSubtasks(ctx context.Context, taskID int64, eligible
// managed nodes, subtasks of task might not be balanced.
adjustedNodes := filterNodesWithEnoughSlots(b.currUsedSlots, b.slotMgr.getCapacity(),
eligibleNodes, subtasks[0].Concurrency)
failpoint.Inject("mockNoEnoughSlots", func(_ failpoint.Value) {
adjustedNodes = []string{}
})
if len(adjustedNodes) == 0 {
// no node has enough slots to run the subtasks, skip balance and skip
// update used slots.
Expand Down
21 changes: 15 additions & 6 deletions pkg/disttask/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewBaseTaskExecutor(ctx context.Context, task *proto.Task, param Param) *Ba
// `pending` state, to make sure subtasks can be balanced later when node scale out.
// - If current running subtask are scheduled away from this node, i.e. this node
// is taken as down, cancel running.
func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCancelCtx context.CancelFunc) {
ticker := time.NewTicker(checkBalanceSubtaskInterval)
defer ticker.Stop()
for {
Expand All @@ -143,7 +143,8 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
e.logger.Info("subtask is scheduled away, cancel running",
zap.Int64("subtaskID", e.currSubtaskID.Load()))
// cancels runStep, but leave the subtask state unchanged.
e.cancelRunStepWith(nil)
subtaskCancelCtx()
failpoint.InjectCall("afterCancelSubtaskExec")
return
}

Expand Down Expand Up @@ -317,6 +318,12 @@ func (e *BaseTaskExecutor) Run() {
continue
}
}
if err := e.stepCtx.Err(); err != nil {
e.logger.Error("step executor context is done, the task should have been reverted",
zap.String("step", proto.Step2Str(task.Type, task.Step)),
zap.Error(err))
continue
}
err = e.runSubtask(subtask)
if err != nil {
// task executor keeps running its subtasks even though some subtask
Expand Down Expand Up @@ -418,23 +425,25 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
logTask := llog.BeginTask(logger, "run subtask")
subtaskErr := func() error {
e.currSubtaskID.Store(subtask.ID)
subtaskCtx, subtaskCancelCtx := context.WithCancel(e.stepCtx)

var wg util.WaitGroupWrapper
checkCtx, checkCancel := context.WithCancel(e.stepCtx)
checkCtx, checkCancel := context.WithCancel(subtaskCtx)
wg.RunWithLog(func() {
e.checkBalanceSubtask(checkCtx)
e.checkBalanceSubtask(checkCtx, subtaskCancelCtx)
})

if e.hasRealtimeSummary(e.stepExec) {
wg.RunWithLog(func() {
e.updateSubtaskSummaryLoop(checkCtx, e.stepCtx, e.stepExec)
e.updateSubtaskSummaryLoop(checkCtx, subtaskCtx, e.stepExec)
})
}
defer func() {
checkCancel()
subtaskCancelCtx()
wg.Wait()
}()
return e.stepExec.RunSubtask(e.stepCtx, subtask)
return e.stepExec.RunSubtask(subtaskCtx, subtask)
}()
failpoint.InjectCall("afterRunSubtask", e, &subtaskErr)
logTask.End2(zap.InfoLevel, subtaskErr)
Expand Down
16 changes: 8 additions & 8 deletions pkg/disttask/framework/taskexecutor/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestTaskExecutorRun(t *testing.T) {
// mock for checkBalanceSubtask, returns empty subtask list
e.taskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id",
e.task1.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil)
// this subtask is scheduled awsy during running
// this subtask is scheduled away during running
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil)
e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil)
Expand All @@ -326,6 +326,7 @@ func TestTaskExecutorRun(t *testing.T) {
<-ctx.Done()
return ctx.Err()
})
e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(true)
// keep running next subtask
nextSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{
ID: 2, Type: e.task1.Type, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}
Expand Down Expand Up @@ -889,18 +890,17 @@ func TestCheckBalanceSubtask(t *testing.T) {
// context canceled
canceledCtx, cancel := context.WithCancel(ctx)
cancel()
taskExecutor.checkBalanceSubtask(canceledCtx)
taskExecutor.checkBalanceSubtask(canceledCtx, nil)
})

t.Run("subtask scheduled away", func(t *testing.T) {
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, errors.New("error"))
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
task.ID, task.Step, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil)
runCtx, cancelCause := context.WithCancelCause(ctx)
taskExecutor.mu.runtimeCancel = cancelCause
runCtx, cancel := context.WithCancel(ctx)
require.NoError(t, runCtx.Err())
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, cancel)
require.ErrorIs(t, runCtx.Err(), context.Canceled)
require.True(t, ctrl.Satisfied())
})
Expand All @@ -913,7 +913,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false)
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1",
subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil)
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, nil)
require.True(t, ctrl.Satisfied())

// if we failed to change state of non-idempotent subtask, will retry
Expand All @@ -930,7 +930,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false)
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1",
subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil)
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, nil)
require.True(t, ctrl.Satisfied())
})

Expand All @@ -945,7 +945,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
// used to break the loop
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, nil)
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, nil)
require.True(t, ctrl.Satisfied())
})
}
Expand Down
45 changes: 45 additions & 0 deletions tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,48 @@ func TestAddIndexDistLockAcquireFailed(t *testing.T) {
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/owner/mockAcquireDistLockFailed", "1*return(true)")
tk.MustExec("alter table t add index idx(b);")
}

func TestAddIndexScheduleAway(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_dist_task = on;")
t.Cleanup(func() {
tk.MustExec("set global tidb_enable_dist_task = off;")
})
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1);")

var jobID atomic.Int64
// Acquire the job ID.
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
if job.Type == model.ActionAddIndex {
jobID.Store(job.ID)
}
})
// Do not balance subtasks automatically.
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockNoEnoughSlots", "return")
afterCancel := make(chan struct{})
// Capture the cancel operation from checkBalanceLoop.
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterCancelSubtaskExec", func() {
close(afterCancel)
})
var once sync.Once
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish", func() {
once.Do(func() {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
updateExecID := fmt.Sprintf(`
update mysql.tidb_background_subtask set exec_id = 'other' where task_key in
(select id from mysql.tidb_global_task where task_key like '%%%d')`, jobID.Load())
tk1.MustExec(updateExecID)
<-afterCancel
updateExecID = fmt.Sprintf(`
update mysql.tidb_background_subtask set exec_id = ':4000' where task_key in
(select id from mysql.tidb_global_task where task_key like '%%%d')`, jobID.Load())
tk1.MustExec(updateExecID)
})
})
tk.MustExec("alter table t add index idx(b);")
require.NotEqual(t, int64(0), jobID.Load())
}