Skip to content

Commit

Permalink
*: remove limiting process id for auto analyze (pingcap#54902)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei committed Aug 1, 2024
1 parent 6db9685 commit 4e7c007
Show file tree
Hide file tree
Showing 19 changed files with 132 additions and 38 deletions.
13 changes: 2 additions & 11 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2287,7 +2287,7 @@ func (do *Domain) StatsHandle() *handle.Handle {

// CreateStatsHandle is used only for test.
func (do *Domain) CreateStatsHandle(ctx, initStatsCtx sessionctx.Context) error {
h, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.GetAutoAnalyzeProcID)
h, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.NextConnID, do.ReleaseConnID)
if err != nil {
return err
}
Expand Down Expand Up @@ -2364,7 +2364,7 @@ func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context, initStatsCtx
// It should be called only once in BootstrapSession.
func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) error {
ctx.GetSessionVars().InRestrictedSQL = true
statsHandle, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.GetAutoAnalyzeProcID)
statsHandle, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.NextConnID, do.ReleaseConnID)
if err != nil {
return err
}
Expand Down Expand Up @@ -2857,12 +2857,6 @@ func (do *Domain) ReleaseConnID(connID uint64) {
do.connIDAllocator.Release(connID)
}

// GetAutoAnalyzeProcID returns processID for auto analyze
// TODO: support IDs for concurrent auto-analyze
func (do *Domain) GetAutoAnalyzeProcID() uint64 {
return do.connIDAllocator.GetReservedConnID(reservedConnAnalyze)
}

