diff --git a/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java b/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java index af8ae5667e1e..6432c7279b73 100644 --- a/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java +++ b/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java @@ -21,6 +21,7 @@ import io.airlift.concurrent.ThreadPoolExecutorMBean; import io.airlift.log.Logger; import io.airlift.stats.CounterStat; +import io.airlift.stats.DistributionStat; import io.airlift.stats.TimeDistribution; import io.airlift.stats.TimeStat; import io.airlift.units.Duration; @@ -153,6 +154,12 @@ public class TaskExecutor private final TimeStat blockedQuantaWallTime = new TimeStat(MICROSECONDS); private final TimeStat unblockedQuantaWallTime = new TimeStat(MICROSECONDS); + private final DistributionStat leafSplitsSize = new DistributionStat(); + @GuardedBy("this") + private long lastLeafSplitsSizeRecordTime; + @GuardedBy("this") + private long lastLeafSplitsSize; + private volatile boolean closed; @Inject @@ -210,6 +217,7 @@ public TaskExecutor( this.maximumNumberOfDriversPerTask = maximumNumberOfDriversPerTask; this.waitingSplits = requireNonNull(splitQueue, "splitQueue is null"); this.tasks = new LinkedList<>(); + this.lastLeafSplitsSizeRecordTime = ticker.read(); } @PostConstruct @@ -279,6 +287,7 @@ public void removeTask(TaskHandle taskHandle) // replace blocked splits that were terminated addNewEntrants(); + recordLeafSplitsSize(); } private void doRemoveTask(TaskHandle taskHandle) @@ -293,6 +302,7 @@ private void doRemoveTask(TaskHandle taskHandle) intermediateSplits.removeAll(splits); blockedSplits.keySet().removeAll(splits); waitingSplits.removeAll(splits); + recordLeafSplitsSize(); } // call destroy outside of synchronized block as it is expensive and doesn't need a lock on the task executor @@ -343,6 +353,7 @@ else if (intermediate) { finishedFutures.add(prioritizedSplitRunner.getFinishedFuture()); } + recordLeafSplitsSize(); } for (PrioritizedSplitRunner split : splitsToDestroy) { split.destroy(); @@ -378,6 +389,7 @@ private void splitFinished(PrioritizedSplitRunner split) scheduleTaskIfNecessary(taskHandle); addNewEntrants(); + recordLeafSplitsSize(); } // call destroy outside of synchronized block as it is expensive and doesn't need a lock on the task executor split.destroy(); @@ -400,6 +412,7 @@ private synchronized void scheduleTaskIfNecessary(TaskHandle taskHandle) startSplit(split); splitQueuedTime.add(Duration.nanosSince(split.getCreatedNanos())); } + recordLeafSplitsSize(); } private synchronized void addNewEntrants() @@ -459,6 +472,17 @@ private synchronized PrioritizedSplitRunner pollNextSplitWorker() return null; } + private synchronized void recordLeafSplitsSize() + { + long now = ticker.read(); + long timeDifference = now - this.lastLeafSplitsSizeRecordTime; + if (timeDifference > 0) { + this.leafSplitsSize.add(lastLeafSplitsSize, timeDifference); + this.lastLeafSplitsSizeRecordTime = now; + this.lastLeafSplitsSize = allSplits.size() - intermediateSplits.size(); + } + } + private class TaskRunner implements Runnable { @@ -577,6 +601,13 @@ public int getWaitingSplits() return waitingSplits.size(); } + @Managed + @Nested + public DistributionStat getLeafSplitsSize() + { + return leafSplitsSize; + } + @Managed public int getRunningSplits() { diff --git a/core/trino-main/src/test/java/io/trino/execution/executor/TestTaskExecutor.java b/core/trino-main/src/test/java/io/trino/execution/executor/TestTaskExecutor.java index 4c718f6151bc..c5ff6fe528eb 100644 --- a/core/trino-main/src/test/java/io/trino/execution/executor/TestTaskExecutor.java +++ b/core/trino-main/src/test/java/io/trino/execution/executor/TestTaskExecutor.java @@ -30,6 +30,7 @@ import java.util.OptionalInt; import java.util.concurrent.Future; import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -498,6 +499,30 @@ public void testMinDriversPerTaskWhenTargetConcurrencyIncreases() } } + @Test + public void testLeafSplitsSize() + { + MultilevelSplitQueue splitQueue = new MultilevelSplitQueue(2); + TestingTicker ticker = new TestingTicker(); + TaskExecutor taskExecutor = new TaskExecutor(4, 1, 2, 2, splitQueue, ticker); + + TaskHandle testTaskHandle = taskExecutor.addTask(new TaskId(new StageId("test", 0), 0, 0), () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty()); + TestingJob driver1 = new TestingJob(ticker, new Phaser(), new Phaser(), new Phaser(), 1, 500); + TestingJob driver2 = new TestingJob(ticker, new Phaser(), new Phaser(), new Phaser(), 1, 1000 / 500); + + ticker.increment(1, TimeUnit.SECONDS); + taskExecutor.enqueueSplits(testTaskHandle, false, ImmutableList.of(driver1, driver2)); + assertEquals(taskExecutor.getLeafSplitsSize().getAllTime().getMax(), 0.0); + + ticker.increment(1, TimeUnit.SECONDS); + taskExecutor.enqueueSplits(testTaskHandle, false, ImmutableList.of(driver1)); + assertEquals(taskExecutor.getLeafSplitsSize().getAllTime().getMax(), 2.0); + + ticker.increment(1, TimeUnit.SECONDS); + taskExecutor.enqueueSplits(testTaskHandle, true, ImmutableList.of(driver1)); + assertEquals(taskExecutor.getLeafSplitsSize().getAllTime().getMax(), 2.0); + } + private void assertSplitStates(int endIndex, TestingJob[] splits) { // assert that splits up to and including endIndex are all started