Skip to content

Commit 9a8df94

Browse files
ti-chi-bothawkingrei
authored andcommitted
statistics: support global singleflight for sync load (pingcap#52796) (pingcap#53838)
close pingcap#52797
1 parent 467e4ca commit 9a8df94

File tree

4 files changed

+87
-75
lines changed

4 files changed

+87
-75
lines changed

sessionctx/stmtctx/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ go_library(
2020
"@com_github_pingcap_errors//:errors",
2121
"@com_github_tikv_client_go_v2//tikvrpc",
2222
"@com_github_tikv_client_go_v2//util",
23+
"@org_golang_x_sync//singleflight",
2324
"@org_uber_go_atomic//:atomic",
2425
"@org_uber_go_zap//:zap",
2526
],

sessionctx/stmtctx/stmtctx.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/tikv/client-go/v2/util"
4040
atomic2 "go.uber.org/atomic"
4141
"go.uber.org/zap"
42+
"golang.org/x/sync/singleflight"
4243
)
4344

4445
const (
@@ -330,7 +331,7 @@ type StatementContext struct {
330331
// NeededItems stores the columns/indices whose stats are needed for planner.
331332
NeededItems []model.TableItemID
332333
// ResultCh to receive stats loading results
333-
ResultCh chan StatsLoadResult
334+
ResultCh []<-chan singleflight.Result
334335
// LoadStartTime is to record the load start time to calculate latency
335336
LoadStartTime time.Time
336337
}

statistics/handle/handle_hist.go

+51-51
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import (
3939
// RetryCount is the max retry count for a sync load task.
4040
const RetryCount = 3
4141

42+
var globalStatsSyncLoadSingleFlight singleflight.Group
43+
4244
type statsWrapper struct {
4345
col *statistics.Column
4446
idx *statistics.Index
@@ -79,25 +81,26 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
7981
}
8082
sc.StatsLoad.Timeout = timeout
8183
sc.StatsLoad.NeededItems = remainedItems
82-
sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems))
83-
tasks := make([]*NeededItemTask, 0)
84+
sc.StatsLoad.ResultCh = make([]<-chan singleflight.Result, 0, len(remainedItems))
8485
for _, item := range remainedItems {
85-
task := &NeededItemTask{
86-
TableItemID: item,
87-
ToTimeout: time.Now().Local().Add(timeout),
88-
ResultCh: sc.StatsLoad.ResultCh,
89-
}
90-
tasks = append(tasks, task)
91-
}
92-
timer := time.NewTimer(timeout)
93-
defer timer.Stop()
94-
for _, task := range tasks {
95-
select {
96-
case h.StatsLoad.NeededItemsCh <- task:
97-
continue
98-
case <-timer.C:
99-
return errors.New("sync load stats channel is full and timeout sending task to channel")
100-
}
86+
localItem := item
87+
resultCh := globalStatsSyncLoadSingleFlight.DoChan(localItem.Key(), func() (any, error) {
88+
timer := time.NewTimer(timeout)
89+
defer timer.Stop()
90+
task := &NeededItemTask{
91+
TableItemID: localItem,
92+
ToTimeout: time.Now().Local().Add(timeout),
93+
ResultCh: make(chan stmtctx.StatsLoadResult, 1),
94+
}
95+
select {
96+
case h.StatsLoad.NeededItemsCh <- task:
97+
result := <-task.ResultCh
98+
return result, nil
99+
case <-timer.C:
100+
return nil, errors.New("sync load stats channel is full and timeout sending task to channel")
101+
}
102+
})
103+
sc.StatsLoad.ResultCh = append(sc.StatsLoad.ResultCh, resultCh)
101104
}
102105
sc.StatsLoad.LoadStartTime = time.Now()
103106
return nil
@@ -123,26 +126,34 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error {
123126
metrics.SyncLoadCounter.Inc()
124127
timer := time.NewTimer(sc.StatsLoad.Timeout)
125128
defer timer.Stop()
126-
for {
129+
for _, resultCh := range sc.StatsLoad.ResultCh {
127130
select {
128-
case result, ok := <-sc.StatsLoad.ResultCh:
129-
if ok {
130-
if result.HasError() {
131-
errorMsgs = append(errorMsgs, result.ErrorMsg())
132-
}
133-
delete(resultCheckMap, result.Item)
134-
if len(resultCheckMap) == 0 {
135-
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
136-
return nil
137-
}
138-
} else {
131+
case result, ok := <-resultCh:
132+
if !ok {
139133
return errors.New("sync load stats channel closed unexpectedly")
140134
}
135+
// this error is from statsSyncLoad.SendLoadRequests which start to task and send task into worker,
136+
// not the stats loading error
137+
if result.Err != nil {
138+
errorMsgs = append(errorMsgs, result.Err.Error())
139+
} else {
140+
val := result.Val.(stmtctx.StatsLoadResult)
141+
// this error is from the stats loading error
142+
if val.HasError() {
143+
errorMsgs = append(errorMsgs, val.ErrorMsg())
144+
}
145+
delete(resultCheckMap, val.Item)
146+
}
141147
case <-timer.C:
142148
metrics.SyncLoadTimeoutCounter.Inc()
143149
return errors.New("sync load stats timeout")
144150
}
145151
}
152+
if len(resultCheckMap) == 0 {
153+
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
154+
return nil
155+
}
156+
return nil
146157
}
147158

148159
// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache.
@@ -240,28 +251,17 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC
240251
task = lastTask
241252
}
242253
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
243-
resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) {
244-
err := h.handleOneItemTask(task, readerCtx, ctx)
245-
return nil, err
246-
})
247-
timeout := time.Until(task.ToTimeout)
248-
select {
249-
case sr := <-resultChan:
250-
// sr.Val is always nil.
251-
if sr.Err == nil {
252-
task.ResultCh <- result
253-
return nil, nil
254-
}
255-
if !isVaildForRetry(task) {
256-
result.Error = sr.Err
257-
task.ResultCh <- result
258-
return nil, nil
259-
}
260-
return task, sr.Err
261-
case <-time.After(timeout):
262-
task.ToTimeout.Add(time.Duration(h.mu.ctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
263-
return task, nil
254+
err = h.handleOneItemTask(task, readerCtx, ctx)
255+
if err == nil {
256+
task.ResultCh <- result
257+
return nil, nil
258+
}
259+
if !isVaildForRetry(task) {
260+
result.Error = err
261+
task.ResultCh <- result
262+
return nil, nil
264263
}
264+
return task, err
265265
}
266266

267267
func isVaildForRetry(task *NeededItemTask) bool {

statistics/handle/handle_hist_test.go

+33-23
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,23 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
208208
task1, err1 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
209209
require.Error(t, err1)
210210
require.NotNil(t, task1)
211-
211+
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
212+
select {
213+
case <-resultCh:
214+
t.Logf("stmtCtx1.ResultCh should not get anything")
215+
t.FailNow()
216+
default:
217+
}
218+
}
219+
for _, resultCh := range stmtCtx2.StatsLoad.ResultCh {
220+
select {
221+
case <-resultCh:
222+
t.Logf("stmtCtx1.ResultCh should not get anything")
223+
t.FailNow()
224+
default:
225+
}
226+
}
212227
select {
213-
case <-stmtCtx1.StatsLoad.ResultCh:
214-
t.Logf("stmtCtx1.ResultCh should not get anything")
215-
t.FailNow()
216-
case <-stmtCtx2.StatsLoad.ResultCh:
217-
t.Logf("stmtCtx2.ResultCh should not get anything")
218-
t.FailNow()
219228
case <-task1.ResultCh:
220229
t.Logf("task1.ResultCh should not get anything")
221230
t.FailNow()
@@ -226,17 +235,18 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
226235
task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
227236
require.NoError(t, err3)
228237
require.Nil(t, task3)
229-
230-
task, err3 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
231-
require.NoError(t, err3)
232-
require.Nil(t, task)
233-
234-
rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
235-
require.True(t, ok1)
236-
require.Equal(t, neededColumns[0], rs1.Item)
237-
rs2, ok2 := <-stmtCtx2.StatsLoad.ResultCh
238-
require.True(t, ok2)
239-
require.Equal(t, neededColumns[0], rs2.Item)
238+
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
239+
rs1, ok1 := <-resultCh
240+
require.True(t, rs1.Shared)
241+
require.True(t, ok1)
242+
require.Equal(t, neededColumns[0].ID, rs1.Val.(stmtctx.StatsLoadResult).Item.ID)
243+
}
244+
for _, resultCh := range stmtCtx2.StatsLoad.ResultCh {
245+
rs1, ok1 := <-resultCh
246+
require.True(t, rs1.Shared)
247+
require.True(t, ok1)
248+
require.Equal(t, neededColumns[0].ID, rs1.Val.(stmtctx.StatsLoadResult).Item.ID)
249+
}
240250

241251
stat = h.GetTableStats(tableInfo)
242252
hg = stat.Columns[tableInfo.Columns[2].ID].Histogram
@@ -313,11 +323,11 @@ func TestRetry(t *testing.T) {
313323
result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
314324
require.NoError(t, err1)
315325
require.Nil(t, result)
316-
select {
317-
case <-task1.ResultCh:
318-
default:
319-
t.Logf("task1.ResultCh should get nothing")
320-
t.FailNow()
326+
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
327+
rs1, ok1 := <-resultCh
328+
require.True(t, rs1.Shared)
329+
require.True(t, ok1)
330+
require.Error(t, rs1.Val.(stmtctx.StatsLoadResult).Error)
321331
}
322332
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail"))
323333
}

0 commit comments

Comments
 (0)