Add CPU and scheduled time timeline to operator metrics#29016
Add CPU and scheduled time timeline to operator metrics#29016losipiuk merged 1 commit intotrinodb:masterfrom
Conversation
67c610a to
2951467
Compare
Introduces ResourceUsageTimeSeriesRecorder, a fixed-size bucketed sampler that doubles its bucket width as operator execution grows. CPU and wall time are recorded across addInput, getOutput, and finish phases, merged into a single snapshot, and surfaced as "CPU and scheduled time usage over time" in operator metrics. JMH benchmark results show small overhead. Benchmark (operatorCount) (randomBucketWidth) (randomStartTime) (recordDelayMillis) (resourceTimeSeries) (snapshotCount) Mode Cnt Score Error Units BenchmarkResourceUsageTimeSeriesRecorder.merge N/A false false N/A N/A 2 avgt 20 222.676 ± 1.748 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A false false N/A N/A 10 avgt 20 402.062 ± 6.396 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A false false N/A N/A 100 avgt 20 3616.961 ± 210.224 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A false true N/A N/A 2 avgt 20 277.844 ± 1.140 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A false true N/A N/A 10 avgt 20 1024.055 ± 5.733 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A false true N/A N/A 100 avgt 20 11280.448 ± 824.108 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A true false N/A N/A 2 avgt 20 258.718 ± 2.311 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A true false N/A N/A 10 avgt 20 748.176 ± 7.178 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A true false N/A N/A 100 avgt 20 7295.883 ± 295.682 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A true true N/A N/A 2 avgt 20 308.245 ± 3.080 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A true true N/A N/A 10 avgt 20 926.292 ± 3.346 ns/op BenchmarkResourceUsageTimeSeriesRecorder.merge N/A true true N/A N/A 100 avgt 20 9384.697 ± 162.079 ns/op BenchmarkResourceUsageTimeSeriesRecorder.operatorStatsAdd 2 N/A N/A N/A false N/A avgt 20 4879.238 ± 5.494 ns/op BenchmarkResourceUsageTimeSeriesRecorder.operatorStatsAdd 2 N/A N/A N/A true N/A avgt 20 5559.705 ± 40.382 ns/op BenchmarkResourceUsageTimeSeriesRecorder.operatorStatsAdd 10 N/A N/A N/A false N/A avgt 20 92375.376 ± 233.255 ns/op BenchmarkResourceUsageTimeSeriesRecorder.operatorStatsAdd 10 N/A N/A N/A true N/A avgt 20 94436.608 ± 422.369 ns/op BenchmarkResourceUsageTimeSeriesRecorder.operatorStatsAdd 100 N/A N/A N/A false N/A avgt 20 1416322.264 ± 9655.329 ns/op BenchmarkResourceUsageTimeSeriesRecorder.operatorStatsAdd 100 N/A N/A N/A true N/A avgt 20 1421749.822 ± 7786.944 ns/op BenchmarkResourceUsageTimeSeriesRecorder.record N/A N/A N/A 100 N/A N/A avgt 20 34.609 ± 0.020 ns/op BenchmarkResourceUsageTimeSeriesRecorder.record N/A N/A N/A 500 N/A N/A avgt 20 34.709 ± 0.048 ns/op BenchmarkResourceUsageTimeSeriesRecorder.record N/A N/A N/A 1000 N/A N/A avgt 20 34.755 ± 0.090 ns/op BenchmarkResourceUsageTimeSeriesRecorder.record N/A N/A N/A 2000 N/A N/A avgt 20 34.771 ± 0.077 ns/op BenchmarkResourceUsageTimeSeriesRecorder.record N/A N/A N/A 32000 N/A N/A avgt 20 34.678 ± 0.030 ns/op
2951467 to
3690351
Compare
| bucket = (int) ((nowNanos - startNanos) / bucketWidthNanos); | ||
| int sourceOffset = 0; | ||
| int targetOffset = 0; | ||
| if (startNanosOffset > 0) { |
There was a problem hiding this comment.
I do not get how this is not important how much did we move the startNanos. The logic is not clear to me.
There was a problem hiding this comment.
Ok - startNanosOffset will be either 0 or previous bucket width - maybe worth putting in the comment.
|
@CodeRabbit full-review |
|
The PR description image shows two stacked area charts visualizing the new metrics feature:
✅ Actions performedFull review triggered. |
📝 WalkthroughWalkthroughThis change introduces time-series recording of operator CPU and wall-time usage. A new Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
core/trino-main/src/test/java/io/trino/operator/TestResourceUsageTimeSeriesRecorder.java (1)
329-390: These two offset-merge tests currently cover the same case.
testMergeWithInitialBucketOffsetandtestMergeWithInitialBucketOffsetDoesNotDoubleCountPartialItemsuse identical inputs and identical assertions, so the second one is not adding coverage. Either collapse them or change one to exercise a different partial-bucket scenario.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/test/java/io/trino/operator/TestResourceUsageTimeSeriesRecorder.java` around lines 329 - 390, Both tests testMergeWithInitialBucketOffset and testMergeWithInitialBucketOffsetDoesNotDoubleCountPartialItems are identical; remove duplication by either deleting the second test or changing it to exercise a different partial-bucket scenario. Update the second test (testMergeWithInitialBucketOffsetDoesNotDoubleCountPartialItems) to use a different narrow snapshot start (for example start=101 or 101 with bucketWidth 1) via ResourceUsageTimeSeriesSnapshot.create and then recompute and assert the expected merged result from merge(wide, narrow) (adjust startTimeEpochSeconds(), bucketWidthSeconds(), and cpuNanosBuckets() expected values accordingly) so it covers a true partial-bucket merge case rather than duplicating the first test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@core/trino-main/src/main/java/io/trino/operator/ResourceUsageTimeSeriesRecorder.java`:
- Around line 297-311: The constructor ResourceUsageTimeSeriesSnapshot currently
allows cpuNanosBuckets and wallNanosBuckets of differing lengths which later
causes index errors; add a validation in the constructor
(ResourceUsageTimeSeriesSnapshot(...)) that verifies cpuNanosBuckets.length ==
wallNanosBuckets.length and reject mismatch (e.g., via checkArgument or similar)
with a clear message mentioning both array names and their lengths so invalid
payloads from create(...) are rejected early.
- Around line 106-118: The bucket realignment currently uses startNanos %
bucketWidthNanos (Ticker origin), causing misalignment vs wall-clock; change the
offset calculation to use the wall-clock start time: compute startEpochNanos =
startTimeEpochSeconds * 1_000_000_000L (or equivalent) and set startNanosOffset
= startEpochNanos % bucketWidthNanos, then adjust startNanos = startNanos -
startNanosOffset (and keep the existing bucket, sourceOffset/targetOffset
adjustments) so the expansion aligns to startTimeEpochSeconds rather than the
Ticker origin in ResourceUsageTimeSeriesRecorder.
In
`@core/trino-main/src/test/java/io/trino/operator/BenchmarkResourceUsageTimeSeriesRecorder.java`:
- Around line 61-100: The benchmark currently reuses the same
RecordData.recorder and TestingTicker across invocations (setup in
RecordData.setup), causing growing timeline and drifting measurements; fix by
reinitializing the recorder and ticker for each benchmark invocation — either
add a new setup method annotated `@Setup`(Level.Invocation) that recreates
ResourceUsageTimeSeriesRecorder and TestingTicker (and resets ticker state and
arrays as needed) or reset recorder/ticker at the start of the record(...)
benchmark so every invocation starts with a fresh
ResourceUsageTimeSeriesRecorder and TestingTicker.
---
Nitpick comments:
In
`@core/trino-main/src/test/java/io/trino/operator/TestResourceUsageTimeSeriesRecorder.java`:
- Around line 329-390: Both tests testMergeWithInitialBucketOffset and
testMergeWithInitialBucketOffsetDoesNotDoubleCountPartialItems are identical;
remove duplication by either deleting the second test or changing it to exercise
a different partial-bucket scenario. Update the second test
(testMergeWithInitialBucketOffsetDoesNotDoubleCountPartialItems) to use a
different narrow snapshot start (for example start=101 or 101 with bucketWidth
1) via ResourceUsageTimeSeriesSnapshot.create and then recompute and assert the
expected merged result from merge(wide, narrow) (adjust startTimeEpochSeconds(),
bucketWidthSeconds(), and cpuNanosBuckets() expected values accordingly) so it
covers a true partial-bucket merge case rather than duplicating the first test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e28dbd8b-314e-4005-a842-1c5d21069146
📒 Files selected for processing (7)
core/trino-main/src/main/java/io/trino/operator/OperationTimer.javacore/trino-main/src/main/java/io/trino/operator/OperatorContext.javacore/trino-main/src/main/java/io/trino/operator/ResourceUsageTimeSeriesRecorder.javacore/trino-main/src/test/java/io/trino/operator/BenchmarkResourceUsageTimeSeriesRecorder.javacore/trino-main/src/test/java/io/trino/operator/TestResourceUsageTimeSeriesRecorder.javacore/trino-main/src/test/java/io/trino/operator/TestWorkProcessorOperatorAdapter.javacore/trino-main/src/test/java/io/trino/operator/TestWorkProcessorSourceOperatorAdapter.java
lukasz-stec
left a comment
There was a problem hiding this comment.
Comments addressed
| bucket = (int) ((nowNanos - startNanos) / bucketWidthNanos); | ||
| int sourceOffset = 0; | ||
| int targetOffset = 0; | ||
| if (startNanosOffset > 0) { |
Description
Introduces ResourceUsageTimeSeriesRecorder, a fixed-size bucketed sampler that doubles its bucket width as operator execution grows. CPU and wall time are recorded across addInput, getOutput, and finish phases, merged into a single snapshot, and surfaced as "CPU and scheduled time usage over time" in operator metrics.
JMH benchmark results show small overhead.
The operator metric serializes to json like:
It enables visualizations like this:

Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( X) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: