Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: remove limiting process id for auto analyze #54902

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2270,7 +2270,7 @@ func (do *Domain) StatsHandle() *handle.Handle {

// CreateStatsHandle is used only for test.
func (do *Domain) CreateStatsHandle(ctx, initStatsCtx sessionctx.Context) error {
h, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.GetAutoAnalyzeProcID)
h, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.NextConnID, do.ReleaseConnID)
if err != nil {
return err
}
Expand Down Expand Up @@ -2347,7 +2347,7 @@ func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context, initStatsCtx
// It should be called only once in BootstrapSession.
func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) error {
ctx.GetSessionVars().InRestrictedSQL = true
statsHandle, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.GetAutoAnalyzeProcID)
statsHandle, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.NextConnID, do.ReleaseConnID)
if err != nil {
return err
}
Expand Down Expand Up @@ -2840,12 +2840,6 @@ func (do *Domain) ReleaseConnID(connID uint64) {
do.connIDAllocator.Release(connID)
}

// GetAutoAnalyzeProcID returns processID for auto analyze
// TODO: support IDs for concurrent auto-analyze
func (do *Domain) GetAutoAnalyzeProcID() uint64 {
return do.connIDAllocator.GetReservedConnID(reservedConnAnalyze)
}

const (
serverIDEtcdPath = "/tidb/server_id"
refreshServerIDRetryCnt = 3
Expand All @@ -2854,9 +2848,6 @@ const (
retrieveServerIDSessionTimeout = 10 * time.Second

acquire32BitsServerIDRetryCnt = 3

// reservedConnXXX must be within [0, globalconn.ReservedCount)
reservedConnAnalyze = 0
)

var (
Expand Down
8 changes: 6 additions & 2 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
failpoint.Inject("mockKillPendingAnalyzeJob", func() {
dom := domain.GetDomain(e.Ctx())
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
dom.SysProcTracker().KillSysProcess(id)
}
})
TASKLOOP:
for _, task := range tasks {
Expand Down Expand Up @@ -157,7 +159,9 @@ TASKLOOP:

failpoint.Inject("mockKillFinishedAnalyzeJob", func() {
dom := domain.GetDomain(e.Ctx())
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
dom.SysProcTracker().KillSysProcess(id)
}
})
// If we enabled dynamic prune mode, then we need to generate global stats here for partition tables.
if needGlobalStats {
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/pkg/planner/core"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/statistics"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
Expand Down Expand Up @@ -176,7 +177,9 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
for {
failpoint.Inject("mockKillRunningV1AnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
dom.SysProcTracker().KillSysProcess(id)
}
})
if err := e.ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil {
return nil, nil, nil, nil, nil, err
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -854,7 +855,9 @@ func readDataAndSendTask(ctx sessionctx.Context, handler *tableResultHandler, me
for {
failpoint.Inject("mockKillRunningV2AnalyzeJob", func() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are the tests.

dom := domain.GetDomain(ctx)
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
dom.SysProcTracker().KillSysProcess(id)
}
})
if err := ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/analyze_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/ranger"
Expand Down Expand Up @@ -199,7 +200,9 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
for {
failpoint.Inject("mockKillRunningAnalyzeIndexJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() {
dom.SysProcTracker().KillSysProcess(id)
}
})
if err := e.ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil {
return nil, nil, nil, nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ go_library(
"//pkg/sessiontxn/staleread",
"//pkg/statistics",
"//pkg/statistics/asyncload",
"//pkg/statistics/handle/util",
"//pkg/table",
"//pkg/table/tables",
"//pkg/table/temptable",
Expand Down
3 changes: 2 additions & 1 deletion pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
"github.com/pingcap/tidb/pkg/statistics"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/table/temptable"
Expand Down Expand Up @@ -3404,7 +3405,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (base.
b.visitInfo = appendDynamicVisitInfo(b.visitInfo, []string{"CONNECTION_ADMIN"}, false, err)
b.visitInfo = appendVisitInfoIsRestrictedUser(b.visitInfo, b.ctx, &auth.UserIdentity{Username: pi.User, Hostname: pi.Host}, "RESTRICTED_CONNECTION_ADMIN")
}
} else if raw.ConnectionID == domain.GetDomain(b.ctx).GetAutoAnalyzeProcID() {
} else if handleutil.GlobalAutoAnalyzeProcessList.Contains(raw.ConnectionID) {
// Only the users with SUPER or CONNECTION_ADMIN privilege can kill auto analyze.
err := plannererrors.ErrSpecificAccessDenied.GenWithStackByArgs("SUPER or CONNECTION_ADMIN")
b.visitInfo = appendDynamicVisitInfo(b.visitInfo, []string{"CONNECTION_ADMIN"}, false, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/statistics/handle",
"//pkg/statistics/handle/util",
"//pkg/store",
"//pkg/store/driver/error",
"//pkg/store/helper",
Expand Down
9 changes: 2 additions & 7 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/session/txninfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/fastrand"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -1032,11 +1033,6 @@ func (s *Server) ServerID() uint64 {
return s.dom.ServerID()
}

// GetAutoAnalyzeProcID implements SessionManager interface.
func (s *Server) GetAutoAnalyzeProcID() uint64 {
return s.dom.GetAutoAnalyzeProcID()
}

// StoreInternalSession implements SessionManager interface.
// @param addr The address of a session.session struct variable
func (s *Server) StoreInternalSession(se any) {
Expand All @@ -1058,10 +1054,9 @@ func (s *Server) GetInternalSessionStartTSList() []uint64 {
s.sessionMapMutex.Lock()
defer s.sessionMapMutex.Unlock()
tsList := make([]uint64, 0, len(s.internalSessions))
analyzeProcID := s.GetAutoAnalyzeProcID()
for se := range s.internalSessions {
if ts, processInfoID := session.GetStartTSFromSession(se); ts != 0 {
if processInfoID == analyzeProcID {
if statsutil.GlobalAutoAnalyzeProcessList.Contains(processInfoID) {
continue
}
tsList = append(tsList, ts)
Expand Down
11 changes: 10 additions & 1 deletion pkg/statistics/handle/autoanalyze/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
Expand Down Expand Up @@ -81,13 +82,21 @@ func execAnalyzeStmt(
) ([]chunk.Row, []*ast.ResultField, error) {
pruneMode := sctx.GetSessionVars().PartitionPruneMode.Load()
analyzeSnapshot := sctx.GetSessionVars().EnableAnalyzeSnapshot
autoAnalyzeTracker := statsutil.NewAutoAnalyzeTracker(sysProcTracker.Track, sysProcTracker.UnTrack)
autoAnalyzeProcID := statsHandle.AutoAnalyzeProcID()
optFuncs := []sqlexec.OptionFuncAlias{
execOptionForAnalyze[statsVer],
sqlexec.GetAnalyzeSnapshotOption(analyzeSnapshot),
sqlexec.GetPartitionPruneModeOption(pruneMode),
sqlexec.ExecOptionUseCurSession,
sqlexec.ExecOptionWithSysProcTrack(statsHandle.AutoAnalyzeProcID(), sysProcTracker.Track, sysProcTracker.UnTrack),
sqlexec.ExecOptionWithSysProcTrack(autoAnalyzeProcID, autoAnalyzeTracker.Track, autoAnalyzeTracker.UnTrack),
}
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Warn("panic in execAnalyzeStmt", zap.Any("error", r), zap.Stack("stack"))
}
statsHandle.ReleaseAutoAnalyzeProcID(autoAnalyzeProcID)
}()
return statsutil.ExecWithOpts(sctx, optFuncs, sql, params...)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func NewHandle(
pool util.SessionPool,
tracker sysproctrack.Tracker,
autoAnalyzeProcIDGetter func() uint64,
releaseAutoAnalyzeProcID func(uint64),
) (*Handle, error) {
handle := &Handle{
InitStatsDone: make(chan struct{}),
Expand All @@ -128,7 +129,7 @@ func NewHandle(
return nil, err
}
handle.Pool = util.NewPool(pool)
handle.AutoAnalyzeProcIDGenerator = util.NewGenerator(autoAnalyzeProcIDGetter)
handle.AutoAnalyzeProcIDGenerator = util.NewGenerator(autoAnalyzeProcIDGetter, releaseAutoAnalyzeProcID)
handle.LeaseGetter = util.NewLeaseGetter(lease)
handle.StatsCache = statsCache
handle.StatsHistory = history.NewStatsHistory(handle)
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/handletest/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestVersion(t *testing.T) {
tbl1, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
tableInfo1 := tbl1.Meta()
h, err := handle.NewHandle(testKit.Session(), testKit2.Session(), time.Millisecond, do.SysSessionPool(), do.SysProcTracker(), do.GetAutoAnalyzeProcID)
h, err := handle.NewHandle(testKit.Session(), testKit2.Session(), time.Millisecond, do.SysSessionPool(), do.SysProcTracker(), do.NextConnID, do.ReleaseConnID)
defer func() {
h.Close()
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/statistics/handle/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/parser/model",
"//pkg/parser/terror",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle/logutil",
"//pkg/table",
Expand All @@ -32,6 +33,7 @@ go_library(
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tiancaiamao_gp//:gp",
"@com_github_tikv_client_go_v2//oracle",
"@org_golang_x_exp//maps",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
90 changes: 87 additions & 3 deletions pkg/statistics/handle/util/auto_analyze_proc_id_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,111 @@

package util

import (
"sync"

"github.com/pingcap/tidb/pkg/sessionctx/sysproctrack"
"golang.org/x/exp/maps"
)

// AutoAnalyzeProcIDGenerator is used to generate auto analyze proc ID.
type AutoAnalyzeProcIDGenerator interface {
// AutoAnalyzeProcID generates an analyze ID.
AutoAnalyzeProcID() uint64
ReleaseAutoAnalyzeProcID(uint64)
}

var _ AutoAnalyzeProcIDGenerator = (*generator)(nil)

type generator struct {
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
// autoAnalyzeProcIDGetter is used to generate auto analyze ID.
autoAnalyzeProcIDGetter func() uint64
autoAnalyzeProcIDGetter func() uint64
releaseAutoAnalyzeProcID func(uint64)
}

// NewGenerator creates a new Generator.
func NewGenerator(autoAnalyzeProcIDGetter func() uint64) AutoAnalyzeProcIDGenerator {
func NewGenerator(autoAnalyzeProcIDGetter func() uint64, releaseAutoAnalyzeProcID func(uint64)) AutoAnalyzeProcIDGenerator {
return &generator{
autoAnalyzeProcIDGetter: autoAnalyzeProcIDGetter,
autoAnalyzeProcIDGetter: autoAnalyzeProcIDGetter,
releaseAutoAnalyzeProcID: releaseAutoAnalyzeProcID,
}
}

// AutoAnalyzeProcID implements AutoAnalyzeProcIDGenerator.
func (g *generator) AutoAnalyzeProcID() uint64 {
return g.autoAnalyzeProcIDGetter()
}

// ReleaseAutoAnalyzeProcID implements AutoAnalyzeProcIDGenerator.
func (g *generator) ReleaseAutoAnalyzeProcID(id uint64) {
g.releaseAutoAnalyzeProcID(id)
}

// GlobalAutoAnalyzeProcessList is used to track the auto analyze process.
var GlobalAutoAnalyzeProcessList = newGlobalAutoAnalyzeProcessList()

type globalAutoAnalyzeProcessList struct {
processes map[uint64]struct{}
Copy link
Member Author

@hawkingrei hawkingrei Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use global thread-safe map to record the running auto analyze task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use a sync.map

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.

so two common use cases are not the same as us.

mu sync.RWMutex
}

func newGlobalAutoAnalyzeProcessList() *globalAutoAnalyzeProcessList {
return &globalAutoAnalyzeProcessList{
processes: make(map[uint64]struct{}),
}
}

// Tracker is used to track the auto analyze process.
func (g *globalAutoAnalyzeProcessList) Tracker(id uint64) {
g.mu.Lock()
defer g.mu.Unlock()
g.processes[id] = struct{}{}
}

// Untracker is used to untrack the auto analyze process.
func (g *globalAutoAnalyzeProcessList) Untracker(id uint64) {
g.mu.Lock()
defer g.mu.Unlock()
delete(g.processes, id)
}

// AutoAnalyzeTracker is used to track the auto analyze process.
type AutoAnalyzeTracker struct {
track func(id uint64, ctx sysproctrack.TrackProc) error
untrack func(id uint64)
}

// All returns all the auto analyze process IDs.
func (g *globalAutoAnalyzeProcessList) All() []uint64 {
g.mu.RLock()
defer g.mu.RUnlock()
return maps.Keys(g.processes)
}

// Contains checks whether the auto analyze process ID is in the list.
func (g *globalAutoAnalyzeProcessList) Contains(id uint64) bool {
g.mu.RLock()
defer g.mu.RUnlock()
_, ok := g.processes[id]
return ok
}

// NewAutoAnalyzeTracker creates a new AutoAnalyzeTracker.
func NewAutoAnalyzeTracker(track func(id uint64, ctx sysproctrack.TrackProc) error, untrack func(id uint64)) *AutoAnalyzeTracker {
return &AutoAnalyzeTracker{
track: track,
untrack: untrack,
}
}

// Track is used to track the auto analyze process.
func (t *AutoAnalyzeTracker) Track(id uint64, ctx sysproctrack.TrackProc) error {
GlobalAutoAnalyzeProcessList.Tracker(id)
return t.track(id, ctx)
}

// UnTrack is used to untrack the auto analyze process.
func (t *AutoAnalyzeTracker) UnTrack(id uint64) {
GlobalAutoAnalyzeProcessList.Untracker(id)
t.untrack(id)
}
5 changes: 0 additions & 5 deletions pkg/testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,6 @@ func (msm *MockSessionManager) ServerID() uint64 {
return msm.SerID
}

// GetAutoAnalyzeProcID implement SessionManager interface.
func (msm *MockSessionManager) GetAutoAnalyzeProcID() uint64 {
return uint64(1)
}

// StoreInternalSession is to store internal session.
func (msm *MockSessionManager) StoreInternalSession(s any) {
msm.mu.Lock()
Expand Down
1 change: 1 addition & 0 deletions pkg/util/expensivequery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/metrics",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/logutil",
"@com_github_pingcap_log//:log",
Expand Down
Loading