Skip to content

Commit 57f2021

Browse files
kv: Record a new AdmittedWorkDoneEvent
This commit creates a new kvpb.AdmittedWorkDoneEvent which the kvadmission controller uses to record total CPU time consumed by KV. This is done via span.RecordStructured. The cpu time is added to the execstats.QueryLevelStats object and reported in explain analyze. Note that CPUTime has been renamed to SQLCPUTime to be more accurate and better distinguish the various timings recorded on this object. Fixes: https://cockroachlabs.atlassian.net/browse/CRDB-55927 Release note (sql change): KV cpu time is now included in traces and EXPLAIN ANALYZE.
1 parent 680a632 commit 57f2021

File tree

13 files changed

+88
-45
lines changed

13 files changed

+88
-45
lines changed

pkg/kv/kvpb/api.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3872,6 +3872,12 @@ message ContentionEvent {
38723872
bool is_latch = 4;
38733873
}
38743874

3875+
// AdmittedWorkDoneEvent is a message that is recorded on traces by the kvadmission
3876+
// controller after admitted KV work is executed.
3877+
message AdmittedWorkDoneEvent {
3878+
uint64 cpu_time = 1 [(gogoproto.customname) = "CPUTime"];
3879+
}
3880+
38753881
// ScanStats is a message that tracks miscellaneous statistics of all Gets,
38763882
// Scans, and ReverseScans in a single BatchResponse.
38773883
message ScanStats {

pkg/kv/kvserver/kvadmission/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ go_library(
2121
"//pkg/util/log",
2222
"//pkg/util/stop",
2323
"//pkg/util/timeutil",
24+
"//pkg/util/tracing",
2425
"@com_github_cockroachdb_errors//:errors",
2526
"@com_github_cockroachdb_pebble//:pebble",
2627
],

pkg/kv/kvserver/kvadmission/kvadmission.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/util/log"
2828
"github.com/cockroachdb/cockroach/pkg/util/stop"
2929
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
30+
"github.com/cockroachdb/cockroach/pkg/util/tracing"
3031
"github.com/cockroachdb/errors"
3132
"github.com/cockroachdb/pebble"
3233
)
@@ -170,7 +171,7 @@ type Controller interface {
170171
) (Handle, error)
171172
// AdmittedKVWorkDone is called after the admitted KV work is done
172173
// executing.
173-
AdmittedKVWorkDone(Handle, *StoreWriteBytes)
174+
AdmittedKVWorkDone(context.Context, Handle, *StoreWriteBytes)
174175
// AdmitRangefeedRequest must be called before serving rangefeed requests.
175176
// If enabled, it returns a non-nil Pacer that's to be used within rangefeed
176177
// catchup scans (typically CPU-intensive and affecting scheduling
@@ -246,7 +247,9 @@ type Handle struct {
246247
raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta
247248

248249
callAdmittedWorkDoneOnKVAdmissionQ bool
249-
cpuStart time.Duration
250+
// Used for measuring post-admission CPU, even for cases that were not
251+
// subject to admission control.
252+
cpuStart time.Duration
250253
}
251254

252255
// AnnotateCtx annotates the given context with request-scoped admission
@@ -293,7 +296,7 @@ func (n *controllerImpl) AdmitKVWork(
293296
ba *kvpb.BatchRequest,
294297
) (_ Handle, retErr error) {
295298
if n.kvAdmissionQ == nil {
296-
return Handle{}, nil
299+
return Handle{cpuStart: grunning.Time()}, nil
297300
}
298301
admissionInfo := workInfoForBatch(n.settings, requestTenantID, rangeTenantID, ba)
299302
ah := Handle{tenantID: admissionInfo.TenantID}
@@ -414,31 +417,32 @@ func (n *controllerImpl) AdmitKVWork(
414417
if err != nil {
415418
return Handle{}, err
416419
}
417-
if callAdmittedWorkDoneOnKVAdmissionQ {
418-
// We include the time to do other activities like intent resolution,
419-
// since it is acceptable to charge them to the tenant.
420-
ah.cpuStart = grunning.Time()
421-
}
422420
ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ
423421
}
424422
}
423+
// We include the time to do other activities like intent resolution,
424+
// since it is acceptable to charge them to the tenant.
425+
ah.cpuStart = grunning.Time()
425426
return ah, nil
426427
}
427428

