Skip to content

Commit aa868ed

Browse files
tangentati-chi-bot
authored andcommitted
ddl: fix runnable ingest job checking (pingcap#52503)
close pingcap#52475
1 parent e34e65f commit aa868ed

File tree

12 files changed

+126
-87
lines changed

12 files changed

+126
-87
lines changed

pkg/ddl/ddl.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
831831
if err != nil {
832832
logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
833833
}
834+
d.runningJobs.clear()
834835
})
835836

836837
d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)
@@ -869,13 +870,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
869870
defer d.sessPool.Put(ctx)
870871

871872
ingest.InitGlobalLightningEnv()
872-
d.ownerManager.SetRetireOwnerHook(func() {
873-
// Since this instance is not DDL owner anymore, we clean up the processing job info.
874-
if ingest.LitBackCtxMgr != nil {
875-
ingest.LitBackCtxMgr.MarkJobFinish()
876-
}
877-
d.runningJobs.clear()
878-
})
879873

880874
return nil
881875
}

pkg/ddl/ddl_running_jobs.go

+32
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ import (
2222
"strconv"
2323
"strings"
2424
"sync"
25+
"time"
2526

2627
"github.com/pingcap/tidb/pkg/parser/model"
28+
"github.com/pingcap/tidb/pkg/util/logutil"
29+
"go.uber.org/zap"
2730
)
2831

