From e0b2d068a94ef2eb46b00a15b1559dd155caa0d1 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 24 Nov 2023 16:56:07 +0100 Subject: [PATCH] Add or update code comments in EventDrivenFaultTolerantQueryScheduler --- .../EventDrivenFaultTolerantQueryScheduler.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 4af6fe5d9f18..d1b1dd7f4017 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -286,7 +286,7 @@ public synchronized void start() } }); - // when query is done or any time a stage completes, attempt to transition query to "final query info ready" + // when query is done, attempt to transition query to "final query info ready" queryStateMachine.addStateChangeListener(state -> { if (!state.isDone()) { return; @@ -629,6 +629,8 @@ public void run() Optional failure = Optional.empty(); try { + // schedule() is the main logic, but expensive, so we do not want to call it after every event. + // Process events for some time (measured by schedulingDelayer) before invoking schedule() next time. if (schedule()) { while (processEvents()) { if (schedulingDelayer.getRemainingDelayInMillis() > 0) { @@ -672,6 +674,9 @@ private Optional closeAndAddSuppressed(Optional existingFa return existingFailure; } + /** + * @return whether processing should continue + */ private boolean processEvents() { try { @@ -705,6 +710,9 @@ private boolean processEvents() } } + /** + * @return whether processing should continue + */ private boolean schedule() { if (checkComplete()) { @@ -1452,6 +1460,7 @@ else if (taskState == TaskState.FAILED) { List replacementTasks = stageExecution.taskFailed(taskId, failureInfo, taskStatus); replacementTasks.forEach(schedulingQueue::addOrUpdate); + // When tasks fail for some intermittent reason, delay scheduling retries if (shouldDelayScheduling(failureInfo.getErrorCode())) { schedulingDelayer.startOrProlongDelayIfNecessary(); scheduledExecutorService.schedule(() -> eventQueue.add(Event.WAKE_UP), schedulingDelayer.getRemainingDelayInMillis(), MILLISECONDS);