Skip to content

Commit

Permalink
statistics: add upper bound of retry for sync load (#52658) (#52817)
Browse files Browse the repository at this point in the history
close #52657
  • Loading branch information
ti-chi-bot authored Aug 28, 2024
1 parent cc0412c commit 3764806
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 33 deletions.
3 changes: 1 addition & 2 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,12 @@ go_test(
embed = [":handle"],
flaky = True,
race = "on",
shard_count = 34,
shard_count = 35,
deps = [
"//config",
"//domain",
"//parser/model",
"//parser/mysql",
"//sessionctx",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics",
Expand Down
67 changes: 40 additions & 27 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ import (
"golang.org/x/sync/singleflight"
)

// RetryCount is the max retry count for a sync load task.
const RetryCount = 3

type statsWrapper struct {
col *statistics.Column
idx *statistics.Index
Expand All @@ -56,6 +59,7 @@ type NeededItemTask struct {
TableItemID model.TableItemID
ToTimeout time.Time
ResultCh chan stmtctx.StatsLoadResult
Retry int
}

// SendLoadRequests send neededColumns requests
Expand Down Expand Up @@ -199,7 +203,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
// if the last task is not successfully handled in last round for error or panic, pass it to this round to retry
var lastTask *NeededItemTask
for {
task, err := h.HandleOneTask(ctx, lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit)
task, err := h.HandleOneTask(lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit)
lastTask = task
if err != nil {
switch err {
Expand All @@ -217,7 +221,10 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
}

// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere.
func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) {
// - If the task is handled successfully, return nil, nil.
// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep.
// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep.
func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
Expand All @@ -226,7 +233,7 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
}
}()
if lastTask == nil {
task, err = h.drainColTask(sctx, exit)
task, err = h.drainColTask(exit)
if err != nil {
if err != errExit {
logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
Expand All @@ -236,52 +243,60 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
} else {
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) {
return h.handleOneItemTask(task, readerCtx, ctx)
err := h.handleOneItemTask(task, readerCtx, ctx)
return nil, err
})
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
if result.Err == nil {
slr := result.Val.(*stmtctx.StatsLoadResult)
if slr.Error != nil {
return task, slr.Error
}
task.ResultCh <- *slr
case sr := <-resultChan:
// sr.Val is always nil.
if sr.Err == nil {
task.ResultCh <- result
return nil, nil
}
return task, result.Err
if !isVaildForRetry(task) {
result.Error = sr.Err
task.ResultCh <- result
return nil, nil
}
return task, sr.Err
case <-time.After(timeout):
return task, nil
}
}

func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (result *stmtctx.StatsLoadResult, err error) {
func isVaildForRetry(task *NeededItemTask) bool {
task.Retry++
return task.Retry <= RetryCount
}

func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.TableItemID}
item := result.Item
item := task.TableItemID
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(item.TableID)
if !ok {
return result, nil
return nil
}
wrapper := &statsWrapper{}
if item.IsIndex {
index, ok := tbl.Indices[item.ID]
if !ok || index.IsFullLoad() {
return result, nil
return nil
}
wrapper.idx = index
} else {
col, ok := tbl.Columns[item.ID]
if !ok || col.IsFullLoad() {
return result, nil
return nil
}
wrapper.col = col
}
Expand All @@ -291,8 +306,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
needUpdate := false
wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader)
if err != nil {
result.Error = err
return result, err
return err
}
if item.IsIndex {
if wrapper.idx != nil {
Expand All @@ -304,10 +318,10 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
}
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) {
return result, nil
if needUpdate {
h.updateCachedItem(item, wrapper.col, wrapper.idx)
}
return nil, nil
return nil
}

func (h *Handle) loadFreshStatsReader(readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) {
Expand Down Expand Up @@ -422,7 +436,7 @@ func (h *Handle) readStatsForOneItem(item model.TableItemID, w *statsWrapper, re
}

// drainColTask will hang until a column task can return, and either task or error will be returned.
func (h *Handle) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*NeededItemTask, error) {
func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
// select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh
for {
select {
Expand All @@ -435,7 +449,6 @@ func (h *Handle) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*Nee
// if the task has already timeout, no sql is sync-waiting for it,
// so do not handle it just now, put it to another channel with lower priority
if time.Now().After(task.ToTimeout) {
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task)
continue
}
Expand Down Expand Up @@ -504,12 +517,12 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(item.TableID)
if !ok {
return true
return false
}
if !item.IsIndex && colHist != nil {
c, ok := tbl.Columns[item.ID]
if !ok || c.IsFullLoad() {
return true
return false
}
tbl = tbl.Copy()
tbl.Columns[c.ID] = colHist
Expand Down
96 changes: 92 additions & 4 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -206,16 +205,28 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable(fp.failPath, fp.inTerms))

task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task1, err1 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-stmtCtx1.StatsLoad.ResultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
case <-stmtCtx2.StatsLoad.ResultCh:
t.Logf("stmtCtx2.ResultCh should not get anything")
t.FailNow()
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get anything")
t.FailNow()
default:
}

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task3)

task, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task, err3 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task)

Expand All @@ -232,3 +243,80 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.Greater(t, hg.Len()+topn.Num(), 0)
}
}

func TestRetry(t *testing.T) {
originConfig := config.GetGlobalConfig()
newConfig := config.NewConfig()
newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel
config.StoreGlobalConfig(newConfig)
defer config.StoreGlobalConfig(originConfig)
store, dom := testkit.CreateMockStoreAndDomain(t)

testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
testKit.MustExec("set @@session.tidb_analyze_version=2")
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table t")

is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()

h := dom.StatsHandle()

neededColumns := make([]model.TableItemID, 1)
neededColumns[0] = model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}
timeout := time.Nanosecond * mathutil.MaxInt

// clear statsCache
h.Clear()
require.NoError(t, dom.StatsHandle().Update(is))

// no stats at beginning
stat := h.GetTableStats(tableInfo)
c, ok := stat.Columns[tableInfo.Columns[2].ID]
require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0))

stmtCtx1 := &stmtctx.StatementContext{}
h.SendLoadRequests(stmtCtx1, neededColumns, timeout)
stmtCtx2 := &stmtctx.StatementContext{}
h.SendLoadRequests(stmtCtx2, neededColumns, timeout)

exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail", "return(true)"))
var (
task1 *handle.NeededItemTask
err1 error
)
readerCtx := &handle.StatsReaderContext{}
for i := 0; i < handle.RetryCount; i++ {
task1, err1 = h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get nothing")
t.FailNow()
default:
}
}
result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err1)
require.Nil(t, result)
select {
case <-task1.ResultCh:
default:
t.Logf("task1.ResultCh should get nothing")
t.FailNow()
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail"))
}

0 comments on commit 3764806

Please sign in to comment.