2932
type runningJobs struct {
@@ -36,6 +39,11 @@ type runningJobs struct {
3639
// It is not necessarily being processed by a worker.
3740
unfinishedIDs map[int64]struct{}
3841
unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{}
42+
43+
// processingReorgJobID records the ID of the ingest job that is being processed by a worker.
44+
// TODO(tangenta): remove this when we support running multiple concurrent ingest jobs.
45+
processingIngestJobID int64
46+
lastLoggingTime time.Time
3947
}
4048

4149
func newRunningJobs() *runningJobs {
@@ -47,6 +55,8 @@ func newRunningJobs() *runningJobs {
4755
}
4856

4957
func (j *runningJobs) clear() {
58+
j.Lock()
59+
defer j.Unlock()
5060
j.unfinishedIDs = make(map[int64]struct{})
5161
j.unfinishedSchema = make(map[string]map[string]struct{})
5262
}
@@ -56,6 +66,9 @@ func (j *runningJobs) add(job *model.Job) {
5666
defer j.Unlock()
5767
j.processingIDs[job.ID] = struct{}{}
5868
j.updateInternalRunningJobIDs()
69+
if isIngestJob(job) {
70+
j.processingIngestJobID = job.ID
71+
}
5972

6073
if _, ok := j.unfinishedIDs[job.ID]; ok {
6174
// Already exists, no need to add it again.
@@ -75,6 +88,9 @@ func (j *runningJobs) remove(job *model.Job) {
7588
defer j.Unlock()
7689
delete(j.processingIDs, job.ID)
7790
j.updateInternalRunningJobIDs()
91+
if isIngestJob(job) && job.ID == j.processingIngestJobID {
92+
j.processingIngestJobID = 0
93+
}
7894

7995
if job.IsFinished() || job.IsSynced() {
8096
delete(j.unfinishedIDs, job.ID)
@@ -115,6 +131,16 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
115131
// Already processing by a worker. Skip running it again.
116132
return false
117133
}
134+
if isIngestJob(job) && j.processingIngestJobID != 0 {
135+
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
136+
if time.Since(j.lastLoggingTime) > 1*time.Minute {
137+
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
138+
zap.String("category", "ddl-ingest"),
139+
zap.Int64("processing job ID", j.processingIngestJobID))
140+
j.lastLoggingTime = time.Now()
141+
}
142+
return false
143+
}
118144
for _, info := range job.GetInvolvingSchemaInfo() {
119145
if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok {
120146
return false
@@ -136,3 +162,9 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
136162
}
137163
return true
138164
}
165+
166+
func isIngestJob(job *model.Job) bool {
167+
return (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
168+
job.ReorgMeta != nil &&
169+
job.ReorgMeta.IsFastReorg
170+
}

pkg/ddl/ddl_worker.go

-11
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/pingcap/errors"
2828
"github.com/pingcap/failpoint"
2929
"github.com/pingcap/kvproto/pkg/kvrpcpb"
30-
"github.com/pingcap/tidb/pkg/ddl/ingest"
3130
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
3231
"github.com/pingcap/tidb/pkg/ddl/util"
3332
"github.com/pingcap/tidb/pkg/kv"
@@ -694,7 +693,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
694693
startTime := time.Now()
695694
defer func() {
696695
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
697-
markJobFinish(job)
698696
}()
699697

700698
if JobNeedGC(job) {
@@ -744,15 +742,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
744742
return errors.Trace(err)
745743
}
746744

747-
func markJobFinish(job *model.Job) {
748-
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
749-
job.ReorgMeta != nil &&
750-
job.ReorgMeta.IsFastReorg &&
751-
ingest.LitBackCtxMgr != nil {
752-
ingest.LitBackCtxMgr.MarkJobFinish()
753-
}
754-
}
755-
756745
func (w *worker) writeDDLSeqNum(job *model.Job) {
757746
w.ddlSeqNumMu.Lock()
758747
w.ddlSeqNumMu.seqNum++

pkg/ddl/index.go

+4
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
4242
"github.com/pingcap/tidb/pkg/infoschema"
4343
"github.com/pingcap/tidb/pkg/kv"
44+
"github.com/pingcap/tidb/pkg/lightning/backend/local"
4445
"github.com/pingcap/tidb/pkg/lightning/common"
4546
"github.com/pingcap/tidb/pkg/meta"
4647
"github.com/pingcap/tidb/pkg/metrics"
@@ -869,6 +870,9 @@ func cleanupSortPath(ctx context.Context, currentJobID int64) error {
869870
logutil.Logger(ctx).Warn(ingest.LitErrCleanSortPath, zap.Error(err))
870871
return nil
871872
}
873+
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
874+
close(local.WaitRMFolderChForTest)
875+
})
872876
}
873877
}
874878
return nil

pkg/ddl/ingest/backend_mgr.go

+10-33
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@ import (
1919
"fmt"
2020
"math"
2121
"strconv"
22-
"sync"
2322
"time"
2423

24+
"github.com/pingcap/failpoint"
2525
"github.com/pingcap/tidb/pkg/lightning/backend/local"
2626
"github.com/pingcap/tidb/pkg/lightning/config"
2727
"github.com/pingcap/tidb/pkg/util/generic"
2828
"github.com/pingcap/tidb/pkg/util/logutil"
2929
kvutil "github.com/tikv/client-go/v2/util"
3030
pd "github.com/tikv/pd/client"
3131
clientv3 "go.etcd.io/etcd/client/v3"
32+
"go.uber.org/atomic"
3233
"go.uber.org/zap"
3334
)
3435

@@ -48,18 +49,12 @@ type BackendCtxMgr interface {
4849
) (BackendCtx, error)
4950
Unregister(jobID int64)
5051
Load(jobID int64) (BackendCtx, bool)
51-
52-
MarkJobProcessing(jobID int64) (ok bool)
53-
MarkJobFinish()
5452
}
5553

5654
type litBackendCtxMgr struct {
5755
generic.SyncMap[int64, *litBackendCtx]
58-
memRoot MemRoot
59-
diskRoot DiskRoot
60-
processingJobID int64
61-
lastLoggingTime time.Time
62-
mu sync.Mutex
56+
memRoot MemRoot
57+
diskRoot DiskRoot
6358
}
6459

6560
func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
@@ -80,30 +75,6 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
8075
return mgr
8176
}
8277

