From 6421f86f84d593fe4b779250898bb5ee41260b31 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 4 Jul 2024 15:49:28 +0800 Subject: [PATCH] domain: refine the runaway code and fix typos (#54435) ref pingcap/tidb#54434 --- pkg/domain/domain.go | 5 ++--- pkg/domain/resourcegroup/runaway.go | 7 +++++-- pkg/domain/runaway.go | 22 +++++++++++----------- pkg/executor/adapter.go | 4 ++-- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index b6c11d1e85561..4ef6a13307aa8 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1064,9 +1064,8 @@ func (do *Domain) Close() { if do.etcdClient != nil { terror.Log(errors.Trace(do.etcdClient.Close())) } - if rm := do.RunawayManager(); rm != nil { - rm.Stop() - } + + do.runawayManager.Stop() if do.unprefixedEtcdCli != nil { terror.Log(errors.Trace(do.unprefixedEtcdCli.Close())) diff --git a/pkg/domain/resourcegroup/runaway.go b/pkg/domain/resourcegroup/runaway.go index 045286236ec80..be4a582ca75bf 100644 --- a/pkg/domain/resourcegroup/runaway.go +++ b/pkg/domain/resourcegroup/runaway.go @@ -327,7 +327,7 @@ func (rm *RunawayManager) addWatchList(record *QuarantineRecord, ttl time.Durati } else { if item == nil { rm.queryLock.Lock() - // When watchlist get record, it will check whether the record is stale, so add new record if returns nil. + // When watchList get record, it will check whether the record is stale, so add new record if returns nil. if rm.watchList.Get(key) == nil { rm.watchList.Set(key, record, ttl) } else { @@ -340,7 +340,7 @@ func (rm *RunawayManager) addWatchList(record *QuarantineRecord, ttl time.Durati defer rm.queryLock.Unlock() rm.watchList.Set(key, record, ttl) } else if item.ID != record.ID { - // check the ID because of the eariler scan. + // check the ID because of the earlier scan. rm.staleQuarantineRecord <- record } } @@ -452,6 +452,9 @@ func (rm *RunawayManager) examineWatchList(resourceGroupName string, convict str // Stop stops the watchList which is a ttlcache. func (rm *RunawayManager) Stop() { + if rm == nil { + return + } if rm.watchList != nil { rm.watchList.Stop() } diff --git a/pkg/domain/runaway.go b/pkg/domain/runaway.go index b67d941bd2d18..1d3315d9ab002 100644 --- a/pkg/domain/runaway.go +++ b/pkg/domain/runaway.go @@ -242,21 +242,21 @@ func (do *Domain) runawayRecordFlushLoop() { // this times is used to batch flushing records, with 1s duration, // we can guarantee a watch record can be seen by the user within 1s. - runawayRecordFluashTimer := time.NewTimer(runawayRecordFlushInterval) + runawayRecordFlushTimer := time.NewTimer(runawayRecordFlushInterval) runawayRecordGCTicker := time.NewTicker(runawayRecordGCInterval) failpoint.Inject("FastRunawayGC", func() { - runawayRecordFluashTimer.Stop() + runawayRecordFlushTimer.Stop() runawayRecordGCTicker.Stop() - runawayRecordFluashTimer = time.NewTimer(time.Millisecond * 50) + runawayRecordFlushTimer = time.NewTimer(time.Millisecond * 50) runawayRecordGCTicker = time.NewTicker(time.Millisecond * 200) }) fired := false - recordCh := do.RunawayManager().RunawayRecordChan() - quarantineRecordCh := do.RunawayManager().QuarantineRecordChan() - staleQuarantineRecordCh := do.RunawayManager().StaleQuarantineRecordChan() - flushThrehold := do.runawayManager.FlushThreshold() - records := make([]*resourcegroup.RunawayRecord, 0, flushThrehold) + recordCh := do.runawayManager.RunawayRecordChan() + quarantineRecordCh := do.runawayManager.QuarantineRecordChan() + staleQuarantineRecordCh := do.runawayManager.StaleQuarantineRecordChan() + flushThreshold := do.runawayManager.FlushThreshold() + records := make([]*resourcegroup.RunawayRecord, 0, flushThreshold) flushRunawayRecords := func() { if len(records) == 0 { @@ -273,7 +273,7 @@ func (do *Domain) runawayRecordFlushLoop() { select { case <-do.exit: return - case <-runawayRecordFluashTimer.C: + case <-runawayRecordFlushTimer.C: flushRunawayRecords() fired = true case r := <-recordCh: @@ -281,12 +281,12 @@ func (do *Domain) runawayRecordFlushLoop() { failpoint.Inject("FastRunawayGC", func() { flushRunawayRecords() }) - if len(records) >= flushThrehold { + if len(records) >= flushThreshold { flushRunawayRecords() } else if fired { fired = false // meet a new record, reset the timer. - runawayRecordFluashTimer.Reset(runawayRecordFlushInterval) + runawayRecordFlushTimer.Reset(runawayRecordFlushInterval) } case <-runawayRecordGCTicker.C: go do.deleteExpiredRows("tidb_runaway_queries", "time", runawayRecordExpiredDuration) diff --git a/pkg/executor/adapter.go b/pkg/executor/adapter.go index 4561706113bc2..009ae197fb4a2 100644 --- a/pkg/executor/adapter.go +++ b/pkg/executor/adapter.go @@ -537,11 +537,11 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { // must set plan according to the `Execute` plan before getting planDigest a.inheritContextFromExecuteStmt() - if variable.EnableResourceControl.Load() && domain.GetDomain(sctx).RunawayManager() != nil { + if rm := domain.GetDomain(sctx).RunawayManager(); variable.EnableResourceControl.Load() && rm != nil { stmtCtx := sctx.GetSessionVars().StmtCtx _, planDigest := GetPlanDigest(stmtCtx) _, sqlDigest := stmtCtx.SQLDigest() - stmtCtx.RunawayChecker = domain.GetDomain(sctx).RunawayManager().DeriveChecker(sctx.GetSessionVars().StmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String()) + stmtCtx.RunawayChecker = rm.DeriveChecker(sctx.GetSessionVars().StmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String()) if err := stmtCtx.RunawayChecker.BeforeExecutor(); err != nil { return nil, err }