Skip to content

Commit

Permalink
planner: move more methods from StatsHandle to its sub-packages (#47640)
Browse files Browse the repository at this point in the history
ref #46905
  • Loading branch information
qw4990 authored Oct 16, 2023
1 parent 306be73 commit 19470d5
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 171 deletions.
1 change: 0 additions & 1 deletion pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
"dump.go",
"handle.go",
"handle_hist.go",
"update.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle",
visibility = ["//visibility:public"],
Expand Down
41 changes: 39 additions & 2 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,34 @@ import (
// statsAnalyze is used to handle auto-analyze and manage analyze jobs.
type statsAnalyze struct {
pool statsutil.SessionPool

// TODO: use interfaces instead of raw function pointers below
sysProcTracker sessionctx.SysProcTracker
getLockedTables func(tableIDs ...int64) (map[int64]struct{}, error)
getTableStats func(tblInfo *model.TableInfo) *statistics.Table
getPartitionStats func(tblInfo *model.TableInfo, pid int64) *statistics.Table
autoAnalyzeProcIDGetter func() uint64
statsLease time.Duration
}

// NewStatsAnalyze creates a new StatsAnalyze.
func NewStatsAnalyze(pool statsutil.SessionPool) statsutil.StatsAnalyze {
return &statsAnalyze{pool: pool}
func NewStatsAnalyze(pool statsutil.SessionPool,
sysProcTracker sessionctx.SysProcTracker,
getLockedTables func(tableIDs ...int64) (map[int64]struct{}, error),
getTableStats func(tblInfo *model.TableInfo) *statistics.Table,
getPartitionStats func(tblInfo *model.TableInfo, pid int64) *statistics.Table,
autoAnalyzeProcIDGetter func() uint64,
statsLease time.Duration) statsutil.StatsAnalyze {
return &statsAnalyze{pool: pool,
sysProcTracker: sysProcTracker,
getLockedTables: getLockedTables,
getTableStats: getTableStats,
getPartitionStats: getPartitionStats,
autoAnalyzeProcIDGetter: autoAnalyzeProcIDGetter,
statsLease: statsLease}
}

// InsertAnalyzeJob inserts the analyze job to the storage.
func (sa *statsAnalyze) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error {
return statsutil.CallWithSCtx(sa.pool, func(sctx sessionctx.Context) error {
return insertAnalyzeJob(sctx, job, instance, procID)
Expand All @@ -66,6 +87,22 @@ func (sa *statsAnalyze) DeleteAnalyzeJobs(updateTime time.Time) error {
})
}

// HandleAutoAnalyze analyzes the newly created table or index.
func (sa *statsAnalyze) HandleAutoAnalyze(is infoschema.InfoSchema) (analyzed bool) {
_ = statsutil.CallWithSCtx(sa.pool, func(sctx sessionctx.Context) error {
analyzed = HandleAutoAnalyze(sctx, &Opt{
StatsLease: sa.statsLease,
GetLockedTables: sa.getLockedTables,
GetTableStats: sa.getTableStats,
GetPartitionStats: sa.getPartitionStats,
SysProcTracker: sa.sysProcTracker,
AutoAnalyzeProcIDGetter: sa.autoAnalyzeProcIDGetter,
}, is)
return nil
})
return
}

func parseAutoAnalyzeRatio(ratio string) float64 {
autoAnalyzeRatio, err := strconv.ParseFloat(ratio, 64)
if err != nil {
Expand Down
16 changes: 2 additions & 14 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package handle

import (
"context"
"strconv"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -57,7 +56,7 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache util.StatsC
tbl := &statistics.Table{
HistColl: newHistColl,
Version: row.GetUint64(0),
Name: getFullTableName(is, tableInfo),
Name: util.GetFullTableName(is, tableInfo),
}
cache.Put(physicalID, tbl) // put this table again since it is updated
}
Expand All @@ -71,7 +70,7 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (util.StatsCache, error
return nil, errors.Trace(err)
}
defer terror.Call(rc.Close)
tables, err := cache.NewStatsCacheImpl()
tables, err := cache.NewStatsCacheImpl(h.pool, h.TableInfoGetter, h.Lease(), h.TableStatsFromStorage)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -544,14 +543,3 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
h.Replace(cache)
return nil
}

func getFullTableName(is infoschema.InfoSchema, tblInfo *model.TableInfo) string {
for _, schema := range is.AllSchemas() {
if t, err := is.TableByName(schema.Name, tblInfo.Name); err == nil {
if t.Meta().ID == tblInfo.ID {
return schema.Name.O + "." + tblInfo.Name.O
}
}
}
return strconv.FormatInt(tblInfo.ID, 10)
}
2 changes: 2 additions & 0 deletions pkg/statistics/handle/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/config",
"//pkg/infoschema",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
Expand All @@ -23,6 +24,7 @@ go_library(
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/syncutil",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
],
)
Expand Down
12 changes: 6 additions & 6 deletions pkg/statistics/handle/cache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func BenchmarkStatsCacheLFUCopyAndUpdate(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = true
})
cache, err := NewStatsCacheImpl()
cache, err := NewStatsCacheImplForTest()
if err != nil {
b.Fail()
}
Expand All @@ -109,7 +109,7 @@ func BenchmarkStatsCacheMapCacheCopyAndUpdate(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = false
})
cache, err := NewStatsCacheImpl()
cache, err := NewStatsCacheImplForTest()
if err != nil {
b.Fail()
}
Expand All @@ -122,7 +122,7 @@ func BenchmarkLFUCachePutGet(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = true
})
cache, err := NewStatsCacheImpl()
cache, err := NewStatsCacheImplForTest()
if err != nil {
b.Fail()
}
Expand All @@ -135,7 +135,7 @@ func BenchmarkMapCachePutGet(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = false
})
cache, err := NewStatsCacheImpl()
cache, err := NewStatsCacheImplForTest()
if err != nil {
b.Fail()
}
Expand All @@ -148,7 +148,7 @@ func BenchmarkLFUCacheGet(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = true
})
cache, err := NewStatsCacheImpl()
cache, err := NewStatsCacheImplForTest()
if err != nil {
b.Fail()
}
Expand All @@ -161,7 +161,7 @@ func BenchmarkMapCacheGet(b *testing.B) {
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = false
})
cache, err := NewStatsCacheImpl()
cache, err := NewStatsCacheImplForTest()
if err != nil {
b.Fail()
}
Expand Down
89 changes: 87 additions & 2 deletions pkg/statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,116 @@ package cache