83-
// MarkJobProcessing marks ingest backfill is processing.
84-
func (m *litBackendCtxMgr) MarkJobProcessing(jobID int64) bool {
85-
m.mu.Lock()
86-
defer m.mu.Unlock()
87-
if m.processingJobID == 0 || m.processingJobID == jobID {
88-
m.processingJobID = jobID
89-
return true
90-
}
91-
if time.Since(m.lastLoggingTime) > 1*time.Minute {
92-
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
93-
zap.String("category", "ddl-ingest"),
94-
zap.Int64("processing job ID", m.processingJobID))
95-
m.lastLoggingTime = time.Now()
96-
}
97-
return false
98-
}
99-
100-
// MarkJobFinish marks ingest backfill is finished.
101-
func (m *litBackendCtxMgr) MarkJobFinish() {
102-
m.mu.Lock()
103-
m.processingJobID = 0
104-
m.mu.Unlock()
105-
}
106-
10778
// CheckAvailable checks if the ingest backfill is available.
10879
func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
10980
if err := m.diskRoot.PreCheckUsage(); err != nil {
@@ -113,6 +84,9 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
11384
return true, nil
11485
}
11586

87+
// ResignOwnerForTest is only used for test.
88+
var ResignOwnerForTest = atomic.NewBool(false)
89+
11690
// Register creates a new backend and registers it to the backend context.
11791
func (m *litBackendCtxMgr) Register(
11892
ctx context.Context,
@@ -137,6 +111,9 @@ func (m *litBackendCtxMgr) Register(
137111
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
138112
return nil, err
139113
}
114+
failpoint.Inject("beforeCreateLocalBackend", func() {
115+
ResignOwnerForTest.Store(true)
116+
})
140117
bd, err := createLocalBackend(ctx, cfg, pdSvcDiscovery)
141118
if err != nil {
142119
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))

pkg/ddl/ingest/mock.go

-9
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,6 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken
4343
}
4444
}
4545

46-
// MarkJobProcessing implements BackendCtxMgr.MarkJobProcessing interface.
47-
func (*MockBackendCtxMgr) MarkJobProcessing(_ int64) bool {
48-
return true
49-
}
50-
51-
// MarkJobFinish implements BackendCtxMgr.MarkJobFinish interface.
52-
func (*MockBackendCtxMgr) MarkJobFinish() {
53-
}
54-
5546
// CheckAvailable implements BackendCtxMgr.Available interface.
5647
func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
5748
return len(m.runningJobs) == 0, nil

pkg/ddl/job_table.go

