150150import static java .lang .Math .max ;
151151import static java .lang .Math .min ;
152152import static java .lang .Math .round ;
153+ import static java .lang .Math .toIntExact ;
153154import static java .lang .String .format ;
154155import static java .util .Objects .requireNonNull ;
155156import static java .util .concurrent .TimeUnit .MILLISECONDS ;
@@ -883,12 +884,12 @@ private void scheduleTasks()
883884 long tasksWaitingForNode = preSchedulingTaskContexts .values ().stream ().filter (context -> !context .getNodeLease ().getNode ().isDone ()).count ();
884885
885886 while (tasksWaitingForNode < maxTasksWaitingForNode && !schedulingQueue .isEmpty ()) {
886- ScheduledTask scheduledTask = schedulingQueue .pollOrThrow ();
887- StageExecution stageExecution = getStageExecution (scheduledTask .stageId ());
887+ PrioritizedScheduledTask scheduledTask = schedulingQueue .pollOrThrow ();
888+ StageExecution stageExecution = getStageExecution (scheduledTask .task (). stageId ());
888889 if (stageExecution .getState ().isDone ()) {
889890 continue ;
890891 }
891- int partitionId = scheduledTask .partitionId ();
892+ int partitionId = scheduledTask .task (). partitionId ();
892893 Optional <NodeRequirements > nodeRequirements = stageExecution .getNodeRequirements (partitionId );
893894 if (nodeRequirements .isEmpty ()) {
894895 // execution finished
@@ -897,7 +898,7 @@ private void scheduleTasks()
897898 MemoryRequirements memoryRequirements = stageExecution .getMemoryRequirements (partitionId );
898899 NodeLease lease = nodeAllocator .acquire (nodeRequirements .get (), memoryRequirements .getRequiredMemory ());
899900 lease .getNode ().addListener (() -> eventQueue .add (Event .WAKE_UP ), queryExecutor );
900- preSchedulingTaskContexts .put (scheduledTask , new PreSchedulingTaskContext (lease ));
901+ preSchedulingTaskContexts .put (scheduledTask . task () , new PreSchedulingTaskContext (lease , scheduledTask . isSpeculative () ));
901902 tasksWaitingForNode ++;
902903 }
903904 }
@@ -964,7 +965,7 @@ public void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns
964965
965966 try {
966967 InternalNode node = getDone (nodeLease .getNode ());
967- Optional <RemoteTask > remoteTask = stageExecution .schedule (partitionId , sinkInstanceHandle , attempt , node );
968+ Optional <RemoteTask > remoteTask = stageExecution .schedule (partitionId , sinkInstanceHandle , attempt , node , context . isSpeculative () );
968969 remoteTask .ifPresent (task -> {
969970 task .addStateChangeListener (createExchangeSinkInstanceHandleUpdateRequiredListener ());
970971 task .addStateChangeListener (taskStatus -> {
@@ -1111,8 +1112,10 @@ public void onSplitAssignment(SplitAssignmentEvent event)
11111112 assignment .sealedPartitions ().forEach (partitionId -> {
11121113 Optional <PrioritizedScheduledTask > scheduledTask = stageExecution .sealPartition (partitionId );
11131114 scheduledTask .ifPresent (prioritizedTask -> {
1114- if (preSchedulingTaskContexts .containsKey (prioritizedTask .task ())) {
1115+ PreSchedulingTaskContext context = preSchedulingTaskContexts .get (prioritizedTask .task ());
1116+ if (context != null ) {
11151117 // task is already waiting for node or for sink instance handle
1118+ context .setSpeculative (prioritizedTask .isSpeculative ()); // update speculative flag
11161119 return ;
11171120 }
11181121 schedulingQueue .addOrUpdate (prioritizedTask );
@@ -1362,7 +1365,7 @@ public Optional<EventDrivenFaultTolerantQueryScheduler.GetExchangeSinkInstanceHa
13621365 attempt ));
13631366 }
13641367
1365- public Optional <RemoteTask > schedule (int partitionId , ExchangeSinkInstanceHandle exchangeSinkInstanceHandle , int attempt , InternalNode node )
1368+ public Optional <RemoteTask > schedule (int partitionId , ExchangeSinkInstanceHandle exchangeSinkInstanceHandle , int attempt , InternalNode node , boolean speculative )
13661369 {
13671370 if (getState ().isDone ()) {
13681371 return Optional .empty ();
@@ -1404,7 +1407,7 @@ public Optional<RemoteTask> schedule(int partitionId, ExchangeSinkInstanceHandle
14041407 splits ,
14051408 noMoreSplits ,
14061409 Optional .of (partition .getMemoryRequirements ().getRequiredMemory ()),
1407- false ); // TODO pass correct value here
1410+ speculative );
14081411 task .ifPresent (remoteTask -> {
14091412 partition .addTask (remoteTask , outputBuffers );
14101413 runningPartitions .add (partitionId );
@@ -2042,15 +2045,16 @@ public int getNonSpeculativeTaskCount()
20422045 return nonSpeculativeTaskCount ;
20432046 }
20442047
2045- public ScheduledTask pollOrThrow ()
2048+ public PrioritizedScheduledTask pollOrThrow ()
20462049 {
2047- ScheduledTask task = queue .poll ();
2050+ IndexedPriorityQueue . Prioritized < ScheduledTask > task = queue .pollPrioritized ();
20482051 checkState (task != null , "queue is empty" );
20492052 if (nonSpeculativeTaskCount > 0 ) {
20502053 // non speculative tasks are always pooled first
20512054 nonSpeculativeTaskCount --;
20522055 }
2053- return task ;
2056+ // negate priority to reverse operation we do in addOrUpdate
2057+ return new PrioritizedScheduledTask (task .getValue (), toIntExact (-task .getPriority ()));
20542058 }
20552059
20562060 public void addOrUpdate (PrioritizedScheduledTask prioritizedTask )
@@ -2332,18 +2336,31 @@ private record GetExchangeSinkInstanceHandleResult(CompletableFuture<ExchangeSin
23322336 private static class PreSchedulingTaskContext
23332337 {
23342338 private final NodeLease nodeLease ;
2339+ private boolean speculative ;
23352340 private boolean waitingForSinkInstanceHandle ;
23362341
2337- public PreSchedulingTaskContext (NodeLease nodeLease )
2342+ public PreSchedulingTaskContext (NodeLease nodeLease , boolean speculative )
23382343 {
23392344 this .nodeLease = requireNonNull (nodeLease , "nodeLease is null" );
2345+ this .speculative = speculative ;
23402346 }
23412347
23422348 public NodeLease getNodeLease ()
23432349 {
23442350 return nodeLease ;
23452351 }
23462352
2353+ public boolean isSpeculative ()
2354+ {
2355+ return speculative ;
2356+ }
2357+
2358+ public void setSpeculative (boolean speculative )
2359+ {
2360+ checkArgument (!speculative || this .speculative , "cannot change speculative flag false -> true" );
2361+ this .speculative = speculative ;
2362+ }
2363+
23472364 public boolean isWaitingForSinkInstanceHandle ()
23482365 {
23492366 return waitingForSinkInstanceHandle ;
0 commit comments