const (
serverIDEtcdPath = "/tidb/server_id"
refreshServerIDRetryCnt = 3
Expand All @@ -2871,9 +2865,6 @@ const (
retrieveServerIDSessionTimeout = 10 * time.Second

acquire32BitsServerIDRetryCnt = 3

// reservedConnXXX must be within [0, globalconn.ReservedCount)
reservedConnAnalyze = 0
)

var (
Expand Down
8 changes: 6 additions & 2 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
failpoint.Inject("mockKillPendingAnalyzeJob", func() {
dom := domain.GetDomain(e.Ctx())
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
dom.SysProcTracker().KillSysProcess(id)
}
})
TASKLOOP:
for _, task := range tasks {
Expand Down Expand Up @@ -157,7 +159,9 @@ TASKLOOP:

failpoint.Inject("mockKillFinishedAnalyzeJob", func() {
dom := domain.GetDomain(e.Ctx())
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
dom.SysProcTracker().KillSysProcess(id)
}
})
// If we enabled dynamic prune mode, then we need to generate global stats here for partition tables.
if needGlobalStats {
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/pkg/planner/core"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/statistics"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
Expand Down Expand Up @@ -176,7 +177,9 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
for {
failpoint.Inject("mockKillRunningV1AnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
dom.SysProcTracker().KillSysProcess(id)
}
})
if err := e.ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil {
return nil, nil, nil, nil, nil, err
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -854,7 +855,9 @@ func readDataAndSendTask(ctx sessionctx.Context, handler *tableResultHandler, me
for {
failpoint.Inject("mockKillRunningV2AnalyzeJob", func() {
dom := domain.GetDomain(ctx)
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
dom.SysProcTracker().KillSysProcess(id)
}
})
if err := ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/analyze_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/ranger"
Expand Down Expand Up @@ -199,7 +200,9 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
for {
failpoint.Inject("mockKillRunningAnalyzeIndexJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
dom.SysProcTracker().KillSysProcess(id)
}
})
if err := e.ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil {
return nil, nil, nil, nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ go_library(
"//pkg/sessiontxn/staleread",
"//pkg/statistics",
"//pkg/statistics/asyncload",
"//pkg/statistics/handle/util",
"//pkg/table",
"//pkg/table/tables",
"//pkg/table/temptable",
Expand Down
3 changes: 2 additions & 1 deletion pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
"github.com/pingcap/tidb/pkg/statistics"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/table/temptable"
Expand Down Expand Up @@ -3405,7 +3406,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (base.
b.visitInfo = appendDynamicVisitInfo(b.visitInfo, []string{"CONNECTION_ADMIN"}, false, err)
b.visitInfo = appendVisitInfoIsRestrictedUser(b.visitInfo, b.ctx, &auth.UserIdentity{Username: pi.User, Hostname: pi.Host}, "RESTRICTED_CONNECTION_ADMIN")
}
} else if raw.ConnectionID == domain.GetDomain(b.ctx).GetAutoAnalyzeProcID() {
} else if handleutil.GlobalAutoAnalyzeProcessList.Contains(raw.ConnectionID) {
// Only the users with SUPER or CONNECTION_ADMIN privilege can kill auto analyze.
err := plannererrors.ErrSpecificAccessDenied.GenWithStackByArgs("SUPER or CONNECTION_ADMIN")
b.visitInfo = appendDynamicVisitInfo(b.visitInfo, []string{"CONNECTION_ADMIN"}, false, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/statistics/handle",
"//pkg/statistics/handle/util",
"//pkg/store",
"//pkg/store/driver/error",
"//pkg/store/helper",
Expand Down
9 changes: 2 additions & 7 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/session/txninfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/fastrand"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -1032,11 +1033,6 @@ func (s *Server) ServerID() uint64 {
return s.dom.ServerID()
}

// GetAutoAnalyzeProcID implements SessionManager interface.
func (s *Server) GetAutoAnalyzeProcID() uint64 {
return s.dom.GetAutoAnalyzeProcID()
}

// StoreInternalSession implements SessionManager interface.
// @param addr The address of a session.session struct variable
func (s *Server) StoreInternalSession(se any) {
Expand All @@ -1058,10 +1054,9 @@ func (s *Server) GetInternalSessionStartTSList() []uint64 {
s.sessionMapMutex.Lock()
defer s.sessionMapMutex.Unlock()
tsList := make([]uint64, 0, len(s.internalSessions))
analyzeProcID := s.GetAutoAnalyzeProcID()
for se := range s.internalSessions {
if ts, processInfoID := session.GetStartTSFromSession(se); ts != 0 {
if processInfoID == analyzeProcID {
if statsutil.GlobalAutoAnalyzeProcessList.Contains(processInfoID) {
continue
}
tsList = append(tsList, ts)
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/autoanalyze/exec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/sqlescape",
"//pkg/util/sqlexec",
"@com_github_pingcap_errors//:errors",
Expand Down
11 changes: 10 additions & 1 deletion pkg/statistics/handle/autoanalyze/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
Expand Down Expand Up @@ -81,13 +82,21 @@ func execAnalyzeStmt(
) ([]chunk.Row, []*ast.ResultField, error) {
pruneMode := sctx.GetSessionVars().PartitionPruneMode.Load()
analyzeSnapshot := sctx.GetSessionVars().EnableAnalyzeSnapshot
autoAnalyzeTracker := statsutil.NewAutoAnalyzeTracker(sysProcTracker.Track, sysProcTracker.UnTrack)
autoAnalyzeProcID := statsHandle.AutoAnalyzeProcID()
optFuncs := []sqlexec.OptionFuncAlias{
execOptionForAnalyze[statsVer],
sqlexec.GetAnalyzeSnapshotOption(analyzeSnapshot),
sqlexec.GetPartitionPruneModeOption(pruneMode),
sqlexec.ExecOptionUseCurSession,
sqlexec.ExecOptionWithSysProcTrack(statsHandle.AutoAnalyzeProcID(), sysProcTracker.Track, sysProcTracker.UnTrack),
sqlexec.ExecOptionWithSysProcTrack(autoAnalyzeProcID, autoAnalyzeTracker.Track, autoAnalyzeTracker.UnTrack),
}
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Warn("panic in execAnalyzeStmt", zap.Any("error", r), zap.Stack("stack"))
}
statsHandle.ReleaseAutoAnalyzeProcID(autoAnalyzeProcID)
}()
return statsutil.ExecWithOpts(sctx, optFuncs, sql, params...)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func NewHandle(
pool util.SessionPool,
tracker sysproctrack.Tracker,
autoAnalyzeProcIDGetter func() uint64,
releaseAutoAnalyzeProcID func(uint64),
) (*Handle, error) {
handle := &Handle{
InitStatsDone: make(chan struct{}),
Expand All @@ -128,7 +129,7 @@ func NewHandle(
return nil, err
}
handle.Pool = util.NewPool(pool)
handle.AutoAnalyzeProcIDGenerator = util.NewGenerator(autoAnalyzeProcIDGetter)
handle.AutoAnalyzeProcIDGenerator = util.NewGenerator(autoAnalyzeProcIDGetter, releaseAutoAnalyzeProcID)
handle.LeaseGetter = util.NewLeaseGetter(lease)
handle.StatsCache = statsCache
handle.StatsHistory = history.NewStatsHistory(handle)
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/handletest/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestVersion(t *testing.T) {
tbl1, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
tableInfo1 := tbl1.Meta()
h, err := handle.NewHandle(testKit.Session(), testKit2.Session(), time.Millisecond, do.SysSessionPool(), do.SysProcTracker(), do.GetAutoAnalyzeProcID)
h, err := handle.NewHandle(testKit.Session(), testKit2.Session(), time.Millisecond, do.SysSessionPool(), do.SysProcTracker(), do.NextConnID, do.ReleaseConnID)
defer func() {
h.Close()
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/statistics/handle/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/parser/model",
"//pkg/parser/terror",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle/logutil",
"//pkg/table",
Expand All @@ -32,6 +33,7 @@ go_library(
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tiancaiamao_gp//:gp",
"@com_github_tikv_client_go_v2//oracle",
"@org_golang_x_exp//maps",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
90 changes: 87 additions & 3 deletions pkg/statistics/handle/util/auto_analyze_proc_id_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,111 @@

package util

import (
"sync"

"github.com/pingcap/tidb/pkg/sessionctx/sysproctrack"
"golang.org/x/exp/maps"
)

// AutoAnalyzeProcIDGenerator is used to generate auto analyze proc ID.
type AutoAnalyzeProcIDGenerator interface {
// AutoAnalyzeProcID generates an analyze ID.
AutoAnalyzeProcID() uint64
ReleaseAutoAnalyzeProcID(uint64)
}

var _ AutoAnalyzeProcIDGenerator = (*generator)(nil)

type generator struct {
// autoAnalyzeProcIDGetter is used to generate auto analyze ID.
autoAnalyzeProcIDGetter func() uint64
autoAnalyzeProcIDGetter func() uint64
releaseAutoAnalyzeProcID func(uint64)
}

// NewGenerator creates a new Generator.
func NewGenerator(autoAnalyzeProcIDGetter func() uint64) AutoAnalyzeProcIDGenerator {
func NewGenerator(autoAnalyzeProcIDGetter func() uint64, releaseAutoAnalyzeProcID func(uint64)) AutoAnalyzeProcIDGenerator {
return &generator{
autoAnalyzeProcIDGetter: autoAnalyzeProcIDGetter,
autoAnalyzeProcIDGetter: autoAnalyzeProcIDGetter,
releaseAutoAnalyzeProcID: releaseAutoAnalyzeProcID,
}
}

// AutoAnalyzeProcID implements AutoAnalyzeProcIDGenerator.
func (g *generator) AutoAnalyzeProcID() uint64 {
return g.autoAnalyzeProcIDGetter()
}

// ReleaseAutoAnalyzeProcID implements AutoAnalyzeProcIDGenerator.
func (g *generator) ReleaseAutoAnalyzeProcID(id uint64) {
g.releaseAutoAnalyzeProcID(id)
}

// GlobalAutoAnalyzeProcessList is used to track the auto analyze process.
var GlobalAutoAnalyzeProcessList = newGlobalAutoAnalyzeProcessList()

type globalAutoAnalyzeProcessList struct {
processes map[uint64]struct{}
mu sync.RWMutex
}

func newGlobalAutoAnalyzeProcessList() *globalAutoAnalyzeProcessList {
return &globalAutoAnalyzeProcessList{
processes: make(map[uint64]struct{}),
}
}

// Tracker is used to track the auto analyze process.
func (g *globalAutoAnalyzeProcessList) Tracker(id uint64) {
g.mu.Lock()
defer g.mu.Unlock()
g.processes[id] = struct{}{}
}

// Untracker is used to untrack the auto analyze process.
func (g *globalAutoAnalyzeProcessList) Untracker(id uint64) {
g.mu.Lock()
defer g.mu.Unlock()
delete(g.processes, id)
}

// AutoAnalyzeTracker is used to track the auto analyze process.
type AutoAnalyzeTracker struct {
track func(id uint64, ctx sysproctrack.TrackProc) error
untrack func(id uint64)
}

// All returns all the auto analyze process IDs.
func (g *globalAutoAnalyzeProcessList) All() []uint64 {
g.mu.RLock()
defer g.mu.RUnlock()
return maps.Keys(g.processes)
}

// Contains checks whether the auto analyze process ID is in the list.
func (g *globalAutoAnalyzeProcessList) Contains(id uint64) bool {
g.mu.RLock()
defer g.mu.RUnlock()
_, ok := g.processes[id]
return ok
}

// NewAutoAnalyzeTracker creates a new AutoAnalyzeTracker.
func NewAutoAnalyzeTracker(track func(id uint64, ctx sysproctrack.TrackProc) error, untrack func(id uint64)) *AutoAnalyzeTracker {
return &AutoAnalyzeTracker{
track: track,
untrack: untrack,
}
}

// Track is used to track the auto analyze process.
func (t *AutoAnalyzeTracker) Track(id uint64, ctx sysproctrack.TrackProc) error {
GlobalAutoAnalyzeProcessList.Tracker(id)
return t.track(id, ctx)
}

// UnTrack is used to untrack the auto analyze process.
func (t *AutoAnalyzeTracker) UnTrack(id uint64) {
GlobalAutoAnalyzeProcessList.Untracker(id)
t.untrack(id)
}
5 changes: 0 additions & 5 deletions pkg/testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,6 @@ func (msm *MockSessionManager) ServerID() uint64 {
return msm.SerID
}

// GetAutoAnalyzeProcID implement SessionManager interface.
func (msm *MockSessionManager) GetAutoAnalyzeProcID() uint64 {
return uint64(1)
}

// StoreInternalSession is to store internal session.
func (msm *MockSessionManager) StoreInternalSession(s any) {
msm.mu.Lock()
Expand Down
Loading

0 comments on commit 4e7c007

Please sign in to comment.