diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java index 1721016c82c7..f367fea6757e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java @@ -69,6 +69,7 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Lists.reverse; +import static io.airlift.concurrent.MoreFutures.addExceptionCallback; import static io.airlift.concurrent.MoreFutures.addSuccessCallback; import static io.airlift.concurrent.MoreFutures.tryGetFutureValue; import static io.airlift.concurrent.MoreFutures.whenAnyComplete; @@ -287,7 +288,8 @@ private Scheduler createScheduler() .map(Exchange::getSourceHandles) .map(MoreFutures::toListenableFuture) .collect(toImmutableList()); - addSuccessCallback(Futures.allAsList(futures), result -> { + ListenableFuture>> allFuture = Futures.allAsList(futures); + addSuccessCallback(allFuture, result -> { List handles = result.stream() .flatMap(List::stream) .collect(toImmutableList()); @@ -297,6 +299,7 @@ private Scheduler createScheduler() } queryStateMachine.updateInputsForQueryResults(inputs.build(), true); }); + addExceptionCallback(allFuture, queryStateMachine::transitionToFailed); } return new Scheduler(