Skip to content

Commit

Permalink
statistics: split async load package (#53318)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored May 16, 2024
1 parent 71821b7 commit a3e6d1c
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 116 deletions.
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ go_library(
"//pkg/sessiontxn",
"//pkg/sessiontxn/staleread",
"//pkg/statistics",
"//pkg/statistics/asyncload",
"//pkg/table",
"//pkg/table/tables",
"//pkg/table/temptable",
Expand Down
4 changes: 2 additions & 2 deletions pkg/planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/asyncload"
"github.com/pingcap/tidb/pkg/util/intset"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -436,7 +436,7 @@ func CollectColumnStatsUsage(lp base.LogicalPlan, predicate, histNeeded bool) (
if histNeeded {
collector.histNeededCols[*colToTriggerLoad] = true
} else {
statistics.HistogramNeededItems.Insert(*colToTriggerLoad, true)
asyncload.AsyncLoadHistogramNeededItems.Insert(*colToTriggerLoad, true)
}
})
var (
Expand Down
3 changes: 2 additions & 1 deletion pkg/planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/asyncload"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
Expand Down Expand Up @@ -282,7 +283,7 @@ func (ds *DataSource) initStats(colGroups [][]*expression.Column) {
// If we enable lite stats init or we just found out the meta info of the column is missed, we need to register columns for async load.
_, isLoadNeeded, _ := ds.statisticTable.ColumnIsLoadNeeded(col.ID, false)
if isLoadNeeded {
statistics.HistogramNeededItems.Insert(model.TableItemID{
asyncload.AsyncLoadHistogramNeededItems.Insert(model.TableItemID{
TableID: ds.tableInfo.ID,
ID: col.ID,
IsIndex: false,
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/statistics/asyncload",
"//pkg/statistics/handle/logutil",
"//pkg/tablecodec",
"//pkg/types",
Expand Down
9 changes: 9 additions & 0 deletions pkg/statistics/asyncload/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "asyncload",
srcs = ["async_load.go"],
importpath = "github.com/pingcap/tidb/pkg/statistics/asyncload",
visibility = ["//visibility:public"],
deps = ["//pkg/parser/model"],
)
120 changes: 120 additions & 0 deletions pkg/statistics/asyncload/async_load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package asyncload

import (
"sync"

"github.com/pingcap/tidb/pkg/parser/model"
)

// AsyncLoadHistogramNeededItems stores the columns/indices whose Histograms need to be loaded from physical kv layer.
// Currently, we only load index/pk's Histogram from kv automatically. Columns' are loaded by needs.
var AsyncLoadHistogramNeededItems = newNeededStatsMap()

type neededStatsInternalMap struct {
// the bool value indicates whether is a full load or not.
items map[model.TableItemID]bool
m sync.RWMutex
}

func (n *neededStatsInternalMap) AllItems() []model.StatsLoadItem {
n.m.RLock()
keys := make([]model.StatsLoadItem, 0, len(n.items))
for key, val := range n.items {
keys = append(keys, model.StatsLoadItem{
TableItemID: key,
FullLoad: val,
})
}
n.m.RUnlock()
return keys
}

func (n *neededStatsInternalMap) Insert(col model.TableItemID, fullLoad bool) {
n.m.Lock()
cur := n.items[col]
if cur {
// If the existing one is full load. We don't need to update it.
n.m.Unlock()
return
}
n.items[col] = fullLoad
// Otherwise, we could safely update it.
n.m.Unlock()
}

func (n *neededStatsInternalMap) Delete(col model.TableItemID) {
n.m.Lock()
delete(n.items, col)
n.m.Unlock()
}

func (n *neededStatsInternalMap) Length() int {
n.m.RLock()
defer n.m.RUnlock()
return len(n.items)
}

const shardCnt = 128

type neededStatsMap struct {
items [shardCnt]neededStatsInternalMap
}

func getIdx(tbl model.TableItemID) int64 {
var id int64
if tbl.ID < 0 {
id = -tbl.ID
} else {
id = tbl.ID
}
return id % shardCnt
}

func newNeededStatsMap() *neededStatsMap {
result := neededStatsMap{}
for i := 0; i < shardCnt; i++ {
result.items[i] = neededStatsInternalMap{
items: make(map[model.TableItemID]bool),
}
}
return &result
}

func (n *neededStatsMap) AllItems() []model.StatsLoadItem {
var result []model.StatsLoadItem
for i := 0; i < shardCnt; i++ {
keys := n.items[i].AllItems()
result = append(result, keys...)
}
return result
}

func (n *neededStatsMap) Insert(col model.TableItemID, fullLoad bool) {
n.items[getIdx(col)].Insert(col, fullLoad)
}

func (n *neededStatsMap) Delete(col model.TableItemID) {
n.items[getIdx(col)].Delete(col)
}

func (n *neededStatsMap) Length() int {
var result int
for i := 0; i < shardCnt; i++ {
result += n.items[i].Length()
}
return result
}
7 changes: 2 additions & 5 deletions pkg/statistics/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/context"
"github.com/pingcap/tidb/pkg/planner/util/debugtrace"
"github.com/pingcap/tidb/pkg/statistics/asyncload"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
)
Expand Down Expand Up @@ -132,10 +133,6 @@ func (c *Column) MemoryUsage() CacheItemMemoryUsage {
return columnMemUsage
}

// HistogramNeededItems stores the columns/indices whose Histograms need to be loaded from physical kv layer.
// Currently, we only load index/pk's Histogram from kv automatically. Columns' are loaded by needs.
var HistogramNeededItems = newNeededStatsMap()

// ColumnStatsIsInvalid checks if this column is invalid.
// If this column has histogram but not loaded yet,
// then we mark it as need histogram.
Expand All @@ -161,7 +158,7 @@ func ColumnStatsIsInvalid(colStats *Column, sctx context.PlanContext, histColl *
if (colStats == nil || !colStats.IsStatsInitialized() || colStats.IsLoadNeeded()) &&
stmtctx != nil &&
!histColl.CanNotTriggerLoad {
HistogramNeededItems.Insert(model.TableItemID{
asyncload.AsyncLoadHistogramNeededItems.Insert(model.TableItemID{
TableID: histColl.PhysicalID,
ID: cid,
IsIndex: false,
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/asyncload",
"//pkg/statistics/handle/cache",
"//pkg/statistics/handle/lockstats",
"//pkg/statistics/handle/logutil",
Expand Down
21 changes: 11 additions & 10 deletions pkg/statistics/handle/storage/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/asyncload"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
Expand Down Expand Up @@ -539,7 +540,7 @@ func LoadHistogram(sctx sessionctx.Context, tableID int64, isIndex int, histID i

// LoadNeededHistograms will load histograms for those needed columns/indices.
func LoadNeededHistograms(sctx sessionctx.Context, statsCache statstypes.StatsCache, loadFMSketch bool) (err error) {
items := statistics.HistogramNeededItems.AllItems()
items := asyncload.AsyncLoadHistogramNeededItems.AllItems()
for _, item := range items {
if !item.IsIndex {
err = loadNeededColumnHistograms(sctx, statsCache, item.TableItemID, loadFMSketch, item.FullLoad)
Expand All @@ -556,12 +557,12 @@ func LoadNeededHistograms(sctx sessionctx.Context, statsCache statstypes.StatsCa

// CleanFakeItemsForShowHistInFlights cleans the invalid inserted items.
func CleanFakeItemsForShowHistInFlights(statsCache statstypes.StatsCache) int {
items := statistics.HistogramNeededItems.AllItems()
items := asyncload.AsyncLoadHistogramNeededItems.AllItems()
reallyNeeded := 0
for _, item := range items {
tbl, ok := statsCache.Get(item.TableID)
if !ok {
statistics.HistogramNeededItems.Delete(item.TableItemID)
asyncload.AsyncLoadHistogramNeededItems.Delete(item.TableItemID)
continue
}
loadNeeded := false
Expand All @@ -573,7 +574,7 @@ func CleanFakeItemsForShowHistInFlights(statsCache statstypes.StatsCache) int {
loadNeeded = loadNeeded && analyzed
}
if !loadNeeded {
statistics.HistogramNeededItems.Delete(item.TableItemID)
asyncload.AsyncLoadHistogramNeededItems.Delete(item.TableItemID)
continue
}
reallyNeeded++
Expand All @@ -589,13 +590,13 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.S
var colInfo *model.ColumnInfo
_, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(col.ID, true)
if !loadNeeded || !analyzed {
statistics.HistogramNeededItems.Delete(col)
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
return nil
}
colInfo = tbl.ColAndIdxExistenceMap.GetCol(col.ID)
hg, _, statsVer, _, err := HistMetaFromStorage(sctx, &col, colInfo)
if hg == nil || err != nil {
statistics.HistogramNeededItems.Delete(col)
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
return err
}
var (
Expand Down Expand Up @@ -649,7 +650,7 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.S
}
tbl.Columns[col.ID] = colHist
statsCache.UpdateStatsCache([]*statistics.Table{tbl}, nil)
statistics.HistogramNeededItems.Delete(col)
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
if col.IsSyncLoadFailed {
logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.",
zap.Int64("table_id", colHist.PhysicalID),
Expand All @@ -666,12 +667,12 @@ func loadNeededIndexHistograms(sctx sessionctx.Context, statsCache statstypes.St
}
_, loadNeeded := tbl.IndexIsLoadNeeded(idx.ID)
if !loadNeeded {
statistics.HistogramNeededItems.Delete(idx)
asyncload.AsyncLoadHistogramNeededItems.Delete(idx)
return nil
}
hgMeta, lastAnalyzePos, statsVer, flag, err := HistMetaFromStorage(sctx, &idx, nil)
if hgMeta == nil || err != nil {
statistics.HistogramNeededItems.Delete(idx)
asyncload.AsyncLoadHistogramNeededItems.Delete(idx)
return err
}
idxInfo := tbl.ColAndIdxExistenceMap.GetIndex(idx.ID)
Expand Down Expand Up @@ -713,7 +714,7 @@ func loadNeededIndexHistograms(sctx sessionctx.Context, statsCache statstypes.St
zap.Int64("index_id", idxHist.Info.ID),
zap.String("index_name", idxHist.Info.Name.O))
}
statistics.HistogramNeededItems.Delete(idx)
asyncload.AsyncLoadHistogramNeededItems.Delete(idx)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/storage/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestLoadStats(t *testing.T) {
idx, ok = stat.Indices[idxBID]
require.True(t, !ok || (float64(idx.CMSketch.TotalCount())+float64(idx.TopN.TotalCount())+idx.Histogram.TotalRowCount() == 0))
require.False(t, ok && idx.IsEssentialStatsLoaded())
// IsInvalid adds the index to HistogramNeededItems.
// IsInvalid adds the index to AsyncLoadHistogramNeededItems.
statistics.IndexStatsIsInvalid(testKit.Session().GetPlanCtx(), idx, &stat.HistColl, idxBID)
require.NoError(t, h.LoadNeededHistograms())
stat = h.GetTableStats(tableInfo)
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/context"
"github.com/pingcap/tidb/pkg/planner/util/debugtrace"
"github.com/pingcap/tidb/pkg/statistics/asyncload"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/twmb/murmur3"
Expand Down Expand Up @@ -144,7 +145,7 @@ func IndexStatsIsInvalid(sctx context.PlanContext, idxStats *Index, coll *HistCo
// If the given index statistics is nil or we found that the index's statistics hasn't been fully loaded, we add this index to NeededItems.
// Also, we need to check that this HistColl has its physical ID and it is permitted to trigger the stats loading.
if (idxStats == nil || !idxStats.IsFullLoad()) && !coll.CanNotTriggerLoad {
HistogramNeededItems.Insert(model.TableItemID{
asyncload.AsyncLoadHistogramNeededItems.Insert(model.TableItemID{
TableID: coll.PhysicalID,
ID: cid,
IsIndex: true,
Expand Down
Loading

0 comments on commit a3e6d1c

Please sign in to comment.