Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/disttask/framework/scheduler/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
llog "github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/util/intest"
Expand Down Expand Up @@ -116,6 +117,9 @@ func (b *balancer) doBalanceSubtasks(ctx context.Context, taskID int64, eligible
// managed nodes, subtasks of task might not be balanced.
adjustedNodes := filterNodesWithEnoughSlots(b.currUsedSlots, b.slotMgr.getCapacity(),
eligibleNodes, subtasks[0].Concurrency)
failpoint.Inject("mockNoEnoughSlots", func(_ failpoint.Value) {
adjustedNodes = []string{}
})
if len(adjustedNodes) == 0 {
// no node has enough slots to run the subtasks, skip balance and skip
// update used slots.
Expand Down
5 changes: 4 additions & 1 deletion pkg/disttask/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
zap.Int64("subtaskID", e.currSubtaskID.Load()))
// cancels runStep, but leave the subtask state unchanged.
e.cancelRunStepWith(nil)
failpoint.InjectCall("afterCancelRunningSubtask")
return
}

Expand Down Expand Up @@ -307,7 +308,9 @@ func (e *BaseTaskExecutor) Run() {
// reset it when we get a subtask
checkInterval, noSubtaskCheckCnt = SubtaskCheckInterval, 0

if e.stepExec != nil && e.stepExec.GetStep() != subtask.Step {
if e.stepExec != nil &&
(e.stepExec.GetStep() != subtask.Step ||
e.stepCtx.Err() != nil) { // Previous step ctx is done, cleanup and use a new one.
e.cleanStepExecutor()
}
if e.stepExec == nil {
Expand Down
45 changes: 45 additions & 0 deletions tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,48 @@ func TestAddIndexDistLockAcquireFailed(t *testing.T) {
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/owner/mockAcquireDistLockFailed", "1*return(true)")
tk.MustExec("alter table t add index idx(b);")
}

func TestAddIndexScheduleAway(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_dist_task = on;")
t.Cleanup(func() {
tk.MustExec("set global tidb_enable_dist_task = off;")
})
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1);")

var jobID atomic.Int64
// Acquire the job ID.
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
if job.Type == model.ActionAddIndex {
jobID.Store(job.ID)
}
})
// Do not balance subtasks automatically.
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockNoEnoughSlots", "return")
afterCancel := make(chan struct{})
// Capture the cancel operation from checkBalanceLoop.
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterCancelRunningSubtask", func() {
close(afterCancel)
})
var once sync.Once
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish", func() {
once.Do(func() {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
updateExecID := fmt.Sprintf(`
update mysql.tidb_background_subtask set exec_id = 'other' where task_key in
(select id from mysql.tidb_global_task where task_key like '%%%d')`, jobID.Load())
tk1.MustExec(updateExecID)
<-afterCancel
updateExecID = fmt.Sprintf(`
update mysql.tidb_background_subtask set exec_id = ':4000' where task_key in
(select id from mysql.tidb_global_task where task_key like '%%%d')`, jobID.Load())
tk1.MustExec(updateExecID)
})
})
tk.MustExec("alter table t add index idx(b);")
require.NotEqual(t, int64(0), jobID.Load())
}