diff --git a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestJdbcConnection.java b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestJdbcConnection.java index 16e302983f17..e342be2a0e9b 100644 --- a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestJdbcConnection.java +++ b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestJdbcConnection.java @@ -419,13 +419,13 @@ private void testRole(String roleParameterValue, ClientSelectedRole clientSelect public void testSessionProperties() throws SQLException { - try (Connection connection = createConnection("roles=hive:admin&sessionProperties=hive.temporary_staging_directory_path:/tmp;execution_policy:phased")) { + try (Connection connection = createConnection("roles=hive:admin&sessionProperties=hive.temporary_staging_directory_path:/tmp;execution_policy:legacy-phased")) { TrinoConnection trinoConnection = connection.unwrap(TrinoConnection.class); assertThat(trinoConnection.getSessionProperties()) .extractingByKeys("hive.temporary_staging_directory_path", "execution_policy") - .containsExactly("/tmp", "phased"); + .containsExactly("/tmp", "legacy-phased"); assertThat(listSession(connection)).containsAll(ImmutableSet.of( - "execution_policy|phased|all-at-once", + "execution_policy|legacy-phased|phased", "hive.temporary_staging_directory_path|/tmp|/tmp/presto-${USER}")); } } diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 346f6353c030..f462dcc6a0e6 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -62,7 +62,7 @@ public class QueryManagerConfig private Duration remoteTaskMaxErrorDuration = new Duration(5, TimeUnit.MINUTES); private int remoteTaskMaxCallbackThreads = 1000; - private String queryExecutionPolicy = "all-at-once"; + private String queryExecutionPolicy = "phased"; private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS); private Duration queryMaxExecutionTime = new Duration(100, TimeUnit.DAYS); private Duration queryMaxPlanningTime = new Duration(10, TimeUnit.MINUTES); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PrioritizeUtilizationExecutionPolicy.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/LegacyPhasedExecutionPolicy.java similarity index 87% rename from core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PrioritizeUtilizationExecutionPolicy.java rename to core/trino-main/src/main/java/io/trino/execution/scheduler/policy/LegacyPhasedExecutionPolicy.java index db854b685ae9..e970a16f8c07 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PrioritizeUtilizationExecutionPolicy.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/LegacyPhasedExecutionPolicy.java @@ -17,12 +17,12 @@ import java.util.Collection; -public class PrioritizeUtilizationExecutionPolicy +public class LegacyPhasedExecutionPolicy implements ExecutionPolicy { @Override public ExecutionSchedule createExecutionSchedule(Collection stages) { - return PrioritizeUtilizationExecutionSchedule.forStages(stages); + return new LegacyPhasedExecutionSchedule(stages); } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/LegacyPhasedExecutionSchedule.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/LegacyPhasedExecutionSchedule.java new file mode 100644 index 000000000000..dd85684d11db --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/LegacyPhasedExecutionSchedule.java @@ -0,0 +1,330 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler.policy; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.trino.execution.scheduler.StageExecution; +import io.trino.sql.planner.PlanFragment; +import io.trino.sql.planner.plan.ExchangeNode; +import io.trino.sql.planner.plan.IndexJoinNode; +import io.trino.sql.planner.plan.JoinNode; +import io.trino.sql.planner.plan.PlanFragmentId; +import io.trino.sql.planner.plan.PlanNode; +import io.trino.sql.planner.plan.PlanVisitor; +import io.trino.sql.planner.plan.RemoteSourceNode; +import io.trino.sql.planner.plan.SemiJoinNode; +import io.trino.sql.planner.plan.SpatialJoinNode; +import io.trino.sql.planner.plan.UnionNode; +import org.jgrapht.DirectedGraph; +import org.jgrapht.alg.StrongConnectivityInspector; +import org.jgrapht.graph.DefaultDirectedGraph; +import org.jgrapht.graph.DefaultEdge; +import org.jgrapht.traverse.TopologicalOrderIterator; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.execution.scheduler.StageExecution.State.FLUSHING; +import static io.trino.execution.scheduler.StageExecution.State.RUNNING; +import static io.trino.execution.scheduler.StageExecution.State.SCHEDULED; +import static io.trino.sql.planner.plan.ExchangeNode.Scope.LOCAL; +import static java.util.function.Function.identity; + +@NotThreadSafe +public class LegacyPhasedExecutionSchedule + implements ExecutionSchedule +{ + private final List> schedulePhases; + private final Set activeSources = new HashSet<>(); + + public LegacyPhasedExecutionSchedule(Collection stages) + { + List> phases = extractPhases(stages.stream().map(StageExecution::getFragment).collect(toImmutableList())); + + Map stagesByFragmentId = stages.stream().collect(toImmutableMap(stage -> stage.getFragment().getId(), identity())); + + // create a mutable list of mutable sets of stages, so we can remove completed stages + schedulePhases = new ArrayList<>(); + for (Set phase : phases) { + schedulePhases.add(phase.stream() + .map(stagesByFragmentId::get) + .collect(Collectors.toCollection(HashSet::new))); + } + } + + @Override + public StagesScheduleResult getStagesToSchedule() + { + removeCompletedStages(); + addPhasesIfNecessary(); + if (isFinished()) { + return new StagesScheduleResult(ImmutableSet.of()); + } + return new StagesScheduleResult(activeSources); + } + + private void removeCompletedStages() + { + for (Iterator stageIterator = activeSources.iterator(); stageIterator.hasNext(); ) { + StageExecution.State state = stageIterator.next().getState(); + if (state == SCHEDULED || state == RUNNING || state == FLUSHING || state.isDone()) { + stageIterator.remove(); + } + } + } + + private void addPhasesIfNecessary() + { + // we want at least one source distributed phase in the active sources + if (hasSourceDistributedStage(activeSources)) { + return; + } + + while (!schedulePhases.isEmpty()) { + Set phase = schedulePhases.remove(0); + activeSources.addAll(phase); + if (hasSourceDistributedStage(phase)) { + return; + } + } + } + + private static boolean hasSourceDistributedStage(Set phase) + { + return phase.stream().anyMatch(stage -> !stage.getFragment().getPartitionedSources().isEmpty()); + } + + @Override + public boolean isFinished() + { + return activeSources.isEmpty() && schedulePhases.isEmpty(); + } + + @VisibleForTesting + static List> extractPhases(Collection fragments) + { + // Build a graph where the plan fragments are vertexes and the edges represent + // a before -> after relationship. For example, a join hash build has an edge + // to the join probe. + DirectedGraph graph = new DefaultDirectedGraph<>(DefaultEdge.class); + fragments.forEach(fragment -> graph.addVertex(fragment.getId())); + + Visitor visitor = new Visitor(fragments, graph); + for (PlanFragment fragment : fragments) { + visitor.processFragment(fragment.getId()); + } + + // Computes all the strongly connected components of the directed graph. + // These are the "phases" which hold the set of fragments that must be started + // at the same time to avoid deadlock. + List> components = new StrongConnectivityInspector<>(graph).stronglyConnectedSets(); + + Map> componentMembership = new HashMap<>(); + for (Set component : components) { + for (PlanFragmentId planFragmentId : component) { + componentMembership.put(planFragmentId, component); + } + } + + // build graph of components (phases) + DirectedGraph, DefaultEdge> componentGraph = new DefaultDirectedGraph<>(DefaultEdge.class); + components.forEach(componentGraph::addVertex); + for (DefaultEdge edge : graph.edgeSet()) { + PlanFragmentId source = graph.getEdgeSource(edge); + PlanFragmentId target = graph.getEdgeTarget(edge); + + Set from = componentMembership.get(source); + Set to = componentMembership.get(target); + if (!from.equals(to)) { // the topological order iterator below doesn't include vertices that have self-edges, so don't add them + componentGraph.addEdge(from, to); + } + } + + List> schedulePhases = ImmutableList.copyOf(new TopologicalOrderIterator<>(componentGraph)); + return schedulePhases; + } + + private static class Visitor + extends PlanVisitor, PlanFragmentId> + { + private final Map fragments; + private final DirectedGraph graph; + private final Map> fragmentSources = new HashMap<>(); + + public Visitor(Collection fragments, DirectedGraph graph) + { + this.fragments = fragments.stream() + .collect(toImmutableMap(PlanFragment::getId, identity())); + this.graph = graph; + } + + public Set processFragment(PlanFragmentId planFragmentId) + { + if (fragmentSources.containsKey(planFragmentId)) { + return fragmentSources.get(planFragmentId); + } + + Set fragment = processFragment(fragments.get(planFragmentId)); + fragmentSources.put(planFragmentId, fragment); + return fragment; + } + + private Set processFragment(PlanFragment fragment) + { + Set sources = fragment.getRoot().accept(this, fragment.getId()); + return ImmutableSet.builder().add(fragment.getId()).addAll(sources).build(); + } + + @Override + public Set visitJoin(JoinNode node, PlanFragmentId currentFragmentId) + { + return processJoin(node.getRight(), node.getLeft(), currentFragmentId); + } + + @Override + public Set visitSpatialJoin(SpatialJoinNode node, PlanFragmentId currentFragmentId) + { + return processJoin(node.getRight(), node.getLeft(), currentFragmentId); + } + + @Override + public Set visitSemiJoin(SemiJoinNode node, PlanFragmentId currentFragmentId) + { + return processJoin(node.getFilteringSource(), node.getSource(), currentFragmentId); + } + + @Override + public Set visitIndexJoin(IndexJoinNode node, PlanFragmentId currentFragmentId) + { + return processJoin(node.getIndexSource(), node.getProbeSource(), currentFragmentId); + } + + private Set processJoin(PlanNode build, PlanNode probe, PlanFragmentId currentFragmentId) + { + Set buildSources = build.accept(this, currentFragmentId); + Set probeSources = probe.accept(this, currentFragmentId); + + for (PlanFragmentId buildSource : buildSources) { + for (PlanFragmentId probeSource : probeSources) { + graph.addEdge(buildSource, probeSource); + } + } + + return ImmutableSet.builder() + .addAll(buildSources) + .addAll(probeSources) + .build(); + } + + @Override + public Set visitRemoteSource(RemoteSourceNode node, PlanFragmentId currentFragmentId) + { + ImmutableSet.Builder sources = ImmutableSet.builder(); + + Set previousFragmentSources = ImmutableSet.of(); + for (PlanFragmentId remoteFragment : node.getSourceFragmentIds()) { + // this current fragment depends on the remote fragment + graph.addEdge(currentFragmentId, remoteFragment); + + // get all sources for the remote fragment + Set remoteFragmentSources = processFragment(remoteFragment); + sources.addAll(remoteFragmentSources); + + // For UNION there can be multiple sources. + // Link the previous source to the current source, so we only + // schedule one at a time. + addEdges(previousFragmentSources, remoteFragmentSources); + + previousFragmentSources = remoteFragmentSources; + } + + return sources.build(); + } + + @Override + public Set visitExchange(ExchangeNode node, PlanFragmentId currentFragmentId) + { + checkArgument(node.getScope() == LOCAL, "Only local exchanges are supported in the phased execution scheduler"); + ImmutableSet.Builder allSources = ImmutableSet.builder(); + + // Link the source fragments together, so we only schedule one at a time. + Set previousSources = ImmutableSet.of(); + for (PlanNode subPlanNode : node.getSources()) { + Set currentSources = subPlanNode.accept(this, currentFragmentId); + allSources.addAll(currentSources); + + addEdges(previousSources, currentSources); + + previousSources = currentSources; + } + + return allSources.build(); + } + + @Override + public Set visitUnion(UnionNode node, PlanFragmentId currentFragmentId) + { + ImmutableSet.Builder allSources = ImmutableSet.builder(); + + // Link the source fragments together, so we only schedule one at a time. + Set previousSources = ImmutableSet.of(); + for (PlanNode subPlanNode : node.getSources()) { + Set currentSources = subPlanNode.accept(this, currentFragmentId); + allSources.addAll(currentSources); + + addEdges(previousSources, currentSources); + + previousSources = currentSources; + } + + return allSources.build(); + } + + @Override + protected Set visitPlan(PlanNode node, PlanFragmentId currentFragmentId) + { + List sources = node.getSources(); + if (sources.isEmpty()) { + return ImmutableSet.of(currentFragmentId); + } + if (sources.size() == 1) { + return sources.get(0).accept(this, currentFragmentId); + } + throw new UnsupportedOperationException("not yet implemented: " + node.getClass().getName()); + } + + private void addEdges(Set sourceFragments, Set targetFragments) + { + for (PlanFragmentId targetFragment : targetFragments) { + for (PlanFragmentId sourceFragment : sourceFragments) { + graph.addEdge(sourceFragment, targetFragment); + } + } + } + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PhasedExecutionPolicy.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PhasedExecutionPolicy.java index 2515f9c7da9e..3e49e510814f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PhasedExecutionPolicy.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PhasedExecutionPolicy.java @@ -23,6 +23,6 @@ public class PhasedExecutionPolicy @Override public ExecutionSchedule createExecutionSchedule(Collection stages) { - return new PhasedExecutionSchedule(stages); + return PhasedExecutionSchedule.forStages(stages); } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PhasedExecutionSchedule.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PhasedExecutionSchedule.java index 135dd8ac593d..6e4ce692471f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PhasedExecutionSchedule.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PhasedExecutionSchedule.java @@ -14,10 +14,13 @@ package io.trino.execution.scheduler.policy; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.trino.execution.scheduler.StageExecution; +import io.trino.execution.scheduler.StageExecution.State; import io.trino.sql.planner.PlanFragment; +import io.trino.sql.planner.plan.AggregationNode; import io.trino.sql.planner.plan.ExchangeNode; import io.trino.sql.planner.plan.IndexJoinNode; import io.trino.sql.planner.plan.JoinNode; @@ -27,304 +30,532 @@ import io.trino.sql.planner.plan.RemoteSourceNode; import io.trino.sql.planner.plan.SemiJoinNode; import io.trino.sql.planner.plan.SpatialJoinNode; -import io.trino.sql.planner.plan.UnionNode; import org.jgrapht.DirectedGraph; +import org.jgrapht.EdgeFactory; import org.jgrapht.alg.StrongConnectivityInspector; import org.jgrapht.graph.DefaultDirectedGraph; -import org.jgrapht.graph.DefaultEdge; -import org.jgrapht.traverse.TopologicalOrderIterator; +import oshi.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.NotThreadSafe; - -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.execution.scheduler.StageExecution.State.FLUSHING; import static io.trino.execution.scheduler.StageExecution.State.RUNNING; import static io.trino.execution.scheduler.StageExecution.State.SCHEDULED; +import static io.trino.sql.planner.plan.AggregationNode.Step.FINAL; +import static io.trino.sql.planner.plan.AggregationNode.Step.SINGLE; import static io.trino.sql.planner.plan.ExchangeNode.Scope.LOCAL; +import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; -@NotThreadSafe +/** + * Schedules stages choosing to order to provide the best resource utilization. + * This means that stages which output won't be consumed (e.g. join probe side) will + * not be scheduled until dependent stages finish (e.g. join build source stages). + * Contrary to {@link LegacyPhasedExecutionPolicy}, {@link PhasedExecutionSchedule} will + * schedule multiple source stages in order to fully utilize IO. + */ public class PhasedExecutionSchedule implements ExecutionSchedule { - private final List> schedulePhases; - private final Set activeSources = new HashSet<>(); - - public PhasedExecutionSchedule(Collection stages) + /** + * Graph representing a before -> after relationship between fragments. + * Destination fragment should be started only when source stage is completed. + */ + private final DirectedGraph fragmentDependency; + /** + * Graph representing topology between fragments (e.g. child -> parent relationship). + */ + private final DirectedGraph fragmentTopology; + private final Map stagesByFragmentId; + private final Set activeStages = new HashSet<>(); + + @GuardedBy("this") + private SettableFuture rescheduleFuture = SettableFuture.create(); + + public static PhasedExecutionSchedule forStages(Collection stages) { - List> phases = extractPhases(stages.stream().map(StageExecution::getFragment).collect(toImmutableList())); + PhasedExecutionSchedule schedule = new PhasedExecutionSchedule(stages); + schedule.init(stages); + return schedule; + } - Map stagesByFragmentId = stages.stream().collect(toImmutableMap(stage -> stage.getFragment().getId(), identity())); + private PhasedExecutionSchedule(Collection stages) + { + fragmentDependency = new DefaultDirectedGraph<>(new FragmentsEdgeFactory()); + fragmentTopology = new DefaultDirectedGraph<>(new FragmentsEdgeFactory()); + stagesByFragmentId = stages.stream() + .collect(toImmutableMap(stage -> stage.getFragment().getId(), identity())); + } - // create a mutable list of mutable sets of stages, so we can remove completed stages - schedulePhases = new ArrayList<>(); - for (Set phase : phases) { - schedulePhases.add(phase.stream() - .map(stagesByFragmentId::get) - .collect(Collectors.toCollection(HashSet::new))); - } + private void init(Collection stages) + { + extractDependenciesAndReturnNonLazyFragments( + stages.stream() + .map(StageExecution::getFragment) + .collect(toImmutableList())).stream() + // start non-lazy stages + .map(stagesByFragmentId::get) + .forEach(this::selectForExecution); + // start stages without any dependencies + fragmentDependency.vertexSet().stream() + .filter(fragmentId -> fragmentDependency.inDegreeOf(fragmentId) == 0) + .map(stagesByFragmentId::get) + .forEach(this::selectForExecution); } @Override public StagesScheduleResult getStagesToSchedule() { - removeCompletedStages(); - addPhasesIfNecessary(); - if (isFinished()) { - return new StagesScheduleResult(ImmutableSet.of()); - } - return new StagesScheduleResult(activeSources); + // obtain reschedule future before actual scheduling, so that state change + // notifications from previously started stages are not lost + Optional> rescheduleFuture = getRescheduleFuture(); + schedule(); + return new StagesScheduleResult(activeStages, rescheduleFuture); } - private void removeCompletedStages() + @Override + public boolean isFinished() { - for (Iterator stageIterator = activeSources.iterator(); stageIterator.hasNext(); ) { - StageExecution.State state = stageIterator.next().getState(); - if (state == SCHEDULED || state == RUNNING || state == FLUSHING || state.isDone()) { - stageIterator.remove(); - } - } + // dependency graph contains both running and not started fragments + return fragmentDependency.vertexSet().isEmpty(); } - private void addPhasesIfNecessary() + @VisibleForTesting + synchronized Optional> getRescheduleFuture() { - // we want at least one source distributed phase in the active sources - if (hasSourceDistributedStage(activeSources)) { - return; - } - - while (!schedulePhases.isEmpty()) { - Set phase = schedulePhases.remove(0); - activeSources.addAll(phase); - if (hasSourceDistributedStage(phase)) { - return; - } - } + return Optional.of(rescheduleFuture); } - private static boolean hasSourceDistributedStage(Set phase) + @VisibleForTesting + void schedule() { - return phase.stream().anyMatch(stage -> !stage.getFragment().getPartitionedSources().isEmpty()); + removeCompletedStages(); + unblockStagesWithFullOutputBuffer(); } - @Override - public boolean isFinished() + @VisibleForTesting + DirectedGraph getFragmentDependency() { - return activeSources.isEmpty() && schedulePhases.isEmpty(); + return fragmentDependency; } @VisibleForTesting - static List> extractPhases(Collection fragments) + Set getActiveStages() { - // Build a graph where the plan fragments are vertexes and the edges represent - // a before -> after relationship. For example, a join hash build has an edge - // to the join probe. - DirectedGraph graph = new DefaultDirectedGraph<>(DefaultEdge.class); - fragments.forEach(fragment -> graph.addVertex(fragment.getId())); + return activeStages; + } - Visitor visitor = new Visitor(fragments, graph); - for (PlanFragment fragment : fragments) { - visitor.processFragment(fragment.getId()); - } + private void removeCompletedStages() + { + Set completedStages = activeStages.stream() + .filter(this::isStageCompleted) + .collect(toImmutableSet()); + // remove completed stages outside of Java stream to prevent concurrent modification + completedStages.forEach(this::removeCompletedStage); + } - // Computes all the strongly connected components of the directed graph. - // These are the "phases" which hold the set of fragments that must be started - // at the same time to avoid deadlock. - List> components = new StrongConnectivityInspector<>(graph).stronglyConnectedSets(); + private void removeCompletedStage(StageExecution stage) + { + // start all stages that depend on completed stage + PlanFragmentId fragmentId = stage.getFragment().getId(); + fragmentDependency.outgoingEdgesOf(fragmentId).stream() + .map(FragmentsEdge::getTarget) + // filter stages that depend on completed stage only + .filter(dependentFragmentId -> fragmentDependency.inDegreeOf(dependentFragmentId) == 1) + .map(stagesByFragmentId::get) + .forEach(this::selectForExecution); + fragmentDependency.removeVertex(fragmentId); + fragmentTopology.removeVertex(fragmentId); + activeStages.remove(stage); + } - Map> componentMembership = new HashMap<>(); - for (Set component : components) { - for (PlanFragmentId planFragmentId : component) { - componentMembership.put(planFragmentId, component); - } + private void unblockStagesWithFullOutputBuffer() + { + // find stages that are blocked on full task output buffer + Set blockedFragments = activeStages.stream() + .filter(StageExecution::isAnyTaskBlocked) + .map(stage -> stage.getFragment().getId()) + .collect(toImmutableSet()); + // start immediate downstream stages so that data can be consumed + blockedFragments.stream() + .flatMap(fragmentId -> fragmentTopology.outgoingEdgesOf(fragmentId).stream()) + .map(FragmentsEdge::getTarget) + .map(stagesByFragmentId::get) + .forEach(this::selectForExecution); + } + + private void selectForExecution(StageExecution stage) + { + if (fragmentDependency.outDegreeOf(stage.getFragment().getId()) > 0) { + // if there are any dependent stages then reschedule when stage is completed + stage.addStateChangeListener(state -> { + if (isStageCompleted(stage)) { + notifyReschedule(); + } + }); } + activeStages.add(stage); + } - // build graph of components (phases) - DirectedGraph, DefaultEdge> componentGraph = new DefaultDirectedGraph<>(DefaultEdge.class); - components.forEach(componentGraph::addVertex); - for (DefaultEdge edge : graph.edgeSet()) { - PlanFragmentId source = graph.getEdgeSource(edge); - PlanFragmentId target = graph.getEdgeTarget(edge); - - Set from = componentMembership.get(source); - Set to = componentMembership.get(target); - if (!from.equals(to)) { // the topological order iterator below doesn't include vertices that have self-edges, so don't add them - componentGraph.addEdge(from, to); - } + private void notifyReschedule() + { + SettableFuture rescheduleFuture; + synchronized (this) { + rescheduleFuture = this.rescheduleFuture; + this.rescheduleFuture = SettableFuture.create(); } + // notify listeners outside of the critical section + rescheduleFuture.set(null); + } - List> schedulePhases = ImmutableList.copyOf(new TopologicalOrderIterator<>(componentGraph)); - return schedulePhases; + private boolean isStageCompleted(StageExecution stage) + { + State state = stage.getState(); + return state == SCHEDULED || state == RUNNING || state == FLUSHING || state.isDone(); + } + + private Set extractDependenciesAndReturnNonLazyFragments(Collection fragments) + { + // Build a graph where the plan fragments are vertexes and the edges represent + // a before -> after relationship. Destination fragment should be started only + // when source fragment is completed. For example, a join hash build has an edge + // to the join probe. + Visitor visitor = new Visitor(fragments); + visitor.processAllFragments(); + + // Make sure there are no strongly connected components as it would mean circular dependency between stages + List> components = new StrongConnectivityInspector<>(fragmentDependency).stronglyConnectedSets(); + verify(components.size() == fragmentDependency.vertexSet().size(), "circular dependency between stages"); + + return visitor.getNonLazyFragments(); } - private static class Visitor - extends PlanVisitor, PlanFragmentId> + private class Visitor + extends PlanVisitor { private final Map fragments; - private final DirectedGraph graph; - private final Map> fragmentSources = new HashMap<>(); + private final ImmutableSet.Builder nonLazyFragments = ImmutableSet.builder(); + private final Map fragmentSubGraphs = new HashMap<>(); - public Visitor(Collection fragments, DirectedGraph graph) + public Visitor(Collection fragments) { - this.fragments = fragments.stream() + this.fragments = requireNonNull(fragments, "fragments is null").stream() .collect(toImmutableMap(PlanFragment::getId, identity())); - this.graph = graph; } - public Set processFragment(PlanFragmentId planFragmentId) + public Set getNonLazyFragments() + { + return nonLazyFragments.build(); + } + + public void processAllFragments() + { + fragments.forEach((fragmentId, fragment) -> { + fragmentDependency.addVertex(fragmentId); + fragmentTopology.addVertex(fragmentId); + }); + fragments.forEach((fragmentId, fragment) -> processFragment(fragmentId)); + } + + public FragmentSubGraph processFragment(PlanFragmentId planFragmentId) { - if (fragmentSources.containsKey(planFragmentId)) { - return fragmentSources.get(planFragmentId); + if (fragmentSubGraphs.containsKey(planFragmentId)) { + return fragmentSubGraphs.get(planFragmentId); } - Set fragment = processFragment(fragments.get(planFragmentId)); - fragmentSources.put(planFragmentId, fragment); - return fragment; + FragmentSubGraph subGraph = processFragment(fragments.get(planFragmentId)); + verify(fragmentSubGraphs.put(planFragmentId, subGraph) == null, "fragment %s was already processed", planFragmentId); + return subGraph; } - private Set processFragment(PlanFragment fragment) + private FragmentSubGraph processFragment(PlanFragment fragment) { - Set sources = fragment.getRoot().accept(this, fragment.getId()); - return ImmutableSet.builder().add(fragment.getId()).addAll(sources).build(); + FragmentSubGraph subGraph = fragment.getRoot().accept(this, fragment.getId()); + // append current fragment to set of upstream fragments as it is no longer being visited + Set upstreamFragments = ImmutableSet.builder() + .addAll(subGraph.getUpstreamFragments()) + .add(fragment.getId()) + .build(); + Set lazyUpstreamFragments; + if (subGraph.isCurrentFragmentLazy()) { + // append current fragment as a lazy fragment as it is no longer being visited + lazyUpstreamFragments = ImmutableSet.builder() + .addAll(subGraph.getLazyUpstreamFragments()) + .add(fragment.getId()) + .build(); + } + else { + lazyUpstreamFragments = subGraph.getLazyUpstreamFragments(); + nonLazyFragments.add(fragment.getId()); + } + return new FragmentSubGraph( + upstreamFragments, + lazyUpstreamFragments, + // no longer relevant as we have finished visiting given fragment + false); } @Override - public Set visitJoin(JoinNode node, PlanFragmentId currentFragmentId) + public FragmentSubGraph visitJoin(JoinNode node, PlanFragmentId currentFragmentId) { - return processJoin(node.getRight(), node.getLeft(), currentFragmentId); + return processJoin( + node.getDistributionType().orElseThrow() == JoinNode.DistributionType.REPLICATED, + node.getLeft(), + node.getRight(), + currentFragmentId); } @Override - public Set visitSpatialJoin(SpatialJoinNode node, PlanFragmentId currentFragmentId) + public FragmentSubGraph visitSpatialJoin(SpatialJoinNode node, PlanFragmentId currentFragmentId) { - return processJoin(node.getRight(), node.getLeft(), currentFragmentId); + return processJoin( + node.getDistributionType() == SpatialJoinNode.DistributionType.REPLICATED, + node.getLeft(), + node.getRight(), + currentFragmentId); } @Override - public Set visitSemiJoin(SemiJoinNode node, PlanFragmentId currentFragmentId) + public FragmentSubGraph visitSemiJoin(SemiJoinNode node, PlanFragmentId currentFragmentId) { - return processJoin(node.getFilteringSource(), node.getSource(), currentFragmentId); + return processJoin( + node.getDistributionType().orElseThrow() == SemiJoinNode.DistributionType.REPLICATED, + node.getSource(), + node.getFilteringSource(), + currentFragmentId); } @Override - public Set visitIndexJoin(IndexJoinNode node, PlanFragmentId currentFragmentId) + public FragmentSubGraph visitIndexJoin(IndexJoinNode node, PlanFragmentId currentFragmentId) { - return processJoin(node.getIndexSource(), node.getProbeSource(), currentFragmentId); + return processJoin( + true, + node.getProbeSource(), + node.getIndexSource(), + currentFragmentId); } - private Set processJoin(PlanNode build, PlanNode probe, PlanFragmentId currentFragmentId) + private FragmentSubGraph processJoin(boolean replicated, PlanNode probe, PlanNode build, PlanFragmentId currentFragmentId) { - Set buildSources = build.accept(this, currentFragmentId); - Set probeSources = probe.accept(this, currentFragmentId); - - for (PlanFragmentId buildSource : buildSources) { - for (PlanFragmentId probeSource : probeSources) { - graph.addEdge(buildSource, probeSource); - } + FragmentSubGraph probeSubGraph = probe.accept(this, currentFragmentId); + FragmentSubGraph buildSubGraph = build.accept(this, currentFragmentId); + + // start probe source stages after all build source stages finish + addDependencyEdges(buildSubGraph.getUpstreamFragments(), probeSubGraph.getLazyUpstreamFragments()); + + boolean currentFragmentLazy = probeSubGraph.isCurrentFragmentLazy() && buildSubGraph.isCurrentFragmentLazy(); + if (replicated && currentFragmentLazy) { + // Do not start join stage (which can also be a source stage with table scans) + // for replicated join until build source stage enters FLUSHING state. + // Broadcast join limit for CBO is set in such a way that build source data should + // fit into task output buffer. + // In case build source stage is blocked on full task buffer then join stage + // will be started automatically regardless od dependency. This is handled by + // unblockStagesWithFullOutputBuffer method. + addDependencyEdges(buildSubGraph.getUpstreamFragments(), ImmutableSet.of(currentFragmentId)); + } + else { + // start current fragment immediately since for partitioned join + // build source data won't be able to fit into task output buffer. + currentFragmentLazy = false; } - return ImmutableSet.builder() - .addAll(buildSources) - .addAll(probeSources) - .build(); + return new FragmentSubGraph( + ImmutableSet.builder() + .addAll(probeSubGraph.getUpstreamFragments()) + .addAll(buildSubGraph.getUpstreamFragments()) + .build(), + // only probe source fragments can be considered lazy + // since build source stages should be started immediately + probeSubGraph.getLazyUpstreamFragments(), + currentFragmentLazy); } @Override - public Set visitRemoteSource(RemoteSourceNode node, PlanFragmentId currentFragmentId) + public FragmentSubGraph visitAggregation(AggregationNode node, PlanFragmentId currentFragmentId) { - ImmutableSet.Builder sources = ImmutableSet.builder(); - - Set previousFragmentSources = ImmutableSet.of(); - for (PlanFragmentId remoteFragment : node.getSourceFragmentIds()) { - // this current fragment depends on the remote fragment - graph.addEdge(currentFragmentId, remoteFragment); - - // get all sources for the remote fragment - Set remoteFragmentSources = processFragment(remoteFragment); - sources.addAll(remoteFragmentSources); - - // For UNION there can be multiple sources. - // Link the previous source to the current source, so we only - // schedule one at a time. - addEdges(previousFragmentSources, remoteFragmentSources); - - previousFragmentSources = remoteFragmentSources; + FragmentSubGraph subGraph = node.getSource().accept(this, currentFragmentId); + if (node.getStep() != FINAL && node.getStep() != SINGLE) { + return subGraph; } - return sources.build(); + // start current fragment immediately since final/single aggregation will fully + // consume input before producing output data (aggregation shouldn't get blocked) + return new FragmentSubGraph( + subGraph.getUpstreamFragments(), + ImmutableSet.of(), + false); } @Override - public Set visitExchange(ExchangeNode node, PlanFragmentId currentFragmentId) + public FragmentSubGraph visitRemoteSource(RemoteSourceNode node, PlanFragmentId currentFragmentId) { - checkArgument(node.getScope() == LOCAL, "Only local exchanges are supported in the phased execution scheduler"); - ImmutableSet.Builder allSources = ImmutableSet.builder(); + List subGraphs = node.getSourceFragmentIds().stream() + .map(this::processFragment) + .collect(toImmutableList()); + node.getSourceFragmentIds() + .forEach(sourceFragmentId -> fragmentTopology.addEdge(sourceFragmentId, currentFragmentId)); + return new FragmentSubGraph( + subGraphs.stream() + .flatMap(source -> source.getUpstreamFragments().stream()) + .collect(toImmutableSet()), + subGraphs.stream() + .flatMap(source -> source.getLazyUpstreamFragments().stream()) + .collect(toImmutableSet()), + // initially current fragment is considered to be lazy unless there exist + // an operator that can fully consume input data without producing any output + // (e.g. final aggregation) + true); + } - // Link the source fragments together, so we only schedule one at a time. - Set previousSources = ImmutableSet.of(); - for (PlanNode subPlanNode : node.getSources()) { - Set currentSources = subPlanNode.accept(this, currentFragmentId); - allSources.addAll(currentSources); + @Override + public FragmentSubGraph visitExchange(ExchangeNode node, PlanFragmentId currentFragmentId) + { + checkArgument(node.getScope() == LOCAL, "Only local exchanges are supported in the phased execution scheduler"); + return visitPlan(node, currentFragmentId); + } - addEdges(previousSources, currentSources); + @Override + protected FragmentSubGraph visitPlan(PlanNode node, PlanFragmentId currentFragmentId) + { + List sourceSubGraphs = node.getSources().stream() + .map(subPlanNode -> subPlanNode.accept(this, currentFragmentId)) + .collect(toImmutableList()); + + return new FragmentSubGraph( + sourceSubGraphs.stream() + .flatMap(source -> source.getUpstreamFragments().stream()) + .collect(toImmutableSet()), + sourceSubGraphs.stream() + .flatMap(source -> source.getLazyUpstreamFragments().stream()) + .collect(toImmutableSet()), + sourceSubGraphs.stream() + .allMatch(FragmentSubGraph::isCurrentFragmentLazy)); + } - previousSources = currentSources; + private void addDependencyEdges(Set sourceFragments, Set targetFragments) + { + for (PlanFragmentId targetFragment : targetFragments) { + for (PlanFragmentId sourceFragment : sourceFragments) { + fragmentDependency.addEdge(sourceFragment, targetFragment); + } } + } + } + + private static class FragmentSubGraph + { + /** + * All upstream fragments (excluding currently visited fragment) + */ + private final Set upstreamFragments; + /** + * All upstream lazy fragments (excluding currently visited fragment). + * Lazy fragments don't have to be started immediately. + */ + private final Set lazyUpstreamFragments; + /** + * Is currently visited fragment lazy? + */ + private final boolean currentFragmentLazy; + + public FragmentSubGraph( + Set upstreamFragments, + Set lazyUpstreamFragments, + boolean currentFragmentLazy) + { + this.upstreamFragments = requireNonNull(upstreamFragments, "upstreamFragments is null"); + this.lazyUpstreamFragments = requireNonNull(lazyUpstreamFragments, "lazyUpstreamFragments is null"); + this.currentFragmentLazy = currentFragmentLazy; + } + + public Set getUpstreamFragments() + { + return upstreamFragments; + } - return allSources.build(); + public Set getLazyUpstreamFragments() + { + return lazyUpstreamFragments; } + public boolean isCurrentFragmentLazy() + { + return currentFragmentLazy; + } + } + + private static class FragmentsEdgeFactory + implements EdgeFactory + { @Override - public Set visitUnion(UnionNode node, PlanFragmentId currentFragmentId) + public FragmentsEdge createEdge(PlanFragmentId sourceVertex, PlanFragmentId targetVertex) { - ImmutableSet.Builder allSources = ImmutableSet.builder(); + return new FragmentsEdge(sourceVertex, targetVertex); + } + } - // Link the source fragments together, so we only schedule one at a time. - Set previousSources = ImmutableSet.of(); - for (PlanNode subPlanNode : node.getSources()) { - Set currentSources = subPlanNode.accept(this, currentFragmentId); - allSources.addAll(currentSources); + @VisibleForTesting + static class FragmentsEdge + { + private final PlanFragmentId source; + private final PlanFragmentId target; - addEdges(previousSources, currentSources); + public FragmentsEdge(PlanFragmentId source, PlanFragmentId target) + { + this.source = requireNonNull(source, "source is null"); + this.target = requireNonNull(target, "target is null"); + } - previousSources = currentSources; - } + public PlanFragmentId getSource() + { + return source; + } + + public PlanFragmentId getTarget() + { + return target; + } - return allSources.build(); + @Override + public String toString() + { + return toStringHelper(this) + .add("source", source) + .add("target", target) + .toString(); } @Override - protected Set visitPlan(PlanNode node, PlanFragmentId currentFragmentId) + public boolean equals(Object o) { - List sources = node.getSources(); - if (sources.isEmpty()) { - return ImmutableSet.of(currentFragmentId); + if (this == o) { + return true; } - if (sources.size() == 1) { - return sources.get(0).accept(this, currentFragmentId); + if (o == null || getClass() != o.getClass()) { + return false; } - throw new UnsupportedOperationException("not yet implemented: " + node.getClass().getName()); + FragmentsEdge that = (FragmentsEdge) o; + return source.equals(that.source) && target.equals(that.target); } - private void addEdges(Set sourceFragments, Set targetFragments) + @Override + public int hashCode() { - for (PlanFragmentId targetFragment : targetFragments) { - for (PlanFragmentId sourceFragment : sourceFragments) { - graph.addEdge(sourceFragment, targetFragment); - } - } + return Objects.hash(source, target); } } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PrioritizeUtilizationExecutionSchedule.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PrioritizeUtilizationExecutionSchedule.java deleted file mode 100644 index a2411751921b..000000000000 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PrioritizeUtilizationExecutionSchedule.java +++ /dev/null @@ -1,561 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.execution.scheduler.policy; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import io.trino.execution.scheduler.StageExecution; -import io.trino.execution.scheduler.StageExecution.State; -import io.trino.sql.planner.PlanFragment; -import io.trino.sql.planner.plan.AggregationNode; -import io.trino.sql.planner.plan.ExchangeNode; -import io.trino.sql.planner.plan.IndexJoinNode; -import io.trino.sql.planner.plan.JoinNode; -import io.trino.sql.planner.plan.PlanFragmentId; -import io.trino.sql.planner.plan.PlanNode; -import io.trino.sql.planner.plan.PlanVisitor; -import io.trino.sql.planner.plan.RemoteSourceNode; -import io.trino.sql.planner.plan.SemiJoinNode; -import io.trino.sql.planner.plan.SpatialJoinNode; -import org.jgrapht.DirectedGraph; -import org.jgrapht.EdgeFactory; -import org.jgrapht.alg.StrongConnectivityInspector; -import org.jgrapht.graph.DefaultDirectedGraph; -import oshi.annotation.concurrent.GuardedBy; - -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; - -import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Verify.verify; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static io.trino.execution.scheduler.StageExecution.State.FLUSHING; -import static io.trino.execution.scheduler.StageExecution.State.RUNNING; -import static io.trino.execution.scheduler.StageExecution.State.SCHEDULED; -import static io.trino.sql.planner.plan.AggregationNode.Step.FINAL; -import static io.trino.sql.planner.plan.AggregationNode.Step.SINGLE; -import static io.trino.sql.planner.plan.ExchangeNode.Scope.LOCAL; -import static java.util.Objects.requireNonNull; -import static java.util.function.Function.identity; - -/** - * Schedules stages choosing to order to provide the best resource utilization. - * This means that stages which output won't be consumed (e.g. join probe side) will - * not be scheduled until dependent stages finish (e.g. join build source stages). - * Contrary to {@link PhasedExecutionPolicy}, {@link PrioritizeUtilizationExecutionSchedule} will - * schedule multiple source stages in order to fully utilize IO. - */ -public class PrioritizeUtilizationExecutionSchedule - implements ExecutionSchedule -{ - /** - * Graph representing a before -> after relationship between fragments. - * Destination fragment should be started only when source stage is completed. - */ - private final DirectedGraph fragmentDependency; - /** - * Graph representing topology between fragments (e.g. child -> parent relationship). - */ - private final DirectedGraph fragmentTopology; - private final Map stagesByFragmentId; - private final Set activeStages = new HashSet<>(); - - @GuardedBy("this") - private SettableFuture rescheduleFuture = SettableFuture.create(); - - public static PrioritizeUtilizationExecutionSchedule forStages(Collection stages) - { - PrioritizeUtilizationExecutionSchedule schedule = new PrioritizeUtilizationExecutionSchedule(stages); - schedule.init(stages); - return schedule; - } - - private PrioritizeUtilizationExecutionSchedule(Collection stages) - { - fragmentDependency = new DefaultDirectedGraph<>(new FragmentsEdgeFactory()); - fragmentTopology = new DefaultDirectedGraph<>(new FragmentsEdgeFactory()); - stagesByFragmentId = stages.stream() - .collect(toImmutableMap(stage -> stage.getFragment().getId(), identity())); - } - - private void init(Collection stages) - { - extractDependenciesAndReturnNonLazyFragments( - stages.stream() - .map(StageExecution::getFragment) - .collect(toImmutableList())).stream() - // start non-lazy stages - .map(stagesByFragmentId::get) - .forEach(this::selectForExecution); - // start stages without any dependencies - fragmentDependency.vertexSet().stream() - .filter(fragmentId -> fragmentDependency.inDegreeOf(fragmentId) == 0) - .map(stagesByFragmentId::get) - .forEach(this::selectForExecution); - } - - @Override - public StagesScheduleResult getStagesToSchedule() - { - // obtain reschedule future before actual scheduling, so that state change - // notifications from previously started stages are not lost - Optional> rescheduleFuture = getRescheduleFuture(); - schedule(); - return new StagesScheduleResult(activeStages, rescheduleFuture); - } - - @Override - public boolean isFinished() - { - // dependency graph contains both running and not started fragments - return fragmentDependency.vertexSet().isEmpty(); - } - - @VisibleForTesting - synchronized Optional> getRescheduleFuture() - { - return Optional.of(rescheduleFuture); - } - - @VisibleForTesting - void schedule() - { - removeCompletedStages(); - unblockStagesWithFullOutputBuffer(); - } - - @VisibleForTesting - DirectedGraph getFragmentDependency() - { - return fragmentDependency; - } - - @VisibleForTesting - Set getActiveStages() - { - return activeStages; - } - - private void removeCompletedStages() - { - Set completedStages = activeStages.stream() - .filter(this::isStageCompleted) - .collect(toImmutableSet()); - // remove completed stages outside of Java stream to prevent concurrent modification - completedStages.forEach(this::removeCompletedStage); - } - - private void removeCompletedStage(StageExecution stage) - { - // start all stages that depend on completed stage - PlanFragmentId fragmentId = stage.getFragment().getId(); - fragmentDependency.outgoingEdgesOf(fragmentId).stream() - .map(FragmentsEdge::getTarget) - // filter stages that depend on completed stage only - .filter(dependentFragmentId -> fragmentDependency.inDegreeOf(dependentFragmentId) == 1) - .map(stagesByFragmentId::get) - .forEach(this::selectForExecution); - fragmentDependency.removeVertex(fragmentId); - fragmentTopology.removeVertex(fragmentId); - activeStages.remove(stage); - } - - private void unblockStagesWithFullOutputBuffer() - { - // find stages that are blocked on full task output buffer - Set blockedFragments = activeStages.stream() - .filter(StageExecution::isAnyTaskBlocked) - .map(stage -> stage.getFragment().getId()) - .collect(toImmutableSet()); - // start immediate downstream stages so that data can be consumed - blockedFragments.stream() - .flatMap(fragmentId -> fragmentTopology.outgoingEdgesOf(fragmentId).stream()) - .map(FragmentsEdge::getTarget) - .map(stagesByFragmentId::get) - .forEach(this::selectForExecution); - } - - private void selectForExecution(StageExecution stage) - { - if (fragmentDependency.outDegreeOf(stage.getFragment().getId()) > 0) { - // if there are any dependent stages then reschedule when stage is completed - stage.addStateChangeListener(state -> { - if (isStageCompleted(stage)) { - notifyReschedule(); - } - }); - } - activeStages.add(stage); - } - - private void notifyReschedule() - { - SettableFuture rescheduleFuture; - synchronized (this) { - rescheduleFuture = this.rescheduleFuture; - this.rescheduleFuture = SettableFuture.create(); - } - // notify listeners outside of the critical section - rescheduleFuture.set(null); - } - - private boolean isStageCompleted(StageExecution stage) - { - State state = stage.getState(); - return state == SCHEDULED || state == RUNNING || state == FLUSHING || state.isDone(); - } - - private Set extractDependenciesAndReturnNonLazyFragments(Collection fragments) - { - // Build a graph where the plan fragments are vertexes and the edges represent - // a before -> after relationship. Destination fragment should be started only - // when source fragment is completed. For example, a join hash build has an edge - // to the join probe. - Visitor visitor = new Visitor(fragments); - visitor.processAllFragments(); - - // Make sure there are no strongly connected components as it would mean circular dependency between stages - List> components = new StrongConnectivityInspector<>(fragmentDependency).stronglyConnectedSets(); - verify(components.size() == fragmentDependency.vertexSet().size(), "circular dependency between stages"); - - return visitor.getNonLazyFragments(); - } - - private class Visitor - extends PlanVisitor - { - private final Map fragments; - private final ImmutableSet.Builder nonLazyFragments = ImmutableSet.builder(); - private final Map fragmentSubGraphs = new HashMap<>(); - - public Visitor(Collection fragments) - { - this.fragments = requireNonNull(fragments, "fragments is null").stream() - .collect(toImmutableMap(PlanFragment::getId, identity())); - } - - public Set getNonLazyFragments() - { - return nonLazyFragments.build(); - } - - public void processAllFragments() - { - fragments.forEach((fragmentId, fragment) -> { - fragmentDependency.addVertex(fragmentId); - fragmentTopology.addVertex(fragmentId); - }); - fragments.forEach((fragmentId, fragment) -> processFragment(fragmentId)); - } - - public FragmentSubGraph processFragment(PlanFragmentId planFragmentId) - { - if (fragmentSubGraphs.containsKey(planFragmentId)) { - return fragmentSubGraphs.get(planFragmentId); - } - - FragmentSubGraph subGraph = processFragment(fragments.get(planFragmentId)); - verify(fragmentSubGraphs.put(planFragmentId, subGraph) == null, "fragment %s was already processed", planFragmentId); - return subGraph; - } - - private FragmentSubGraph processFragment(PlanFragment fragment) - { - FragmentSubGraph subGraph = fragment.getRoot().accept(this, fragment.getId()); - // append current fragment to set of upstream fragments as it is no longer being visited - Set upstreamFragments = ImmutableSet.builder() - .addAll(subGraph.getUpstreamFragments()) - .add(fragment.getId()) - .build(); - Set lazyUpstreamFragments; - if (subGraph.isCurrentFragmentLazy()) { - // append current fragment as a lazy fragment as it is no longer being visited - lazyUpstreamFragments = ImmutableSet.builder() - .addAll(subGraph.getLazyUpstreamFragments()) - .add(fragment.getId()) - .build(); - } - else { - lazyUpstreamFragments = subGraph.getLazyUpstreamFragments(); - nonLazyFragments.add(fragment.getId()); - } - return new FragmentSubGraph( - upstreamFragments, - lazyUpstreamFragments, - // no longer relevant as we have finished visiting given fragment - false); - } - - @Override - public FragmentSubGraph visitJoin(JoinNode node, PlanFragmentId currentFragmentId) - { - return processJoin( - node.getDistributionType().orElseThrow() == JoinNode.DistributionType.REPLICATED, - node.getLeft(), - node.getRight(), - currentFragmentId); - } - - @Override - public FragmentSubGraph visitSpatialJoin(SpatialJoinNode node, PlanFragmentId currentFragmentId) - { - return processJoin( - node.getDistributionType() == SpatialJoinNode.DistributionType.REPLICATED, - node.getLeft(), - node.getRight(), - currentFragmentId); - } - - @Override - public FragmentSubGraph visitSemiJoin(SemiJoinNode node, PlanFragmentId currentFragmentId) - { - return processJoin( - node.getDistributionType().orElseThrow() == SemiJoinNode.DistributionType.REPLICATED, - node.getSource(), - node.getFilteringSource(), - currentFragmentId); - } - - @Override - public FragmentSubGraph visitIndexJoin(IndexJoinNode node, PlanFragmentId currentFragmentId) - { - return processJoin( - true, - node.getProbeSource(), - node.getIndexSource(), - currentFragmentId); - } - - private FragmentSubGraph processJoin(boolean replicated, PlanNode probe, PlanNode build, PlanFragmentId currentFragmentId) - { - FragmentSubGraph probeSubGraph = probe.accept(this, currentFragmentId); - FragmentSubGraph buildSubGraph = build.accept(this, currentFragmentId); - - // start probe source stages after all build source stages finish - addDependencyEdges(buildSubGraph.getUpstreamFragments(), probeSubGraph.getLazyUpstreamFragments()); - - boolean currentFragmentLazy = probeSubGraph.isCurrentFragmentLazy() && buildSubGraph.isCurrentFragmentLazy(); - if (replicated && currentFragmentLazy) { - // Do not start join stage (which can also be a source stage with table scans) - // for replicated join until build source stage enters FLUSHING state. - // Broadcast join limit for CBO is set in such a way that build source data should - // fit into task output buffer. - // In case build source stage is blocked on full task buffer then join stage - // will be started automatically regardless od dependency. This is handled by - // unblockStagesWithFullOutputBuffer method. - addDependencyEdges(buildSubGraph.getUpstreamFragments(), ImmutableSet.of(currentFragmentId)); - } - else { - // start current fragment immediately since for partitioned join - // build source data won't be able to fit into task output buffer. - currentFragmentLazy = false; - } - - return new FragmentSubGraph( - ImmutableSet.builder() - .addAll(probeSubGraph.getUpstreamFragments()) - .addAll(buildSubGraph.getUpstreamFragments()) - .build(), - // only probe source fragments can be considered lazy - // since build source stages should be started immediately - probeSubGraph.getLazyUpstreamFragments(), - currentFragmentLazy); - } - - @Override - public FragmentSubGraph visitAggregation(AggregationNode node, PlanFragmentId currentFragmentId) - { - FragmentSubGraph subGraph = node.getSource().accept(this, currentFragmentId); - if (node.getStep() != FINAL && node.getStep() != SINGLE) { - return subGraph; - } - - // start current fragment immediately since final/single aggregation will fully - // consume input before producing output data (aggregation shouldn't get blocked) - return new FragmentSubGraph( - subGraph.getUpstreamFragments(), - ImmutableSet.of(), - false); - } - - @Override - public FragmentSubGraph visitRemoteSource(RemoteSourceNode node, PlanFragmentId currentFragmentId) - { - List subGraphs = node.getSourceFragmentIds().stream() - .map(this::processFragment) - .collect(toImmutableList()); - node.getSourceFragmentIds() - .forEach(sourceFragmentId -> fragmentTopology.addEdge(sourceFragmentId, currentFragmentId)); - return new FragmentSubGraph( - subGraphs.stream() - .flatMap(source -> source.getUpstreamFragments().stream()) - .collect(toImmutableSet()), - subGraphs.stream() - .flatMap(source -> source.getLazyUpstreamFragments().stream()) - .collect(toImmutableSet()), - // initially current fragment is considered to be lazy unless there exist - // an operator that can fully consume input data without producing any output - // (e.g. final aggregation) - true); - } - - @Override - public FragmentSubGraph visitExchange(ExchangeNode node, PlanFragmentId currentFragmentId) - { - checkArgument(node.getScope() == LOCAL, "Only local exchanges are supported in the prioritize utilization scheduler"); - return visitPlan(node, currentFragmentId); - } - - @Override - protected FragmentSubGraph visitPlan(PlanNode node, PlanFragmentId currentFragmentId) - { - List sourceSubGraphs = node.getSources().stream() - .map(subPlanNode -> subPlanNode.accept(this, currentFragmentId)) - .collect(toImmutableList()); - - return new FragmentSubGraph( - sourceSubGraphs.stream() - .flatMap(source -> source.getUpstreamFragments().stream()) - .collect(toImmutableSet()), - sourceSubGraphs.stream() - .flatMap(source -> source.getLazyUpstreamFragments().stream()) - .collect(toImmutableSet()), - sourceSubGraphs.stream() - .allMatch(FragmentSubGraph::isCurrentFragmentLazy)); - } - - private void addDependencyEdges(Set sourceFragments, Set targetFragments) - { - for (PlanFragmentId targetFragment : targetFragments) { - for (PlanFragmentId sourceFragment : sourceFragments) { - fragmentDependency.addEdge(sourceFragment, targetFragment); - } - } - } - } - - private static class FragmentSubGraph - { - /** - * All upstream fragments (excluding currently visited fragment) - */ - private final Set upstreamFragments; - /** - * All upstream lazy fragments (excluding currently visited fragment). - * Lazy fragments don't have to be started immediately. - */ - private final Set lazyUpstreamFragments; - /** - * Is currently visited fragment lazy? - */ - private final boolean currentFragmentLazy; - - public FragmentSubGraph( - Set upstreamFragments, - Set lazyUpstreamFragments, - boolean currentFragmentLazy) - { - this.upstreamFragments = requireNonNull(upstreamFragments, "upstreamFragments is null"); - this.lazyUpstreamFragments = requireNonNull(lazyUpstreamFragments, "lazyUpstreamFragments is null"); - this.currentFragmentLazy = currentFragmentLazy; - } - - public Set getUpstreamFragments() - { - return upstreamFragments; - } - - public Set getLazyUpstreamFragments() - { - return lazyUpstreamFragments; - } - - public boolean isCurrentFragmentLazy() - { - return currentFragmentLazy; - } - } - - private static class FragmentsEdgeFactory - implements EdgeFactory - { - @Override - public FragmentsEdge createEdge(PlanFragmentId sourceVertex, PlanFragmentId targetVertex) - { - return new FragmentsEdge(sourceVertex, targetVertex); - } - } - - @VisibleForTesting - static class FragmentsEdge - { - private final PlanFragmentId source; - private final PlanFragmentId target; - - public FragmentsEdge(PlanFragmentId source, PlanFragmentId target) - { - this.source = requireNonNull(source, "source is null"); - this.target = requireNonNull(target, "target is null"); - } - - public PlanFragmentId getSource() - { - return source; - } - - public PlanFragmentId getTarget() - { - return target; - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("source", source) - .add("target", target) - .toString(); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - FragmentsEdge that = (FragmentsEdge) o; - return source.equals(that.source) && target.equals(that.target); - } - - @Override - public int hashCode() - { - return Objects.hash(source, target); - } - } -} diff --git a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java index 44869f8a6c7d..19f6af4f5933 100644 --- a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java +++ b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java @@ -63,8 +63,8 @@ import io.trino.execution.scheduler.SplitSchedulerStats; import io.trino.execution.scheduler.policy.AllAtOnceExecutionPolicy; import io.trino.execution.scheduler.policy.ExecutionPolicy; +import io.trino.execution.scheduler.policy.LegacyPhasedExecutionPolicy; import io.trino.execution.scheduler.policy.PhasedExecutionPolicy; -import io.trino.execution.scheduler.policy.PrioritizeUtilizationExecutionPolicy; import io.trino.failuredetector.FailureDetectorModule; import io.trino.memory.ClusterMemoryManager; import io.trino.memory.ForMemoryManager; @@ -285,8 +285,8 @@ protected void setup(Binder binder) MapBinder executionPolicyBinder = newMapBinder(binder, String.class, ExecutionPolicy.class); executionPolicyBinder.addBinding("all-at-once").to(AllAtOnceExecutionPolicy.class); + executionPolicyBinder.addBinding("legacy-phased").to(LegacyPhasedExecutionPolicy.class); executionPolicyBinder.addBinding("phased").to(PhasedExecutionPolicy.class); - executionPolicyBinder.addBinding("prioritize-utilization").to(PrioritizeUtilizationExecutionPolicy.class); install(new QueryExecutionFactoryModule()); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java index b79e59aaa218..6749cc7efe8e 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java @@ -51,7 +51,7 @@ public void testDefaults() .setRemoteTaskMinErrorDuration(new Duration(5, MINUTES)) .setRemoteTaskMaxErrorDuration(new Duration(5, MINUTES)) .setRemoteTaskMaxCallbackThreads(1000) - .setQueryExecutionPolicy("all-at-once") + .setQueryExecutionPolicy("phased") .setQueryMaxRunTime(new Duration(100, DAYS)) .setQueryMaxExecutionTime(new Duration(100, DAYS)) .setQueryMaxPlanningTime(new Duration(10, MINUTES)) @@ -84,7 +84,7 @@ public void testExplicitPropertyMappings() .put("query.remote-task.min-error-duration", "30s") .put("query.remote-task.max-error-duration", "60s") .put("query.remote-task.max-callback-threads", "10") - .put("query.execution-policy", "phased") + .put("query.execution-policy", "legacy-phased") .put("query.max-run-time", "2h") .put("query.max-execution-time", "3h") .put("query.max-planning-time", "1h") @@ -114,7 +114,7 @@ public void testExplicitPropertyMappings() .setRemoteTaskMinErrorDuration(new Duration(60, SECONDS)) .setRemoteTaskMaxErrorDuration(new Duration(60, SECONDS)) .setRemoteTaskMaxCallbackThreads(10) - .setQueryExecutionPolicy("phased") + .setQueryExecutionPolicy("legacy-phased") .setQueryMaxRunTime(new Duration(2, HOURS)) .setQueryMaxExecutionTime(new Duration(3, HOURS)) .setQueryMaxPlanningTime(new Duration(1, HOURS)) diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestLegacyPhasedExecutionSchedule.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestLegacyPhasedExecutionSchedule.java new file mode 100644 index 000000000000..66e7697211db --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestLegacyPhasedExecutionSchedule.java @@ -0,0 +1,129 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler.policy; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.trino.sql.planner.PlanFragment; +import io.trino.sql.planner.plan.PlanFragmentId; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Set; + +import static io.trino.execution.scheduler.policy.PlanUtils.createBroadcastJoinPlanFragment; +import static io.trino.execution.scheduler.policy.PlanUtils.createExchangePlanFragment; +import static io.trino.execution.scheduler.policy.PlanUtils.createJoinPlanFragment; +import static io.trino.execution.scheduler.policy.PlanUtils.createTableScanPlanFragment; +import static io.trino.execution.scheduler.policy.PlanUtils.createUnionPlanFragment; +import static io.trino.sql.planner.plan.JoinNode.Type.INNER; +import static io.trino.sql.planner.plan.JoinNode.Type.RIGHT; +import static org.testng.Assert.assertEquals; + +public class TestLegacyPhasedExecutionSchedule +{ + @Test + public void testExchange() + { + PlanFragment aFragment = createTableScanPlanFragment("a"); + PlanFragment bFragment = createTableScanPlanFragment("b"); + PlanFragment cFragment = createTableScanPlanFragment("c"); + PlanFragment exchangeFragment = createExchangePlanFragment("exchange", aFragment, bFragment, cFragment); + + List> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(aFragment, bFragment, cFragment, exchangeFragment)); + assertEquals(phases, ImmutableList.of( + ImmutableSet.of(exchangeFragment.getId()), + ImmutableSet.of(aFragment.getId()), + ImmutableSet.of(bFragment.getId()), + ImmutableSet.of(cFragment.getId()))); + } + + @Test + public void testUnion() + { + PlanFragment aFragment = createTableScanPlanFragment("a"); + PlanFragment bFragment = createTableScanPlanFragment("b"); + PlanFragment cFragment = createTableScanPlanFragment("c"); + PlanFragment unionFragment = createUnionPlanFragment("union", aFragment, bFragment, cFragment); + + List> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(aFragment, bFragment, cFragment, unionFragment)); + assertEquals(phases, ImmutableList.of( + ImmutableSet.of(unionFragment.getId()), + ImmutableSet.of(aFragment.getId()), + ImmutableSet.of(bFragment.getId()), + ImmutableSet.of(cFragment.getId()))); + } + + @Test + public void testJoin() + { + PlanFragment buildFragment = createTableScanPlanFragment("build"); + PlanFragment probeFragment = createTableScanPlanFragment("probe"); + PlanFragment joinFragment = createJoinPlanFragment(INNER, "join", buildFragment, probeFragment); + + List> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment, probeFragment)); + assertEquals(phases, ImmutableList.of(ImmutableSet.of(joinFragment.getId()), ImmutableSet.of(buildFragment.getId()), ImmutableSet.of(probeFragment.getId()))); + } + + @Test + public void testRightJoin() + { + PlanFragment buildFragment = createTableScanPlanFragment("build"); + PlanFragment probeFragment = createTableScanPlanFragment("probe"); + PlanFragment joinFragment = createJoinPlanFragment(RIGHT, "join", buildFragment, probeFragment); + + List> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment, probeFragment)); + assertEquals(phases, ImmutableList.of(ImmutableSet.of(joinFragment.getId()), ImmutableSet.of(buildFragment.getId()), ImmutableSet.of(probeFragment.getId()))); + } + + @Test + public void testBroadcastJoin() + { + PlanFragment buildFragment = createTableScanPlanFragment("build"); + PlanFragment joinFragment = createBroadcastJoinPlanFragment("join", buildFragment); + + List> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment)); + assertEquals(phases, ImmutableList.of(ImmutableSet.of(joinFragment.getId(), buildFragment.getId()))); + } + + @Test + public void testJoinWithDeepSources() + { + PlanFragment buildSourceFragment = createTableScanPlanFragment("buildSource"); + PlanFragment buildMiddleFragment = createExchangePlanFragment("buildMiddle", buildSourceFragment); + PlanFragment buildTopFragment = createExchangePlanFragment("buildTop", buildMiddleFragment); + PlanFragment probeSourceFragment = createTableScanPlanFragment("probeSource"); + PlanFragment probeMiddleFragment = createExchangePlanFragment("probeMiddle", probeSourceFragment); + PlanFragment probeTopFragment = createExchangePlanFragment("probeTop", probeMiddleFragment); + PlanFragment joinFragment = createJoinPlanFragment(INNER, "join", buildTopFragment, probeTopFragment); + + List> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of( + joinFragment, + buildTopFragment, + buildMiddleFragment, + buildSourceFragment, + probeTopFragment, + probeMiddleFragment, + probeSourceFragment)); + + assertEquals(phases, ImmutableList.of( + ImmutableSet.of(joinFragment.getId()), + ImmutableSet.of(buildTopFragment.getId()), + ImmutableSet.of(buildMiddleFragment.getId()), + ImmutableSet.of(buildSourceFragment.getId()), + ImmutableSet.of(probeTopFragment.getId()), + ImmutableSet.of(probeMiddleFragment.getId()), + ImmutableSet.of(probeSourceFragment.getId()))); + } +} diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPhasedExecutionSchedule.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPhasedExecutionSchedule.java index b9466d0d9891..58c5a5b6090e 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPhasedExecutionSchedule.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPhasedExecutionSchedule.java @@ -13,117 +13,325 @@ */ package io.trino.execution.scheduler.policy; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.ListenableFuture; +import io.trino.execution.ExecutionFailureInfo; +import io.trino.execution.Lifespan; +import io.trino.execution.RemoteTask; +import io.trino.execution.StageId; +import io.trino.execution.StateMachine.StateChangeListener; +import io.trino.execution.TaskId; +import io.trino.execution.TaskStatus; +import io.trino.execution.scheduler.StageExecution; +import io.trino.execution.scheduler.TaskLifecycleListener; +import io.trino.execution.scheduler.policy.PhasedExecutionSchedule.FragmentsEdge; +import io.trino.metadata.InternalNode; +import io.trino.metadata.Split; import io.trino.sql.planner.PlanFragment; import io.trino.sql.planner.plan.PlanFragmentId; +import io.trino.sql.planner.plan.PlanNodeId; +import org.jgrapht.DirectedGraph; import org.testng.annotations.Test; import java.util.List; +import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.execution.scheduler.StageExecution.State.FINISHED; +import static io.trino.execution.scheduler.StageExecution.State.FLUSHING; +import static io.trino.execution.scheduler.policy.PlanUtils.createAggregationFragment; +import static io.trino.execution.scheduler.policy.PlanUtils.createBroadcastAndPartitionedJoinPlanFragment; import static io.trino.execution.scheduler.policy.PlanUtils.createBroadcastJoinPlanFragment; -import static io.trino.execution.scheduler.policy.PlanUtils.createExchangePlanFragment; import static io.trino.execution.scheduler.policy.PlanUtils.createJoinPlanFragment; import static io.trino.execution.scheduler.policy.PlanUtils.createTableScanPlanFragment; -import static io.trino.execution.scheduler.policy.PlanUtils.createUnionPlanFragment; +import static io.trino.sql.planner.plan.JoinNode.DistributionType.PARTITIONED; +import static io.trino.sql.planner.plan.JoinNode.DistributionType.REPLICATED; import static io.trino.sql.planner.plan.JoinNode.Type.INNER; -import static io.trino.sql.planner.plan.JoinNode.Type.RIGHT; -import static org.testng.Assert.assertEquals; +import static java.util.Objects.requireNonNull; +import static org.assertj.core.api.Assertions.assertThat; public class TestPhasedExecutionSchedule { @Test - public void testExchange() + public void testPartitionedJoin() { - PlanFragment aFragment = createTableScanPlanFragment("a"); - PlanFragment bFragment = createTableScanPlanFragment("b"); - PlanFragment cFragment = createTableScanPlanFragment("c"); - PlanFragment exchangeFragment = createExchangePlanFragment("exchange", aFragment, bFragment, cFragment); - - List> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(aFragment, bFragment, cFragment, exchangeFragment)); - assertEquals(phases, ImmutableList.of( - ImmutableSet.of(exchangeFragment.getId()), - ImmutableSet.of(aFragment.getId()), - ImmutableSet.of(bFragment.getId()), - ImmutableSet.of(cFragment.getId()))); + PlanFragment buildFragment = createTableScanPlanFragment("build"); + PlanFragment probeFragment = createTableScanPlanFragment("probe"); + PlanFragment joinFragment = createJoinPlanFragment(INNER, PARTITIONED, "join", buildFragment, probeFragment); + + TestingStageExecution buildStage = new TestingStageExecution(buildFragment); + TestingStageExecution probeStage = new TestingStageExecution(probeFragment); + TestingStageExecution joinStage = new TestingStageExecution(joinFragment); + + PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(buildStage, probeStage, joinStage)); + DirectedGraph dependencies = schedule.getFragmentDependency(); + + // single dependency between build and probe stages + assertThat(dependencies.edgeSet()).containsExactlyInAnyOrder(new FragmentsEdge(buildFragment.getId(), probeFragment.getId())); + + // build and join stage should start immediately + assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder(joinFragment.getId(), buildFragment.getId()); + + // probe stage should start after build stage is completed + ListenableFuture rescheduleFuture = schedule.getRescheduleFuture().orElseThrow(); + assertThat(rescheduleFuture).isNotDone(); + buildStage.setState(FLUSHING); + assertThat(rescheduleFuture).isDone(); + schedule.schedule(); + assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder(joinFragment.getId(), probeFragment.getId()); + + // make sure scheduler finishes + rescheduleFuture = schedule.getRescheduleFuture().orElseThrow(); + assertThat(rescheduleFuture).isNotDone(); + probeStage.setState(FINISHED); + assertThat(rescheduleFuture).isNotDone(); + joinStage.setState(FINISHED); + schedule.schedule(); + assertThat(getActiveFragments(schedule)).isEmpty(); + assertThat(schedule.isFinished()).isTrue(); } @Test - public void testUnion() + public void testBroadcastSourceJoin() { - PlanFragment aFragment = createTableScanPlanFragment("a"); - PlanFragment bFragment = createTableScanPlanFragment("b"); - PlanFragment cFragment = createTableScanPlanFragment("c"); - PlanFragment unionFragment = createUnionPlanFragment("union", aFragment, bFragment, cFragment); - - List> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(aFragment, bFragment, cFragment, unionFragment)); - assertEquals(phases, ImmutableList.of( - ImmutableSet.of(unionFragment.getId()), - ImmutableSet.of(aFragment.getId()), - ImmutableSet.of(bFragment.getId()), - ImmutableSet.of(cFragment.getId()))); + PlanFragment buildFragment = createTableScanPlanFragment("build"); + PlanFragment joinSourceFragment = createBroadcastJoinPlanFragment("probe", buildFragment); + + TestingStageExecution buildStage = new TestingStageExecution(buildFragment); + TestingStageExecution joinSourceStage = new TestingStageExecution(joinSourceFragment); + + PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(buildStage, joinSourceStage)); + DirectedGraph dependencies = schedule.getFragmentDependency(); + + // single dependency between build and join stages + assertThat(dependencies.edgeSet()).containsExactlyInAnyOrder(new FragmentsEdge(buildFragment.getId(), joinSourceFragment.getId())); + + // build stage should start immediately + assertThat(getActiveFragments(schedule)).containsExactly(buildFragment.getId()); + + // join stage should start after build stage buffer is full + buildStage.setAnyTaskBlocked(true); + schedule.schedule(); + assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder(joinSourceFragment.getId(), buildFragment.getId()); } @Test - public void testJoin() + public void testAggregation() { + PlanFragment sourceFragment = createTableScanPlanFragment("probe"); + PlanFragment aggregationFragment = createAggregationFragment("aggregation", sourceFragment); PlanFragment buildFragment = createTableScanPlanFragment("build"); - PlanFragment probeFragment = createTableScanPlanFragment("probe"); - PlanFragment joinFragment = createJoinPlanFragment(INNER, "join", buildFragment, probeFragment); + PlanFragment joinFragment = createJoinPlanFragment(INNER, REPLICATED, "join", buildFragment, aggregationFragment); + + TestingStageExecution sourceStage = new TestingStageExecution(sourceFragment); + TestingStageExecution aggregationStage = new TestingStageExecution(aggregationFragment); + TestingStageExecution buildStage = new TestingStageExecution(buildFragment); + TestingStageExecution joinStage = new TestingStageExecution(joinFragment); - List> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment, probeFragment)); - assertEquals(phases, ImmutableList.of(ImmutableSet.of(joinFragment.getId()), ImmutableSet.of(buildFragment.getId()), ImmutableSet.of(probeFragment.getId()))); + PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(sourceStage, aggregationStage, buildStage, joinStage)); + DirectedGraph dependencies = schedule.getFragmentDependency(); + + // aggregation and source stage should start immediately, join stage should wait for build stage to complete + assertThat(dependencies.edgeSet()).containsExactly(new FragmentsEdge(buildFragment.getId(), joinFragment.getId())); + assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder(buildFragment.getId(), sourceFragment.getId(), aggregationFragment.getId()); } @Test - public void testRightJoin() + public void testStageWithBroadcastAndPartitionedJoin() { - PlanFragment buildFragment = createTableScanPlanFragment("build"); + PlanFragment broadcastBuildFragment = createTableScanPlanFragment("broadcast_build"); + PlanFragment partitionedBuildFragment = createTableScanPlanFragment("partitioned_build"); PlanFragment probeFragment = createTableScanPlanFragment("probe"); - PlanFragment joinFragment = createJoinPlanFragment(RIGHT, "join", buildFragment, probeFragment); + PlanFragment joinFragment = createBroadcastAndPartitionedJoinPlanFragment("join", broadcastBuildFragment, partitionedBuildFragment, probeFragment); + + TestingStageExecution broadcastBuildStage = new TestingStageExecution(broadcastBuildFragment); + TestingStageExecution partitionedBuildStage = new TestingStageExecution(partitionedBuildFragment); + TestingStageExecution probeStage = new TestingStageExecution(probeFragment); + TestingStageExecution joinStage = new TestingStageExecution(joinFragment); - List> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment, probeFragment)); - assertEquals(phases, ImmutableList.of(ImmutableSet.of(joinFragment.getId()), ImmutableSet.of(buildFragment.getId()), ImmutableSet.of(probeFragment.getId()))); + PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of( + broadcastBuildStage, partitionedBuildStage, probeStage, joinStage)); + DirectedGraph dependencies = schedule.getFragmentDependency(); + + // join stage should start immediately because partitioned join forces that + assertThat(dependencies.edgeSet()).containsExactlyInAnyOrder( + new FragmentsEdge(broadcastBuildFragment.getId(), probeFragment.getId()), + new FragmentsEdge(partitionedBuildFragment.getId(), probeFragment.getId()), + new FragmentsEdge(broadcastBuildFragment.getId(), joinFragment.getId())); + assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder( + broadcastBuildFragment.getId(), + partitionedBuildFragment.getId(), + joinFragment.getId()); + + // completing single build dependency shouldn't cause probe stage to start + broadcastBuildStage.setState(FLUSHING); + schedule.schedule(); + assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder( + partitionedBuildFragment.getId(), + joinFragment.getId()); + + // completing all build dependencies should cause probe stage to start + partitionedBuildStage.setState(FLUSHING); + schedule.schedule(); + assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder( + probeFragment.getId(), + joinFragment.getId()); } - @Test - public void testBroadcastJoin() + private Set getActiveFragments(PhasedExecutionSchedule schedule) { - PlanFragment buildFragment = createTableScanPlanFragment("build"); - PlanFragment joinFragment = createBroadcastJoinPlanFragment("join", buildFragment); - - List> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment)); - assertEquals(phases, ImmutableList.of(ImmutableSet.of(joinFragment.getId(), buildFragment.getId()))); + return schedule.getActiveStages().stream() + .map(stage -> stage.getFragment().getId()) + .collect(toImmutableSet()); } - @Test - public void testJoinWithDeepSources() + private static class TestingStageExecution + implements StageExecution { - PlanFragment buildSourceFragment = createTableScanPlanFragment("buildSource"); - PlanFragment buildMiddleFragment = createExchangePlanFragment("buildMiddle", buildSourceFragment); - PlanFragment buildTopFragment = createExchangePlanFragment("buildTop", buildMiddleFragment); - PlanFragment probeSourceFragment = createTableScanPlanFragment("probeSource"); - PlanFragment probeMiddleFragment = createExchangePlanFragment("probeMiddle", probeSourceFragment); - PlanFragment probeTopFragment = createExchangePlanFragment("probeTop", probeMiddleFragment); - PlanFragment joinFragment = createJoinPlanFragment(INNER, "join", buildTopFragment, probeTopFragment); - - List> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of( - joinFragment, - buildTopFragment, - buildMiddleFragment, - buildSourceFragment, - probeTopFragment, - probeMiddleFragment, - probeSourceFragment)); - - assertEquals(phases, ImmutableList.of( - ImmutableSet.of(joinFragment.getId()), - ImmutableSet.of(buildTopFragment.getId()), - ImmutableSet.of(buildMiddleFragment.getId()), - ImmutableSet.of(buildSourceFragment.getId()), - ImmutableSet.of(probeTopFragment.getId()), - ImmutableSet.of(probeMiddleFragment.getId()), - ImmutableSet.of(probeSourceFragment.getId()))); + private final PlanFragment fragment; + private StateChangeListener stateChangeListener; + private boolean anyTaskBlocked; + private State state = State.SCHEDULING; + + public TestingStageExecution(PlanFragment fragment) + { + this.fragment = requireNonNull(fragment, "fragment is null"); + } + + @Override + public PlanFragment getFragment() + { + return fragment; + } + + @Override + public boolean isAnyTaskBlocked() + { + return anyTaskBlocked; + } + + public void setAnyTaskBlocked(boolean anyTaskBlocked) + { + this.anyTaskBlocked = anyTaskBlocked; + } + + public void setState(State state) + { + this.state = state; + if (stateChangeListener != null) { + stateChangeListener.stateChanged(state); + } + } + + @Override + public State getState() + { + return state; + } + + @Override + public void addStateChangeListener(StateChangeListener stateChangeListener) + { + this.stateChangeListener = requireNonNull(stateChangeListener, "stateChangeListener is null"); + } + + @Override + public StageId getStageId() + { + throw new UnsupportedOperationException(); + } + + @Override + public int getAttemptId() + { + throw new UnsupportedOperationException(); + } + + @Override + public void beginScheduling() + { + throw new UnsupportedOperationException(); + } + + @Override + public void transitionToSchedulingSplits() + { + throw new UnsupportedOperationException(); + } + + @Override + public void addCompletedDriverGroupsChangedListener(Consumer> newlyCompletedDriverGroupConsumer) + { + throw new UnsupportedOperationException(); + } + + @Override + public TaskLifecycleListener getTaskLifecycleListener() + { + throw new UnsupportedOperationException(); + } + + @Override + public void schedulingComplete() + { + throw new UnsupportedOperationException(); + } + + @Override + public void schedulingComplete(PlanNodeId partitionedSource) + { + throw new UnsupportedOperationException(); + } + + @Override + public void cancel() + { + throw new UnsupportedOperationException(); + } + + @Override + public void abort() + { + throw new UnsupportedOperationException(); + } + + @Override + public void recordGetSplitTime(long start) + { + throw new UnsupportedOperationException(); + } + + @Override + public Optional scheduleTask(InternalNode node, int partition, Multimap initialSplits, Multimap noMoreSplitsForLifespan) + { + throw new UnsupportedOperationException(); + } + + @Override + public void failTask(TaskId taskId, Throwable failureCause) + { + throw new UnsupportedOperationException(); + } + + @Override + public List getAllTasks() + { + throw new UnsupportedOperationException(); + } + + @Override + public List getTaskStatuses() + { + throw new UnsupportedOperationException(); + } + + @Override + public Optional getFailureCause() + { + throw new UnsupportedOperationException(); + } } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPrioritizeUtilizationExecutionSchedule.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPrioritizeUtilizationExecutionSchedule.java deleted file mode 100644 index bd3952793f6d..000000000000 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/policy/TestPrioritizeUtilizationExecutionSchedule.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.execution.scheduler.policy; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; -import com.google.common.util.concurrent.ListenableFuture; -import io.trino.execution.ExecutionFailureInfo; -import io.trino.execution.Lifespan; -import io.trino.execution.RemoteTask; -import io.trino.execution.StageId; -import io.trino.execution.StateMachine.StateChangeListener; -import io.trino.execution.TaskId; -import io.trino.execution.TaskStatus; -import io.trino.execution.scheduler.StageExecution; -import io.trino.execution.scheduler.TaskLifecycleListener; -import io.trino.execution.scheduler.policy.PrioritizeUtilizationExecutionSchedule.FragmentsEdge; -import io.trino.metadata.InternalNode; -import io.trino.metadata.Split; -import io.trino.sql.planner.PlanFragment; -import io.trino.sql.planner.plan.PlanFragmentId; -import io.trino.sql.planner.plan.PlanNodeId; -import org.jgrapht.DirectedGraph; -import org.testng.annotations.Test; - -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.function.Consumer; - -import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static io.trino.execution.scheduler.StageExecution.State.FINISHED; -import static io.trino.execution.scheduler.StageExecution.State.FLUSHING; -import static io.trino.execution.scheduler.policy.PlanUtils.createAggregationFragment; -import static io.trino.execution.scheduler.policy.PlanUtils.createBroadcastAndPartitionedJoinPlanFragment; -import static io.trino.execution.scheduler.policy.PlanUtils.createBroadcastJoinPlanFragment; -import static io.trino.execution.scheduler.policy.PlanUtils.createJoinPlanFragment; -import static io.trino.execution.scheduler.policy.PlanUtils.createTableScanPlanFragment; -import static io.trino.sql.planner.plan.JoinNode.DistributionType.PARTITIONED; -import static io.trino.sql.planner.plan.JoinNode.DistributionType.REPLICATED; -import static io.trino.sql.planner.plan.JoinNode.Type.INNER; -import static java.util.Objects.requireNonNull; -import static org.assertj.core.api.Assertions.assertThat; - -public class TestPrioritizeUtilizationExecutionSchedule -{ - @Test - public void testPartitionedJoin() - { - PlanFragment buildFragment = createTableScanPlanFragment("build"); - PlanFragment probeFragment = createTableScanPlanFragment("probe"); - PlanFragment joinFragment = createJoinPlanFragment(INNER, PARTITIONED, "join", buildFragment, probeFragment); - - TestingStageExecution buildStage = new TestingStageExecution(buildFragment); - TestingStageExecution probeStage = new TestingStageExecution(probeFragment); - TestingStageExecution joinStage = new TestingStageExecution(joinFragment); - - PrioritizeUtilizationExecutionSchedule schedule = PrioritizeUtilizationExecutionSchedule.forStages(ImmutableSet.of(buildStage, probeStage, joinStage)); - DirectedGraph dependencies = schedule.getFragmentDependency(); - - // single dependency between build and probe stages - assertThat(dependencies.edgeSet()).containsExactlyInAnyOrder(new FragmentsEdge(buildFragment.getId(), probeFragment.getId())); - - // build and join stage should start immediately - assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder(joinFragment.getId(), buildFragment.getId()); - - // probe stage should start after build stage is completed - ListenableFuture rescheduleFuture = schedule.getRescheduleFuture().orElseThrow(); - assertThat(rescheduleFuture).isNotDone(); - buildStage.setState(FLUSHING); - assertThat(rescheduleFuture).isDone(); - schedule.schedule(); - assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder(joinFragment.getId(), probeFragment.getId()); - - // make sure scheduler finishes - rescheduleFuture = schedule.getRescheduleFuture().orElseThrow(); - assertThat(rescheduleFuture).isNotDone(); - probeStage.setState(FINISHED); - assertThat(rescheduleFuture).isNotDone(); - joinStage.setState(FINISHED); - schedule.schedule(); - assertThat(getActiveFragments(schedule)).isEmpty(); - assertThat(schedule.isFinished()).isTrue(); - } - - @Test - public void testBroadcastSourceJoin() - { - PlanFragment buildFragment = createTableScanPlanFragment("build"); - PlanFragment joinSourceFragment = createBroadcastJoinPlanFragment("probe", buildFragment); - - TestingStageExecution buildStage = new TestingStageExecution(buildFragment); - TestingStageExecution joinSourceStage = new TestingStageExecution(joinSourceFragment); - - PrioritizeUtilizationExecutionSchedule schedule = PrioritizeUtilizationExecutionSchedule.forStages(ImmutableSet.of(buildStage, joinSourceStage)); - DirectedGraph dependencies = schedule.getFragmentDependency(); - - // single dependency between build and join stages - assertThat(dependencies.edgeSet()).containsExactlyInAnyOrder(new FragmentsEdge(buildFragment.getId(), joinSourceFragment.getId())); - - // build stage should start immediately - assertThat(getActiveFragments(schedule)).containsExactly(buildFragment.getId()); - - // join stage should start after build stage buffer is full - buildStage.setAnyTaskBlocked(true); - schedule.schedule(); - assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder(joinSourceFragment.getId(), buildFragment.getId()); - } - - @Test - public void testAggregation() - { - PlanFragment sourceFragment = createTableScanPlanFragment("probe"); - PlanFragment aggregationFragment = createAggregationFragment("aggregation", sourceFragment); - PlanFragment buildFragment = createTableScanPlanFragment("build"); - PlanFragment joinFragment = createJoinPlanFragment(INNER, REPLICATED, "join", buildFragment, aggregationFragment); - - TestingStageExecution sourceStage = new TestingStageExecution(sourceFragment); - TestingStageExecution aggregationStage = new TestingStageExecution(aggregationFragment); - TestingStageExecution buildStage = new TestingStageExecution(buildFragment); - TestingStageExecution joinStage = new TestingStageExecution(joinFragment); - - PrioritizeUtilizationExecutionSchedule schedule = PrioritizeUtilizationExecutionSchedule.forStages(ImmutableSet.of(sourceStage, aggregationStage, buildStage, joinStage)); - DirectedGraph dependencies = schedule.getFragmentDependency(); - - // aggregation and source stage should start immediately, join stage should wait for build stage to complete - assertThat(dependencies.edgeSet()).containsExactly(new FragmentsEdge(buildFragment.getId(), joinFragment.getId())); - assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder(buildFragment.getId(), sourceFragment.getId(), aggregationFragment.getId()); - } - - @Test - public void testStageWithBroadcastAndPartitionedJoin() - { - PlanFragment broadcastBuildFragment = createTableScanPlanFragment("broadcast_build"); - PlanFragment partitionedBuildFragment = createTableScanPlanFragment("partitioned_build"); - PlanFragment probeFragment = createTableScanPlanFragment("probe"); - PlanFragment joinFragment = createBroadcastAndPartitionedJoinPlanFragment("join", broadcastBuildFragment, partitionedBuildFragment, probeFragment); - - TestingStageExecution broadcastBuildStage = new TestingStageExecution(broadcastBuildFragment); - TestingStageExecution partitionedBuildStage = new TestingStageExecution(partitionedBuildFragment); - TestingStageExecution probeStage = new TestingStageExecution(probeFragment); - TestingStageExecution joinStage = new TestingStageExecution(joinFragment); - - PrioritizeUtilizationExecutionSchedule schedule = PrioritizeUtilizationExecutionSchedule.forStages(ImmutableSet.of( - broadcastBuildStage, partitionedBuildStage, probeStage, joinStage)); - DirectedGraph dependencies = schedule.getFragmentDependency(); - - // join stage should start immediately because partitioned join forces that - assertThat(dependencies.edgeSet()).containsExactlyInAnyOrder( - new FragmentsEdge(broadcastBuildFragment.getId(), probeFragment.getId()), - new FragmentsEdge(partitionedBuildFragment.getId(), probeFragment.getId()), - new FragmentsEdge(broadcastBuildFragment.getId(), joinFragment.getId())); - assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder( - broadcastBuildFragment.getId(), - partitionedBuildFragment.getId(), - joinFragment.getId()); - - // completing single build dependency shouldn't cause probe stage to start - broadcastBuildStage.setState(FLUSHING); - schedule.schedule(); - assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder( - partitionedBuildFragment.getId(), - joinFragment.getId()); - - // completing all build dependencies should cause probe stage to start - partitionedBuildStage.setState(FLUSHING); - schedule.schedule(); - assertThat(getActiveFragments(schedule)).containsExactlyInAnyOrder( - probeFragment.getId(), - joinFragment.getId()); - } - - private Set getActiveFragments(PrioritizeUtilizationExecutionSchedule schedule) - { - return schedule.getActiveStages().stream() - .map(stage -> stage.getFragment().getId()) - .collect(toImmutableSet()); - } - - private static class TestingStageExecution - implements StageExecution - { - private final PlanFragment fragment; - private StateChangeListener stateChangeListener; - private boolean anyTaskBlocked; - private State state = State.SCHEDULING; - - public TestingStageExecution(PlanFragment fragment) - { - this.fragment = requireNonNull(fragment, "fragment is null"); - } - - @Override - public PlanFragment getFragment() - { - return fragment; - } - - @Override - public boolean isAnyTaskBlocked() - { - return anyTaskBlocked; - } - - public void setAnyTaskBlocked(boolean anyTaskBlocked) - { - this.anyTaskBlocked = anyTaskBlocked; - } - - public void setState(State state) - { - this.state = state; - if (stateChangeListener != null) { - stateChangeListener.stateChanged(state); - } - } - - @Override - public State getState() - { - return state; - } - - @Override - public void addStateChangeListener(StateChangeListener stateChangeListener) - { - this.stateChangeListener = requireNonNull(stateChangeListener, "stateChangeListener is null"); - } - - @Override - public StageId getStageId() - { - throw new UnsupportedOperationException(); - } - - @Override - public int getAttemptId() - { - throw new UnsupportedOperationException(); - } - - @Override - public void beginScheduling() - { - throw new UnsupportedOperationException(); - } - - @Override - public void transitionToSchedulingSplits() - { - throw new UnsupportedOperationException(); - } - - @Override - public void addCompletedDriverGroupsChangedListener(Consumer> newlyCompletedDriverGroupConsumer) - { - throw new UnsupportedOperationException(); - } - - @Override - public TaskLifecycleListener getTaskLifecycleListener() - { - throw new UnsupportedOperationException(); - } - - @Override - public void schedulingComplete() - { - throw new UnsupportedOperationException(); - } - - @Override - public void schedulingComplete(PlanNodeId partitionedSource) - { - throw new UnsupportedOperationException(); - } - - @Override - public void cancel() - { - throw new UnsupportedOperationException(); - } - - @Override - public void abort() - { - throw new UnsupportedOperationException(); - } - - @Override - public void recordGetSplitTime(long start) - { - throw new UnsupportedOperationException(); - } - - @Override - public Optional scheduleTask(InternalNode node, int partition, Multimap initialSplits, Multimap noMoreSplitsForLifespan) - { - throw new UnsupportedOperationException(); - } - - @Override - public void failTask(TaskId taskId, Throwable failureCause) - { - throw new UnsupportedOperationException(); - } - - @Override - public List getAllTasks() - { - throw new UnsupportedOperationException(); - } - - @Override - public List getTaskStatuses() - { - throw new UnsupportedOperationException(); - } - - @Override - public Optional getFailureCause() - { - throw new UnsupportedOperationException(); - } - } -}