diff --git a/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java b/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java index 900a2331bd84..3bf50f16a3ef 100644 --- a/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java +++ b/core/trino-main/src/main/java/io/trino/execution/executor/TaskExecutor.java @@ -287,8 +287,10 @@ public void removeTask(TaskHandle taskHandle) } // replace blocked splits that were terminated - addNewEntrants(); - recordLeafSplitsSize(); + synchronized (this) { + addNewEntrants(); + recordLeafSplitsSize(); + } } private void doRemoveTask(TaskHandle taskHandle) @@ -480,8 +482,10 @@ private synchronized void recordLeafSplitsSize() if (timeDifference > 0) { this.leafSplitsSize.add(lastLeafSplitsSize, timeDifference); this.lastLeafSplitsSizeRecordTime = now; - this.lastLeafSplitsSize = allSplits.size() - intermediateSplits.size(); } + // always record new lastLeafSplitsSize as it might have changed + // even if timeDifference is 0 + this.lastLeafSplitsSize = allSplits.size() - intermediateSplits.size(); } private class TaskRunner diff --git a/core/trino-main/src/test/java/io/trino/execution/executor/TestTaskExecutor.java b/core/trino-main/src/test/java/io/trino/execution/executor/TestTaskExecutor.java index c5ff6fe528eb..93fefba26d33 100644 --- a/core/trino-main/src/test/java/io/trino/execution/executor/TestTaskExecutor.java +++ b/core/trino-main/src/test/java/io/trino/execution/executor/TestTaskExecutor.java @@ -40,6 +40,7 @@ import static io.airlift.testing.Assertions.assertLessThan; import static io.trino.execution.executor.MultilevelSplitQueue.LEVEL_CONTRIBUTION_CAP; import static io.trino.execution.executor.MultilevelSplitQueue.LEVEL_THRESHOLD_SECONDS; +import static java.lang.Double.isNaN; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -510,9 +511,9 @@ public void testLeafSplitsSize() TestingJob driver1 = new TestingJob(ticker, new Phaser(), new Phaser(), new Phaser(), 1, 500); TestingJob driver2 = new TestingJob(ticker, new Phaser(), new Phaser(), new Phaser(), 1, 1000 / 500); - ticker.increment(1, TimeUnit.SECONDS); + ticker.increment(0, TimeUnit.SECONDS); taskExecutor.enqueueSplits(testTaskHandle, false, ImmutableList.of(driver1, driver2)); - assertEquals(taskExecutor.getLeafSplitsSize().getAllTime().getMax(), 0.0); + assertTrue(isNaN(taskExecutor.getLeafSplitsSize().getAllTime().getMax())); ticker.increment(1, TimeUnit.SECONDS); taskExecutor.enqueueSplits(testTaskHandle, false, ImmutableList.of(driver1));