+9-11
Original file line numberDiff line numberDiff line change
@@ -228,17 +228,6 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) {
228228
if !d.runningJobs.checkRunnable(job) {
229229
return false, nil
230230
}
231-
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
232-
job.State == model.JobStateQueueing &&
233-
job.ReorgMeta != nil &&
234-
job.ReorgMeta.IsFastReorg &&
235-
ingest.LitBackCtxMgr != nil {
236-
succeed := ingest.LitBackCtxMgr.MarkJobProcessing(job.ID)
237-
if !succeed {
238-
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
239-
return false, nil
240-
}
241-
}
242231
// Check if there is any block ddl running, like drop schema and flashback cluster.
243232
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+
244233
"(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+
@@ -292,6 +281,15 @@ func (d *ddl) startDispatchLoop() {
292281
time.Sleep(dispatchLoopWaitingDuration)
293282
continue
294283
}
284+
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
285+
if ingest.ResignOwnerForTest.Load() {
286+
err2 := d.ownerManager.ResignOwner(context.Background())
287+
if err2 != nil {
288+
logutil.BgLogger().Info("resign meet error", zap.Error(err2))
289+
}
290+
ingest.ResignOwnerForTest.Store(false)
291+
}
292+
})
295293
select {
296294
case <-d.ddlJobCh:
297295
case <-ticker.C:

pkg/lightning/backend/local/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ go_library(
6060
"//pkg/util/compress",
6161
"//pkg/util/engine",
6262
"//pkg/util/hack",
63+
"//pkg/util/logutil",
6364
"//pkg/util/mathutil",
6465
"//pkg/util/ranger",
6566
"@com_github_cockroachdb_pebble//:pebble",

pkg/lightning/backend/local/engine_mgr.go

+26
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ import (
1919
"math"
2020
"os"
2121
"path/filepath"
22+
"strings"
2223
"sync"
24+
"time"
2325

2426
"github.com/cockroachdb/pebble"
27+
"github.com/cockroachdb/pebble/vfs"
2528
"github.com/docker/go-units"
2629
"github.com/google/uuid"
2730
"github.com/pingcap/errors"
@@ -33,6 +36,7 @@ import (
3336
"github.com/pingcap/tidb/pkg/lightning/common"
3437
"github.com/pingcap/tidb/pkg/lightning/log"
3538
"github.com/pingcap/tidb/pkg/lightning/manual"
39+
"github.com/pingcap/tidb/pkg/util/logutil"
3640
"github.com/tikv/client-go/v2/oracle"
3741
tikvclient "github.com/tikv/client-go/v2/tikv"
3842
"go.uber.org/atomic"
@@ -578,6 +582,25 @@ func (em *engineManager) getBufferPool() *membuf.Pool {
578582
return em.bufferPool
579583
}
580584

585+
// only used in tests
586+
type slowCreateFS struct {
587+
vfs.FS
588+
}
589+
590+
// WaitRMFolderChForTest is a channel for testing.
591+
var WaitRMFolderChForTest = make(chan struct{})
592+
593+
func (s slowCreateFS) Create(name string) (vfs.File, error) {
594+
if strings.Contains(name, "temporary") {
595+
select {
596+
case <-WaitRMFolderChForTest:
597+
case <-time.After(1 * time.Second):
598+
logutil.BgLogger().Info("no one removes folder")
599+
}
600+
}
601+
return s.FS.Create(name)
602+
}
603+
581604
func openDuplicateDB(storeDir string) (*pebble.DB, error) {
582605
dbPath := filepath.Join(storeDir, duplicateDBName)
583606
// TODO: Optimize the opts for better write.
@@ -586,6 +609,9 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
586609
newRangePropertiesCollector,
587610
},
588611
}
612+
failpoint.Inject("slowCreateFS", func() {
613+
opts.FS = slowCreateFS{vfs.Default}
614+
})
589615
return pebble.Open(dbPath, opts)
590616
}
591617

pkg/owner/manager.go

+1-11
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ type Manager interface {
6565

6666
// SetBeOwnerHook sets a hook. The hook is called before becoming an owner.
6767
SetBeOwnerHook(hook func())
68-
// SetRetireOwnerHook will be called after retiring the owner.
69-
SetRetireOwnerHook(hook func())
7068
}
7169

7270
const (
@@ -118,8 +116,7 @@ type ownerManager struct {
118116
wg sync.WaitGroup
119117
campaignCancel context.CancelFunc
120118

121-
beOwnerHook func()
122-
retireOwnerHook func()
119+
beOwnerHook func()
123120
}
124121

125122
// NewOwnerManager creates a new Manager.
@@ -164,10 +161,6 @@ func (m *ownerManager) SetBeOwnerHook(hook func()) {
164161
m.beOwnerHook = hook
165162
}
166163

167-
func (m *ownerManager) SetRetireOwnerHook(hook func()) {
168-
m.retireOwnerHook = hook
169-
}
170-
171164
// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
172165
var ManagerSessionTTL = 60
173166

@@ -230,9 +223,6 @@ func (m *ownerManager) toBeOwner(elec *concurrency.Election) {
230223

231224
// RetireOwner make the manager to be a not owner.
232225
func (m *ownerManager) RetireOwner() {
233-
if m.retireOwnerHook != nil {
234-
m.retireOwnerHook()
235-
}
236226
atomic.StorePointer(&m.elec, nil)
237227
}
238228

0 commit comments

Comments
 (0)