Skip to content

Commit 0019e89

Browse files
kv: Record a new AdmittedWorkDoneEvent
This commit creates a new kvpb.AdmittedWorkDoneEvent which the kvadmission controller uses to record total CPU time consumed by KV. This is done via span.RecordStructured. The cpu time is added to the execstats.QueryLevelStats object and reported in explain analyze. Note that CPUTime has been renamed to SQLCPUTime to be more accurate and better distinguish the various timings recorded on this object. Fixes: https://cockroachlabs.atlassian.net/browse/CRDB-55927 Release note (sql change): KV cpu time is now included in traces and EXPLAIN ANALYZE.
1 parent 680a632 commit 0019e89

File tree

13 files changed

+94
-45
lines changed

13 files changed

+94
-45
lines changed

pkg/kv/kvpb/api.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3872,6 +3872,12 @@ message ContentionEvent {
38723872
bool is_latch = 4;
38733873
}
38743874

3875+
// AdmittedWorkDoneEvent is a message that is recorded on traces by the kvadmission
3876+
// controller after admitted KV work is executed.
3877+
message AdmittedWorkDoneEvent {
3878+
uint64 cpu_time = 1 [(gogoproto.customname) = "CPUTime"];
3879+
}
3880+
38753881
// ScanStats is a message that tracks miscellaneous statistics of all Gets,
38763882
// Scans, and ReverseScans in a single BatchResponse.
38773883
message ScanStats {

pkg/kv/kvserver/kvadmission/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ go_library(
2121
"//pkg/util/log",
2222
"//pkg/util/stop",
2323
"//pkg/util/timeutil",
24+
"//pkg/util/tracing",
2425
"@com_github_cockroachdb_errors//:errors",
2526
"@com_github_cockroachdb_pebble//:pebble",
2627
],

pkg/kv/kvserver/kvadmission/kvadmission.go

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/util/log"
2828
"github.com/cockroachdb/cockroach/pkg/util/stop"
2929
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
30+
"github.com/cockroachdb/cockroach/pkg/util/tracing"
3031
"github.com/cockroachdb/errors"
3132
"github.com/cockroachdb/pebble"
3233
)
@@ -170,7 +171,7 @@ type Controller interface {
170171
) (Handle, error)
171172
// AdmittedKVWorkDone is called after the admitted KV work is done
172173
// executing.
173-
AdmittedKVWorkDone(Handle, *StoreWriteBytes)
174+
AdmittedKVWorkDone(context.Context, Handle, *StoreWriteBytes)
174175
// AdmitRangefeedRequest must be called before serving rangefeed requests.
175176
// If enabled, it returns a non-nil Pacer that's to be used within rangefeed
176177
// catchup scans (typically CPU-intensive and affecting scheduling
@@ -246,7 +247,9 @@ type Handle struct {
246247
raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta
247248

248249
callAdmittedWorkDoneOnKVAdmissionQ bool
249-
cpuStart time.Duration
250+
// Used for measuring post-admission CPU, even for cases that were not
251+
// subject to admission control.
252+
cpuStart time.Duration
250253
}
251254

252255
// AnnotateCtx annotates the given context with request-scoped admission
@@ -293,7 +296,7 @@ func (n *controllerImpl) AdmitKVWork(
293296
ba *kvpb.BatchRequest,
294297
) (_ Handle, retErr error) {
295298
if n.kvAdmissionQ == nil {
296-
return Handle{}, nil
299+
return Handle{cpuStart: grunning.Time()}, nil
297300
}
298301
admissionInfo := workInfoForBatch(n.settings, requestTenantID, rangeTenantID, ba)
299302
ah := Handle{tenantID: admissionInfo.TenantID}
@@ -414,31 +417,34 @@ func (n *controllerImpl) AdmitKVWork(
414417
if err != nil {
415418
return Handle{}, err
416419
}
417-
if callAdmittedWorkDoneOnKVAdmissionQ {
418-
// We include the time to do other activities like intent resolution,
419-
// since it is acceptable to charge them to the tenant.
420-
ah.cpuStart = grunning.Time()
421-
}
422420
ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ
423421
}
424422
}
423+
// We include the time to do other activities like intent resolution,
424+
// since it is acceptable to charge them to the tenant.
425+
ah.cpuStart = grunning.Time()
425426
return ah, nil
426427
}
427428

