diff --git a/core/trino-main/src/main/java/io/trino/operator/OperationTimer.java b/core/trino-main/src/main/java/io/trino/operator/OperationTimer.java index fe610e97480d..f2a1573f5eda 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperationTimer.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperationTimer.java @@ -15,6 +15,7 @@ import com.google.errorprone.annotations.ThreadSafe; import io.trino.annotation.NotThreadSafe; +import jakarta.annotation.Nullable; import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; @@ -104,6 +105,18 @@ static class OperationTiming private final AtomicLong calls = new AtomicLong(); private final AtomicLong wallNanos = new AtomicLong(); private final AtomicLong cpuNanos = new AtomicLong(); + @Nullable + private final ResourceUsageTimeSeriesRecorder timeSeriesRecorder; + + OperationTiming() + { + this.timeSeriesRecorder = null; + } + + OperationTiming(ResourceUsageTimeSeriesRecorder timeSeriesRecorder) + { + this.timeSeriesRecorder = requireNonNull(timeSeriesRecorder, "timeSeriesRecorder is null"); + } long getCalls() { @@ -125,6 +138,9 @@ void record(long wallNanos, long cpuNanos) this.calls.incrementAndGet(); this.wallNanos.addAndGet(wallNanos); this.cpuNanos.addAndGet(cpuNanos); + if (timeSeriesRecorder != null) { + timeSeriesRecorder.record(wallNanos, cpuNanos); + } } @Override diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java index 18926bd39334..b59aa798db52 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java @@ -29,6 +29,7 @@ import io.trino.memory.context.LocalMemoryContext; import io.trino.memory.context.MemoryTrackingContext; import io.trino.operator.OperationTimer.OperationTiming; +import io.trino.operator.ResourceUsageTimeSeriesRecorder.ResourceUsageTimeSeriesSnapshot; import io.trino.plugin.base.metrics.TDigestHistogram; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -76,11 +77,12 @@ public class OperatorContext private final CounterStat internalNetworkInputDataSize = new CounterStat(); private final CounterStat internalNetworkPositions = new CounterStat(); - private final OperationTiming addInputTiming = new OperationTiming(); + private final ResourceUsageTimeSeriesRecorder cpuTimeSeriesRecorder = new ResourceUsageTimeSeriesRecorder(); + private final OperationTiming addInputTiming = new OperationTiming(cpuTimeSeriesRecorder); private final CounterStat inputDataSize = new CounterStat(); private final CounterStat inputPositions = new CounterStat(); - private final OperationTiming getOutputTiming = new OperationTiming(); + private final OperationTiming getOutputTiming = new OperationTiming(cpuTimeSeriesRecorder); private final CounterStat outputDataSize = new CounterStat(); private final CounterStat outputPositions = new CounterStat(); @@ -97,7 +99,7 @@ public class OperatorContext private final AtomicReference> finishedFuture = new AtomicReference<>(); private final AtomicLong blockedWallNanos = new AtomicLong(); - private final OperationTiming finishTiming = new OperationTiming(); + private final OperationTiming finishTiming = new OperationTiming(cpuTimeSeriesRecorder); private final OperatorSpillContext spillContext; private final AtomicReference> infoSupplier = new AtomicReference<>(); @@ -560,7 +562,8 @@ public OperatorStats getOperatorStats() inputPositionsCount, new Duration(addInputTiming.getCpuNanos() + getOutputTiming.getCpuNanos() + finishTiming.getCpuNanos(), NANOSECONDS).convertTo(SECONDS).getValue(), new Duration(addInputTiming.getWallNanos() + getOutputTiming.getWallNanos() + finishTiming.getWallNanos(), NANOSECONDS).convertTo(SECONDS).getValue(), - new Duration(blockedWallNanos.get(), NANOSECONDS).convertTo(SECONDS).getValue()), + new Duration(blockedWallNanos.get(), NANOSECONDS).convertTo(SECONDS).getValue(), + cpuTimeSeriesRecorder.snapshot()), connectorMetrics.get(), Metrics.EMPTY, // will be filled in when aggregating at pipeline level @@ -585,13 +588,19 @@ public OperatorStats getOperatorStats() info); } - private Metrics getOperatorMetrics(long inputPositions, double cpuTimeSeconds, double wallTimeSeconds, double blockedWallSeconds) + private Metrics getOperatorMetrics( + long inputPositions, + double cpuTimeSeconds, + double wallTimeSeconds, + double blockedWallSeconds, + ResourceUsageTimeSeriesSnapshot resourceUsageTimeSeries) { return metrics.get().mergeWith(new Metrics(ImmutableMap.of( "Input rows distribution", TDigestHistogram.fromValue(inputPositions), "CPU time distribution (s)", TDigestHistogram.fromValue(cpuTimeSeconds), "Scheduled time distribution (s)", TDigestHistogram.fromValue(wallTimeSeconds), - "Blocked time distribution (s)", TDigestHistogram.fromValue(blockedWallSeconds)))); + "Blocked time distribution (s)", TDigestHistogram.fromValue(blockedWallSeconds), + "CPU and scheduled time usage over time", resourceUsageTimeSeries))); } private static long nanosBetween(long start, long end) diff --git a/core/trino-main/src/main/java/io/trino/operator/ResourceUsageTimeSeriesRecorder.java b/core/trino-main/src/main/java/io/trino/operator/ResourceUsageTimeSeriesRecorder.java new file mode 100644 index 000000000000..5d76adf8dbf9 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/ResourceUsageTimeSeriesRecorder.java @@ -0,0 +1,395 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableList; +import io.trino.spi.metrics.Metric; + +import java.time.Clock; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.math.IntMath.isPowerOfTwo; +import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; + +/** + * Records CPU and wall time usage over the lifetime of an operator as a + * fixed-size, time-bucketed histogram. + * + *

Samples are accumulated into time buckets of equal width. The initial + * bucket width is one second. When all buckets are full, adjacent pairs of + * buckets are merged and the width is doubled, so the fixed bucket array + * always covers the entire recording period at the coarsest resolution that + * fits. + * + *

