Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3872,6 +3872,12 @@ message ContentionEvent {
bool is_latch = 4;
}

// AdmittedWorkDoneEvent is a message that is recorded on traces by the kvadmission
// controller after admitted KV work is executed.
message AdmittedWorkDoneEvent {
uint64 cpu_time = 1 [(gogoproto.customname) = "CPUTime"];
}

// ScanStats is a message that tracks miscellaneous statistics of all Gets,
// Scans, and ReverseScans in a single BatchResponse.
message ScanStats {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvadmission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
],
Expand Down
62 changes: 36 additions & 26 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)
Expand Down Expand Up @@ -170,7 +171,7 @@ type Controller interface {
) (Handle, error)
// AdmittedKVWorkDone is called after the admitted KV work is done
// executing.
AdmittedKVWorkDone(Handle, *StoreWriteBytes)
AdmittedKVWorkDone(context.Context, Handle, *StoreWriteBytes)
// AdmitRangefeedRequest must be called before serving rangefeed requests.
// If enabled, it returns a non-nil Pacer that's to be used within rangefeed
// catchup scans (typically CPU-intensive and affecting scheduling
Expand Down Expand Up @@ -246,7 +247,9 @@ type Handle struct {
raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta

callAdmittedWorkDoneOnKVAdmissionQ bool
cpuStart time.Duration
// Used for measuring post-admission CPU, even for cases that were not
// subject to admission control.
cpuStart time.Duration
Copy link
Contributor

@angles-n-daemons angles-n-daemons Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand better, can you spot check my understanding of this field?

There's some work from: kvBatchServer.Batch -> KV Admitted, which we want to omit from our KV work time reporting, this work is stored as cpuStart on the Handler, the time spent on the cpu by the goroutine up to the point where this field is set.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, i'm actually not too sure, but i've asked in KV [1] what the difference is between measuring cpu from the potential call sites (node.Batch , ReplicaSend , and finally this here AdmitKVWork -> AdmittedKVWorkDone)

[1] https://cockroachlabs.slack.com/archives/C0KB9Q03D/p1763667326272419

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, nice thanks for the context

}

