diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java index 17233b702dc5..87bc32991387 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java @@ -18,6 +18,7 @@ import com.google.common.collect.Multimap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.log.Logger; import io.trino.execution.RemoteTask; import io.trino.execution.TableExecuteContext; import io.trino.execution.TableExecuteContextManager; @@ -54,6 +55,8 @@ public class SourcePartitionedScheduler implements SourceScheduler { + private static final Logger log = Logger.get(SourcePartitionedScheduler.class); + private enum State { /** @@ -263,16 +266,19 @@ else if (pendingSplits.isEmpty()) { splitSource.getCatalogHandle(), new EmptySplit(splitSource.getCatalogHandle()))); } + log.debug("stage id: %s, node: %s; transitioning to SPLITS_SCHEDULED", stageExecution.getStageId(), partitionedNode); state = State.SPLITS_SCHEDULED; } } else { blockedFuture = Optional.of(asVoid(nextSplitBatchFuture)); blockedOnNextSplitBatch = true; + log.debug("stage id: %s, node: %s; blocked on next split batch", stageExecution.getStageId(), partitionedNode); } } if (!pendingSplits.isEmpty() && state == State.INITIALIZED) { + log.debug("stage id: %s, node: %s; transitioning to SPLITS_ADDED", stageExecution.getStageId(), partitionedNode); state = State.SPLITS_ADDED; } @@ -295,11 +301,16 @@ else if (pendingSplits.isEmpty()) { } } + if (blockedOnPlacements) { + log.debug("stage id: %s, node: %s; blocked on placements", stageExecution.getStageId(), partitionedNode); + } + // assign the splits with successful placements overallNewTasks.addAll(assignSplits(splitAssignment)); // if no new splits will be assigned, update state and attach completion event if (pendingSplits.isEmpty() && state == State.SPLITS_SCHEDULED) { + log.debug("stage id: %s, node: %s; transitioning to FINISHED", stageExecution.getStageId(), partitionedNode); state = State.FINISHED; Optional> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo(); @@ -317,12 +328,14 @@ else if (pendingSplits.isEmpty()) { } if (blockedFuture.isEmpty()) { + log.debug("stage id: %s, node: %s; assigned %s splits (not blocked)", stageExecution.getStageId(), partitionedNode, overallSplitAssignmentCount); return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount); } if (anySourceTaskBlocked.getAsBoolean()) { // Dynamic filters might not be collected due to build side source tasks being blocked on full buffer. // In such case probe split generation that is waiting for dynamic filters should be unblocked to prevent deadlock. + log.debug("stage id: %s, node: %s; unblocking dynamic filters", stageExecution.getStageId(), partitionedNode); dynamicFilterService.unblockStageDynamicFilters(stageExecution.getStageId().getQueryId(), stageExecution.getAttemptId(), stageExecution.getFragment()); if (blockedOnPlacements) { @@ -336,15 +349,18 @@ else if (pendingSplits.isEmpty()) { // The build side blocks due to a full output buffer. // In the meantime the probe side split cannot be consumed since // builder side hash table construction has not finished. + log.debug("stage id: %s, node: %s; finalize task creation if necessary", stageExecution.getStageId(), partitionedNode); overallNewTasks.addAll(finalizeTaskCreationIfNecessary()); } } + ScheduleResult.BlockedReason blockedReason = blockedOnNextSplitBatch ? WAITING_FOR_SOURCE : SPLIT_QUEUES_FULL; + log.debug("stage id: %s, node: %s; assigned %s splits (blocked reason %s)", stageExecution.getStageId(), partitionedNode, overallSplitAssignmentCount, blockedReason); return new ScheduleResult( false, overallNewTasks.build(), nonCancellationPropagating(blockedFuture.get()), - blockedOnNextSplitBatch ? WAITING_FOR_SOURCE : SPLIT_QUEUES_FULL, + blockedReason, overallSplitAssignmentCount); }