diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index 8c941109eb33..28b2f91aa17e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -1129,7 +1129,7 @@ private StageExecution getStageExecution(StageId stageId) private static List sortPlanInTopologicalOrder(SubPlan subPlan) { ImmutableList.Builder result = ImmutableList.builder(); - Traverser.forTree(SubPlan::getChildren).depthFirstPreOrder(subPlan).forEach(result::add); + Traverser.forTree(SubPlan::getChildren).depthFirstPostOrder(subPlan).forEach(result::add); return result.build(); } @@ -1295,7 +1295,7 @@ public Optional sealPartition(int partitionId) } StagePartition partition = getStagePartition(partitionId); - partition.seal(partitionId); + partition.seal(); if (!partition.isRunning()) { // if partition is not yet running update its priority as it is no longer speculative @@ -1769,7 +1769,7 @@ private boolean isFinalOutputSelectorDelivered(PlanNodeId planNodeId) return finalSelectors.contains(planNodeId); } - public void seal(int partitionId) + public void seal() { checkState(openTaskDescriptor.isPresent(), "openTaskDescriptor is empty"); TaskDescriptor taskDescriptor = openTaskDescriptor.get().createTaskDescriptor(partitionId); @@ -2006,6 +2006,12 @@ public boolean isNonSpeculative() { return priority < SPECULATIVE_EXECUTION_PRIORITY; } + + @Override + public String toString() + { + return "" + task.stageId() + "/" + task.partitionId() + "[" + priority + "]"; + } } private static class SchedulingQueue @@ -2039,7 +2045,8 @@ public void addOrUpdate(PrioritizedScheduledTask prioritizedTask) if (prioritizedTask.isNonSpeculative()) { nonSpeculativeTaskCount++; } - queue.addOrUpdate(prioritizedTask.task(), prioritizedTask.priority()); + // using negative priority here as will return entries with lowest pririty first and here we use bigger number for tasks with lower priority + queue.addOrUpdate(prioritizedTask.task(), -prioritizedTask.priority()); } }