diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ScheduleResult.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ScheduleResult.java index 8bdaf00ba40a..c2cbe5987dc6 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ScheduleResult.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ScheduleResult.java @@ -29,10 +29,8 @@ public class ScheduleResult public enum BlockedReason { WRITER_SCALING, - NO_ACTIVE_DRIVER_GROUP, SPLIT_QUEUES_FULL, WAITING_FOR_SOURCE, - MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE, /**/; } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java index 14bd80029b10..17233b702dc5 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java @@ -18,7 +18,6 @@ import com.google.common.collect.Multimap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import io.trino.execution.RemoteTask; import io.trino.execution.TableExecuteContext; import io.trino.execution.TableExecuteContextManager; @@ -30,7 +29,6 @@ import io.trino.split.SplitSource.SplitBatch; import io.trino.sql.planner.plan.PlanNodeId; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -49,9 +47,6 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.addSuccessCallback; import static io.airlift.concurrent.MoreFutures.getFutureValue; -import static io.airlift.concurrent.MoreFutures.whenAnyComplete; -import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE; -import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.NO_ACTIVE_DRIVER_GROUP; import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL; import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.WAITING_FOR_SOURCE; import static java.util.Objects.requireNonNull; @@ -66,6 +61,11 @@ private enum State */ INITIALIZED, + /** + * At least one split has been added to pendingSplits set. + */ + SPLITS_ADDED, + /** * All splits from underlying SplitSource have been discovered. * No more splits will be added to the pendingSplits set. @@ -89,14 +89,12 @@ private enum State private final BooleanSupplier anySourceTaskBlocked; private final PartitionIdAllocator partitionIdAllocator; private final Map scheduledTasks; + private final Set pendingSplits = new HashSet<>(); - public ListenableFuture nextSplitBatchFuture; - public ListenableFuture placementFuture = immediateVoidFuture(); - public final Set pendingSplits = new HashSet<>(); + private ListenableFuture nextSplitBatchFuture; + private ListenableFuture placementFuture = immediateVoidFuture(); private State state = State.INITIALIZED; - private SettableFuture whenFinished = SettableFuture.create(); - private SourcePartitionedScheduler( StageExecution stageExecution, PlanNodeId partitionedNode, @@ -226,102 +224,85 @@ public synchronized void start() @Override public synchronized ScheduleResult schedule() { - dropListenersFromWhenFinished(); + if (state == State.FINISHED) { + return new ScheduleResult(true, ImmutableSet.of(), 0); + } int overallSplitAssignmentCount = 0; + Multimap splitAssignment = ImmutableMultimap.of(); ImmutableSet.Builder overallNewTasks = ImmutableSet.builder(); - List> overallBlockedFutures = new ArrayList<>(); - boolean anyBlockedOnPlacements = false; - boolean anyBlockedOnNextSplitBatch = false; - boolean anyNotBlocked = false; - - if (state != State.FINISHED) { - if (state == State.SPLITS_SCHEDULED) { - verify(nextSplitBatchFuture == null); - } - else if (pendingSplits.isEmpty()) { - // try to get the next batch - if (nextSplitBatchFuture == null) { - nextSplitBatchFuture = splitSource.getNextBatch(splitBatchSize); + Optional> blockedFuture = Optional.empty(); + boolean blockedOnPlacements = false; + boolean blockedOnNextSplitBatch = false; - long start = System.nanoTime(); - addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start)); - } + if (state == State.SPLITS_SCHEDULED) { + verify(nextSplitBatchFuture == null); + } + else if (pendingSplits.isEmpty()) { + // try to get the next batch + if (nextSplitBatchFuture == null) { + nextSplitBatchFuture = splitSource.getNextBatch(splitBatchSize); - if (nextSplitBatchFuture.isDone()) { - SplitBatch nextSplits = getFutureValue(nextSplitBatchFuture); - nextSplitBatchFuture = null; - pendingSplits.addAll(nextSplits.getSplits()); - if (nextSplits.isLastBatch()) { - if (state == State.INITIALIZED && pendingSplits.isEmpty()) { - // Add an empty split in case no splits have been produced for the source. - // For source operators, they never take input, but they may produce output. - // This is well handled by the execution engine. - // However, there are certain non-source operators that may produce output without any input, - // for example, 1) an AggregationOperator, 2) a HashAggregationOperator where one of the grouping sets is (). - // Scheduling an empty split kicks off necessary driver instantiation to make this work. - pendingSplits.add(new Split( - splitSource.getCatalogHandle(), - new EmptySplit(splitSource.getCatalogHandle()))); - } - state = State.SPLITS_SCHEDULED; - } - } - else { - overallBlockedFutures.add(nextSplitBatchFuture); - anyBlockedOnNextSplitBatch = true; - } + long start = System.nanoTime(); + addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start)); } - if (!anyBlockedOnNextSplitBatch) { - Multimap splitAssignment = ImmutableMultimap.of(); - boolean skip = false; - if (!pendingSplits.isEmpty()) { - if (!placementFuture.isDone()) { - anyBlockedOnPlacements = true; - skip = true; - } - else { // calculate placements for splits - SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits); - splitAssignment = splitPlacementResult.getAssignments(); // remove splits with successful placements - splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here. - overallSplitAssignmentCount += splitAssignment.size(); // if not completed placed, mark scheduleGroup as blocked on placement - if (!pendingSplits.isEmpty()) { - placementFuture = splitPlacementResult.getBlocked(); - overallBlockedFutures.add(placementFuture); - anyBlockedOnPlacements = true; - } + + if (nextSplitBatchFuture.isDone()) { + SplitBatch nextSplits = getFutureValue(nextSplitBatchFuture); + nextSplitBatchFuture = null; + pendingSplits.addAll(nextSplits.getSplits()); + if (nextSplits.isLastBatch()) { + if (state == State.INITIALIZED && pendingSplits.isEmpty()) { + // Add an empty split in case no splits have been produced for the source. + // For source operators, they never take input, but they may produce output. + // This is well handled by the execution engine. + // However, there are certain non-source operators that may produce output without any input, + // for example, 1) an AggregationOperator, 2) a HashAggregationOperator where one of the grouping sets is (). + // Scheduling an empty split kicks off necessary driver instantiation to make this work. + pendingSplits.add(new Split( + splitSource.getCatalogHandle(), + new EmptySplit(splitSource.getCatalogHandle()))); } + state = State.SPLITS_SCHEDULED; } - if (!skip) { // if no new splits will be assigned, update state and attach completion event - if (pendingSplits.isEmpty() && state == State.SPLITS_SCHEDULED) { - state = State.FINISHED; - } + } + else { + blockedFuture = Optional.of(asVoid(nextSplitBatchFuture)); + blockedOnNextSplitBatch = true; + } + } - // assign the splits with successful placements - overallNewTasks.addAll(assignSplits(splitAssignment)); - - // Assert that "placement future is not done" implies "pendingSplits is not empty". - // The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line. - // However, there are other reasons that could lead to this. - // Note that `computeAssignments` is quite broken: - // 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked. - // 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion. - // As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here. - if (nextSplitBatchFuture == null && pendingSplits.isEmpty() && state != State.FINISHED) { - anyNotBlocked = true; - } + if (!pendingSplits.isEmpty() && state == State.INITIALIZED) { + state = State.SPLITS_ADDED; + } + + if (blockedFuture.isEmpty() && !pendingSplits.isEmpty()) { + if (!placementFuture.isDone()) { + blockedFuture = Optional.of(placementFuture); + blockedOnPlacements = true; + } + else { + // calculate placements for splits + SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits); + splitAssignment = splitPlacementResult.getAssignments(); // remove splits with successful placements + splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here. + overallSplitAssignmentCount += splitAssignment.size(); // if not completed placed, mark scheduleGroup as blocked on placement + if (!pendingSplits.isEmpty()) { + placementFuture = splitPlacementResult.getBlocked(); + blockedFuture = Optional.of(placementFuture); + blockedOnPlacements = true; } } } - // * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked. - // If state is FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now. - // * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source. - // * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures, - // which may contain recently published splits. We must not ignore those. - if (state == State.FINISHED && splitSource.isFinished()) { - Optional> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo(); + // assign the splits with successful placements + overallNewTasks.addAll(assignSplits(splitAssignment)); + + // if no new splits will be assigned, update state and attach completion event + if (pendingSplits.isEmpty() && state == State.SPLITS_SCHEDULED) { + state = State.FINISHED; + Optional> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo(); // Here we assume that we can get non-empty tableExecuteSplitsInfo only for queries which facilitate single split source. tableExecuteSplitsInfo.ifPresent(info -> { TableExecuteContext tableExecuteContext = tableExecuteContextManager.getTableExecuteContextForQuery(stageExecution.getStageId().getQueryId()); @@ -335,53 +316,35 @@ else if (pendingSplits.isEmpty()) { overallSplitAssignmentCount); } - if (anyNotBlocked) { + if (blockedFuture.isEmpty()) { return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount); } - boolean anySourceTaskBlocked = this.anySourceTaskBlocked.getAsBoolean(); - if (anySourceTaskBlocked) { + if (anySourceTaskBlocked.getAsBoolean()) { // Dynamic filters might not be collected due to build side source tasks being blocked on full buffer. // In such case probe split generation that is waiting for dynamic filters should be unblocked to prevent deadlock. dynamicFilterService.unblockStageDynamicFilters(stageExecution.getStageId().getQueryId(), stageExecution.getAttemptId(), stageExecution.getFragment()); - } - - if (anyBlockedOnPlacements && anySourceTaskBlocked) { - // In a broadcast join, output buffers of the tasks in build source stage have to - // hold onto all data produced before probe side task scheduling finishes, - // even if the data is acknowledged by all known consumers. This is because - // new consumers may be added until the probe side task scheduling finishes. - // - // As a result, the following line is necessary to prevent deadlock - // due to neither build nor probe can make any progress. - // The build side blocks due to a full output buffer. - // In the meantime the probe side split cannot be consumed since - // builder side hash table construction has not finished. - overallNewTasks.addAll(finalizeTaskCreationIfNecessary()); - } - - ScheduleResult.BlockedReason blockedReason; - if (anyBlockedOnNextSplitBatch) { - blockedReason = anyBlockedOnPlacements ? MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : WAITING_FOR_SOURCE; - } - else { - blockedReason = anyBlockedOnPlacements ? SPLIT_QUEUES_FULL : NO_ACTIVE_DRIVER_GROUP; - } - - overallBlockedFutures.add(whenFinished); - if (state == State.FINISHED && splitSource.isFinished()) { - // Wake up blocked caller so that it will invoke schedule() right away. - // Once schedule is invoked, state will be transitioned to FINISHED. - whenFinished.set(null); - whenFinished = SettableFuture.create(); + if (blockedOnPlacements) { + // In a broadcast join, output buffers of the tasks in build source stage have to + // hold onto all data produced before probe side task scheduling finishes, + // even if the data is acknowledged by all known consumers. This is because + // new consumers may be added until the probe side task scheduling finishes. + // + // As a result, the following line is necessary to prevent deadlock + // due to neither build nor probe can make any progress. + // The build side blocks due to a full output buffer. + // In the meantime the probe side split cannot be consumed since + // builder side hash table construction has not finished. + overallNewTasks.addAll(finalizeTaskCreationIfNecessary()); + } } return new ScheduleResult( false, overallNewTasks.build(), - nonCancellationPropagating(asVoid(whenAnyComplete(overallBlockedFutures))), - blockedReason, + nonCancellationPropagating(blockedFuture.get()), + blockedOnNextSplitBatch ? WAITING_FOR_SOURCE : SPLIT_QUEUES_FULL, overallSplitAssignmentCount); } @@ -390,25 +353,6 @@ private static ListenableFuture asVoid(ListenableFuture future) return Futures.transform(future, v -> null, directExecutor()); } - private synchronized void dropListenersFromWhenFinished() - { - // whenFinished may remain in a not-done state for an extended period of time. - // As a result, over time, it can retain a huge number of listener objects. - - // Whenever schedule is called, holding onto the previous listener is not useful anymore. - // Therefore, we drop those listeners here by recreating the future. - - // Note: The following implementation is thread-safe because whenFinished can only be completed - // while holding the monitor of this. - - if (whenFinished.isDone()) { - return; - } - - whenFinished.cancel(true); - whenFinished = SettableFuture.create(); - } - @Override public void close() { diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java index 7bdc8a093833..2c23fed4f441 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java @@ -1593,9 +1593,6 @@ else if (!result.getBlocked().isDone()) { case SPLIT_QUEUES_FULL: schedulerStats.getSplitQueuesFull().update(1); break; - case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE: - case NO_ACTIVE_DRIVER_GROUP: - break; default: throw new UnsupportedOperationException("Unknown blocked reason: " + result.getBlockedReason().get()); } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java index 2399a2ca78f8..ae9694be65ff 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java @@ -165,6 +165,29 @@ public void testScheduleNoSplits() stage.abort(); } + @Test + public void testDoesNotScheduleEmptySplit() + { + PlanFragment plan = createFragment(); + NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); + StageExecution stage = createStageExecution(plan, nodeTaskMap); + + ConnectorSplitSource splitSource = createFixedSplitSource(2, TestingSplit::createRemoteSplit); + StageScheduler scheduler = getSourcePartitionedScheduler(splitSource, stage, nodeManager, nodeTaskMap, 1, STAGE); + + assertEquals(scheduler.schedule().getNewTasks().size(), 1); + + // ensure that next batch size fetched by scheduler will be empty and last + splitSource.getNextBatch(1); + + ScheduleResult scheduleResult = scheduler.schedule(); + assertEquals(scheduleResult.getNewTasks().size(), 0); + + assertEffectivelyFinished(scheduleResult, scheduler); + + stage.abort(); + } + @Test public void testScheduleSplitsOneAtATime() {