Skip to content

Commit

Permalink
domain: refine the runaway code and fix typos (#54435)
Browse files Browse the repository at this point in the history
ref #54434
  • Loading branch information
JmPotato authored Jul 4, 2024
1 parent be16d49 commit 6421f86
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 18 deletions.
5 changes: 2 additions & 3 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,8 @@ func (do *Domain) Close() {
if do.etcdClient != nil {
terror.Log(errors.Trace(do.etcdClient.Close()))
}
if rm := do.RunawayManager(); rm != nil {
rm.Stop()
}

do.runawayManager.Stop()

if do.unprefixedEtcdCli != nil {
terror.Log(errors.Trace(do.unprefixedEtcdCli.Close()))
Expand Down
7 changes: 5 additions & 2 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (rm *RunawayManager) addWatchList(record *QuarantineRecord, ttl time.Durati
} else {
if item == nil {
rm.queryLock.Lock()
// When watchlist get record, it will check whether the record is stale, so add new record if returns nil.
// When watchList get record, it will check whether the record is stale, so add new record if returns nil.
if rm.watchList.Get(key) == nil {
rm.watchList.Set(key, record, ttl)
} else {
Expand All @@ -340,7 +340,7 @@ func (rm *RunawayManager) addWatchList(record *QuarantineRecord, ttl time.Durati
defer rm.queryLock.Unlock()
rm.watchList.Set(key, record, ttl)
} else if item.ID != record.ID {
// check the ID because of the eariler scan.
// check the ID because of the earlier scan.
rm.staleQuarantineRecord <- record
}
}
Expand Down Expand Up @@ -452,6 +452,9 @@ func (rm *RunawayManager) examineWatchList(resourceGroupName string, convict str

// Stop stops the watchList which is a ttlcache.
func (rm *RunawayManager) Stop() {
if rm == nil {
return
}
if rm.watchList != nil {
rm.watchList.Stop()
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/domain/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,21 +242,21 @@ func (do *Domain) runawayRecordFlushLoop() {

// this times is used to batch flushing records, with 1s duration,
// we can guarantee a watch record can be seen by the user within 1s.
runawayRecordFluashTimer := time.NewTimer(runawayRecordFlushInterval)
runawayRecordFlushTimer := time.NewTimer(runawayRecordFlushInterval)
runawayRecordGCTicker := time.NewTicker(runawayRecordGCInterval)
failpoint.Inject("FastRunawayGC", func() {
runawayRecordFluashTimer.Stop()
runawayRecordFlushTimer.Stop()
runawayRecordGCTicker.Stop()
runawayRecordFluashTimer = time.NewTimer(time.Millisecond * 50)
runawayRecordFlushTimer = time.NewTimer(time.Millisecond * 50)
runawayRecordGCTicker = time.NewTicker(time.Millisecond * 200)
})

fired := false
recordCh := do.RunawayManager().RunawayRecordChan()
quarantineRecordCh := do.RunawayManager().QuarantineRecordChan()
staleQuarantineRecordCh := do.RunawayManager().StaleQuarantineRecordChan()
flushThrehold := do.runawayManager.FlushThreshold()
records := make([]*resourcegroup.RunawayRecord, 0, flushThrehold)
recordCh := do.runawayManager.RunawayRecordChan()
quarantineRecordCh := do.runawayManager.QuarantineRecordChan()
staleQuarantineRecordCh := do.runawayManager.StaleQuarantineRecordChan()
flushThreshold := do.runawayManager.FlushThreshold()
records := make([]*resourcegroup.RunawayRecord, 0, flushThreshold)

flushRunawayRecords := func() {
if len(records) == 0 {
Expand All @@ -273,20 +273,20 @@ func (do *Domain) runawayRecordFlushLoop() {
select {
case <-do.exit:
return
case <-runawayRecordFluashTimer.C:
case <-runawayRecordFlushTimer.C:
flushRunawayRecords()
fired = true
case r := <-recordCh:
records = append(records, r)
failpoint.Inject("FastRunawayGC", func() {
flushRunawayRecords()
})
if len(records) >= flushThrehold {
if len(records) >= flushThreshold {
flushRunawayRecords()
} else if fired {
fired = false
// meet a new record, reset the timer.
runawayRecordFluashTimer.Reset(runawayRecordFlushInterval)
runawayRecordFlushTimer.Reset(runawayRecordFlushInterval)
}
case <-runawayRecordGCTicker.C:
go do.deleteExpiredRows("tidb_runaway_queries", "time", runawayRecordExpiredDuration)
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,11 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {

// must set plan according to the `Execute` plan before getting planDigest
a.inheritContextFromExecuteStmt()
if variable.EnableResourceControl.Load() && domain.GetDomain(sctx).RunawayManager() != nil {
if rm := domain.GetDomain(sctx).RunawayManager(); variable.EnableResourceControl.Load() && rm != nil {
stmtCtx := sctx.GetSessionVars().StmtCtx
_, planDigest := GetPlanDigest(stmtCtx)
_, sqlDigest := stmtCtx.SQLDigest()
stmtCtx.RunawayChecker = domain.GetDomain(sctx).RunawayManager().DeriveChecker(sctx.GetSessionVars().StmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String())
stmtCtx.RunawayChecker = rm.DeriveChecker(sctx.GetSessionVars().StmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String())
if err := stmtCtx.RunawayChecker.BeforeExecutor(); err != nil {
return nil, err
}
Expand Down

0 comments on commit 6421f86

Please sign in to comment.