A point-in-time view of the recorded data is obtained via + * {@link #snapshot()}. Multiple snapshots produced by independent recorders + * (for example, one per operator instance) can be combined into a single + * aligned histogram with {@link #merge(ResourceUsageTimeSeriesSnapshot...)}. + * Merging finds the union time range and the coarsest bucket width across all + * inputs, then re-bins each snapshot into that common grid before summing. + * + */ +public class ResourceUsageTimeSeriesRecorder +{ + private static final long ONE_SECOND = 1_000_000_000L; + private static final int DEFAULT_BUCKET_COUNT = 32; + + private final Ticker ticker; + private final Clock clock; + private final long[] cpuNanosBuckets; + private final long[] wallNanosBuckets; + private long startTimeEpochSeconds = -1; + private long startNanos = -1; + private long bucketWidthNanos = ONE_SECOND; + private int size; + + public ResourceUsageTimeSeriesRecorder() + { + this(DEFAULT_BUCKET_COUNT, Ticker.systemTicker(), Clock.systemUTC()); + } + + @VisibleForTesting + ResourceUsageTimeSeriesRecorder(Ticker ticker) + { + this(DEFAULT_BUCKET_COUNT, ticker, Clock.systemUTC()); + } + + @VisibleForTesting + ResourceUsageTimeSeriesRecorder(int bucketCount, Ticker ticker) + { + this(bucketCount, ticker, Clock.systemUTC()); + } + + @VisibleForTesting + ResourceUsageTimeSeriesRecorder(int bucketCount, Ticker ticker, Clock clock) + { + checkArgument(bucketCount > 0, "bucketCount must be positive"); + this.ticker = requireNonNull(ticker, "ticker is null"); + this.clock = requireNonNull(clock, "clock is null"); + this.cpuNanosBuckets = new long[bucketCount]; + this.wallNanosBuckets = new long[bucketCount]; + } + + public synchronized void record(long wallNanos, long cpuNanos) + { + long nowNanos = ticker.read(); + if (startTimeEpochSeconds < 0) { + Instant now = clock.instant(); + startTimeEpochSeconds = now.getEpochSecond(); + // Set the startNanos at the beginning of the second to match startTime + startNanos = nowNanos - now.getNano(); + } + int bucket = (int) ((nowNanos - startNanos) / bucketWidthNanos); + // The bucket width expansion could be implemented without a loop, but it is unlikely + // that the loop with have more than 1 iteration and expanding by 2 is both simpler and more efficient. + while (bucket >= cpuNanosBuckets.length) { + bucketWidthNanos = bucketWidthNanos * 2; + // We need to start counting on the same "unit" to match different time series, + // so we move back start time to a multiple of bucket width + long startNanosOffset = startNanos % bucketWidthNanos; + startNanos = startNanos - startNanosOffset; + bucket = (int) ((nowNanos - startNanos) / bucketWidthNanos); + int sourceOffset = 0; + int targetOffset = 0; + if (startNanosOffset > 0) { + // Because we moved back startNanos, the new first bucket is merged virtually with an empty past. + sourceOffset = 1; + targetOffset = 1; + } + for (; sourceOffset < size; sourceOffset += 2, targetOffset++) { + cpuNanosBuckets[targetOffset] = cpuNanosBuckets[sourceOffset]; + wallNanosBuckets[targetOffset] = wallNanosBuckets[sourceOffset]; + if (sourceOffset + 1 < size) { + cpuNanosBuckets[targetOffset] += cpuNanosBuckets[sourceOffset + 1]; + wallNanosBuckets[targetOffset] += wallNanosBuckets[sourceOffset + 1]; + } + } + for (int i = targetOffset; i < cpuNanosBuckets.length; i++) { + cpuNanosBuckets[i] = 0; + wallNanosBuckets[i] = 0; + } + size = targetOffset; + } + cpuNanosBuckets[bucket] += cpuNanos; + wallNanosBuckets[bucket] += wallNanos; + size = bucket + 1; + } + + public synchronized ResourceUsageTimeSeriesSnapshot snapshot() + { + if (size == 0) { + return ResourceUsageTimeSeriesSnapshot.EMPTY; + } + int bucketWidthSeconds = toIntExact(bucketWidthNanos / ONE_SECOND); + return new ResourceUsageTimeSeriesSnapshot( + truncateTo(startTimeEpochSeconds, bucketWidthSeconds), + bucketWidthSeconds, + Arrays.copyOf(cpuNanosBuckets, size), + Arrays.copyOf(wallNanosBuckets, size)); + } + + public static ResourceUsageTimeSeriesSnapshot merge(ResourceUsageTimeSeriesSnapshot... snapshots) + { + return merge(ImmutableList.copyOf(snapshots)); + } + + public static ResourceUsageTimeSeriesSnapshot merge(List snapshots) + { + try { + // Each time-series snapshot may start and end at a different time, and have a different bucket width. + // To merge them, we first calculate the earliest start time, the latest end time, and the biggest bucket width. Those will be the parameters of the merged result. + int biggestBucketWidthSeconds = 0; + long earliestStartTimeSeconds = Long.MAX_VALUE; + long latestEndTimeSeconds = -1; + for (ResourceUsageTimeSeriesSnapshot snapshot : snapshots) { + if (snapshot.isEmpty()) { + continue; + } + if (biggestBucketWidthSeconds < snapshot.bucketWidthSeconds) { + biggestBucketWidthSeconds = snapshot.bucketWidthSeconds; + } + if (earliestStartTimeSeconds > snapshot.startTimeEpochSeconds) { + earliestStartTimeSeconds = snapshot.startTimeEpochSeconds; + } + long endTimeSeconds = snapshot.startTimeEpochSeconds + ((long) snapshot.cpuNanosBuckets.length * snapshot.bucketWidthSeconds); + if (endTimeSeconds > latestEndTimeSeconds) { + latestEndTimeSeconds = endTimeSeconds; + } + } + if (biggestBucketWidthSeconds < 1) { + return ResourceUsageTimeSeriesSnapshot.EMPTY; + } + // Start time must be a multiple of the bucket width + long adjustedStartTimeSeconds = earliestStartTimeSeconds - earliestStartTimeSeconds % biggestBucketWidthSeconds; + + // Use ceiling division: the last snapshot bucket may fall in the middle of a merged bucket, + // requiring one extra merged bucket to cover it. + int bucketCount = toIntExact((latestEndTimeSeconds - adjustedStartTimeSeconds + biggestBucketWidthSeconds - 1) / biggestBucketWidthSeconds); + + long[] finalCpuNanosBuckets = new long[bucketCount]; + long[] finalWallNanosBuckets = new long[bucketCount]; + + for (ResourceUsageTimeSeriesSnapshot snapshot : snapshots) { + if (snapshot.isEmpty()) { + continue; + } + if (snapshot.bucketWidthSeconds == biggestBucketWidthSeconds && snapshot.startTimeEpochSeconds == adjustedStartTimeSeconds) { + // Fast path for the simple case + for (int i = 0; i < snapshot.cpuNanosBuckets.length; i++) { + finalCpuNanosBuckets[i] += snapshot.cpuNanosBuckets[i]; + finalWallNanosBuckets[i] += snapshot.wallNanosBuckets[i]; + } + } + else { + // The mergeSize defines how many snapshot buckets we need to merge to the final bucket + int mergeSize = biggestBucketWidthSeconds / snapshot.bucketWidthSeconds; + int firstBucket = toIntExact((snapshot.startTimeEpochSeconds - adjustedStartTimeSeconds) / biggestBucketWidthSeconds); + // The first bucket merge size may be smaller than mergeSize when the snapshot starts in the middle of a merged bucket. + // offsetSeconds is how far into the first merged bucket the snapshot starts. + // The remaining part of that merged bucket holds (biggestBucketWidthSeconds - offsetSeconds) / snapshot.bucketWidthSeconds snapshot buckets. + // We need to clamp the firstBucketMergeSize to snapshot length in case the snapshot is shorter than the remaining capacity of the + // first merged bucket. + int offsetSeconds = toIntExact((snapshot.startTimeEpochSeconds - adjustedStartTimeSeconds) % biggestBucketWidthSeconds); + int firstBucketMergeSize = offsetSeconds > 0 ? + Math.min((biggestBucketWidthSeconds - offsetSeconds) / snapshot.bucketWidthSeconds, snapshot.cpuNanosBuckets.length) : 0; + + if (firstBucketMergeSize > 0) { + for (int i = 0; i < firstBucketMergeSize; i++) { + finalCpuNanosBuckets[firstBucket] += snapshot.cpuNanosBuckets[i]; + finalWallNanosBuckets[firstBucket] += snapshot.wallNanosBuckets[i]; + } + firstBucket += 1; + } + for (int targetIndex = firstBucket, sourceIndex = firstBucketMergeSize; sourceIndex < snapshot.cpuNanosBuckets.length; sourceIndex += mergeSize, targetIndex++) { + // The last bucket can have currentMergeSize smaller than mergeSize + int currentMergeSize = Math.min(snapshot.cpuNanosBuckets.length - sourceIndex, mergeSize); + for (int i = 0; i < currentMergeSize; i++) { + finalCpuNanosBuckets[targetIndex] += snapshot.cpuNanosBuckets[sourceIndex + i]; + finalWallNanosBuckets[targetIndex] += snapshot.wallNanosBuckets[sourceIndex + i]; + } + } + } + } + return new ResourceUsageTimeSeriesSnapshot(adjustedStartTimeSeconds, biggestBucketWidthSeconds, finalCpuNanosBuckets, finalWallNanosBuckets); + } + catch (RuntimeException e) { + throw new RuntimeException("merge failed for: " + snapshots, e); + } + } + + private static long truncateTo(long epochSeconds, int seconds) + { + if (seconds == 1) { + return epochSeconds; + } + return epochSeconds - epochSeconds % seconds; + } + + private static boolean truncatedTo(long epochSeconds, int seconds) + { + return epochSeconds % seconds == 0; + } + + /** + * An immutable, point-in-time view of a {@link ResourceUsageTimeSeriesRecorder}. + * + *

Each element {@code i} of {@link #cpuNanosBuckets()} and + * {@link #wallNanosBuckets()} represents the total nanoseconds spent in CPU + * or wall time during the half-open interval + * {@code [startTimeEpochSeconds + i * bucketWidthSeconds, + * startTimeEpochSeconds + (i+1) * bucketWidthSeconds)}. + * + *

{@code startTimeEpochSeconds} is always truncated to a multiple of + * {@code bucketWidthSeconds}, and {@code bucketWidthSeconds} is always a + * power of two, so snapshots from different recorders can be aligned and + * merged without remainder arithmetic. + * + *

Implements {@link io.trino.spi.metrics.Metric} so it can be reported + * directly as an operator metric and aggregated across tasks. + * + *

It is implemented as a class and not record to have control over construction + * and avoid unnecessary array copies + */ + public static class ResourceUsageTimeSeriesSnapshot + implements Metric + { + private final long startTimeEpochSeconds; + private final int bucketWidthSeconds; + private final long[] cpuNanosBuckets; + private final long[] wallNanosBuckets; + + public static final ResourceUsageTimeSeriesSnapshot EMPTY = new ResourceUsageTimeSeriesSnapshot(-1, 1, new long[0], new long[0]); + + @JsonCreator + public static ResourceUsageTimeSeriesSnapshot create( + @JsonProperty("startTimeEpochSeconds") long startTimeEpochSeconds, + @JsonProperty("bucketWidthSeconds") int bucketWidthSeconds, + @JsonProperty("cpuNanosBuckets") long[] cpuNanosBuckets, + @JsonProperty("wallNanosBuckets") long[] wallNanosBuckets) + { + return new ResourceUsageTimeSeriesSnapshot( + startTimeEpochSeconds, + bucketWidthSeconds, + cpuNanosBuckets.clone(), + wallNanosBuckets.clone()); + } + + private ResourceUsageTimeSeriesSnapshot( + long startTimeEpochSeconds, + int bucketWidthSeconds, + long[] cpuNanosBuckets, + long[] wallNanosBuckets) + { + checkArgument(bucketWidthSeconds >= 1, "bucketWidthSeconds must be >= 1"); + checkArgument(isPowerOfTwo(bucketWidthSeconds), "bucketWidthSeconds must be a power of 2"); + checkArgument(truncatedTo(startTimeEpochSeconds, bucketWidthSeconds), "startTime must be truncated to bucket width (%ss) but was %s" + .formatted(bucketWidthSeconds, startTimeEpochSeconds)); + this.startTimeEpochSeconds = startTimeEpochSeconds; + this.bucketWidthSeconds = bucketWidthSeconds; + this.cpuNanosBuckets = requireNonNull(cpuNanosBuckets, "cpuNanosBuckets is null"); + this.wallNanosBuckets = requireNonNull(wallNanosBuckets, "wallNanosBuckets is null"); + } + + @JsonProperty + public long startTimeEpochSeconds() + { + return startTimeEpochSeconds; + } + + @JsonProperty + public int bucketWidthSeconds() + { + return bucketWidthSeconds; + } + + @JsonProperty + public long[] cpuNanosBuckets() + { + return cpuNanosBuckets.clone(); + } + + @JsonProperty + public long[] wallNanosBuckets() + { + return wallNanosBuckets.clone(); + } + + public boolean isEmpty() + { + return cpuNanosBuckets.length == 0; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) { + return false; + } + ResourceUsageTimeSeriesSnapshot that = (ResourceUsageTimeSeriesSnapshot) o; + return bucketWidthSeconds == that.bucketWidthSeconds && startTimeEpochSeconds == that.startTimeEpochSeconds + && Arrays.equals(cpuNanosBuckets, that.cpuNanosBuckets) && Arrays.equals(wallNanosBuckets, that.wallNanosBuckets); + } + + @Override + public int hashCode() + { + return Objects.hash(startTimeEpochSeconds, bucketWidthSeconds, Arrays.hashCode(cpuNanosBuckets), Arrays.hashCode(wallNanosBuckets)); + } + + @Override + public ResourceUsageTimeSeriesSnapshot mergeWith(ResourceUsageTimeSeriesSnapshot other) + { + if (!this.isEmpty() && !other.isEmpty()) { + return merge(ImmutableList.of(this, other)); + } + if (this.isEmpty()) { + return other; + } + return this; + } + + @Override + public ResourceUsageTimeSeriesSnapshot mergeWith(List others) + { + if (!this.isEmpty()) { + return merge(ImmutableList.builderWithExpectedSize(others.size() + 1) + .add(this) + .addAll(others) + .build()); + } + + return merge(others); + } + + @Override + public String toString() + { + return "ResourceUsageTimeSeriesSnapshot{" + + "startTimeEpochSeconds=" + startTimeEpochSeconds + + ", bucketWidthSeconds=" + bucketWidthSeconds + + ", cpuNanosBuckets=" + Arrays.toString(cpuNanosBuckets) + + ", wallNanosBuckets=" + Arrays.toString(wallNanosBuckets) + + '}'; + } + } +} diff --git a/core/trino-main/src/test/java/io/trino/operator/BenchmarkResourceUsageTimeSeriesRecorder.java b/core/trino-main/src/test/java/io/trino/operator/BenchmarkResourceUsageTimeSeriesRecorder.java new file mode 100644 index 000000000000..7e7a47b085be --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/operator/BenchmarkResourceUsageTimeSeriesRecorder.java @@ -0,0 +1,261 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.stats.TDigest; +import io.airlift.testing.TestingTicker; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.trino.operator.ResourceUsageTimeSeriesRecorder.ResourceUsageTimeSeriesSnapshot; +import io.trino.plugin.base.metrics.TDigestHistogram; +import io.trino.spi.metrics.Metric; +import io.trino.spi.metrics.Metrics; +import io.trino.sql.planner.plan.PlanNodeId; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.RunnerException; + +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static io.trino.jmh.Benchmarks.benchmark; + +@SuppressWarnings("MethodMayBeStatic") +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(2) +@Warmup(iterations = 5, time = 500, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS) +@BenchmarkMode(Mode.AverageTime) +public class BenchmarkResourceUsageTimeSeriesRecorder +{ + private static final int OPERATIONS = 1_000; + + @State(Scope.Thread) + public static class RecordData + { + @Param({"100", "500", "1000", "2000", "32000"}) + private int recordDelayMillis = 1000; + + private ResourceUsageTimeSeriesRecorder recorder; + private TestingTicker ticker; + private int[] tickerDelay; + private long[] cpuTime; + private long[] wallTime; + + @Setup + public void setup() + { + ticker = new TestingTicker(); + ticker.increment(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + recorder = new ResourceUsageTimeSeriesRecorder(ticker); + tickerDelay = new int[OPERATIONS]; + cpuTime = new long[OPERATIONS]; + wallTime = new long[OPERATIONS]; + Random random = ThreadLocalRandom.current(); + for (int i = 0; i < OPERATIONS; i++) { + tickerDelay[i] = random.nextInt(recordDelayMillis); + cpuTime[i] = random.nextLong(recordDelayMillis) * 1000_000L; + wallTime[i] = random.nextLong(recordDelayMillis) * 1000_000L; + } + } + } + + @Benchmark + @OperationsPerInvocation(OPERATIONS) + public ResourceUsageTimeSeriesRecorder record(RecordData data) + { + ResourceUsageTimeSeriesRecorder recorder = data.recorder; + for (int i = 0; i < OPERATIONS; i++) { + data.ticker.increment(data.tickerDelay[i], TimeUnit.MILLISECONDS); + recorder.record(data.wallTime[i], data.cpuTime[i]); + } + return recorder; + } + + @State(Scope.Thread) + public static class MergeData + { + @Param({"2", "10", "100"}) + private int snapshotCount = 2; + + @Param({"false", "true"}) + private boolean randomBucketWidth; + + @Param({"false", "true"}) + private boolean randomStartTime; + + private List> snapshots; + + @Setup + public void setup() + { + ImmutableList.Builder> snapshots = ImmutableList.builder(); + Random random = ThreadLocalRandom.current(); + for (int i = 0; i < OPERATIONS; i++) { + ImmutableList.Builder batch = ImmutableList.builder(); + for (int j = 0; j < snapshotCount; j++) { + batch.add(randomSnapshot(random, randomBucketWidth, randomStartTime)); + } + snapshots.add(batch.build()); + } + + this.snapshots = snapshots.build(); + } + } + + private static ResourceUsageTimeSeriesSnapshot randomSnapshot(Random random, boolean randomBucketWidth, boolean randomStartTime) + { + long[] cpuNanosBuckets = new long[32]; + long[] wallNanosBuckets = new long[32]; + for (int i = 0; i < 32; i++) { + cpuNanosBuckets[i] = random.nextLong(1_000_000_000); + wallNanosBuckets[i] = random.nextLong(1_000_000_000); + } + int bucketWidthSeconds = randomBucketWidth ? Math.powExact(2, random.nextInt(3)) : 1; + long startTimeEpochSeconds = randomStartTime ? random.nextLong(4) * bucketWidthSeconds : 0; + return ResourceUsageTimeSeriesSnapshot.create(startTimeEpochSeconds, bucketWidthSeconds, cpuNanosBuckets, wallNanosBuckets); + } + + @Benchmark + @OperationsPerInvocation(OPERATIONS) + public void merge(MergeData data, Blackhole blackhole) + { + for (List batch : data.snapshots) { + blackhole.consume(ResourceUsageTimeSeriesRecorder.merge(batch)); + } + } + + @State(Scope.Thread) + public static class OperatorStatsAddData + { + @Param({"2", "10", "100"}) + private int operatorCount = 10; + + @Param({"false", "true"}) + private boolean resourceTimeSeries; + + private List baseStats; + private List> otherStats; + + @Setup + public void setup() + { + Random random = ThreadLocalRandom.current(); + ImmutableList.Builder baseStats = ImmutableList.builder(); + ImmutableList.Builder> otherStats = ImmutableList.builder(); + for (int i = 0; i < OPERATIONS; i++) { + baseStats.add(createOperatorStats(random)); + ImmutableList.Builder stats = ImmutableList.builder(); + for (int j = 1; j < operatorCount; j++) { + stats.add(createOperatorStats(random)); + } + otherStats.add(stats.build()); + } + this.baseStats = baseStats.build(); + this.otherStats = otherStats.build(); + } + + private OperatorStats createOperatorStats(Random random) + { + ImmutableMap.Builder> metricsBuilder = ImmutableMap.>builder().putAll(ImmutableMap.of( + "Input rows distribution", new TDigestHistogram(radomTDigest(random)), + "CPU time distribution (s)", new TDigestHistogram(radomTDigest(random)), + "Scheduled time distribution (s)", new TDigestHistogram(radomTDigest(random)), + "Blocked time distribution (s)", new TDigestHistogram(radomTDigest(random)))); + if (resourceTimeSeries) { + metricsBuilder.put("CPU and scheduled time usage over time", randomSnapshot(random, true, true)); + } + Metrics metrics = new Metrics(metricsBuilder.buildOrThrow()); + return new OperatorStats( + 0, 0, 0, + new PlanNodeId("test"), + Optional.empty(), + "test", + 1, + 0, Duration.ZERO, Duration.ZERO, + DataSize.ofBytes(0), 0, Duration.ZERO, + DataSize.ofBytes(0), 0, + DataSize.ofBytes(0), 0, 0d, + 0, Duration.ZERO, Duration.ZERO, + DataSize.ofBytes(0), 0, + 0, metrics, Metrics.EMPTY, Metrics.EMPTY, + DataSize.ofBytes(0), + Duration.ZERO, + 0, Duration.ZERO, Duration.ZERO, + DataSize.ofBytes(0), DataSize.ofBytes(0), + DataSize.ofBytes(0), DataSize.ofBytes(0), DataSize.ofBytes(0), + DataSize.ofBytes(0), + Optional.empty(), + null); + } + + private static TDigest radomTDigest(Random random) + { + TDigest tdigest = new TDigest(); + for (int i = 0; i < 20; i++) { + tdigest.add(random.nextLong(1_000_000_000)); + } + return tdigest; + } + } + + @Benchmark + @OperationsPerInvocation(OPERATIONS) + public void operatorStatsAdd(OperatorStatsAddData data, Blackhole blackhole) + { + for (int i = 0; i < OPERATIONS; i++) { + blackhole.consume(data.baseStats.get(i).add(data.otherStats.get(i))); + } + } + + static void main() + throws RunnerException + { + // assure the benchmarks are valid before running + RecordData recordData = new RecordData(); + recordData.setup(); + BenchmarkResourceUsageTimeSeriesRecorder benchmarkInstance = new BenchmarkResourceUsageTimeSeriesRecorder(); + benchmarkInstance.record(recordData); + + MergeData mergeData = new MergeData(); + mergeData.setup(); + Blackhole blackhole = new Blackhole("Today's password is swordfish. I understand instantiating Blackholes directly is dangerous."); + benchmarkInstance.merge(mergeData, blackhole); + + OperatorStatsAddData operatorStatsAddData = new OperatorStatsAddData(); + operatorStatsAddData.setup(); + benchmarkInstance.operatorStatsAdd(operatorStatsAddData, blackhole); + + benchmark(BenchmarkResourceUsageTimeSeriesRecorder.class) + .withOptions(options -> options.jvmArgs("-Xmx4g") + ).run(); + } +} diff --git a/core/trino-main/src/test/java/io/trino/operator/TestResourceUsageTimeSeriesRecorder.java b/core/trino-main/src/test/java/io/trino/operator/TestResourceUsageTimeSeriesRecorder.java new file mode 100644 index 000000000000..1d68c1f6660b --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/operator/TestResourceUsageTimeSeriesRecorder.java @@ -0,0 +1,459 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator; + +import io.airlift.testing.TestingTicker; +import io.trino.operator.ResourceUsageTimeSeriesRecorder.ResourceUsageTimeSeriesSnapshot; +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; + +import static io.trino.operator.ResourceUsageTimeSeriesRecorder.ResourceUsageTimeSeriesSnapshot.EMPTY; +import static io.trino.operator.ResourceUsageTimeSeriesRecorder.merge; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestResourceUsageTimeSeriesRecorder +{ + @Test + public void testBasicRecording() + { + TestingTicker ticker = new TestingTicker(); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(ticker); + + recorder.record(100, 50); + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + assertThat(snapshot.bucketWidthSeconds()).isEqualTo(1); + assertThat(snapshot.cpuNanosBuckets()[0]).isEqualTo(50); + assertThat(snapshot.wallNanosBuckets()[0]).isEqualTo(100); + } + + @Test + public void testRecordingInDifferentBuckets() + { + TestingTicker ticker = new TestingTicker(); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(ticker); + + recorder.record(100, 50); + + ticker.increment(1, SECONDS); + recorder.record(200, 80); + + ticker.increment(1, SECONDS); + recorder.record(300, 120); + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + assertThat(snapshot.bucketWidthSeconds()).isEqualTo(1); + assertThat(snapshot.cpuNanosBuckets()).containsExactly(50, 80, 120); + assertThat(snapshot.wallNanosBuckets()).containsExactly(100, 200, 300); + } + + @Test + public void testMultipleRecordsInSameBucket() + { + TestingTicker ticker = new TestingTicker(); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(ticker); + + recorder.record(100, 50); + recorder.record(200, 80); + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + assertThat(snapshot.cpuNanosBuckets()).containsExactly(130); + assertThat(snapshot.wallNanosBuckets()).containsExactly(300); + } + + @Test + public void testBucketExpansion() + { + TestingTicker ticker = new TestingTicker(); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(4, ticker); + + recorder.record(100, 10); + ticker.increment(1, SECONDS); + recorder.record(200, 20); + ticker.increment(1, SECONDS); + recorder.record(300, 30); + ticker.increment(1, SECONDS); + recorder.record(400, 40); + + // Advance beyond capacity - triggers bucket width doubling + ticker.increment(1, SECONDS); + recorder.record(500, 50); + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + assertThat(snapshot.bucketWidthSeconds()).isEqualTo(2); + // Buckets 0+1 merged, buckets 2+3 merged, then new record in bucket 2 + assertThat(snapshot.cpuNanosBuckets()).containsExactly(10 + 20, 30 + 40, 50); + assertThat(snapshot.wallNanosBuckets()).containsExactly(100 + 200, 300 + 400, 500); + } + + @Test + public void testDoubleExpansion() + { + TestingTicker ticker = new TestingTicker(); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(4, ticker); + + for (int i = 0; i < 4; i++) { + recorder.record(100, 10); + if (i < 3) { + ticker.increment(1, SECONDS); + } + } + + // Jump far enough to trigger double expansion + ticker.increment(5, SECONDS); + recorder.record(500, 50); + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + assertThat(snapshot.bucketWidthSeconds()).isEqualTo(4); + assertThat(snapshot.cpuNanosBuckets()).containsExactly(40, 0, 50); + } + + @Test + public void testBucketExpansionWithStartNanosOffset() + { + // Ticker starts at 1.5S. Clock reports 0.5S into the second, so startNanos = 1.5S - 0.5S = 1.0S. + // On expansion, bucketWidthNanos doubles to 2S, giving startNanosOffset = 1.0S % 2.0S = 1.0S > 0, + // exercising the branch where the first old bucket is kept solo and subsequent pairs are merged. + TestingTicker ticker = new TestingTicker(); + ticker.increment(1500, MILLISECONDS); + Clock clock = Clock.fixed(Instant.ofEpochSecond(1000, 500_000_000), ZoneOffset.UTC); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(4, ticker, clock); + + recorder.record(100, 10); + ticker.increment(1, SECONDS); + recorder.record(200, 20); + ticker.increment(1, SECONDS); + recorder.record(300, 30); + ticker.increment(1, SECONDS); + recorder.record(400, 40); + + // Trigger expansion — with startNanosOffset > 0, the merge loop starts at + // sourceOffset=1: bucket[0] is kept solo, buckets [1+2] are merged, and bucket[3] is + // carried forward as an unpaired singleton into the next target slot. + ticker.increment(1, SECONDS); + recorder.record(500, 50); + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + assertThat(snapshot.bucketWidthSeconds()).isGreaterThanOrEqualTo(2); + assertThat(snapshot.cpuNanosBuckets()).containsExactly(10, 50, 90); + } + + @Test + public void testExpansionPreservesAllRecordedData() + { + // Ticker starts at 1.5S so that startNanos = 1.5S - Instant.now().getNano() ∈ (0.5S, 1.5S). + // This guarantees startNanosOffset > 0 on the first expansion, which causes the expansion + // loop to start merging pairs from sourceOffset=1, leaving the last bucket (cpu[3]=10) + // without a pair. The total CPU nanos must still equal the sum of all recorded values. + TestingTicker ticker = new TestingTicker(); + ticker.increment(1500, MILLISECONDS); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(4, ticker); + + long cpuPerRecord = 10; + int recordCount = 5; // one more than bucket capacity to trigger exactly one expansion + + for (int i = 0; i < recordCount; i++) { + recorder.record(cpuPerRecord, cpuPerRecord); + if (i < recordCount - 1) { + ticker.increment(1, SECONDS); + } + } + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + long totalCpuNanos = 0; + for (long v : snapshot.cpuNanosBuckets()) { + totalCpuNanos += v; + } + assertThat(totalCpuNanos).isEqualTo(cpuPerRecord * recordCount); + } + + @Test + public void testSnapshotStartTime() + { + TestingTicker ticker = new TestingTicker(); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(ticker); + recorder.record(100, 10); + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + assertThat(snapshot.startTimeEpochSeconds()).isGreaterThan(0); + } + + @Test + public void testEmptySnapshot() + { + TestingTicker ticker = new TestingTicker(); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(ticker); + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + assertThat(snapshot.startTimeEpochSeconds()).isEqualTo(-1); + assertThat(snapshot.isEmpty()).isTrue(); + assertThat(snapshot.bucketWidthSeconds()).isEqualTo(1); + assertThat(snapshot.cpuNanosBuckets()).isEmpty(); + assertThat(snapshot.wallNanosBuckets()).isEmpty(); + } + + @Test + public void testMergeSameTimeline() + { + TestingTicker ticker = new TestingTicker(); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(ticker); + + recorder.record(100, 50); + ticker.increment(1, SECONDS); + recorder.record(200, 80); + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + ResourceUsageTimeSeriesSnapshot merged = merge(snapshot, snapshot); + + assertThat(merged.bucketWidthSeconds()).isEqualTo(snapshot.bucketWidthSeconds()); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(snapshot.startTimeEpochSeconds()); + for (int i = 0; i < snapshot.cpuNanosBuckets().length; i++) { + assertThat(merged.cpuNanosBuckets()[i]).isEqualTo(snapshot.cpuNanosBuckets()[i] * 2); + assertThat(merged.wallNanosBuckets()[i]).isEqualTo(snapshot.wallNanosBuckets()[i] * 2); + } + } + + @Test + public void testMergeWithDifferentBucketWidths() + { + long startTime = 1000; + ResourceUsageTimeSeriesSnapshot narrow = ResourceUsageTimeSeriesSnapshot.create( + startTime, 1, new long[] {10, 20, 30, 40}, new long[] {100, 200, 300, 400}); + ResourceUsageTimeSeriesSnapshot wide = ResourceUsageTimeSeriesSnapshot.create( + startTime, 2, new long[] {50, 60}, new long[] {700, 800}); + + ResourceUsageTimeSeriesSnapshot merged = merge(narrow, wide); + assertThat(merged.bucketWidthSeconds()).isEqualTo(2); + assertThat(merged.cpuNanosBuckets()).containsExactly(80, 130); + assertThat(merged.wallNanosBuckets()).containsExactly(1000, 1500); + } + + @Test + public void testMergeWithDifferentStartTimes() + { + ResourceUsageTimeSeriesSnapshot first = ResourceUsageTimeSeriesSnapshot.create( + 100, 1, new long[] {10, 20}, new long[] {100, 200}); + ResourceUsageTimeSeriesSnapshot second = ResourceUsageTimeSeriesSnapshot.create( + 102, 1, new long[] {30, 40}, new long[] {300, 400}); + + ResourceUsageTimeSeriesSnapshot merged = merge(first, second); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(100); + assertThat(merged.cpuNanosBuckets()).containsExactly(10, 20, 30, 40); + } + + @Test + public void testMergeOverlappingTimelines() + { + ResourceUsageTimeSeriesSnapshot first = ResourceUsageTimeSeriesSnapshot.create( + 100, 1, new long[] {10, 20, 30, 40}, new long[] {100, 200, 300, 400}); + ResourceUsageTimeSeriesSnapshot second = ResourceUsageTimeSeriesSnapshot.create( + 102, 1, new long[] {50, 60}, new long[] {500, 600}); + + ResourceUsageTimeSeriesSnapshot merged = merge(first, second); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(100); + assertThat(merged.cpuNanosBuckets()).containsExactly(10, 20, 30 + 50, 40 + 60); + } + + @Test + public void testMergeNonOverlappingTimelines() + { + ResourceUsageTimeSeriesSnapshot first = ResourceUsageTimeSeriesSnapshot.create( + 10, 1, new long[] {10, 20}, new long[] {100, 200}); + ResourceUsageTimeSeriesSnapshot second = ResourceUsageTimeSeriesSnapshot.create( + 14, 1, new long[] {50, 60}, new long[] {500, 600}); + + ResourceUsageTimeSeriesSnapshot merged = merge(first, second); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(10); + assertThat(merged.bucketWidthSeconds()).isEqualTo(1); + assertThat(merged.cpuNanosBuckets()).containsExactly(10, 20, 0, 0, 50, 60); + } + + @Test + public void testMergeNonOverlappingTimelinesWithDifferentWidths() + { + ResourceUsageTimeSeriesSnapshot first = ResourceUsageTimeSeriesSnapshot.create( + 10, 1, new long[] {10, 20, 40}, new long[] {100, 200, 400}); + ResourceUsageTimeSeriesSnapshot second = ResourceUsageTimeSeriesSnapshot.create( + 16, 4, new long[] {50, 60}, new long[] {500, 600}); + + ResourceUsageTimeSeriesSnapshot merged = merge(first, second); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(8); + assertThat(merged.bucketWidthSeconds()).isEqualTo(4); + assertThat(merged.cpuNanosBuckets()).containsExactly(30, 40, 50, 60); + } + + @Test + public void testSnapshotInvalidBucketWidth() + { + assertThatThrownBy(() -> ResourceUsageTimeSeriesSnapshot.create(0, 0, new long[] {}, new long[] {})) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testSnapshotStartTimeMustBeTruncated() + { + assertThatThrownBy(() -> ResourceUsageTimeSeriesSnapshot.create(3, 2, new long[] {}, new long[] {})) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testMergeWithAdjustedStartTime() + { + ResourceUsageTimeSeriesSnapshot a = ResourceUsageTimeSeriesSnapshot.create( + 5, 1, new long[] {10, 20}, new long[] {100, 200}); + ResourceUsageTimeSeriesSnapshot b = ResourceUsageTimeSeriesSnapshot.create( + 6, 2, new long[] {50}, new long[] {500}); + + ResourceUsageTimeSeriesSnapshot merged = merge(a, b); + assertThat(merged.bucketWidthSeconds()).isEqualTo(2); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(4); + } + + @Test + public void testMergeWithInitialBucketOffset() + { + ResourceUsageTimeSeriesSnapshot wide = ResourceUsageTimeSeriesSnapshot.create( + 100, 2, new long[] {50, 60, 70, 80}, new long[] {500, 600, 700, 800}); + ResourceUsageTimeSeriesSnapshot narrow = ResourceUsageTimeSeriesSnapshot.create( + 103, 1, new long[] {10, 20}, new long[] {100, 200}); + + ResourceUsageTimeSeriesSnapshot merged = merge(wide, narrow); + assertThat(merged.bucketWidthSeconds()).isEqualTo(2); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(100); + assertThat(merged.cpuNanosBuckets()).hasSize(4); + assertThat(merged.cpuNanosBuckets()).containsExactly(50, 60 + 10, 70 + 20, 80); + } + + @Test + public void testMergeThreeSnapshots() + { + ResourceUsageTimeSeriesSnapshot a = ResourceUsageTimeSeriesSnapshot.create( + 100, 1, new long[] {10, 20}, new long[] {100, 200}); + ResourceUsageTimeSeriesSnapshot b = ResourceUsageTimeSeriesSnapshot.create( + 100, 1, new long[] {30, 40}, new long[] {300, 400}); + ResourceUsageTimeSeriesSnapshot c = ResourceUsageTimeSeriesSnapshot.create( + 101, 1, new long[] {50}, new long[] {500}); + + ResourceUsageTimeSeriesSnapshot merged = merge(a, b, c); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(100); + assertThat(merged.cpuNanosBuckets()).hasSize(2); + assertThat(merged.cpuNanosBuckets()).containsExactly(10 + 30, 20 + 40 + 50); + } + + @Test + public void testSnapshotAfterExpansionTruncatesStartTime() + { + TestingTicker ticker = new TestingTicker(); + ResourceUsageTimeSeriesRecorder recorder = new ResourceUsageTimeSeriesRecorder(4, ticker); + + // Fill and expand to get bucketWidthSeconds > 1 + for (int i = 0; i < 4; i++) { + recorder.record(100, 10); + ticker.increment(1, SECONDS); + } + recorder.record(100, 10); + + ResourceUsageTimeSeriesSnapshot snapshot = recorder.snapshot(); + assertThat(snapshot.bucketWidthSeconds()).isEqualTo(2); + // Start time should be truncated to bucket width + assertThat(snapshot.startTimeEpochSeconds() % snapshot.bucketWidthSeconds()).isZero(); + } + + @Test + public void testMergeWithInitialBucketOffsetDoesNotDoubleCountPartialItems() + { + ResourceUsageTimeSeriesSnapshot wide = ResourceUsageTimeSeriesSnapshot.create( + 100, 2, new long[] {50, 60, 70, 80}, new long[] {500, 600, 700, 800}); + ResourceUsageTimeSeriesSnapshot narrow = ResourceUsageTimeSeriesSnapshot.create( + 103, 1, new long[] {10, 20}, new long[] {100, 200}); + + ResourceUsageTimeSeriesSnapshot merged = merge(wide, narrow); + assertThat(merged.bucketWidthSeconds()).isEqualTo(2); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(100); + assertThat(merged.cpuNanosBuckets()).containsExactly(50, 60 + 10, 70 + 20, 80); + } + + @Test + public void testMergeWithUnadjustedEarliestStartTime() + { + ResourceUsageTimeSeriesSnapshot a = ResourceUsageTimeSeriesSnapshot.create( + 1, 1, new long[] {10, 20}, new long[] {100, 200}); + ResourceUsageTimeSeriesSnapshot b = ResourceUsageTimeSeriesSnapshot.create( + 2, 2, new long[] {50}, new long[] {500}); + + ResourceUsageTimeSeriesSnapshot merged = merge(a, b); + assertThat(merged.bucketWidthSeconds()).isEqualTo(2); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(0); + assertThat(merged.cpuNanosBuckets()).containsExactly(10, 70); + } + + @Test + public void testMergeWhenFineSnapshotExtendsOneBucketPastCoarseEnd() + { + ResourceUsageTimeSeriesSnapshot wide = ResourceUsageTimeSeriesSnapshot.create( + 0, 2, new long[] {50}, new long[] {500}); + ResourceUsageTimeSeriesSnapshot narrow = ResourceUsageTimeSeriesSnapshot.create( + 0, 1, new long[] {10, 20, 30}, new long[] {100, 200, 300}); + + ResourceUsageTimeSeriesSnapshot merged = merge(wide, narrow); + assertThat(merged.bucketWidthSeconds()).isEqualTo(2); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(0); + assertThat(merged.cpuNanosBuckets()).containsExactly(80, 30); + assertThat(merged.wallNanosBuckets()).containsExactly(800, 300); + } + + @Test + public void testMergeWithMisalignedFineSnapshotAndCoarseBucketWidth() + { + ResourceUsageTimeSeriesSnapshot wide = ResourceUsageTimeSeriesSnapshot.create( + 0, 4, new long[] {50, 60}, new long[] {500, 600}); + ResourceUsageTimeSeriesSnapshot narrow = ResourceUsageTimeSeriesSnapshot.create( + 2, 2, new long[] {10, 20, 30}, new long[] {100, 200, 300}); + + ResourceUsageTimeSeriesSnapshot merged = merge(wide, narrow); + assertThat(merged.bucketWidthSeconds()).isEqualTo(4); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(0); + assertThat(merged.cpuNanosBuckets()).containsExactly(60, 110); + assertThat(merged.wallNanosBuckets()).containsExactly(600, 1100); + } + + @Test + public void testMergeWithShortFineSnapshotInFirstPartialBucket() + { + ResourceUsageTimeSeriesSnapshot wide = ResourceUsageTimeSeriesSnapshot.create( + 0, 4, new long[] {50, 60, 70}, new long[] {500, 600, 700}); + ResourceUsageTimeSeriesSnapshot narrow = ResourceUsageTimeSeriesSnapshot.create( + 2, 1, new long[] {10}, new long[] {100}); + + ResourceUsageTimeSeriesSnapshot merged = merge(wide, narrow); + assertThat(merged.bucketWidthSeconds()).isEqualTo(4); + assertThat(merged.startTimeEpochSeconds()).isEqualTo(0); + assertThat(merged.cpuNanosBuckets()).containsExactly(60, 60, 70); + assertThat(merged.wallNanosBuckets()).containsExactly(600, 600, 700); + } + + @Test + public void testMergeEmptySnapshots() + { + assertThat(merge(EMPTY, EMPTY)).isEqualTo(EMPTY); + ResourceUsageTimeSeriesSnapshot nonEmpty = ResourceUsageTimeSeriesSnapshot.create(1, 1, new long[] {6}, new long[] {67}); + assertThat(merge(EMPTY, nonEmpty, EMPTY)).isEqualTo(nonEmpty); + } +} diff --git a/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorOperatorAdapter.java b/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorOperatorAdapter.java index 65472c3de947..fc4b2275db5e 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorOperatorAdapter.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorOperatorAdapter.java @@ -68,13 +68,13 @@ public void testMetrics() operator.getOutput(); assertThat(operator.isFinished()).isFalse(); assertThat(context.getOperatorStats().getMetrics().getMetrics()) - .hasSize(5) + .hasSize(6) .containsEntry("testOperatorMetric", new LongCount(1)); operator.getOutput(); assertThat(operator.isFinished()).isTrue(); assertThat(context.getOperatorStats().getMetrics().getMetrics()) - .hasSize(5) + .hasSize(6) .containsEntry("testOperatorMetric", new LongCount(2)); } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorSourceOperatorAdapter.java b/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorSourceOperatorAdapter.java index aaa42490bbf2..69efc005e1bf 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorSourceOperatorAdapter.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorSourceOperatorAdapter.java @@ -73,7 +73,7 @@ public void testMetrics() operator.getOutput(); assertThat(operator.isFinished()).isFalse(); assertThat(context.getOperatorStats().getMetrics().getMetrics()) - .hasSize(5) + .hasSize(6) .containsEntry("testOperatorMetric", new LongCount(1)); assertThat(context.getOperatorStats().getConnectorMetrics().getMetrics()).isEqualTo(ImmutableMap.of( "testConnectorMetric", new LongCount(2))); @@ -83,7 +83,7 @@ public void testMetrics() operator.getOutput(); assertThat(operator.isFinished()).isTrue(); assertThat(context.getOperatorStats().getMetrics().getMetrics()) - .hasSize(5) + .hasSize(6) .containsEntry("testOperatorMetric", new LongCount(2)); assertThat(context.getOperatorStats().getConnectorMetrics().getMetrics()).isEqualTo(ImmutableMap.of( "testConnectorMetric", new LongCount(3)));