import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache/internal/metrics"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

// StatsCacheImpl implements util.StatsCache.
type StatsCacheImpl struct {
pool util.SessionPool
tblInfo util.TableInfoGetter
atomic.Pointer[StatsCache]

tableStatsFromStorage func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error)
statsLease time.Duration
}

// NewStatsCacheImpl creates a new StatsCache.
func NewStatsCacheImpl() (util.StatsCache, error) {
func NewStatsCacheImpl(pool util.SessionPool, tblInfo util.TableInfoGetter, statsLease time.Duration,
tableStatsFromStorage func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error),
) (util.StatsCache, error) {
newCache, err := NewStatsCache()
if err != nil {
return nil, err
}
result := &StatsCacheImpl{}
result := &StatsCacheImpl{
pool: pool,
tblInfo: tblInfo,
statsLease: statsLease,
tableStatsFromStorage: tableStatsFromStorage,
}
result.Store(newCache)
return result, nil
}

// NewStatsCacheImplForTest creates a new StatsCache for test.
func NewStatsCacheImplForTest() (util.StatsCache, error) {
return NewStatsCacheImpl(nil, nil, 0, nil)
}

// Update reads stats meta from store and updates the stats map.
func (s *StatsCacheImpl) Update(is infoschema.InfoSchema) error {
lastVersion := s.MaxTableStatsVersion()
// We need this because for two tables, the smaller version may write later than the one with larger version.
// Consider the case that there are two tables A and B, their version and commit time is (A0, A1) and (B0, B1),
// and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read
// the table stats of A0 if we read stats that greater than lastVersion which is B0.
// We can read the stats if the diff between commit time and version is less than three lease.
offset := util.DurationToTS(3 * s.statsLease)
if s.MaxTableStatsVersion() >= offset {
lastVersion = lastVersion - offset
} else {
lastVersion = 0
}

var rows []chunk.Row
var err error
err = util.CallWithSCtx(s.pool, func(sctx sessionctx.Context) error {
rows, _, err = util.ExecRows(sctx, "SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %? order by version", lastVersion)
return err
})
if err != nil {
return errors.Trace(err)
}
tables := make([]*statistics.Table, 0, len(rows))
deletedTableIDs := make([]int64, 0, len(rows))
for _, row := range rows {
version := row.GetUint64(0)
physicalID := row.GetInt64(1)
modifyCount := row.GetInt64(2)
count := row.GetInt64(3)
table, ok := s.tblInfo.TableInfoByID(is, physicalID)
if !ok {
logutil.BgLogger().Debug("unknown physical ID in stats meta table, maybe it has been dropped", zap.Int64("ID", physicalID))
deletedTableIDs = append(deletedTableIDs, physicalID)
continue
}
tableInfo := table.Meta()
if oldTbl, ok := s.Get(physicalID); ok && oldTbl.Version >= version && tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
continue
}
tbl, err := s.tableStatsFromStorage(tableInfo, physicalID, false, 0)
// Error is not nil may mean that there are some ddl changes on this table, we will not update it.
if err != nil {
logutil.BgLogger().Error("error occurred when read table stats", zap.String("category", "stats"), zap.String("table", tableInfo.Name.O), zap.Error(err))
continue
}
if tbl == nil {
deletedTableIDs = append(deletedTableIDs, physicalID)
continue
}
tbl.Version = version
tbl.RealtimeCount = count
tbl.ModifyCount = modifyCount
tbl.Name = util.GetFullTableName(is, tableInfo)
tbl.TblInfoUpdateTS = tableInfo.UpdateTS
tables = append(tables, tbl)
}
s.UpdateStatsCache(tables, deletedTableIDs)
return nil
}

// Replace replaces this cache.
func (s *StatsCacheImpl) Replace(cache util.StatsCache) {
x := cache.(*StatsCacheImpl)
Expand Down
Loading

0 comments on commit 19470d5

Please sign in to comment.