Skip to content

Commit 2c302ba

Browse files
committed
Make logic for nonSpeculativeTaskCount more explicit
Previously there was a brittle assumption that addOrUpdate is called with just once for non-speculative task and for speculative task we get exactly two calls - add with speculative priority and with - update with non-speculative priority This PR makes logic more explicit and relaxes assumptions
1 parent 1f10771 commit 2c302ba

File tree

2 files changed

+29
-6
lines changed

2 files changed

+29
-6
lines changed

core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@ public E poll()
8989
return entry.getValue();
9090
}
9191

92+
public Prioritized<E> getPrioritized(E element)
93+
{
94+
Entry<E> entry = index.get(element);
95+
if (entry == null) {
96+
return null;
97+
}
98+
99+
return new Prioritized<>(entry.getValue(), entry.getPriority());
100+
}
101+
92102
public Prioritized<E> pollPrioritized()
93103
{
94104
Entry<E> entry = pollEntry();

core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2049,22 +2049,35 @@ public PrioritizedScheduledTask pollOrThrow()
20492049
{
20502050
IndexedPriorityQueue.Prioritized<ScheduledTask> task = queue.pollPrioritized();
20512051
checkState(task != null, "queue is empty");
2052-
if (nonSpeculativeTaskCount > 0) {
2053-
// non speculative tasks are always pooled first
2052+
PrioritizedScheduledTask prioritizedTask = getPrioritizedTask(task);
2053+
if (!prioritizedTask.isSpeculative()) {
20542054
nonSpeculativeTaskCount--;
20552055
}
2056-
// negate priority to reverse operation we do in addOrUpdate
2057-
return new PrioritizedScheduledTask(task.getValue(), toIntExact(-task.getPriority()));
2056+
return prioritizedTask;
20582057
}
20592058

20602059
public void addOrUpdate(PrioritizedScheduledTask prioritizedTask)
20612060
{
2062-
if (!prioritizedTask.isSpeculative()) {
2061+
IndexedPriorityQueue.Prioritized<ScheduledTask> previousTask = queue.getPrioritized(prioritizedTask.task());
2062+
PrioritizedScheduledTask previousPrioritizedTask = null;
2063+
if (previousTask != null) {
2064+
previousPrioritizedTask = getPrioritizedTask(previousTask);
2065+
}
2066+
2067+
if (!prioritizedTask.isSpeculative() && (previousPrioritizedTask == null || previousPrioritizedTask.isSpeculative())) {
2068+
// number of non-speculative tasks increased
20632069
nonSpeculativeTaskCount++;
20642070
}
2065-
// using negative priority here as will return entries with lowest pririty first and here we use bigger number for tasks with lower priority
2071+
2072+
// using negative priority here as will return entries with the lowest priority first and here we use bigger number for tasks with lower priority
20662073
queue.addOrUpdate(prioritizedTask.task(), -prioritizedTask.priority());
20672074
}
2075+
2076+
private static PrioritizedScheduledTask getPrioritizedTask(IndexedPriorityQueue.Prioritized<ScheduledTask> task)
2077+
{
2078+
// negate priority to reverse operation we do in addOrUpdate
2079+
return new PrioritizedScheduledTask(task.getValue(), toIntExact(-task.getPriority()));
2080+
}
20682081
}
20692082

20702083
private static class SchedulingDelayer

0 commit comments

Comments
 (0)