diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 22a34933a117..c8fcca351dab 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -137,6 +137,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -1659,6 +1660,33 @@ public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns nodeLease.release(); } }); + + // we observed it may happen that final task info notification may be lost. + // in such case query progression will be blocked. + // the code below is a stop-gap to mitigate this issue and unblock or fail query + // until we find and fix the bug + AtomicBoolean finalTaskInfoReceived = new AtomicBoolean(); + task.addStateChangeListener(taskStatus -> { + if (!taskStatus.getState().isDone()) { + return; + } + switch (taskStatus.getState()) { + case FINISHED -> scheduledExecutorService.schedule(() -> { + if (!finalTaskInfoReceived.get()) { + log.error("Did not receive final task info for task %s after it FINISHED; internal inconsistency; failing query", task.getTaskId()); + queryStateMachine.transitionToFailed(new TrinoException(GENERIC_INTERNAL_ERROR, "Did not receive final task info for task after it finished; failing query")); + } + }, 1, MINUTES); + case CANCELED, ABORTED, FAILED -> scheduledExecutorService.schedule(() -> { + if (!finalTaskInfoReceived.get()) { + log.error("Did not receive final task info for task %s after it %s; internal inconsistency; marking task failed in scheduler to unblock query progression", taskStatus.getState(), task.getTaskId()); + eventQueue.add(new RemoteTaskCompletedEvent(taskStatus)); + } + }, 1, MINUTES); + default -> throw new IllegalStateException("Unexpected task state: " + taskStatus.getState()); + } + }); + task.addFinalTaskInfoListener(taskExecutionStats::update); task.addFinalTaskInfoListener(taskInfo -> eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.taskStatus()))); nodeLease.attachTaskId(task.getTaskId());