// AnnotateCtx annotates the given context with request-scoped admission
Expand Down Expand Up @@ -291,12 +294,19 @@ func (n *controllerImpl) AdmitKVWork(
requestTenantID roachpb.TenantID,
rangeTenantID roachpb.TenantID,
ba *kvpb.BatchRequest,
) (_ Handle, retErr error) {
) (ah Handle, retErr error) {
defer func() {
if retErr == nil {
// We include the time to do other activities like intent resolution,
// since it is acceptable to charge them to the tenant.
ah.cpuStart = grunning.Time()
}
}()
if n.kvAdmissionQ == nil {
return Handle{}, nil
return ah, nil
}
admissionInfo := workInfoForBatch(n.settings, requestTenantID, rangeTenantID, ba)
ah := Handle{tenantID: admissionInfo.TenantID}
ah.tenantID = admissionInfo.TenantID
admissionEnabled := true
// Don't subject HeartbeatTxnRequest to the storeAdmissionQ. Even though
// it would bypass admission, it would consume a slot. When writes are
Expand All @@ -310,13 +320,13 @@ func (n *controllerImpl) AdmitKVWork(
if attemptFlowControl && !admissionInfo.BypassAdmission {
kvflowHandle, found := n.kvflowHandles.LookupReplicationAdmissionHandle(ba.RangeID)
if !found {
return Handle{}, nil
return ah, nil
}
var err error
admitted, err = kvflowHandle.Admit(
ctx, admissionInfo.Priority, timeutil.FromUnixNanos(admissionInfo.CreateTime))
if err != nil {
return Handle{}, err
return ah, err
} else if admitted {
// NB: It's possible for us to be waiting for available flow tokens
// for a different set of streams that the ones we'll eventually
Expand Down Expand Up @@ -346,7 +356,7 @@ func (n *controllerImpl) AdmitKVWork(
storeWorkHandle, err := storeAdmissionQ.Admit(
ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo})
if err != nil {
return Handle{}, err
return ah, err
}
admissionEnabled = storeWorkHandle.UseAdmittedWorkDone()
if admissionEnabled {
Expand Down Expand Up @@ -399,7 +409,7 @@ func (n *controllerImpl) AdmitKVWork(
ctx, admitDuration, admissionInfo,
)
if err != nil {
return Handle{}, err
return ah, err
}
ah.elasticCPUWorkHandle = elasticWorkHandle
defer func() {
Expand All @@ -412,12 +422,7 @@ func (n *controllerImpl) AdmitKVWork(
// Use the slots-based mechanism for everything else.
callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo)
if err != nil {
return Handle{}, err
}
if callAdmittedWorkDoneOnKVAdmissionQ {
// We include the time to do other activities like intent resolution,
// since it is acceptable to charge them to the tenant.
ah.cpuStart = grunning.Time()
return ah, err
}
ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ
}
Expand All @@ -426,19 +431,24 @@ func (n *controllerImpl) AdmitKVWork(
}

// AdmittedKVWorkDone implements the Controller interface.
func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) {
func (n *controllerImpl) AdmittedKVWorkDone(
ctx context.Context, ah Handle, writeBytes *StoreWriteBytes,
) {
n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.elasticCPUWorkHandle)
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
cpuTime := grunning.Time() - ah.cpuStart
if cpuTime < 0 {
// We sometimes see cpuTime to be negative. We use 1ns here, arbitrarily.
// This issue is tracked by
// https://github.com/cockroachdb/cockroach/issues/126681.
if buildutil.CrdbTestBuild {
log.KvDistribution.Warningf(context.Background(), "grunning.Time() should be non-decreasing, cpuTime=%s", cpuTime)
}
cpuTime = 1
cpuTime := grunning.Time() - ah.cpuStart
if cpuTime < 0 {
// We sometimes see cpuTime to be negative. We use 1ns here, arbitrarily.
// This issue is tracked by
// https://github.com/cockroachdb/cockroach/issues/126681.
if buildutil.CrdbTestBuild {
log.KvDistribution.Warningf(context.Background(), "grunning.Time() should be non-decreasing, cpuTime=%s", cpuTime)
}
cpuTime = 1
}
if span := tracing.SpanFromContext(ctx); span != nil {
span.RecordStructured(&kvpb.AdmittedWorkDoneEvent{CPUTime: uint64(cpuTime.Nanoseconds())})
}
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID, cpuTime)
}
if ah.storeAdmissionQ != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,7 @@ func (n *Node) batchInternal(

var writeBytes *kvadmission.StoreWriteBytes
defer func() {
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes)
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(ctx, handle, writeBytes)
writeBytes.Release()
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (p *planner) maybeLogStatementInternal(
KvTimeNanos: queryLevelStats.KVTime.Nanoseconds(),
KvGrpcCalls: queryLevelStats.KVBatchRequestsIssued,
NetworkMessages: queryLevelStats.DistSQLNetworkMessages,
CpuTimeNanos: queryLevelStats.CPUTime.Nanoseconds(),
CpuTimeNanos: queryLevelStats.SQLCPUTime.Nanoseconds(),
IndexRecommendations: indexRecs,
// TODO(mgartner): Use a slice of struct{uint64, uint64} instead of
// converting to strings.
Expand Down Expand Up @@ -506,7 +506,7 @@ func (p *planner) logTransaction(
ContentionTime: int64(txnStats.ExecStats.ContentionTime.Seconds()),
NetworkMessages: txnStats.ExecStats.DistSQLNetworkMessages,
MaxDiskUsage: txnStats.ExecStats.MaxDiskUsage,
CPUSQLNanos: txnStats.ExecStats.CPUTime.Nanoseconds(),
CPUSQLNanos: txnStats.ExecStats.SQLCPUTime.Nanoseconds(),
MVCCIteratorStats: eventpb.MVCCIteratorStats{
StepCount: txnStats.ExecStats.MvccSteps,
StepCountInternal: txnStats.ExecStats.MvccStepsInternal,
Expand Down
48 changes: 37 additions & 11 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,20 @@ func NewFlowsMetadata(flows map[base.SQLInstanceID]*execinfrapb.FlowSpec) *Flows
// given traces and flow metadata.
// NOTE: When adding fields to this struct, be sure to update Accumulate.
type QueryLevelStats struct {
DistSQLNetworkBytesSent int64
MaxMemUsage int64
MaxDiskUsage int64
KVBytesRead int64
KVPairsRead int64
KVRowsRead int64
KVBatchRequestsIssued int64
KVTime time.Duration
DistSQLNetworkBytesSent int64
MaxMemUsage int64
MaxDiskUsage int64
KVBytesRead int64
KVPairsRead int64
KVRowsRead int64
KVBatchRequestsIssued int64
// KVTime is the cumulative time spent waiting for a KV request. This includes disk IO time
// and potentially network time (if any of the keys are not local).
KVTime time.Duration
// KVCPUTime is the total amount of CPU time spent by nodes doing KV work.
KVCPUTime time.Duration
// SQLCPUTime is the total amount of CPU time spent by nodes doing SQL work.
SQLCPUTime time.Duration
MvccSteps int64
MvccStepsInternal int64
MvccSeeks int64
Expand All @@ -122,7 +128,6 @@ type QueryLevelStats struct {
LatchWaitTime time.Duration
ContentionEvents []kvpb.ContentionEvent
RUEstimate float64
CPUTime time.Duration
// SQLInstanceIDs is an ordered list of SQL instances that were involved in
// query processing.
SQLInstanceIDs []int32
Expand Down Expand Up @@ -168,6 +173,8 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.KVRowsRead += other.KVRowsRead
s.KVBatchRequestsIssued += other.KVBatchRequestsIssued
s.KVTime += other.KVTime
s.KVCPUTime += other.KVCPUTime
s.SQLCPUTime += other.SQLCPUTime
s.MvccSteps += other.MvccSteps
s.MvccStepsInternal += other.MvccStepsInternal
s.MvccSeeks += other.MvccSeeks
Expand All @@ -187,7 +194,6 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.LatchWaitTime += other.LatchWaitTime
s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...)
s.RUEstimate += other.RUEstimate
s.CPUTime += other.CPUTime
s.SQLInstanceIDs = util.CombineUnique(s.SQLInstanceIDs, other.SQLInstanceIDs)
s.KVNodeIDs = util.CombineUnique(s.KVNodeIDs, other.KVNodeIDs)
s.Regions = util.CombineUnique(s.Regions, other.Regions)
Expand Down Expand Up @@ -287,7 +293,7 @@ func (a *TraceAnalyzer) ProcessStats() {
s.LockWaitTime += stats.KV.LockWaitTime.Value()
s.LatchWaitTime += stats.KV.LatchWaitTime.Value()
s.RUEstimate += float64(stats.Exec.ConsumedRU.Value())
s.CPUTime += stats.Exec.CPUTime.Value()
s.SQLCPUTime += stats.Exec.CPUTime.Value()
}

// Process streamStats.
Expand Down Expand Up @@ -397,6 +403,25 @@ func getAllContentionEvents(trace []tracingpb.RecordedSpan) []kvpb.ContentionEve
return contentionEvents
}

// getKVCPUTime returns the sum of CPUTimes from all AdmittedWorkDoneEvents
// found in the given trace.
func getKVCPUTime(trace []tracingpb.RecordedSpan) time.Duration {
var totalCPUTimeNanos int64
var ev kvpb.AdmittedWorkDoneEvent
for i := range trace {
trace[i].Structured(func(any *pbtypes.Any, _ time.Time) {
if !pbtypes.Is(any, &ev) {
return
}
if err := pbtypes.UnmarshalAny(any, &ev); err != nil {
return
}
totalCPUTimeNanos += int64(ev.CPUTime)
})
}
return time.Duration(totalCPUTimeNanos)
}

// GetQueryLevelStats returns all the top-level stats in a QueryLevelStats
// struct. GetQueryLevelStats tries to process as many stats as possible. If
// errors occur while processing stats, GetQueryLevelStats returns the combined
Expand All @@ -417,5 +442,6 @@ func GetQueryLevelStats(
queryLevelStats.Accumulate(analyzer.GetQueryLevelStats())
}
queryLevelStats.ContentionEvents = getAllContentionEvents(trace)
queryLevelStats.KVCPUTime = getKVCPUTime(trace)
return queryLevelStats, errs
}
9 changes: 6 additions & 3 deletions pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
ContentionEvents: []kvpb.ContentionEvent{aEvent},
MaxDiskUsage: 8,
RUEstimate: 9,
CPUTime: 10 * time.Second,
SQLCPUTime: 10 * time.Second,
KVCPUTime: 12 * time.Second,
MvccSteps: 11,
MvccStepsInternal: 12,
MvccSeeks: 13,
Expand Down Expand Up @@ -283,7 +284,8 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
ContentionEvents: []kvpb.ContentionEvent{bEvent},
MaxDiskUsage: 15,
RUEstimate: 16,
CPUTime: 17 * time.Second,
SQLCPUTime: 17 * time.Second,
KVCPUTime: 19 * time.Second,
MvccSteps: 18,
MvccStepsInternal: 19,
MvccSeeks: 20,
Expand Down Expand Up @@ -318,7 +320,8 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
ContentionEvents: []kvpb.ContentionEvent{aEvent, bEvent},
MaxDiskUsage: 15,
RUEstimate: 25,
CPUTime: 27 * time.Second,
SQLCPUTime: 27 * time.Second,
KVCPUTime: 31 * time.Second,
MvccSteps: 29,
MvccStepsInternal: 31,
MvccSeeks: 33,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,8 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder(
// is only collected for vectorized plans since it is gathered by the
// vectorizedStatsCollector operator.
// TODO(drewk): lift these restrictions.
ob.AddCPUTime(queryStats.CPUTime)
ob.AddSQLCPUTime(queryStats.SQLCPUTime)
ob.AddKVCPUTime(queryStats.KVCPUTime)
}
if ih.isTenant && ih.vectorized {
// Only output RU estimate if this is a tenant. Additionally, RUs aren't
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/opt/exec/explain/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,16 +447,22 @@ func (ob *OutputBuilder) AddMaxDiskUsage(bytes int64) {
}
}

// AddCPUTime adds a top-level field for the cumulative cpu time spent by SQL
// AddSQLCPUTime adds a top-level field for the cumulative cpu time spent by SQL
// execution. If we're redacting, we leave this out to keep test outputs
// independent of platform because the grunning library isn't currently
// supported on all platforms.
func (ob *OutputBuilder) AddCPUTime(cpuTime time.Duration) {
func (ob *OutputBuilder) AddSQLCPUTime(cpuTime time.Duration) {
if !ob.flags.Deflake.HasAny(DeflakeVolatile) {
ob.AddTopLevelField("sql cpu time", string(humanizeutil.Duration(cpuTime)))
}
}

func (ob *OutputBuilder) AddKVCPUTime(cpuTime time.Duration) {
if !ob.flags.Deflake.HasAny(DeflakeVolatile) {
ob.AddTopLevelField("kv cpu time", string(humanizeutil.Duration(cpuTime)))
}
}

// AddRUEstimate adds a top-level field for the estimated number of RUs consumed
// by the query.
func (ob *OutputBuilder) AddRUEstimate(ru float64) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/sqlstats/insights/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func makeTxnInsight(value *sqlstats.RecordedTxnStats) *insightspb.Transaction {
}

var cpuSQLNanos int64
if value.ExecStats.CPUTime.Nanoseconds() >= 0 {
cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds()
if value.ExecStats.SQLCPUTime.Nanoseconds() >= 0 {
cpuSQLNanos = value.ExecStats.SQLCPUTime.Nanoseconds()
}

var errorCode string
Expand Down Expand Up @@ -71,7 +71,7 @@ func makeStmtInsight(value *sqlstats.RecordedStmtStats) *insightspb.Statement {
var cpuSQLNanos int64
if value.ExecStats != nil {
contention = &value.ExecStats.ContentionTime
cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds()
cpuSQLNanos = value.ExecStats.SQLCPUTime.Nanoseconds()
}

var errorCode string
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (s *stmtStats) recordExecStatsLocked(stats execstats.QueryLevelStats) {
s.mu.data.ExecStats.ContentionTime.Record(count, stats.ContentionTime.Seconds())
s.mu.data.ExecStats.NetworkMessages.Record(count, float64(stats.DistSQLNetworkMessages))
s.mu.data.ExecStats.MaxDiskUsage.Record(count, float64(stats.MaxDiskUsage))
s.mu.data.ExecStats.CPUSQLNanos.Record(count, float64(stats.CPUTime.Nanoseconds()))
s.mu.data.ExecStats.CPUSQLNanos.Record(count, float64(stats.SQLCPUTime.Nanoseconds()))

s.mu.data.ExecStats.MVCCIteratorStats.StepCount.Record(count, float64(stats.MvccSteps))
s.mu.data.ExecStats.MVCCIteratorStats.StepCountInternal.Record(count, float64(stats.MvccStepsInternal))
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *Container) RecordTransaction(ctx context.Context, value *sqlstats.Recor
stats.mu.data.ExecStats.ContentionTime.Record(stats.mu.data.ExecStats.Count, value.ExecStats.ContentionTime.Seconds())
stats.mu.data.ExecStats.NetworkMessages.Record(stats.mu.data.ExecStats.Count, float64(value.ExecStats.DistSQLNetworkMessages))
stats.mu.data.ExecStats.MaxDiskUsage.Record(stats.mu.data.ExecStats.Count, float64(value.ExecStats.MaxDiskUsage))
stats.mu.data.ExecStats.CPUSQLNanos.Record(stats.mu.data.ExecStats.Count, float64(value.ExecStats.CPUTime.Nanoseconds()))
stats.mu.data.ExecStats.CPUSQLNanos.Record(stats.mu.data.ExecStats.Count, float64(value.ExecStats.SQLCPUTime.Nanoseconds()))

stats.mu.data.ExecStats.MVCCIteratorStats.StepCount.Record(stats.mu.data.ExecStats.Count, float64(value.ExecStats.MvccSteps))
stats.mu.data.ExecStats.MVCCIteratorStats.StepCountInternal.Record(stats.mu.data.ExecStats.Count, float64(value.ExecStats.MvccStepsInternal))
Expand Down
Loading