Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import io.trino.execution.RemoteTask;
import io.trino.execution.TableExecuteContext;
import io.trino.execution.TableExecuteContextManager;
Expand Down Expand Up @@ -54,6 +55,8 @@
public class SourcePartitionedScheduler
implements SourceScheduler
{
private static final Logger log = Logger.get(SourcePartitionedScheduler.class);

private enum State
{
/**
Expand Down Expand Up @@ -263,16 +266,19 @@ else if (pendingSplits.isEmpty()) {
splitSource.getCatalogHandle(),
new EmptySplit(splitSource.getCatalogHandle())));
}
log.debug("stage id: %s, node: %s; transitioning to SPLITS_SCHEDULED", stageExecution.getStageId(), partitionedNode);
state = State.SPLITS_SCHEDULED;
}
}
else {
blockedFuture = Optional.of(asVoid(nextSplitBatchFuture));
blockedOnNextSplitBatch = true;
log.debug("stage id: %s, node: %s; blocked on next split batch", stageExecution.getStageId(), partitionedNode);
}
}

if (!pendingSplits.isEmpty() && state == State.INITIALIZED) {
log.debug("stage id: %s, node: %s; transitioning to SPLITS_ADDED", stageExecution.getStageId(), partitionedNode);
state = State.SPLITS_ADDED;
}

Expand All @@ -295,11 +301,16 @@ else if (pendingSplits.isEmpty()) {
}
}

if (blockedOnPlacements) {
log.debug("stage id: %s, node: %s; blocked on placements", stageExecution.getStageId(), partitionedNode);
}

// assign the splits with successful placements
overallNewTasks.addAll(assignSplits(splitAssignment));

// if no new splits will be assigned, update state and attach completion event
if (pendingSplits.isEmpty() && state == State.SPLITS_SCHEDULED) {
log.debug("stage id: %s, node: %s; transitioning to FINISHED", stageExecution.getStageId(), partitionedNode);
state = State.FINISHED;

Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();
Expand All @@ -317,12 +328,14 @@ else if (pendingSplits.isEmpty()) {
}

if (blockedFuture.isEmpty()) {
log.debug("stage id: %s, node: %s; assigned %s splits (not blocked)", stageExecution.getStageId(), partitionedNode, overallSplitAssignmentCount);
return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
}

if (anySourceTaskBlocked.getAsBoolean()) {
// Dynamic filters might not be collected due to build side source tasks being blocked on full buffer.
// In such case probe split generation that is waiting for dynamic filters should be unblocked to prevent deadlock.
log.debug("stage id: %s, node: %s; unblocking dynamic filters", stageExecution.getStageId(), partitionedNode);
dynamicFilterService.unblockStageDynamicFilters(stageExecution.getStageId().getQueryId(), stageExecution.getAttemptId(), stageExecution.getFragment());

if (blockedOnPlacements) {
Expand All @@ -336,15 +349,18 @@ else if (pendingSplits.isEmpty()) {
// The build side blocks due to a full output buffer.
// In the meantime the probe side split cannot be consumed since
// builder side hash table construction has not finished.
log.debug("stage id: %s, node: %s; finalize task creation if necessary", stageExecution.getStageId(), partitionedNode);
overallNewTasks.addAll(finalizeTaskCreationIfNecessary());
}
}

ScheduleResult.BlockedReason blockedReason = blockedOnNextSplitBatch ? WAITING_FOR_SOURCE : SPLIT_QUEUES_FULL;
log.debug("stage id: %s, node: %s; assigned %s splits (blocked reason %s)", stageExecution.getStageId(), partitionedNode, overallSplitAssignmentCount, blockedReason);
return new ScheduleResult(
false,
overallNewTasks.build(),
nonCancellationPropagating(blockedFuture.get()),
blockedOnNextSplitBatch ? WAITING_FOR_SOURCE : SPLIT_QUEUES_FULL,
blockedReason,
overallSplitAssignmentCount);
}

Expand Down