428429
// AdmittedKVWorkDone implements the Controller interface.
429-
func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) {
430+
func (n *controllerImpl) AdmittedKVWorkDone(
431+
ctx context.Context, ah Handle, writeBytes *StoreWriteBytes,
432+
) {
430433
n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.elasticCPUWorkHandle)
431-
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
432-
cpuTime := grunning.Time() - ah.cpuStart
433-
if cpuTime < 0 {
434-
// We sometimes see cpuTime to be negative. We use 1ns here, arbitrarily.
435-
// This issue is tracked by
436-
// https://github.com/cockroachdb/cockroach/issues/126681.
437-
if buildutil.CrdbTestBuild {
438-
log.KvDistribution.Warningf(context.Background(), "grunning.Time() should be non-decreasing, cpuTime=%s", cpuTime)
439-
}
440-
cpuTime = 1
434+
cpuTime := grunning.Time() - ah.cpuStart
435+
if cpuTime < 0 {
436+
// We sometimes see cpuTime to be negative. We use 1ns here, arbitrarily.
437+
// This issue is tracked by
438+
// https://github.com/cockroachdb/cockroach/issues/126681.
439+
if buildutil.CrdbTestBuild {
440+
log.KvDistribution.Warningf(context.Background(), "grunning.Time() should be non-decreasing, cpuTime=%s", cpuTime)
441441
}
442+
cpuTime = 1
443+
}
444+
if span := tracing.SpanFromContext(ctx); span != nil {
445+
span.RecordStructured(&kvpb.AdmittedWorkDoneEvent{CPUTime: uint64(cpuTime.Nanoseconds())})
446+
}
447+
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
442448
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID, cpuTime)
443449
}
444450
if ah.storeAdmissionQ != nil {

pkg/server/node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1637,7 +1637,7 @@ func (n *Node) batchInternal(
16371637

16381638
var writeBytes *kvadmission.StoreWriteBytes
16391639
defer func() {
1640-
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes)
1640+
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(ctx, handle, writeBytes)
16411641
writeBytes.Release()
16421642
}()
16431643

pkg/sql/exec_log.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ func (p *planner) maybeLogStatementInternal(
404404
KvTimeNanos: queryLevelStats.KVTime.Nanoseconds(),
405405
KvGrpcCalls: queryLevelStats.KVBatchRequestsIssued,
406406
NetworkMessages: queryLevelStats.DistSQLNetworkMessages,
407-
CpuTimeNanos: queryLevelStats.CPUTime.Nanoseconds(),
407+
CpuTimeNanos: queryLevelStats.SQLCPUTime.Nanoseconds(),
408408
IndexRecommendations: indexRecs,
409409
// TODO(mgartner): Use a slice of struct{uint64, uint64} instead of
410410
// converting to strings.
@@ -506,7 +506,7 @@ func (p *planner) logTransaction(
506506
ContentionTime: int64(txnStats.ExecStats.ContentionTime.Seconds()),
507507
NetworkMessages: txnStats.ExecStats.DistSQLNetworkMessages,
508508
MaxDiskUsage: txnStats.ExecStats.MaxDiskUsage,
509-
CPUSQLNanos: txnStats.ExecStats.CPUTime.Nanoseconds(),
509+
CPUSQLNanos: txnStats.ExecStats.SQLCPUTime.Nanoseconds(),
510510
MVCCIteratorStats: eventpb.MVCCIteratorStats{
511511
StepCount: txnStats.ExecStats.MvccSteps,
512512
StepCountInternal: txnStats.ExecStats.MvccStepsInternal,

pkg/sql/execstats/traceanalyzer.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,20 @@ func NewFlowsMetadata(flows map[base.SQLInstanceID]*execinfrapb.FlowSpec) *Flows
9595
// given traces and flow metadata.
9696
// NOTE: When adding fields to this struct, be sure to update Accumulate.
9797
type QueryLevelStats struct {
98-
DistSQLNetworkBytesSent int64
99-
MaxMemUsage int64
100-
MaxDiskUsage int64
101-
KVBytesRead int64
102-
KVPairsRead int64
103-
KVRowsRead int64
104-
KVBatchRequestsIssued int64
105-
KVTime time.Duration
98+
DistSQLNetworkBytesSent int64
99+
MaxMemUsage int64
100+
MaxDiskUsage int64
101+
KVBytesRead int64
102+
KVPairsRead int64
103+
KVRowsRead int64
104+
KVBatchRequestsIssued int64
105+
// KVTime is the cumulative time spent waiting for a KV request. This includes disk IO time
106+
// and potentially network time (if any of the keys are not local).
107+
KVTime time.Duration
108+
// KVCPUTime is the total amount of CPU time spent by nodes doing KV work.
109+
KVCPUTime time.Duration
110+
// SQLCPUTime is the total amount of CPU time spent by nodes doing SQL work.
111+
SQLCPUTime time.Duration
106112
MvccSteps int64
107113
MvccStepsInternal int64
108114
MvccSeeks int64
@@ -122,7 +128,6 @@ type QueryLevelStats struct {
122128
LatchWaitTime time.Duration
123129
ContentionEvents []kvpb.ContentionEvent
124130
RUEstimate float64
125-
CPUTime time.Duration
126131
// SQLInstanceIDs is an ordered list of SQL instances that were involved in
127132
// query processing.
128133
SQLInstanceIDs []int32
@@ -168,6 +173,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
168173
s.KVRowsRead += other.KVRowsRead
169174
s.KVBatchRequestsIssued += other.KVBatchRequestsIssued
170175
s.KVTime += other.KVTime
176+
s.KVCPUTime += other.KVCPUTime
171177
s.MvccSteps += other.MvccSteps
172178
s.MvccStepsInternal += other.MvccStepsInternal
173179
s.MvccSeeks += other.MvccSeeks
@@ -187,7 +193,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
187193
s.LatchWaitTime += other.LatchWaitTime
188194
s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...)
189195
s.RUEstimate += other.RUEstimate
190-
s.CPUTime += other.CPUTime
196+
s.SQLCPUTime += other.SQLCPUTime
191197
s.SQLInstanceIDs = util.CombineUnique(s.SQLInstanceIDs, other.SQLInstanceIDs)
192198
s.KVNodeIDs = util.CombineUnique(s.KVNodeIDs, other.KVNodeIDs)
193199
s.Regions = util.CombineUnique(s.Regions, other.Regions)
@@ -287,7 +293,7 @@ func (a *TraceAnalyzer) ProcessStats() {
287293
s.LockWaitTime += stats.KV.LockWaitTime.Value()
288294
s.LatchWaitTime += stats.KV.LatchWaitTime.Value()
289295
s.RUEstimate += float64(stats.Exec.ConsumedRU.Value())
290-
s.CPUTime += stats.Exec.CPUTime.Value()
296+
s.SQLCPUTime += stats.Exec.CPUTime.Value()
291297
}
292298

293299
// Process streamStats.
@@ -397,6 +403,25 @@ func getAllContentionEvents(trace []tracingpb.RecordedSpan) []kvpb.ContentionEve
397403
return contentionEvents
398404
}
399405

406+
// getKVCPUTime returns the sum of CPUTimes from all AdmittedWorkDoneEvents
407+
// found in the given trace.
408+
func getKVCPUTime(trace []tracingpb.RecordedSpan) time.Duration {
409+
var totalCPUTimeNanos int64
410+
var ev kvpb.AdmittedWorkDoneEvent
411+
for i := range trace {
412+
trace[i].Structured(func(any *pbtypes.Any, _ time.Time) {
413+
if !pbtypes.Is(any, &ev) {
414+
return
415+
}
416+
if err := pbtypes.UnmarshalAny(any, &ev); err != nil {
417+
return
418+
}
419+
totalCPUTimeNanos += int64(ev.CPUTime)
420+
})
421+
}
422+
return time.Duration(totalCPUTimeNanos)
423+
}
424+
400425
// GetQueryLevelStats returns all the top-level stats in a QueryLevelStats
401426
// struct. GetQueryLevelStats tries to process as many stats as possible. If
402427
// errors occur while processing stats, GetQueryLevelStats returns the combined
@@ -417,5 +442,6 @@ func GetQueryLevelStats(
417442
queryLevelStats.Accumulate(analyzer.GetQueryLevelStats())
418443
}
419444
queryLevelStats.ContentionEvents = getAllContentionEvents(trace)
445+
queryLevelStats.KVCPUTime = getKVCPUTime(trace)
420446
return queryLevelStats, errs
421447
}

pkg/sql/execstats/traceanalyzer_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
247247
ContentionEvents: []kvpb.ContentionEvent{aEvent},
248248
MaxDiskUsage: 8,
249249
RUEstimate: 9,
250-
CPUTime: 10 * time.Second,
250+
SQLCPUTime: 10 * time.Second,
251+
KVCPUTime: 12 * time.Second,
251252
MvccSteps: 11,
252253
MvccStepsInternal: 12,
253254
MvccSeeks: 13,
@@ -283,7 +284,8 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
283284
ContentionEvents: []kvpb.ContentionEvent{bEvent},
284285
MaxDiskUsage: 15,
285286
RUEstimate: 16,
286-
CPUTime: 17 * time.Second,
287+
SQLCPUTime: 17 * time.Second,
288+
KVCPUTime: 19 * time.Second,
287289
MvccSteps: 18,
288290
MvccStepsInternal: 19,
289291
MvccSeeks: 20,
@@ -318,7 +320,8 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
318320
ContentionEvents: []kvpb.ContentionEvent{aEvent, bEvent},
319321
MaxDiskUsage: 15,
320322
RUEstimate: 25,
321-
CPUTime: 27 * time.Second,
323+
SQLCPUTime: 27 * time.Second,
324+
KVCPUTime: 31 * time.Second,
322325
MvccSteps: 29,
323326
MvccStepsInternal: 31,
324327
MvccSeeks: 33,

pkg/sql/instrumentation.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -905,7 +905,8 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder(
905905
// is only collected for vectorized plans since it is gathered by the
906906
// vectorizedStatsCollector operator.
907907
// TODO(drewk): lift these restrictions.
908-
ob.AddCPUTime(queryStats.CPUTime)
908+
ob.AddCPUTime(queryStats.SQLCPUTime)
909+
ob.AddKVCPUTime(queryStats.KVCPUTime)
909910
}
910911
if ih.isTenant && ih.vectorized {
911912
// Only output RU estimate if this is a tenant. Additionally, RUs aren't

pkg/sql/opt/exec/explain/output.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,12 @@ func (ob *OutputBuilder) AddCPUTime(cpuTime time.Duration) {
457457
}
458458
}
459459

460+
func (ob *OutputBuilder) AddKVCPUTime(cpuTime time.Duration) {
461+
if !ob.flags.Deflake.HasAny(DeflakeVolatile) {
462+
ob.AddTopLevelField("kv cpu time", string(humanizeutil.Duration(cpuTime)))
463+
}
464+
}
465+
460466
// AddRUEstimate adds a top-level field for the estimated number of RUs consumed
461467
// by the query.
462468
func (ob *OutputBuilder) AddRUEstimate(ru float64) {

pkg/sql/sqlstats/insights/util.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ func makeTxnInsight(value *sqlstats.RecordedTxnStats) *insightspb.Transaction {
2222
}
2323

2424
var cpuSQLNanos int64
25-
if value.ExecStats.CPUTime.Nanoseconds() >= 0 {
26-
cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds()
25+
if value.ExecStats.SQLCPUTime.Nanoseconds() >= 0 {
26+
cpuSQLNanos = value.ExecStats.SQLCPUTime.Nanoseconds()
2727
}
2828

2929
var errorCode string
@@ -71,7 +71,7 @@ func makeStmtInsight(value *sqlstats.RecordedStmtStats) *insightspb.Statement {
7171
var cpuSQLNanos int64
7272
if value.ExecStats != nil {
7373
contention = &value.ExecStats.ContentionTime
74-
cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds()
74+
cpuSQLNanos = value.ExecStats.SQLCPUTime.Nanoseconds()
7575
}
7676

7777
var errorCode string

0 commit comments

Comments
 (0)