Skip to content

Commit 9d6a0fd

Browse files
dekimirlosipiuk
authored andcommitted
Make AdaptivePlanner optional in the FTE scheduler
... instead of having a separate boolean field indicating whether the planner is to be used. Reduces potential for inadvertent erroneous use; simplifies method signatures.
1 parent 736ecbf commit 9d6a0fd

File tree

1 file changed

+9
-13
lines changed

1 file changed

+9
-13
lines changed

core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,7 @@ public class EventDrivenFaultTolerantQueryScheduler
219219
private final FailureDetector failureDetector;
220220
private final DynamicFilterService dynamicFilterService;
221221
private final TaskExecutionStats taskExecutionStats;
222-
private final AdaptivePlanner adaptivePlanner;
223-
private final boolean adaptiveQueryPlanningEnabled;
222+
private final Optional<AdaptivePlanner> adaptivePlanner;
224223
private final SubPlan originalPlan;
225224
private final boolean stageEstimationForEagerParentEnabled;
226225

@@ -275,8 +274,9 @@ public EventDrivenFaultTolerantQueryScheduler(
275274
this.failureDetector = requireNonNull(failureDetector, "failureDetector is null");
276275
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
277276
this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null");
278-
this.adaptivePlanner = requireNonNull(adaptivePlanner, "adaptivePlanner is null");
279-
this.adaptiveQueryPlanningEnabled = isFaultTolerantExecutionAdaptiveQueryPlanningEnabled(queryStateMachine.getSession());
277+
this.adaptivePlanner = isFaultTolerantExecutionAdaptiveQueryPlanningEnabled(queryStateMachine.getSession()) ?
278+
Optional.of(requireNonNull(adaptivePlanner, "adaptivePlanner is null")) :
279+
Optional.empty();
280280
this.originalPlan = requireNonNull(originalPlan, "originalPlan is null");
281281

282282
this.stageEstimationForEagerParentEnabled = isFaultTolerantExecutionStageEstimationForEagerParentEnabled(queryStateMachine.getSession());
@@ -361,8 +361,7 @@ public synchronized void start()
361361
getFaultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount(session),
362362
getFaultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize(session),
363363
stageEstimationForEagerParentEnabled,
364-
adaptivePlanner,
365-
adaptiveQueryPlanningEnabled);
364+
adaptivePlanner);
366365
queryExecutor.submit(scheduler::run);
367366
}
368367
catch (Throwable t) {
@@ -717,8 +716,7 @@ private static class Scheduler
717716
private SubPlan plan;
718717
private List<SubPlan> planInTopologicalOrder;
719718

720-
private final AdaptivePlanner adaptivePlanner;
721-
private final boolean adaptiveQueryPlanningEnabled;
719+
private final Optional<AdaptivePlanner> adaptivePlanner;
722720

723721
private final Map<StageId, StageExecution> stageExecutions = new HashMap<>();
724722
private final Map<SubPlan, IsReadyForExecutionResult> isReadyForExecutionCache = new HashMap<>();
@@ -764,8 +762,7 @@ public Scheduler(
764762
int runtimeAdaptivePartitioningPartitionCount,
765763
DataSize runtimeAdaptivePartitioningMaxTaskSize,
766764
boolean stageEstimationForEagerParentEnabled,
767-
AdaptivePlanner adaptivePlanner,
768-
boolean adaptiveQueryPlanningEnabled)
765+
Optional<AdaptivePlanner> adaptivePlanner)
769766
{
770767
this.queryStateMachine = requireNonNull(queryStateMachine, "queryStateMachine is null");
771768
this.metadata = requireNonNull(metadata, "metadata is null");
@@ -798,7 +795,6 @@ public Scheduler(
798795
this.runtimeAdaptivePartitioningPartitionCount = runtimeAdaptivePartitioningPartitionCount;
799796
this.runtimeAdaptivePartitioningMaxTaskSizeInBytes = requireNonNull(runtimeAdaptivePartitioningMaxTaskSize, "runtimeAdaptivePartitioningMaxTaskSize is null").toBytes();
800797
this.adaptivePlanner = requireNonNull(adaptivePlanner, "adaptivePlanner is null");
801-
this.adaptiveQueryPlanningEnabled = adaptiveQueryPlanningEnabled;
802798
this.stageEstimationForEagerParentEnabled = stageEstimationForEagerParentEnabled;
803799
this.schedulerSpan = tracer.spanBuilder("scheduler")
804800
.setParent(Context.current().with(queryStateMachine.getSession().getQuerySpan()))
@@ -1094,7 +1090,7 @@ private SubPlan optimizePlan(SubPlan plan)
10941090
{
10951091
// Re-optimize plan here based on available runtime statistics.
10961092
// Fragments changed due to re-optimization as well as their downstream stages are expected to be assigned new fragment ids.
1097-
if (!adaptiveQueryPlanningEnabled) {
1093+
if (adaptivePlanner.isEmpty()) {
10981094
return plan;
10991095
}
11001096

@@ -1124,7 +1120,7 @@ private SubPlan optimizePlan(SubPlan plan)
11241120
// already ready for execution.
11251121
if (isReadyForExecutionResult.isReadyForExecution()
11261122
&& (oldValue == null || !oldValue.isReadyForExecution())) {
1127-
return adaptivePlanner.optimize(plan, createRuntimeInfoProvider());
1123+
return adaptivePlanner.get().optimize(plan, createRuntimeInfoProvider());
11281124
}
11291125
}
11301126

0 commit comments

Comments
 (0)