428429
// AdmittedKVWorkDone implements the Controller interface.
429-
func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) {
430+
func (n *controllerImpl) AdmittedKVWorkDone(ctx context.Context, ah Handle, writeBytes *StoreWriteBytes) {
430431
n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.elasticCPUWorkHandle)
431-
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
432-
cpuTime := grunning.Time() - ah.cpuStart
433-
if cpuTime < 0 {
434-
// We sometimes see cpuTime to be negative. We use 1ns here, arbitrarily.
435-
// This issue is tracked by
436-
// https://github.com/cockroachdb/cockroach/issues/126681.
437-
if buildutil.CrdbTestBuild {
438-
log.KvDistribution.Warningf(context.Background(), "grunning.Time() should be non-decreasing, cpuTime=%s", cpuTime)
439-
}
440-
cpuTime = 1
432+
cpuTime := grunning.Time() - ah.cpuStart
433+
if cpuTime < 0 {
434+
// We sometimes see cpuTime to be negative. We use 1ns here, arbitrarily.
435+
// This issue is tracked by
436+
// https://github.com/cockroachdb/cockroach/issues/126681.
437+
if buildutil.CrdbTestBuild {
438+
log.KvDistribution.Warningf(context.Background(), "grunning.Time() should be non-decreasing, cpuTime=%s", cpuTime)
441439
}
440+
cpuTime = 1
441+
}
442+
if span := tracing.SpanFromContext(ctx); span != nil {
443+
span.RecordStructured(&kvpb.AdmittedWorkDoneEvent{CPUTime: uint64(cpuTime.Nanoseconds())})
444+
}
445+
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
442446
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID, cpuTime)
443447
}
444448
if ah.storeAdmissionQ != nil {

pkg/server/node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1637,7 +1637,7 @@ func (n *Node) batchInternal(
16371637

16381638
var writeBytes *kvadmission.StoreWriteBytes
16391639
defer func() {
1640-
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes)
1640+
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(ctx, handle, writeBytes)
16411641
writeBytes.Release()
16421642
}()
16431643

pkg/sql/exec_log.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ func (p *planner) maybeLogStatementInternal(
404404
KvTimeNanos: queryLevelStats.KVTime.Nanoseconds(),
405405
KvGrpcCalls: queryLevelStats.KVBatchRequestsIssued,
406406
NetworkMessages: queryLevelStats.DistSQLNetworkMessages,
407-
CpuTimeNanos: queryLevelStats.CPUTime.Nanoseconds(),
407+
CpuTimeNanos: queryLevelStats.SQLCPUTime.Nanoseconds(),
408408
IndexRecommendations: indexRecs,
409409
// TODO(mgartner): Use a slice of struct{uint64, uint64} instead of
410410
// converting to strings.
@@ -506,7 +506,7 @@ func (p *planner) logTransaction(
506506
ContentionTime: int64(txnStats.ExecStats.ContentionTime.Seconds()),
507507
NetworkMessages: txnStats.ExecStats.DistSQLNetworkMessages,
508508
MaxDiskUsage: txnStats.ExecStats.MaxDiskUsage,
509-
CPUSQLNanos: txnStats.ExecStats.CPUTime.Nanoseconds(),
509+
CPUSQLNanos: txnStats.ExecStats.SQLCPUTime.Nanoseconds(),
510510
MVCCIteratorStats: eventpb.MVCCIteratorStats{
511511
StepCount: txnStats.ExecStats.MvccSteps,
512512
StepCountInternal: txnStats.ExecStats.MvccStepsInternal,

pkg/sql/execstats/traceanalyzer.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,20 @@ func NewFlowsMetadata(flows map[base.SQLInstanceID]*execinfrapb.FlowSpec) *Flows
9595
// given traces and flow metadata.
9696
// NOTE: When adding fields to this struct, be sure to update Accumulate.
9797
type QueryLevelStats struct {
98-
DistSQLNetworkBytesSent int64
99-
MaxMemUsage int64
100-
MaxDiskUsage int64
101-
KVBytesRead int64
102-
KVPairsRead int64
103-
KVRowsRead int64
104-
KVBatchRequestsIssued int64
105-
KVTime time.Duration
98+
DistSQLNetworkBytesSent int64
99+
MaxMemUsage int64
100+
MaxDiskUsage int64
101+
KVBytesRead int64
102+
KVPairsRead int64
103+
KVRowsRead int64
104+
KVBatchRequestsIssued int64
105+
// KVTime is the cumulative time spent waiting for a KV request. This includes disk IO time
106+
// and potentially network time (if any of the keys are not local).
107+
KVTime time.Duration
108+
// KVCPUTime is the total amount of CPU time spent by nodes doing KV work.
109+
KVCPUTime time.Duration
110+
// SQLCPUTime is the total amount of CPU time spent by nodes doing SQL work.
111+
SQLCPUTime time.Duration
106112
MvccSteps int64
107113
MvccStepsInternal int64
108114
MvccSeeks int64
@@ -122,7 +128,6 @@ type QueryLevelStats struct {
122128
LatchWaitTime time.Duration
123129
ContentionEvents []kvpb.ContentionEvent
124130
RUEstimate float64
125-
CPUTime time.Duration
126131
// SQLInstanceIDs is an ordered list of SQL instances that were involved in
127132
// query processing.
128133
SQLInstanceIDs []int32
@@ -187,7 +192,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
187192
s.LatchWaitTime += other.LatchWaitTime
188193
s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...)
189194
s.RUEstimate += other.RUEstimate
190-
s.CPUTime += other.CPUTime
195+
s.SQLCPUTime += other.SQLCPUTime
191196
s.SQLInstanceIDs = util.CombineUnique(s.SQLInstanceIDs, other.SQLInstanceIDs)
192197
s.KVNodeIDs = util.CombineUnique(s.KVNodeIDs, other.KVNodeIDs)
193198
s.Regions = util.CombineUnique(s.Regions, other.Regions)
@@ -287,7 +292,7 @@ func (a *TraceAnalyzer) ProcessStats() {
287292
s.LockWaitTime += stats.KV.LockWaitTime.Value()
288293
s.LatchWaitTime += stats.KV.LatchWaitTime.Value()
289294
s.RUEstimate += float64(stats.Exec.ConsumedRU.Value())
290-
s.CPUTime += stats.Exec.CPUTime.Value()
295+
s.SQLCPUTime += stats.Exec.CPUTime.Value()
291296
}
292297

293298
// Process streamStats.
@@ -397,6 +402,25 @@ func getAllContentionEvents(trace []tracingpb.RecordedSpan) []kvpb.ContentionEve
397402
return contentionEvents
398403
}
399404

405+
// getKVCPUTime returns the sum of CPUTimes from all AdmittedWorkDoneEvents
406+
// found in the given trace.
407+
func getKVCPUTime(trace []tracingpb.RecordedSpan) time.Duration {
408+
var totalCPUTimeNanos int64
409+
var ev kvpb.AdmittedWorkDoneEvent
410+
for i := range trace {
411+
trace[i].Structured(func(any *pbtypes.Any, _ time.Time) {
412+
if !pbtypes.Is(any, &ev) {
413+
return
414+
}
415+
if err := pbtypes.UnmarshalAny(any, &ev); err != nil {
416+
return
417+
}
418+
totalCPUTimeNanos += int64(ev.CPUTime)
419+
})
420+
}
421+
return time.Duration(totalCPUTimeNanos)
422+
}
423+
400424
// GetQueryLevelStats returns all the top-level stats in a QueryLevelStats
401425
// struct. GetQueryLevelStats tries to process as many stats as possible. If
402426
// errors occur while processing stats, GetQueryLevelStats returns the combined
@@ -417,5 +441,6 @@ func GetQueryLevelStats(
417441
queryLevelStats.Accumulate(analyzer.GetQueryLevelStats())
418442
}
419443
queryLevelStats.ContentionEvents = getAllContentionEvents(trace)
444+
queryLevelStats.KVCPUTime = getKVCPUTime(trace)
420445
return queryLevelStats, errs
421446
}

pkg/sql/execstats/traceanalyzer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
247247
ContentionEvents: []kvpb.ContentionEvent{aEvent},
248248
MaxDiskUsage: 8,
249249
RUEstimate: 9,
250-
CPUTime: 10 * time.Second,
250+
SQLCPUTime: 10 * time.Second,
251251
MvccSteps: 11,
252252
MvccStepsInternal: 12,
253253
MvccSeeks: 13,
@@ -283,7 +283,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
283283
ContentionEvents: []kvpb.ContentionEvent{bEvent},
284284
MaxDiskUsage: 15,
285285
RUEstimate: 16,
286-
CPUTime: 17 * time.Second,
286+
SQLCPUTime: 17 * time.Second,
287287
MvccSteps: 18,
288288
MvccStepsInternal: 19,
289289
MvccSeeks: 20,
@@ -318,7 +318,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
318318
ContentionEvents: []kvpb.ContentionEvent{aEvent, bEvent},
319319
MaxDiskUsage: 15,
320320
RUEstimate: 25,
321-
CPUTime: 27 * time.Second,
321+
SQLCPUTime: 27 * time.Second,
322322
MvccSteps: 29,
323323
MvccStepsInternal: 31,
324324
MvccSeeks: 33,

pkg/sql/instrumentation.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -905,7 +905,8 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder(
905905
// is only collected for vectorized plans since it is gathered by the
906906
// vectorizedStatsCollector operator.
907907
// TODO(drewk): lift these restrictions.
908-
ob.AddCPUTime(queryStats.CPUTime)
908+
ob.AddCPUTime(queryStats.SQLCPUTime)
909+
ob.AddKVCPUTime(queryStats.KVCPUTime)
909910
}
910911
if ih.isTenant && ih.vectorized {
911912
// Only output RU estimate if this is a tenant. Additionally, RUs aren't

pkg/sql/opt/exec/explain/output.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,12 @@ func (ob *OutputBuilder) AddCPUTime(cpuTime time.Duration) {
457457
}
458458
}
459459

460+
func (ob *OutputBuilder) AddKVCPUTime(cpuTime time.Duration) {
461+
if !ob.flags.Deflake.HasAny(DeflakeVolatile) {
462+
ob.AddTopLevelField("kv cpu time", string(humanizeutil.Duration(cpuTime)))
463+
}
464+
}
465+
460466
// AddRUEstimate adds a top-level field for the estimated number of RUs consumed
461467
// by the query.
462468
func (ob *OutputBuilder) AddRUEstimate(ru float64) {

pkg/sql/sqlstats/insights/util.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ func makeTxnInsight(value *sqlstats.RecordedTxnStats) *insightspb.Transaction {
2222
}
2323

2424
var cpuSQLNanos int64
25-
if value.ExecStats.CPUTime.Nanoseconds() >= 0 {
26-
cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds()
25+
if value.ExecStats.SQLCPUTime.Nanoseconds() >= 0 {
26+
cpuSQLNanos = value.ExecStats.SQLCPUTime.Nanoseconds()
2727
}
2828

2929
var errorCode string
@@ -71,7 +71,7 @@ func makeStmtInsight(value *sqlstats.RecordedStmtStats) *insightspb.Statement {
7171
var cpuSQLNanos int64
7272
if value.ExecStats != nil {
7373
contention = &value.ExecStats.ContentionTime
74-
cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds()
74+
cpuSQLNanos = value.ExecStats.SQLCPUTime.Nanoseconds()
7575
}
7676

7777
var errorCode string

0 commit comments

Comments
 (0)