Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,12 @@ public AsGroupedSourceScheduler(SourceScheduler sourceScheduler)
pendingCompleted = new ArrayList<>();
}

@Override
public void start()
{
sourceScheduler.start();
}

@Override
public ScheduleResult schedule()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(

return new StageScheduler()
{
@Override
public void start()
{
sourcePartitionedScheduler.start();
}

@Override
public ScheduleResult schedule()
{
Expand Down Expand Up @@ -251,6 +257,18 @@ public synchronized void noMoreLifespans()
whenFinishedOrNewLifespanAdded = SettableFuture.create();
}

@Override
public synchronized void start()
{
// Avoid deadlocks by immediately scheduling a task for collecting dynamic filters because:
// * there can be task in other stage blocked waiting for the dynamic filters, or
// * connector split source for this stage might be blocked waiting the dynamic filters.
if (dynamicFilterService.isCollectingTaskNeeded(stageExecution.getStageId().getQueryId(), stageExecution.getFragment())) {
stageExecution.beginScheduling();
createTaskOnRandomNode();
}
}

@Override
public synchronized ScheduleResult schedule()
{
Expand Down Expand Up @@ -406,13 +424,6 @@ else if (pendingSplits.isEmpty()) {
return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
}

if (anyBlockedOnNextSplitBatch
&& scheduledTasks.isEmpty()
&& dynamicFilterService.isCollectingTaskNeeded(stageExecution.getStageId().getQueryId(), stageExecution.getFragment())) {
// schedule a task for collecting dynamic filters in case probe split generator is waiting for them
createTaskOnRandomNode().ifPresent(overallNewTasks::add);
}

boolean anySourceTaskBlocked = this.anySourceTaskBlocked.getAsBoolean();
if (anySourceTaskBlocked) {
// Dynamic filters might not be collected due to build side source tasks being blocked on full buffer.
Expand Down Expand Up @@ -541,13 +552,13 @@ private Set<RemoteTask> assignSplits(Multimap<InternalNode, Split> splitAssignme
return newTasks.build();
}

private Optional<RemoteTask> createTaskOnRandomNode()
private void createTaskOnRandomNode()
{
checkState(scheduledTasks.isEmpty(), "Stage task is already scheduled on node");
List<InternalNode> allNodes = splitPlacementPolicy.allNodes();
checkState(allNodes.size() > 0, "No nodes available");
InternalNode node = allNodes.get(ThreadLocalRandom.current().nextInt(0, allNodes.size()));
return scheduleTask(node, ImmutableMultimap.of(), ImmutableMultimap.of());
scheduleTask(node, ImmutableMultimap.of(), ImmutableMultimap.of());
Comment thread
sopel39 marked this conversation as resolved.
Outdated
}

private Set<RemoteTask> finalizeTaskCreationIfNecessary()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

public interface SourceScheduler
{
void start();

ScheduleResult schedule();

void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,7 @@ public void schedule()
checkState(started.compareAndSet(false, true), "already started");

try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
stageSchedulers.values().forEach(StageScheduler::start);
while (!executionSchedule.isFinished()) {
List<ListenableFuture<Void>> blockedStages = new ArrayList<>();
StagesScheduleResult stagesScheduleResult = executionSchedule.getStagesToSchedule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@
public interface StageScheduler
extends Closeable
{
/**
* Called by the query scheduler when the scheduling process begins.
* This method is called before the ExecutionSchedule takes a decision
* to schedule a stage but after the query scheduling has been fully initialized.
* Within this method the scheduler may decide to schedule tasks that
* are necessary for query execution to make progress.
* For example the scheduler may decide to schedule a task without
* assigning any splits to unblock dynamic filter collection.
*/
default void start() {}
Comment thread
sopel39 marked this conversation as resolved.
Outdated

/**
* Schedules as much work as possible without blocking.
* The schedule results is a hint to the query scheduler if and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,28 @@
package io.trino.execution.scheduler.policy;

import io.trino.execution.scheduler.StageExecution;
import io.trino.server.DynamicFilterService;

import javax.inject.Inject;

import java.util.Collection;

import static java.util.Objects.requireNonNull;

public class PhasedExecutionPolicy
implements ExecutionPolicy
{
private final DynamicFilterService dynamicFilterService;

@Inject
public PhasedExecutionPolicy(DynamicFilterService dynamicFilterService)
{
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
}

@Override
public ExecutionSchedule createExecutionSchedule(Collection<StageExecution> stages)
{
return PhasedExecutionSchedule.forStages(stages);
return PhasedExecutionSchedule.forStages(stages, dynamicFilterService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.util.concurrent.SettableFuture;
import io.trino.execution.scheduler.StageExecution;
import io.trino.execution.scheduler.StageExecution.State;
import io.trino.server.DynamicFilterService;
import io.trino.spi.QueryId;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.ExchangeNode;
Expand Down Expand Up @@ -88,6 +90,7 @@ public class PhasedExecutionSchedule
private final List<PlanFragmentId> sortedFragments = new ArrayList<>();
private final Map<PlanFragmentId, StageExecution> stagesByFragmentId;
private final Set<StageExecution> activeStages = new LinkedHashSet<>();
private final DynamicFilterService dynamicFilterService;

/**
* Set by {@link PhasedExecutionSchedule#init(Collection)} method.
Expand All @@ -97,28 +100,26 @@ public class PhasedExecutionSchedule
@GuardedBy("this")
private SettableFuture<Void> rescheduleFuture = SettableFuture.create();

public static PhasedExecutionSchedule forStages(Collection<StageExecution> stages)
public static PhasedExecutionSchedule forStages(Collection<StageExecution> stages, DynamicFilterService dynamicFilterService)
{
PhasedExecutionSchedule schedule = new PhasedExecutionSchedule(stages);
PhasedExecutionSchedule schedule = new PhasedExecutionSchedule(stages, dynamicFilterService);
schedule.init(stages);
return schedule;
}

private PhasedExecutionSchedule(Collection<StageExecution> stages)
private PhasedExecutionSchedule(Collection<StageExecution> stages, DynamicFilterService dynamicFilterService)
{
fragmentDependency = new DefaultDirectedGraph<>(new FragmentsEdgeFactory());
fragmentTopology = new DefaultDirectedGraph<>(new FragmentsEdgeFactory());
stagesByFragmentId = stages.stream()
.collect(toImmutableMap(stage -> stage.getFragment().getId(), identity()));
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
}

private void init(Collection<StageExecution> stages)
{
ImmutableSet.Builder<PlanFragmentId> fragmentsToExecute = ImmutableSet.builder();
fragmentsToExecute.addAll(extractDependenciesAndReturnNonLazyFragments(
stages.stream()
.map(StageExecution::getFragment)
.collect(toImmutableList())));
fragmentsToExecute.addAll(extractDependenciesAndReturnNonLazyFragments(stages));
// start stages without any dependencies
fragmentDependency.vertexSet().stream()
.filter(fragmentId -> fragmentDependency.inDegreeOf(fragmentId) == 0)
Expand Down Expand Up @@ -267,13 +268,24 @@ private boolean isStageCompleted(StageExecution stage)
return state == SCHEDULED || state == RUNNING || state == FLUSHING || state.isDone();
}

private Set<PlanFragmentId> extractDependenciesAndReturnNonLazyFragments(Collection<PlanFragment> fragments)
private Set<PlanFragmentId> extractDependenciesAndReturnNonLazyFragments(Collection<StageExecution> stages)
{
if (stages.isEmpty()) {
return ImmutableSet.of();
}

QueryId queryId = stages.stream()
.map(stage -> stage.getStageId().getQueryId())
.findAny().orElseThrow();
List<PlanFragment> fragments = stages.stream()
.map(StageExecution::getFragment)
.collect(toImmutableList());

// Build a graph where the plan fragments are vertexes and the edges represent
// a before -> after relationship. Destination fragment should be started only
// when source fragment is completed. For example, a join hash build has an edge
// to the join probe.
Visitor visitor = new Visitor(fragments);
Visitor visitor = new Visitor(queryId, fragments);
visitor.processAllFragments();

// Make sure there are no strongly connected components as it would mean circular dependency between stages
Expand All @@ -286,12 +298,14 @@ private Set<PlanFragmentId> extractDependenciesAndReturnNonLazyFragments(Collect
private class Visitor
extends PlanVisitor<FragmentSubGraph, PlanFragmentId>
{
private final QueryId queryId;
private final Map<PlanFragmentId, PlanFragment> fragments;
private final ImmutableSet.Builder<PlanFragmentId> nonLazyFragments = ImmutableSet.builder();
private final Map<PlanFragmentId, FragmentSubGraph> fragmentSubGraphs = new HashMap<>();

public Visitor(Collection<PlanFragment> fragments)
public Visitor(QueryId queryId, Collection<PlanFragment> fragments)
{
this.queryId = queryId;
this.fragments = requireNonNull(fragments, "fragments is null").stream()
.collect(toImmutableMap(PlanFragment::getId, identity()));
}
Expand Down Expand Up @@ -410,7 +424,7 @@ private FragmentSubGraph processJoin(boolean replicated, PlanNode probe, PlanNod
addDependencyEdges(buildSubGraph.getUpstreamFragments(), probeSubGraph.getLazyUpstreamFragments());

boolean currentFragmentLazy = probeSubGraph.isCurrentFragmentLazy() && buildSubGraph.isCurrentFragmentLazy();
if (replicated && currentFragmentLazy) {
if (replicated && currentFragmentLazy && !dynamicFilterService.isStageSchedulingNeededToCollectDynamicFilters(queryId, fragments.get(currentFragmentId))) {
// Do not start join stage (which can also be a source stage with table scans)
// for replicated join until build source stage enters FLUSHING state.
// Broadcast join limit for CBO is set in such a way that build source data should
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,21 @@ public boolean isCollectingTaskNeeded(QueryId queryId, PlanFragment plan)
return false;
}

return !getSourceStageInnerLazyDynamicFilters(plan).isEmpty();
// dynamic filters are collected by additional task only for non-fixed source stage
return plan.getPartitioning().equals(SOURCE_DISTRIBUTION) && !getLazyDynamicFilters(plan).isEmpty();
}

public boolean isStageSchedulingNeededToCollectDynamicFilters(QueryId queryId, PlanFragment plan)
{
DynamicFilterContext context = dynamicFilterContexts.get(queryId);
if (context == null) {
// query has been removed or not registered (e.g dynamic filtering is disabled)
return false;
}

// stage scheduling is not needed to collect dynamic filters for non-fixed source stage, because
// for such stage collecting task is created
return !plan.getPartitioning().equals(SOURCE_DISTRIBUTION) && !getLazyDynamicFilters(plan).isEmpty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
import static io.trino.execution.scheduler.PipelinedStageExecution.createPipelinedStageExecution;
import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL;
import static io.trino.execution.scheduler.SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler;
import static io.trino.execution.scheduler.StageExecution.State.PLANNED;
import static io.trino.execution.scheduler.StageExecution.State.SCHEDULING;
import static io.trino.metadata.MetadataManager.createTestMetadataManager;
import static io.trino.operator.StageExecutionDescriptor.ungroupedExecution;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
Expand Down Expand Up @@ -584,15 +586,20 @@ public void testDynamicFiltersUnblockedOnBlockedBuildSource()
ImmutableMap.of(symbol, new TestingColumnHandle("probeColumnA")),
symbolAllocator.getTypes());

// make sure dynamic filtering collecting task was created immediately
assertEquals(stage.getState(), PLANNED);
scheduler.start();
assertEquals(stage.getAllTasks().size(), 1);
assertEquals(stage.getState(), SCHEDULING);

// make sure dynamic filter is initially blocked
assertFalse(dynamicFilter.isBlocked().isDone());

// make sure dynamic filter is unblocked due to build side source tasks being blocked
ScheduleResult scheduleResult = scheduler.schedule();
assertTrue(dynamicFilter.isBlocked().isDone());

// make sure that an extra task for collecting dynamic filters was created
assertEquals(scheduleResult.getNewTasks().size(), 1);
// no new probe splits should be scheduled
assertEquals(scheduleResult.getSplitsScheduled(), 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import io.trino.execution.scheduler.policy.PhasedExecutionSchedule.FragmentsEdge;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.server.DynamicFilterService;
import io.trino.spi.QueryId;
import io.trino.spi.type.TypeOperators;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
Expand All @@ -40,6 +43,7 @@
import java.util.function.Consumer;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.trino.execution.scheduler.StageExecution.State.ABORTED;
import static io.trino.execution.scheduler.StageExecution.State.FINISHED;
import static io.trino.execution.scheduler.StageExecution.State.FLUSHING;
Expand All @@ -48,6 +52,7 @@
import static io.trino.execution.scheduler.policy.PlanUtils.createBroadcastJoinPlanFragment;
import static io.trino.execution.scheduler.policy.PlanUtils.createJoinPlanFragment;
import static io.trino.execution.scheduler.policy.PlanUtils.createTableScanPlanFragment;
import static io.trino.metadata.MetadataManager.createTestMetadataManager;
import static io.trino.sql.planner.plan.JoinNode.DistributionType.PARTITIONED;
import static io.trino.sql.planner.plan.JoinNode.DistributionType.REPLICATED;
import static io.trino.sql.planner.plan.JoinNode.Type.INNER;
Expand All @@ -56,6 +61,8 @@

public class TestPhasedExecutionSchedule
{
private final DynamicFilterService dynamicFilterService = new DynamicFilterService(createTestMetadataManager(), new TypeOperators(), newDirectExecutorService());

@Test
public void testPartitionedJoin()
{
Expand All @@ -67,7 +74,7 @@ public void testPartitionedJoin()
TestingStageExecution probeStage = new TestingStageExecution(probeFragment);
TestingStageExecution joinStage = new TestingStageExecution(joinFragment);

PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(buildStage, probeStage, joinStage));
PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(buildStage, probeStage, joinStage), dynamicFilterService);
assertThat(schedule.getSortedFragments()).containsExactly(buildFragment.getId(), probeFragment.getId(), joinFragment.getId());

// single dependency between build and probe stages
Expand Down Expand Up @@ -105,7 +112,7 @@ public void testBroadcastSourceJoin()
TestingStageExecution buildStage = new TestingStageExecution(buildFragment);
TestingStageExecution joinSourceStage = new TestingStageExecution(joinSourceFragment);

PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(joinSourceStage, buildStage));
PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(joinSourceStage, buildStage), dynamicFilterService);
assertThat(schedule.getSortedFragments()).containsExactly(buildFragment.getId(), joinSourceFragment.getId());

// single dependency between build and join stages
Expand Down Expand Up @@ -134,7 +141,7 @@ public void testAggregation()
TestingStageExecution buildStage = new TestingStageExecution(buildFragment);
TestingStageExecution joinStage = new TestingStageExecution(joinFragment);

PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(sourceStage, aggregationStage, buildStage, joinStage));
PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(sourceStage, aggregationStage, buildStage, joinStage), dynamicFilterService);
assertThat(schedule.getSortedFragments()).containsExactly(buildFragment.getId(), sourceFragment.getId(), aggregationFragment.getId(), joinFragment.getId());

// aggregation and source stage should start immediately, join stage should wait for build stage to complete
Expand All @@ -156,7 +163,7 @@ public void testDependentStageAbortedBeforeStarted()
TestingStageExecution buildStage = new TestingStageExecution(buildFragment);
TestingStageExecution joinStage = new TestingStageExecution(joinFragment);

PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(sourceStage, aggregationStage, buildStage, joinStage));
PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(sourceStage, aggregationStage, buildStage, joinStage), dynamicFilterService);
assertThat(schedule.getSortedFragments()).containsExactly(buildFragment.getId(), sourceFragment.getId(), aggregationFragment.getId(), joinFragment.getId());

// aggregation and source stage should start immediately, join stage should wait for build stage to complete
Expand Down Expand Up @@ -191,7 +198,7 @@ public void testStageWithBroadcastAndPartitionedJoin()
TestingStageExecution joinStage = new TestingStageExecution(joinFragment);

PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(
broadcastBuildStage, partitionedBuildStage, probeStage, joinStage));
broadcastBuildStage, partitionedBuildStage, probeStage, joinStage), dynamicFilterService);

// join stage should start immediately because partitioned join forces that
DirectedGraph<PlanFragmentId, FragmentsEdge> dependencies = schedule.getFragmentDependency();
Expand Down Expand Up @@ -272,7 +279,7 @@ public void addStateChangeListener(StateChangeListener<State> stateChangeListene
@Override
public StageId getStageId()
{
throw new UnsupportedOperationException();
return new StageId(new QueryId("id"), 0);
}

@Override
Expand Down
Loading