diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java index c636b20aa1fb..18680cd7e2cf 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java @@ -1089,7 +1089,7 @@ public ListenableFuture processFor(Duration duration) driver = this.driver; } - return driver.processFor(duration); + return driver.processForDuration(duration); } @Override diff --git a/core/trino-main/src/main/java/io/trino/operator/Driver.java b/core/trino-main/src/main/java/io/trino/operator/Driver.java index 9b107ff5c540..b2cd721a5237 100644 --- a/core/trino-main/src/main/java/io/trino/operator/Driver.java +++ b/core/trino-main/src/main/java/io/trino/operator/Driver.java @@ -54,6 +54,7 @@ import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static java.lang.Boolean.TRUE; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.NANOSECONDS; // // NOTE: As a general strategy the methods should "stage" a change and only @@ -66,6 +67,8 @@ public class Driver { private static final Logger log = Logger.get(Driver.class); + private static final Duration UNLIMITED_DURATION = new Duration(Long.MAX_VALUE, NANOSECONDS); + private final DriverContext driverContext; private final List activeOperators; // this is present only for debugging @@ -268,11 +271,28 @@ private void processNewSources() currentSplitAssignment = newAssignment; } - public ListenableFuture processFor(Duration duration) + public ListenableFuture processForDuration(Duration duration) + { + return process(duration, Integer.MAX_VALUE); + } + + public ListenableFuture processForNumberOfIterations(int maxIterations) + { + return process(UNLIMITED_DURATION, maxIterations); + } + + public ListenableFuture processUntilBlocked() + { + return process(UNLIMITED_DURATION, Integer.MAX_VALUE); + } + + @VisibleForTesting + public ListenableFuture process(Duration maxRuntime, int maxIterations) { checkLockNotHeld("Cannot process for a duration while holding the driver lock"); - requireNonNull(duration, "duration is null"); + requireNonNull(maxRuntime, "maxRuntime is null"); + checkArgument(maxIterations > 0, "maxIterations must be greater than zero"); // if the driver is blocked we don't need to continue SettableFuture blockedFuture = driverBlockedFuture.get(); @@ -280,21 +300,41 @@ public ListenableFuture processFor(Duration duration) return blockedFuture; } - long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS); + long maxRuntimeInNanos = maxRuntime.roundTo(TimeUnit.NANOSECONDS); Optional> result = tryWithLock(100, TimeUnit.MILLISECONDS, true, () -> { OperationTimer operationTimer = createTimer(); driverContext.startProcessTimer(); - driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor()); + driverContext.getYieldSignal().setWithDelay(maxRuntimeInNanos, driverContext.getYieldExecutor()); try { long start = System.nanoTime(); - do { + int iterations = 0; + while (!isFinishedInternal()) { ListenableFuture future = processInternal(operationTimer); + iterations++; if (!future.isDone()) { return updateDriverBlockedFuture(future); } + if (System.nanoTime() - start >= maxRuntimeInNanos || iterations >= maxIterations) { + break; + } + } + } + catch (Throwable t) { + List interrupterStack = exclusiveLock.getInterrupterStack(); + if (interrupterStack == null) { + driverContext.failed(t); + throw t; } - while (System.nanoTime() - start < maxRuntime && !isFinishedInternal()); + + // Driver thread was interrupted which should only happen if the task is already finished. + // If this becomes the actual cause of a failed query there is a bug in the task state machine. + Exception exception = new Exception("Interrupted By"); + exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new)); + TrinoException newException = new TrinoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception); + newException.addSuppressed(t); + driverContext.failed(newException); + throw newException; } finally { driverContext.getYieldSignal().reset(); @@ -305,23 +345,6 @@ public ListenableFuture processFor(Duration duration) return result.orElse(NOT_BLOCKED); } - public ListenableFuture process() - { - checkLockNotHeld("Cannot process while holding the driver lock"); - - // if the driver is blocked we don't need to continue - SettableFuture blockedFuture = driverBlockedFuture.get(); - if (!blockedFuture.isDone()) { - return blockedFuture; - } - - Optional> result = tryWithLock(100, TimeUnit.MILLISECONDS, true, () -> { - ListenableFuture future = processInternal(createTimer()); - return updateDriverBlockedFuture(future); - }); - return result.orElse(NOT_BLOCKED); - } - private OperationTimer createTimer() { return new OperationTimer( @@ -359,119 +382,101 @@ private ListenableFuture processInternal(OperationTimer operationTimer) handleMemoryRevoke(); - try { - processNewSources(); - - // If there is only one operator, finish it - // Some operators (LookupJoinOperator and HashBuildOperator) are broken and requires finish to be called continuously - // TODO remove the second part of the if statement, when these operators are fixed - // Note: finish should not be called on the natural source of the pipeline as this could cause the task to finish early - if (!activeOperators.isEmpty() && activeOperators.size() != allOperators.size()) { - Operator rootOperator = activeOperators.get(0); - rootOperator.finish(); - rootOperator.getOperatorContext().recordFinish(operationTimer); - } + processNewSources(); - boolean movedPage = false; - for (int i = 0; i < activeOperators.size() - 1 && !driverContext.isDone(); i++) { - Operator current = activeOperators.get(i); - Operator next = activeOperators.get(i + 1); + // If there is only one operator, finish it + // Some operators (LookupJoinOperator and HashBuildOperator) are broken and requires finish to be called continuously + // TODO remove the second part of the if statement, when these operators are fixed + // Note: finish should not be called on the natural source of the pipeline as this could cause the task to finish early + if (!activeOperators.isEmpty() && activeOperators.size() != allOperators.size()) { + Operator rootOperator = activeOperators.get(0); + rootOperator.finish(); + rootOperator.getOperatorContext().recordFinish(operationTimer); + } - // skip blocked operator - if (getBlockedFuture(current).isPresent()) { - continue; - } + boolean movedPage = false; + for (int i = 0; i < activeOperators.size() - 1 && !driverContext.isDone(); i++) { + Operator current = activeOperators.get(i); + Operator next = activeOperators.get(i + 1); - // if the current operator is not finished and next operator isn't blocked and needs input... - if (!current.isFinished() && getBlockedFuture(next).isEmpty() && next.needsInput()) { - // get an output page from current operator - Page page = current.getOutput(); - current.getOperatorContext().recordGetOutput(operationTimer, page); - - // if we got an output page, add it to the next operator - if (page != null && page.getPositionCount() != 0) { - next.addInput(page); - next.getOperatorContext().recordAddInput(operationTimer, page); - movedPage = true; - } + // skip blocked operator + if (getBlockedFuture(current).isPresent()) { + continue; + } - if (current instanceof SourceOperator) { - movedPage = true; - } + // if the current operator is not finished and next operator isn't blocked and needs input... + if (!current.isFinished() && getBlockedFuture(next).isEmpty() && next.needsInput()) { + // get an output page from current operator + Page page = current.getOutput(); + current.getOperatorContext().recordGetOutput(operationTimer, page); + + // if we got an output page, add it to the next operator + if (page != null && page.getPositionCount() != 0) { + next.addInput(page); + next.getOperatorContext().recordAddInput(operationTimer, page); + movedPage = true; } - // if current operator is finished... - if (current.isFinished()) { - // let next operator know there will be no more data - next.finish(); - next.getOperatorContext().recordFinish(operationTimer); + if (current instanceof SourceOperator) { + movedPage = true; } } - for (int index = activeOperators.size() - 1; index >= 0; index--) { - if (activeOperators.get(index).isFinished()) { - // close and remove this operator and all source operators - List finishedOperators = this.activeOperators.subList(0, index + 1); - Throwable throwable = closeAndDestroyOperators(finishedOperators); - finishedOperators.clear(); - if (throwable != null) { - throwIfUnchecked(throwable); - throw new RuntimeException(throwable); - } - // Finish the next operator, which is now the first operator. - if (!activeOperators.isEmpty()) { - Operator newRootOperator = activeOperators.get(0); - newRootOperator.finish(); - newRootOperator.getOperatorContext().recordFinish(operationTimer); - } - break; - } + // if current operator is finished... + if (current.isFinished()) { + // let next operator know there will be no more data + next.finish(); + next.getOperatorContext().recordFinish(operationTimer); } + } - // if we did not move any pages, check if we are blocked - if (!movedPage) { - List blockedOperators = new ArrayList<>(); - List> blockedFutures = new ArrayList<>(); - for (Operator operator : activeOperators) { - Optional> blocked = getBlockedFuture(operator); - if (blocked.isPresent()) { - blockedOperators.add(operator); - blockedFutures.add(blocked.get()); - } + for (int index = activeOperators.size() - 1; index >= 0; index--) { + if (activeOperators.get(index).isFinished()) { + // close and remove this operator and all source operators + List finishedOperators = this.activeOperators.subList(0, index + 1); + Throwable throwable = closeAndDestroyOperators(finishedOperators); + finishedOperators.clear(); + if (throwable != null) { + throwIfUnchecked(throwable); + throw new RuntimeException(throwable); } - - if (!blockedFutures.isEmpty()) { - // unblock when the first future is complete - ListenableFuture blocked = firstFinishedFuture(blockedFutures); - // driver records serial blocked time - driverContext.recordBlocked(blocked); - // each blocked operator is responsible for blocking the execution - // until one of the operators can continue - for (Operator operator : blockedOperators) { - operator.getOperatorContext().recordBlocked(blocked); - } - return blocked; + // Finish the next operator, which is now the first operator. + if (!activeOperators.isEmpty()) { + Operator newRootOperator = activeOperators.get(0); + newRootOperator.finish(); + newRootOperator.getOperatorContext().recordFinish(operationTimer); } + break; } - - return NOT_BLOCKED; } - catch (Throwable t) { - List interrupterStack = exclusiveLock.getInterrupterStack(); - if (interrupterStack == null) { - driverContext.failed(t); - throw t; + + // if we did not move any pages, check if we are blocked + if (!movedPage) { + List blockedOperators = new ArrayList<>(); + List> blockedFutures = new ArrayList<>(); + for (Operator operator : activeOperators) { + Optional> blocked = getBlockedFuture(operator); + if (blocked.isPresent()) { + blockedOperators.add(operator); + blockedFutures.add(blocked.get()); + } } - // Driver thread was interrupted which should only happen if the task is already finished. - // If this becomes the actual cause of a failed query there is a bug in the task state machine. - Exception exception = new Exception("Interrupted By"); - exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new)); - TrinoException newException = new TrinoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception); - newException.addSuppressed(t); - driverContext.failed(newException); - throw newException; + if (!blockedFutures.isEmpty()) { + // unblock when the first future is complete + ListenableFuture blocked = firstFinishedFuture(blockedFutures); + // driver records serial blocked time + driverContext.recordBlocked(blocked); + // each blocked operator is responsible for blocking the execution + // until one of the operators can continue + for (Operator operator : blockedOperators) { + operator.getOperatorContext().recordBlocked(blocked); + } + return blocked; + } } + + return NOT_BLOCKED; } @GuardedBy("exclusiveLock") diff --git a/core/trino-main/src/main/java/io/trino/operator/TableFinishOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableFinishOperator.java index ab4a8a232670..d2c8836a60ca 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableFinishOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableFinishOperator.java @@ -30,6 +30,7 @@ import io.trino.spi.type.Type; import io.trino.sql.planner.plan.PlanNodeId; import io.trino.sql.planner.plan.StatisticAggregationsDescriptor; +import io.trino.util.AutoCloseableCloser; import java.util.Collection; import java.util.List; @@ -360,7 +361,10 @@ private static Supplier createTableFinishInfoSupplier(AtomicRef public void close() throws Exception { - statisticsAggregationOperator.close(); + AutoCloseableCloser closer = AutoCloseableCloser.create(); + closer.register(() -> statisticsAggregationOperator.getOperatorContext().destroy()); + closer.register(statisticsAggregationOperator); + closer.close(); } public interface TableFinisher diff --git a/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java index 133843a16618..5907cb1e34f1 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java @@ -378,6 +378,7 @@ public void close() closer.register(pageSink::abort); } } + closer.register(() -> statisticAggregationOperator.getOperatorContext().destroy()); closer.register(statisticAggregationOperator); closer.register(pageSinkMemoryContext::close); closer.close(); diff --git a/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java b/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java index 826780540841..66ca68882254 100644 --- a/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java +++ b/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java @@ -340,7 +340,7 @@ public boolean load(List requests) ScheduledSplit split = new ScheduledSplit(0, sourcePlanNodeId, new Split(INDEX_CONNECTOR_ID, new IndexSplit(recordSetForLookupSource), Lifespan.taskWide())); driver.updateSplitAssignment(new SplitAssignment(sourcePlanNodeId, ImmutableSet.of(split), true)); while (!driver.isFinished()) { - ListenableFuture process = driver.process(); + ListenableFuture process = driver.processUntilBlocked(); checkState(process.isDone(), "Driver should never block"); } } diff --git a/core/trino-main/src/main/java/io/trino/operator/index/StreamingIndexedData.java b/core/trino-main/src/main/java/io/trino/operator/index/StreamingIndexedData.java index 0a245fde7a67..535822eeb1ef 100644 --- a/core/trino-main/src/main/java/io/trino/operator/index/StreamingIndexedData.java +++ b/core/trino-main/src/main/java/io/trino/operator/index/StreamingIndexedData.java @@ -101,7 +101,7 @@ private boolean loadNextPage() if (driver.isFinished()) { return false; } - driver.process(); + driver.processForNumberOfIterations(1); nextPage = extractNonEmptyPage(pageBuffer); } currentPage = nextPage; diff --git a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java index d548b3b8e583..977be2069e65 100644 --- a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java +++ b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java @@ -831,7 +831,7 @@ private MaterializedResultWithPlan executeInternal(Session session, @Language("S } if (!driver.isFinished()) { - driver.process(); + driver.processForNumberOfIterations(1); processed = true; } } diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryBlocking.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryBlocking.java index b343f4975ab0..1ea01b2dbbcd 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryBlocking.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryBlocking.java @@ -118,7 +118,7 @@ public void testTableScanMemoryBlocking() Split testSplit = new Split(new CatalogName("test"), new TestSplit(), Lifespan.taskWide()); driver.updateSplitAssignment(new SplitAssignment(sourceId, ImmutableSet.of(new ScheduledSplit(0, sourceId, testSplit)), true)); - ListenableFuture blocked = driver.processFor(new Duration(1, NANOSECONDS)); + ListenableFuture blocked = driver.processForDuration(new Duration(1, NANOSECONDS)); // the driver shouldn't block in the first call as it will be able to move a page between source and the sink operator // but the operator should be blocked @@ -128,7 +128,7 @@ public void testTableScanMemoryBlocking() // in the subsequent calls both the driver and the operator should be blocked // and they should stay blocked until more memory becomes available for (int i = 0; i < 10; i++) { - blocked = driver.processFor(new Duration(1, NANOSECONDS)); + blocked = driver.processForDuration(new Duration(1, NANOSECONDS)); assertFalse(blocked.isDone()); assertFalse(source.getOperatorContext().isWaitingForMemory().isDone()); } @@ -140,7 +140,7 @@ public void testTableScanMemoryBlocking() assertTrue(source.getOperatorContext().isWaitingForMemory().isDone()); // the driver shouldn't be blocked - blocked = driver.processFor(new Duration(1, NANOSECONDS)); + blocked = driver.processForDuration(new Duration(1, NANOSECONDS)); assertTrue(blocked.isDone()); } diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java index 6e4fecd81eba..1dadb3e76a97 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java @@ -436,7 +436,7 @@ private long runDriversUntilBlocked(Predicate reason) // run driver, until it blocks while (!isOperatorBlocked(drivers, reason)) { for (Driver driver : drivers) { - driver.process(); + driver.processForNumberOfIterations(1); } iterationsCount++; } @@ -454,7 +454,7 @@ private void assertDriversProgress(Predicate reason) assertFalse(isOperatorBlocked(drivers, reason)); boolean progress = false; for (Driver driver : drivers) { - ListenableFuture blocked = driver.process(); + ListenableFuture blocked = driver.processUntilBlocked(); progress = progress | blocked.isDone(); } // query should not block diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDriver.java b/core/trino-main/src/test/java/io/trino/operator/TestDriver.java index 42a1210a9a37..b4ea218fad8a 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDriver.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDriver.java @@ -106,7 +106,7 @@ public void testNormalFinish() assertSame(driver.getDriverContext(), driverContext); assertFalse(driver.isFinished()); - ListenableFuture blocked = driver.processFor(new Duration(1, TimeUnit.SECONDS)); + ListenableFuture blocked = driver.processForDuration(new Duration(1, TimeUnit.SECONDS)); assertTrue(blocked.isDone()); assertTrue(driver.isFinished()); @@ -127,7 +127,7 @@ public void testConcurrentClose() Operator sink = createSinkOperator(types); Driver driver = Driver.createDriver(driverContext, source, sink); // let these threads race - scheduledExecutor.submit(() -> driver.processFor(new Duration(1, TimeUnit.NANOSECONDS))); // don't want to call isFinishedInternal in processFor + scheduledExecutor.submit(() -> driver.processForDuration(new Duration(1, TimeUnit.NANOSECONDS))); // don't want to call isFinishedInternal in processFor scheduledExecutor.submit(driver::close); while (!driverContext.isDone()) { Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); @@ -179,13 +179,13 @@ public void testAddSourceFinish() assertSame(driver.getDriverContext(), driverContext); assertFalse(driver.isFinished()); - assertFalse(driver.processFor(new Duration(1, TimeUnit.MILLISECONDS)).isDone()); + assertFalse(driver.processForDuration(new Duration(1, TimeUnit.MILLISECONDS)).isDone()); assertFalse(driver.isFinished()); driver.updateSplitAssignment(new SplitAssignment(sourceId, ImmutableSet.of(new ScheduledSplit(0, sourceId, newMockSplit())), true)); assertFalse(driver.isFinished()); - assertTrue(driver.processFor(new Duration(1, TimeUnit.SECONDS)).isDone()); + assertTrue(driver.processForDuration(new Duration(1, TimeUnit.SECONDS)).isDone()); assertTrue(driver.isFinished()); assertTrue(sink.isFinished()); @@ -202,7 +202,7 @@ public void testBrokenOperatorCloseWhileProcessing() assertSame(driver.getDriverContext(), driverContext); // block thread in operator processing - Future driverProcessFor = executor.submit(() -> driver.processFor(new Duration(1, TimeUnit.MILLISECONDS)).isDone()); + Future driverProcessFor = executor.submit(() -> driver.processForDuration(new Duration(1, TimeUnit.MILLISECONDS)).isDone()); brokenOperator.waitForLocked(); driver.close(); @@ -229,7 +229,7 @@ public void testBrokenOperatorProcessWhileClosing() }); brokenOperator.waitForLocked(); - assertTrue(driver.processFor(new Duration(1, TimeUnit.MILLISECONDS)).isDone()); + assertTrue(driver.processForDuration(new Duration(1, TimeUnit.MILLISECONDS)).isDone()); assertTrue(driver.isFinished()); brokenOperator.unlock(); @@ -253,7 +253,7 @@ public void testMemoryRevocationRace() // the table scan operator will request memory revocation with requestMemoryRevoking() // while the driver is still not done with the processFor() method and before it moves to // updateDriverBlockedFuture() method. - assertTrue(driver.processFor(new Duration(100, TimeUnit.MILLISECONDS)).isDone()); + assertTrue(driver.processForDuration(new Duration(100, TimeUnit.MILLISECONDS)).isDone()); } @Test @@ -275,21 +275,21 @@ public void testBrokenOperatorAddSource() Driver driver = Driver.createDriver(driverContext, source, brokenOperator); // block thread in operator processing - Future driverProcessFor = executor.submit(() -> driver.processFor(new Duration(1, TimeUnit.MILLISECONDS)).isDone()); + Future driverProcessFor = executor.submit(() -> driver.processForDuration(new Duration(1, TimeUnit.MILLISECONDS)).isDone()); brokenOperator.waitForLocked(); assertSame(driver.getDriverContext(), driverContext); assertFalse(driver.isFinished()); // processFor always returns NOT_BLOCKED, because DriveLockResult was not acquired - assertTrue(driver.processFor(new Duration(1, TimeUnit.MILLISECONDS)).isDone()); + assertTrue(driver.processForDuration(new Duration(1, TimeUnit.MILLISECONDS)).isDone()); assertFalse(driver.isFinished()); driver.updateSplitAssignment(new SplitAssignment(sourceId, ImmutableSet.of(new ScheduledSplit(0, sourceId, newMockSplit())), true)); assertFalse(driver.isFinished()); // processFor always returns NOT_BLOCKED, because DriveLockResult was not acquired - assertTrue(driver.processFor(new Duration(1, TimeUnit.SECONDS)).isDone()); + assertTrue(driver.processForDuration(new Duration(1, TimeUnit.SECONDS)).isDone()); assertFalse(driver.isFinished()); driver.close(); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestHashSemiJoinOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestHashSemiJoinOperator.java index b124080c2280..984bd4453044 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestHashSemiJoinOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestHashSemiJoinOperator.java @@ -121,7 +121,7 @@ public void testSemiJoin(boolean hashEnabled) Driver driver = Driver.createDriver(driverContext, buildOperator, setBuilderOperator); while (!driver.isFinished()) { - driver.process(); + driver.processUntilBlocked(); } // probe @@ -186,7 +186,7 @@ public void testSemiJoinOnVarcharType(boolean hashEnabled) Driver driver = Driver.createDriver(driverContext, buildOperator, setBuilderOperator); while (!driver.isFinished()) { - driver.process(); + driver.processUntilBlocked(); } // probe @@ -281,7 +281,7 @@ public void testBuildSideNulls(boolean hashEnabled) Driver driver = Driver.createDriver(driverContext, buildOperator, setBuilderOperator); while (!driver.isFinished()) { - driver.process(); + driver.processUntilBlocked(); } // probe @@ -337,7 +337,7 @@ public void testProbeSideNulls(boolean hashEnabled) Driver driver = Driver.createDriver(driverContext, buildOperator, setBuilderOperator); while (!driver.isFinished()) { - driver.process(); + driver.processUntilBlocked(); } // probe @@ -397,7 +397,7 @@ public void testProbeAndBuildNulls(boolean hashEnabled) Driver driver = Driver.createDriver(driverContext, buildOperator, setBuilderOperator); while (!driver.isFinished()) { - driver.process(); + driver.processUntilBlocked(); } // probe @@ -455,7 +455,7 @@ public void testMemoryLimit(boolean hashEnabled) Driver driver = Driver.createDriver(driverContext, buildOperator, setBuilderOperator); while (!driver.isFinished()) { - driver.process(); + driver.processUntilBlocked(); } } } diff --git a/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java b/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java index 1ac64b847d15..ecb5d812798e 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java @@ -168,7 +168,7 @@ public static BuildSideSetup setupBuildSide( sinkOperatorFactory.noMoreOperators(); while (!sourceDriver.isFinished()) { - sourceDriver.process(); + sourceDriver.processUntilBlocked(); } // build side operator factories @@ -214,7 +214,7 @@ public static void buildLookupSource(ExecutorService executor, BuildSideSetup bu while (!lookupSourceProvider.isDone()) { for (Driver buildDriver : buildDrivers) { - buildDriver.process(); + buildDriver.processForNumberOfIterations(1); } } getFutureValue(lookupSourceProvider).close(); @@ -232,7 +232,7 @@ public static void runDriverInThread(ExecutorService executor, Driver driver) executor.execute(() -> { if (!driver.isFinished()) { try { - driver.process(); + driver.processUntilBlocked(); } catch (TrinoException e) { driver.getDriverContext().failed(e); diff --git a/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java b/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java index 6d7163e01e97..5f2bfb7f147b 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slices; import io.airlift.units.DataSize; +import io.airlift.units.Duration; import io.trino.ExceededMemoryLimitException; import io.trino.RowPagesBuilder; import io.trino.execution.Lifespan; @@ -78,7 +79,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -111,6 +111,7 @@ import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -528,7 +529,7 @@ private void innerJoinWithSpill(boolean probeHashEnabled, List whenSp while (!lookupSourceProvider.isDone()) { for (int i = 0; i < buildOperatorCount; i++) { checkErrors(taskStateMachine); - buildDrivers.get(i).process(); + buildDrivers.get(i).processForNumberOfIterations(1); HashBuilderOperator buildOperator = buildSideSetup.getBuildOperators().get(i); if (whenSpill.get(i) == WhenSpill.DURING_BUILD && buildOperator.getOperatorContext().getReservedRevocableBytes() > 0) { checkState(!lookupSourceProvider.isDone(), "Too late, LookupSource already done"); @@ -598,9 +599,7 @@ private void innerJoinWithSpill(boolean probeHashEnabled, List whenSp private static void processRow(Driver joinDriver, TaskStateMachine taskStateMachine) { - joinDriver.getDriverContext().getYieldSignal().setWithDelay(TimeUnit.SECONDS.toNanos(1), joinDriver.getDriverContext().getYieldExecutor()); - joinDriver.process(); - joinDriver.getDriverContext().getYieldSignal().reset(); + joinDriver.process(new Duration(1, NANOSECONDS), 1); checkErrors(taskStateMachine); } diff --git a/core/trino-main/src/test/java/io/trino/operator/join/TestNestedLoopJoinOperator.java b/core/trino-main/src/test/java/io/trino/operator/join/TestNestedLoopJoinOperator.java index a31683fe89c9..8dbce0e28ef6 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/TestNestedLoopJoinOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/TestNestedLoopJoinOperator.java @@ -528,7 +528,7 @@ private static NestedLoopJoinOperatorFactory newJoinOperatorFactoryWithCompleted nestedLoopBuildOperatorFactory.noMoreOperators(); while (nestedLoopBuildOperator.isBlocked().isDone()) { - driver.process(); + driver.processUntilBlocked(); } return joinOperatorFactory; diff --git a/lib/trino-memory-context/src/main/java/io/trino/memory/context/ChildAggregatedMemoryContext.java b/lib/trino-memory-context/src/main/java/io/trino/memory/context/ChildAggregatedMemoryContext.java index a9400f0043b4..aa1affaad8f1 100644 --- a/lib/trino-memory-context/src/main/java/io/trino/memory/context/ChildAggregatedMemoryContext.java +++ b/lib/trino-memory-context/src/main/java/io/trino/memory/context/ChildAggregatedMemoryContext.java @@ -43,6 +43,7 @@ synchronized ListenableFuture updateBytes(String allocationTag, long delta @Override synchronized boolean tryUpdateBytes(String allocationTag, long delta) { + checkState(!isClosed(), "ChildAggregatedMemoryContext is already closed"); if (parentMemoryContext.tryUpdateBytes(allocationTag, delta)) { addBytes(delta); return true; diff --git a/lib/trino-memory-context/src/main/java/io/trino/memory/context/RootAggregatedMemoryContext.java b/lib/trino-memory-context/src/main/java/io/trino/memory/context/RootAggregatedMemoryContext.java index 8b8170256a7d..b132e36ac26b 100644 --- a/lib/trino-memory-context/src/main/java/io/trino/memory/context/RootAggregatedMemoryContext.java +++ b/lib/trino-memory-context/src/main/java/io/trino/memory/context/RootAggregatedMemoryContext.java @@ -46,6 +46,7 @@ synchronized ListenableFuture updateBytes(String allocationTag, long delta @Override synchronized boolean tryUpdateBytes(String allocationTag, long delta) { + checkState(!isClosed(), "RootAggregatedMemoryContext is already closed"); if (reservationHandler.tryReserveMemory(allocationTag, delta)) { addBytes(delta); return true; diff --git a/lib/trino-memory-context/src/main/java/io/trino/memory/context/SimpleAggregatedMemoryContext.java b/lib/trino-memory-context/src/main/java/io/trino/memory/context/SimpleAggregatedMemoryContext.java index a42cab089ec4..d0aeac5dc2d1 100644 --- a/lib/trino-memory-context/src/main/java/io/trino/memory/context/SimpleAggregatedMemoryContext.java +++ b/lib/trino-memory-context/src/main/java/io/trino/memory/context/SimpleAggregatedMemoryContext.java @@ -34,6 +34,7 @@ synchronized ListenableFuture updateBytes(String allocationTag, long delta @Override synchronized boolean tryUpdateBytes(String allocationTag, long delta) { + checkState(!isClosed(), "SimpleAggregatedMemoryContext is already closed"); addBytes(delta); return true; } diff --git a/plugin/trino-geospatial/src/test/java/io/trino/plugin/geospatial/TestSpatialJoinOperator.java b/plugin/trino-geospatial/src/test/java/io/trino/plugin/geospatial/TestSpatialJoinOperator.java index 2844a55d21aa..0850cb1f9b6d 100644 --- a/plugin/trino-geospatial/src/test/java/io/trino/plugin/geospatial/TestSpatialJoinOperator.java +++ b/plugin/trino-geospatial/src/test/java/io/trino/plugin/geospatial/TestSpatialJoinOperator.java @@ -497,7 +497,7 @@ private PagesSpatialIndexFactory buildIndex(DriverContext driverContext, Spatial ListenableFuture pagesSpatialIndex = pagesSpatialIndexFactory.createPagesSpatialIndex(); while (!pagesSpatialIndex.isDone()) { - driver.process(); + driver.processUntilBlocked(); } runDriverInThread(executor, driver); @@ -512,7 +512,7 @@ private static void runDriverInThread(ExecutorService executor, Driver driver) executor.execute(() -> { if (!driver.isFinished()) { try { - driver.process(); + driver.processUntilBlocked(); } catch (TrinoException e) { driver.getDriverContext().failed(e); diff --git a/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java b/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java index 5d6caa88a6b9..bc6416efceba 100644 --- a/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java +++ b/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java @@ -81,6 +81,7 @@ import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount; import static io.trino.SystemSessionProperties.getFilterAndProjectMinOutputPageSize; +import static io.trino.execution.executor.PrioritizedSplitRunner.SPLIT_RUN_QUANTA; import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING; import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.connector.DynamicFilter.EMPTY; @@ -273,7 +274,7 @@ protected Map execute(TaskContext taskContext) boolean processed = false; for (Driver driver : drivers) { if (!driver.isFinished()) { - driver.process(); + driver.processForDuration(SPLIT_RUN_QUANTA); long lastPeakMemory = peakMemory; peakMemory = taskContext.getTaskStats().getUserMemoryReservation().toBytes(); if (peakMemory <= lastPeakMemory) { diff --git a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashJoinBenchmark.java b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashJoinBenchmark.java index d981cca46e2b..a906834775ed 100644 --- a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashJoinBenchmark.java +++ b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashJoinBenchmark.java @@ -44,6 +44,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.benchmark.BenchmarkQueryRunner.createLocalQueryRunner; +import static io.trino.execution.executor.PrioritizedSplitRunner.SPLIT_RUN_QUANTA; import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier; import static io.trino.operator.PipelineExecutionStrategy.UNGROUPED_EXECUTION; import static io.trino.spiller.PartitioningSpillerFactory.unsupportedPartitioningSpillerFactory; @@ -130,7 +131,7 @@ protected List createDrivers(TaskContext taskContext) Driver driver = buildDriverFactory.createDriver(driverContext); Future lookupSourceProvider = lookupSourceFactoryManager.getJoinBridge(Lifespan.taskWide()).createLookupSourceProvider(); while (!lookupSourceProvider.isDone()) { - driver.process(); + driver.processForDuration(SPLIT_RUN_QUANTA); } getFutureValue(lookupSourceProvider).close(); } diff --git a/testing/trino-benchmark/src/test/java/io/trino/benchmark/MemoryLocalQueryRunner.java b/testing/trino-benchmark/src/test/java/io/trino/benchmark/MemoryLocalQueryRunner.java index 488e51a1c3fd..1b010819ded8 100644 --- a/testing/trino-benchmark/src/test/java/io/trino/benchmark/MemoryLocalQueryRunner.java +++ b/testing/trino-benchmark/src/test/java/io/trino/benchmark/MemoryLocalQueryRunner.java @@ -98,7 +98,7 @@ public List execute(@Language("SQL") String query) boolean processed = false; for (Driver driver : drivers) { if (!driver.isFinished()) { - driver.process(); + driver.processForNumberOfIterations(1); processed = true; } } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java index 3b53251d7739..63641c66f160 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java @@ -25,6 +25,8 @@ public static Map getExtraProperties() { return ImmutableMap.builder() .put("retry-policy", "TASK") + .put("retry-initial-delay", "50ms") + .put("retry-max-delay", "100ms") .put("fault-tolerant-execution-partition-count", "5") .put("fault-tolerant-execution-target-task-input-size", "10MB") .put("fault-tolerant-execution-target-task-split-count", "4")