2222import io .trino .execution .RemoteTask ;
2323import io .trino .execution .TableExecuteContextManager ;
2424import io .trino .execution .scheduler .ScheduleResult .BlockedReason ;
25- import io .trino .execution .scheduler .group .DynamicLifespanScheduler ;
26- import io .trino .execution .scheduler .group .FixedLifespanScheduler ;
27- import io .trino .execution .scheduler .group .LifespanScheduler ;
2825import io .trino .metadata .InternalNode ;
2926import io .trino .metadata .Split ;
30- import io .trino .operator .StageExecutionDescriptor ;
3127import io .trino .server .DynamicFilterService ;
3228import io .trino .spi .connector .ConnectorPartitionHandle ;
3329import io .trino .split .SplitSource ;
4339import java .util .function .Supplier ;
4440
4541import static com .google .common .base .Preconditions .checkArgument ;
46- import static com .google .common .base .Preconditions .checkState ;
4742import static com .google .common .base .Verify .verify ;
4843import static io .airlift .concurrent .MoreFutures .whenAnyComplete ;
4944import static io .trino .execution .scheduler .SourcePartitionedScheduler .newSourcePartitionedSchedulerAsSourceScheduler ;
@@ -59,15 +54,13 @@ public class FixedSourcePartitionedScheduler
5954 private final List <InternalNode > nodes ;
6055 private final List <SourceScheduler > sourceSchedulers ;
6156 private final List <ConnectorPartitionHandle > partitionHandles ;
62- private final Optional <LifespanScheduler > groupedLifespanScheduler ;
6357
6458 private final PartitionIdAllocator partitionIdAllocator ;
6559 private final Map <InternalNode , RemoteTask > scheduledTasks ;
6660
6761 public FixedSourcePartitionedScheduler (
6862 StageExecution stageExecution ,
6963 Map <PlanNodeId , SplitSource > splitSources ,
70- StageExecutionDescriptor stageExecutionDescriptor ,
7164 List <PlanNodeId > schedulingOrder ,
7265 List <InternalNode > nodes ,
7366 BucketNodeMap bucketNodeMap ,
@@ -93,20 +86,14 @@ public FixedSourcePartitionedScheduler(
9386 BucketedSplitPlacementPolicy splitPlacementPolicy = new BucketedSplitPlacementPolicy (nodeSelector , nodes , bucketNodeMap , stageExecution ::getAllTasks );
9487
9588 ArrayList <SourceScheduler > sourceSchedulers = new ArrayList <>();
96- checkArgument (
97- partitionHandles .equals (ImmutableList .of (NOT_PARTITIONED )) != stageExecutionDescriptor .isStageGroupedExecution (),
98- "PartitionHandles should be [NOT_PARTITIONED] if and only if all scan nodes use ungrouped execution strategy" );
99- int nodeCount = nodes .size ();
10089 int concurrentLifespans = partitionHandles .size ();
10190
10291 boolean firstPlanNode = true ;
103- Optional <LifespanScheduler > groupedLifespanScheduler = Optional .empty ();
10492
10593 partitionIdAllocator = new PartitionIdAllocator ();
10694 scheduledTasks = new HashMap <>();
10795 for (PlanNodeId planNodeId : schedulingOrder ) {
10896 SplitSource splitSource = splitSources .get (planNodeId );
109- boolean groupedExecutionForScanNode = stageExecutionDescriptor .isScanGroupedExecution (planNodeId );
11097 // TODO : change anySourceTaskBlocked to accommodate the correct blocked status of source tasks
11198 // (ref : https://github.com/trinodb/trino/issues/4713)
11299 SourceScheduler sourceScheduler = newSourcePartitionedSchedulerAsSourceScheduler (
@@ -115,47 +102,21 @@ public FixedSourcePartitionedScheduler(
115102 splitSource ,
116103 splitPlacementPolicy ,
117104 Math .max (splitBatchSize / concurrentLifespans , 1 ),
118- groupedExecutionForScanNode ,
105+ false ,
119106 dynamicFilterService ,
120107 tableExecuteContextManager ,
121108 () -> true ,
122109 partitionIdAllocator ,
123110 scheduledTasks );
124111
125- if (stageExecutionDescriptor .isStageGroupedExecution () && !groupedExecutionForScanNode ) {
126- sourceScheduler = new AsGroupedSourceScheduler (sourceScheduler );
127- }
128112 sourceSchedulers .add (sourceScheduler );
129113
130114 if (firstPlanNode ) {
131115 firstPlanNode = false ;
132- if (!stageExecutionDescriptor .isStageGroupedExecution ()) {
133- sourceScheduler .startLifespan (Lifespan .taskWide (), NOT_PARTITIONED );
134- sourceScheduler .noMoreLifespans ();
135- }
136- else {
137- LifespanScheduler lifespanScheduler ;
138- if (bucketNodeMap .isDynamic ()) {
139- // Callee of the constructor guarantees dynamic bucket node map will only be
140- // used when the stage has no remote source.
141- //
142- // When the stage has no remote source, any scan is grouped execution guarantees
143- // all scan is grouped execution.
144- lifespanScheduler = new DynamicLifespanScheduler (bucketNodeMap , nodes , partitionHandles );
145- }
146- else {
147- lifespanScheduler = new FixedLifespanScheduler (bucketNodeMap , partitionHandles );
148- }
149-
150- // Schedule the first few lifespans
151- lifespanScheduler .scheduleInitial (sourceScheduler );
152- // Schedule new lifespans for finished ones
153- stageExecution .addCompletedDriverGroupsChangedListener (lifespanScheduler ::onLifespanFinished );
154- groupedLifespanScheduler = Optional .of (lifespanScheduler );
155- }
116+ sourceScheduler .startLifespan (Lifespan .taskWide (), NOT_PARTITIONED );
117+ sourceScheduler .noMoreLifespans ();
156118 }
157119 }
158- this .groupedLifespanScheduler = groupedLifespanScheduler ;
159120 this .sourceSchedulers = sourceSchedulers ;
160121 }
161122
@@ -188,15 +149,6 @@ public ScheduleResult schedule()
188149 List <ListenableFuture <Void >> blocked = new ArrayList <>();
189150 BlockedReason blockedReason = BlockedReason .NO_ACTIVE_DRIVER_GROUP ;
190151
191- if (groupedLifespanScheduler .isPresent ()) {
192- // Start new driver groups on the first scheduler if necessary,
193- // i.e. when previous ones have finished execution (not finished scheduling).
194- //
195- // Invoke schedule method to get a new SettableFuture every time.
196- // Reusing previously returned SettableFuture could lead to the ListenableFuture retaining too many listeners.
197- blocked .add (groupedLifespanScheduler .get ().schedule (sourceSchedulers .get (0 )));
198- }
199-
200152 int splitsScheduled = 0 ;
201153 Iterator <SourceScheduler > schedulerIterator = sourceSchedulers .iterator ();
202154 List <Lifespan > driverGroupsToStart = ImmutableList .of ();
@@ -299,77 +251,4 @@ public InternalNode getNodeForBucket(int bucketId)
299251 return bucketNodeMap .getAssignedNode (bucketId ).get ();
300252 }
301253 }
302-
303- private static class AsGroupedSourceScheduler
304- implements SourceScheduler
305- {
306- private final SourceScheduler sourceScheduler ;
307- private boolean started ;
308- private boolean completed ;
309- private final List <Lifespan > pendingCompleted ;
310-
311- public AsGroupedSourceScheduler (SourceScheduler sourceScheduler )
312- {
313- this .sourceScheduler = requireNonNull (sourceScheduler , "sourceScheduler is null" );
314- pendingCompleted = new ArrayList <>();
315- }
316-
317- @ Override
318- public void start ()
319- {
320- sourceScheduler .start ();
321- }
322-
323- @ Override
324- public ScheduleResult schedule ()
325- {
326- return sourceScheduler .schedule ();
327- }
328-
329- @ Override
330- public void close ()
331- {
332- sourceScheduler .close ();
333- }
334-
335- @ Override
336- public PlanNodeId getPlanNodeId ()
337- {
338- return sourceScheduler .getPlanNodeId ();
339- }
340-
341- @ Override
342- public void startLifespan (Lifespan lifespan , ConnectorPartitionHandle partitionHandle )
343- {
344- pendingCompleted .add (lifespan );
345- if (started ) {
346- return ;
347- }
348- started = true ;
349- sourceScheduler .startLifespan (Lifespan .taskWide (), NOT_PARTITIONED );
350- sourceScheduler .noMoreLifespans ();
351- }
352-
353- @ Override
354- public void noMoreLifespans ()
355- {
356- checkState (started );
357- }
358-
359- @ Override
360- public List <Lifespan > drainCompletedLifespans ()
361- {
362- if (!completed ) {
363- List <Lifespan > lifespans = sourceScheduler .drainCompletedLifespans ();
364- if (lifespans .isEmpty ()) {
365- return ImmutableList .of ();
366- }
367- checkState (ImmutableList .of (Lifespan .taskWide ()).equals (lifespans ));
368- completed = true ;
369- }
370- List <Lifespan > result = ImmutableList .copyOf (pendingCompleted );
371- pendingCompleted .clear ();
372- return result ;
373- }
374- }
375254}
0 commit comments