Skip to content

Commit

Permalink
statistics: upgrade stats timeout checkpoint after it timeouts (#52424)…
Browse files Browse the repository at this point in the history
… (#52439)

close #52425
  • Loading branch information
ti-chi-bot authored Apr 15, 2024
1 parent 1fec74b commit 6ee2141
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 11 deletions.
2 changes: 1 addition & 1 deletion planner/core/plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPlan
return plan, nil
}
predicateNeeded := variable.EnableColumnTracking.Load()
syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait * time.Millisecond.Nanoseconds()
syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait.Load() * time.Millisecond.Nanoseconds()
histNeeded := syncWait > 0
predicateColumns, histNeededColumns := CollectColumnStatsUsage(plan, predicateNeeded, histNeeded)
if len(predicateColumns) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ type SessionVars struct {
ReadConsistency ReadConsistencyLevel

// StatsLoadSyncWait indicates how long to wait for stats load before timeout.
StatsLoadSyncWait int64
StatsLoadSyncWait atomic.Int64

// SysdateIsNow indicates whether Sysdate is an alias of Now function
SysdateIsNow bool
Expand Down Expand Up @@ -1944,7 +1944,6 @@ func NewSessionVars(hctx HookContext) *SessionVars {
TMPTableSize: DefTiDBTmpTableMaxSize,
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
Rng: mathutil.NewWithTime(),
StatsLoadSyncWait: StatsLoadSyncWait.Load(),
EnableLegacyInstanceScope: DefEnableLegacyInstanceScope,
RemoveOrderbyInSubquery: DefTiDBRemoveOrderbyInSubquery,
EnableSkewDistinctAgg: DefTiDBSkewDistinctAgg,
Expand Down Expand Up @@ -2003,6 +2002,7 @@ func NewSessionVars(hctx HookContext) *SessionVars {
vars.DiskTracker = disk.NewTracker(memory.LabelForSession, -1)
vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery)
vars.MemTracker.IsRootTrackerOfSess = true
vars.StatsLoadSyncWait.Store(StatsLoadSyncWait.Load())

for _, engine := range config.GetGlobalConfig().IsolationRead.Engines {
switch engine {
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2073,7 +2073,7 @@ var defaultSysVars = []*SysVar{
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBStatsLoadSyncWait, Value: strconv.Itoa(DefTiDBStatsLoadSyncWait), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt32,
SetSession: func(s *SessionVars, val string) error {
s.StatsLoadSyncWait = TidbOptInt64(val, DefTiDBStatsLoadSyncWait)
s.StatsLoadSyncWait.Store(TidbOptInt64(val, DefTiDBStatsLoadSyncWait))
return nil
},
GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ go_test(
"//domain",
"//parser/model",
"//parser/mysql",
"//sessionctx",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics",
Expand Down
9 changes: 5 additions & 4 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
// if the last task is not successfully handled in last round for error or panic, pass it to this round to retry
var lastTask *NeededItemTask
for {
task, err := h.HandleOneTask(lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit)
task, err := h.HandleOneTask(ctx, lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit)
lastTask = task
if err != nil {
switch err {
Expand All @@ -216,7 +216,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
}

// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere.
func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) {
func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
Expand All @@ -225,7 +225,7 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC
}
}()
if lastTask == nil {
task, err = h.drainColTask(exit)
task, err = h.drainColTask(sctx, exit)
if err != nil {
if err != errExit {
logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
Expand Down Expand Up @@ -408,7 +408,7 @@ func (h *Handle) readStatsForOneItem(item model.TableItemID, w *statsWrapper, re
}

// drainColTask will hang until a column task can return, and either task or error will be returned.
func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
func (h *Handle) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*NeededItemTask, error) {
// select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh
for {
select {
Expand All @@ -421,6 +421,7 @@ func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
// if the task has already timeout, no sql is sync-waiting for it,
// so do not handle it just now, put it to another channel with lower priority
if time.Now().After(task.ToTimeout) {
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task)
continue
}
Expand Down
7 changes: 4 additions & 3 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -205,15 +206,15 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable(fp.failPath, fp.inTerms))

task1, err1 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
list, ok := h.StatsLoad.WorkingColMap[neededColumns[0]]
require.True(t, ok)
require.Len(t, list, 1)
require.Equal(t, stmtCtx1.StatsLoad.ResultCh, list[0])

task2, err2 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task2, err2 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Nil(t, err2)
require.Nil(t, task2)
list, ok = h.StatsLoad.WorkingColMap[neededColumns[0]]
Expand All @@ -222,7 +223,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Equal(t, stmtCtx2.StatsLoad.ResultCh, list[1])

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task3)

Expand Down

0 comments on commit 6ee2141

Please sign in to comment.