Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ public class ScheduleResult
public enum BlockedReason
{
WRITER_SCALING,
NO_ACTIVE_DRIVER_GROUP,
SPLIT_QUEUES_FULL,
WAITING_FOR_SOURCE,
MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE,
/**/;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.trino.execution.RemoteTask;
import io.trino.execution.TableExecuteContext;
import io.trino.execution.TableExecuteContextManager;
Expand All @@ -30,7 +29,6 @@
import io.trino.split.SplitSource.SplitBatch;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -49,9 +47,6 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.addSuccessCallback;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE;
import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.NO_ACTIVE_DRIVER_GROUP;
import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL;
import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.WAITING_FOR_SOURCE;
import static java.util.Objects.requireNonNull;
Expand All @@ -66,6 +61,11 @@ private enum State
*/
INITIALIZED,

/**
* At least one split has been added to pendingSplits set.
*/
SPLITS_ADDED,

/**
* All splits from underlying SplitSource have been discovered.
* No more splits will be added to the pendingSplits set.
Expand All @@ -89,14 +89,12 @@ private enum State
private final BooleanSupplier anySourceTaskBlocked;
private final PartitionIdAllocator partitionIdAllocator;
private final Map<InternalNode, RemoteTask> scheduledTasks;
private final Set<Split> pendingSplits = new HashSet<>();

public ListenableFuture<SplitBatch> nextSplitBatchFuture;
public ListenableFuture<Void> placementFuture = immediateVoidFuture();
public final Set<Split> pendingSplits = new HashSet<>();
private ListenableFuture<SplitBatch> nextSplitBatchFuture;
private ListenableFuture<Void> placementFuture = immediateVoidFuture();
private State state = State.INITIALIZED;

private SettableFuture<Void> whenFinished = SettableFuture.create();

private SourcePartitionedScheduler(
StageExecution stageExecution,
PlanNodeId partitionedNode,
Expand Down Expand Up @@ -226,102 +224,85 @@ public synchronized void start()
@Override
public synchronized ScheduleResult schedule()
{
dropListenersFromWhenFinished();
if (state == State.FINISHED) {
return new ScheduleResult(true, ImmutableSet.of(), 0);
}

int overallSplitAssignmentCount = 0;
Multimap<InternalNode, Split> splitAssignment = ImmutableMultimap.of();
ImmutableSet.Builder<RemoteTask> overallNewTasks = ImmutableSet.builder();
List<ListenableFuture<?>> overallBlockedFutures = new ArrayList<>();
boolean anyBlockedOnPlacements = false;
boolean anyBlockedOnNextSplitBatch = false;
boolean anyNotBlocked = false;

if (state != State.FINISHED) {
if (state == State.SPLITS_SCHEDULED) {
verify(nextSplitBatchFuture == null);
}
else if (pendingSplits.isEmpty()) {
// try to get the next batch
if (nextSplitBatchFuture == null) {
nextSplitBatchFuture = splitSource.getNextBatch(splitBatchSize);
Optional<ListenableFuture<Void>> blockedFuture = Optional.empty();
boolean blockedOnPlacements = false;
boolean blockedOnNextSplitBatch = false;

long start = System.nanoTime();
addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start));
}
if (state == State.SPLITS_SCHEDULED) {
verify(nextSplitBatchFuture == null);
}
else if (pendingSplits.isEmpty()) {
// try to get the next batch
if (nextSplitBatchFuture == null) {
nextSplitBatchFuture = splitSource.getNextBatch(splitBatchSize);

if (nextSplitBatchFuture.isDone()) {
SplitBatch nextSplits = getFutureValue(nextSplitBatchFuture);
nextSplitBatchFuture = null;
pendingSplits.addAll(nextSplits.getSplits());
if (nextSplits.isLastBatch()) {
if (state == State.INITIALIZED && pendingSplits.isEmpty()) {
// Add an empty split in case no splits have been produced for the source.
// For source operators, they never take input, but they may produce output.
// This is well handled by the execution engine.
// However, there are certain non-source operators that may produce output without any input,
// for example, 1) an AggregationOperator, 2) a HashAggregationOperator where one of the grouping sets is ().
// Scheduling an empty split kicks off necessary driver instantiation to make this work.
pendingSplits.add(new Split(
splitSource.getCatalogHandle(),
new EmptySplit(splitSource.getCatalogHandle())));
}
state = State.SPLITS_SCHEDULED;
}
}
else {
overallBlockedFutures.add(nextSplitBatchFuture);
anyBlockedOnNextSplitBatch = true;
}
long start = System.nanoTime();
addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start));
}
if (!anyBlockedOnNextSplitBatch) {
Multimap<InternalNode, Split> splitAssignment = ImmutableMultimap.of();
boolean skip = false;
if (!pendingSplits.isEmpty()) {
if (!placementFuture.isDone()) {
anyBlockedOnPlacements = true;
skip = true;
}
else { // calculate placements for splits
SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
splitAssignment = splitPlacementResult.getAssignments(); // remove splits with successful placements
splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.
overallSplitAssignmentCount += splitAssignment.size(); // if not completed placed, mark scheduleGroup as blocked on placement
if (!pendingSplits.isEmpty()) {
placementFuture = splitPlacementResult.getBlocked();
overallBlockedFutures.add(placementFuture);
anyBlockedOnPlacements = true;
}

if (nextSplitBatchFuture.isDone()) {
SplitBatch nextSplits = getFutureValue(nextSplitBatchFuture);
nextSplitBatchFuture = null;
pendingSplits.addAll(nextSplits.getSplits());
if (nextSplits.isLastBatch()) {
if (state == State.INITIALIZED && pendingSplits.isEmpty()) {
// Add an empty split in case no splits have been produced for the source.
// For source operators, they never take input, but they may produce output.
// This is well handled by the execution engine.
// However, there are certain non-source operators that may produce output without any input,
// for example, 1) an AggregationOperator, 2) a HashAggregationOperator where one of the grouping sets is ().
// Scheduling an empty split kicks off necessary driver instantiation to make this work.
pendingSplits.add(new Split(
splitSource.getCatalogHandle(),
new EmptySplit(splitSource.getCatalogHandle())));
}
state = State.SPLITS_SCHEDULED;
}
if (!skip) { // if no new splits will be assigned, update state and attach completion event
if (pendingSplits.isEmpty() && state == State.SPLITS_SCHEDULED) {
state = State.FINISHED;
}
}
else {
blockedFuture = Optional.of(asVoid(nextSplitBatchFuture));
blockedOnNextSplitBatch = true;
}
}

// assign the splits with successful placements
overallNewTasks.addAll(assignSplits(splitAssignment));

// Assert that "placement future is not done" implies "pendingSplits is not empty".
// The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line.
// However, there are other reasons that could lead to this.
// Note that `computeAssignments` is quite broken:
// 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked.
// 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion.
// As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here.
if (nextSplitBatchFuture == null && pendingSplits.isEmpty() && state != State.FINISHED) {
anyNotBlocked = true;
}
if (!pendingSplits.isEmpty() && state == State.INITIALIZED) {
state = State.SPLITS_ADDED;
}

if (blockedFuture.isEmpty() && !pendingSplits.isEmpty()) {
if (!placementFuture.isDone()) {
blockedFuture = Optional.of(placementFuture);
blockedOnPlacements = true;
}
else {
// calculate placements for splits
SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
splitAssignment = splitPlacementResult.getAssignments(); // remove splits with successful placements
splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.
overallSplitAssignmentCount += splitAssignment.size(); // if not completed placed, mark scheduleGroup as blocked on placement
if (!pendingSplits.isEmpty()) {
placementFuture = splitPlacementResult.getBlocked();
blockedFuture = Optional.of(placementFuture);
blockedOnPlacements = true;
}
}
}

// * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked.
// If state is FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now.
// * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source.
// * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures,
// which may contain recently published splits. We must not ignore those.
if (state == State.FINISHED && splitSource.isFinished()) {
Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();
// assign the splits with successful placements
overallNewTasks.addAll(assignSplits(splitAssignment));

// if no new splits will be assigned, update state and attach completion event
if (pendingSplits.isEmpty() && state == State.SPLITS_SCHEDULED) {
state = State.FINISHED;

Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();
// Here we assume that we can get non-empty tableExecuteSplitsInfo only for queries which facilitate single split source.
tableExecuteSplitsInfo.ifPresent(info -> {
TableExecuteContext tableExecuteContext = tableExecuteContextManager.getTableExecuteContextForQuery(stageExecution.getStageId().getQueryId());
Expand All @@ -335,53 +316,35 @@ else if (pendingSplits.isEmpty()) {
overallSplitAssignmentCount);
}

if (anyNotBlocked) {
if (blockedFuture.isEmpty()) {
return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
}

boolean anySourceTaskBlocked = this.anySourceTaskBlocked.getAsBoolean();
if (anySourceTaskBlocked) {
if (anySourceTaskBlocked.getAsBoolean()) {
// Dynamic filters might not be collected due to build side source tasks being blocked on full buffer.
// In such case probe split generation that is waiting for dynamic filters should be unblocked to prevent deadlock.
dynamicFilterService.unblockStageDynamicFilters(stageExecution.getStageId().getQueryId(), stageExecution.getAttemptId(), stageExecution.getFragment());
}

if (anyBlockedOnPlacements && anySourceTaskBlocked) {
// In a broadcast join, output buffers of the tasks in build source stage have to
// hold onto all data produced before probe side task scheduling finishes,
// even if the data is acknowledged by all known consumers. This is because
// new consumers may be added until the probe side task scheduling finishes.
//
// As a result, the following line is necessary to prevent deadlock
// due to neither build nor probe can make any progress.
// The build side blocks due to a full output buffer.
// In the meantime the probe side split cannot be consumed since
// builder side hash table construction has not finished.
overallNewTasks.addAll(finalizeTaskCreationIfNecessary());
}

ScheduleResult.BlockedReason blockedReason;
if (anyBlockedOnNextSplitBatch) {
blockedReason = anyBlockedOnPlacements ? MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : WAITING_FOR_SOURCE;
}
else {
blockedReason = anyBlockedOnPlacements ? SPLIT_QUEUES_FULL : NO_ACTIVE_DRIVER_GROUP;
}

overallBlockedFutures.add(whenFinished);

if (state == State.FINISHED && splitSource.isFinished()) {
// Wake up blocked caller so that it will invoke schedule() right away.
// Once schedule is invoked, state will be transitioned to FINISHED.
whenFinished.set(null);
Comment thread
sopel39 marked this conversation as resolved.
Outdated
whenFinished = SettableFuture.create();
if (blockedOnPlacements) {
// In a broadcast join, output buffers of the tasks in build source stage have to
// hold onto all data produced before probe side task scheduling finishes,
// even if the data is acknowledged by all known consumers. This is because
// new consumers may be added until the probe side task scheduling finishes.
//
// As a result, the following line is necessary to prevent deadlock
// due to neither build nor probe can make any progress.
// The build side blocks due to a full output buffer.
// In the meantime the probe side split cannot be consumed since
// builder side hash table construction has not finished.
overallNewTasks.addAll(finalizeTaskCreationIfNecessary());
}
}

return new ScheduleResult(
false,
overallNewTasks.build(),
nonCancellationPropagating(asVoid(whenAnyComplete(overallBlockedFutures))),
blockedReason,
nonCancellationPropagating(blockedFuture.get()),
blockedOnNextSplitBatch ? WAITING_FOR_SOURCE : SPLIT_QUEUES_FULL,
overallSplitAssignmentCount);
}

Expand All @@ -390,25 +353,6 @@ private static <T> ListenableFuture<Void> asVoid(ListenableFuture<T> future)
return Futures.transform(future, v -> null, directExecutor());
}

private synchronized void dropListenersFromWhenFinished()
{
// whenFinished may remain in a not-done state for an extended period of time.
// As a result, over time, it can retain a huge number of listener objects.

// Whenever schedule is called, holding onto the previous listener is not useful anymore.
// Therefore, we drop those listeners here by recreating the future.

// Note: The following implementation is thread-safe because whenFinished can only be completed
// while holding the monitor of this.

if (whenFinished.isDone()) {
return;
}

whenFinished.cancel(true);
whenFinished = SettableFuture.create();
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1593,9 +1593,6 @@ else if (!result.getBlocked().isDone()) {
case SPLIT_QUEUES_FULL:
schedulerStats.getSplitQueuesFull().update(1);
break;
case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:
case NO_ACTIVE_DRIVER_GROUP:
break;
default:
throw new UnsupportedOperationException("Unknown blocked reason: " + result.getBlockedReason().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,29 @@ public void testScheduleNoSplits()
stage.abort();
}

@Test
public void testDoesNotScheduleEmptySplit()
{
PlanFragment plan = createFragment();
NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService);
StageExecution stage = createStageExecution(plan, nodeTaskMap);

ConnectorSplitSource splitSource = createFixedSplitSource(2, TestingSplit::createRemoteSplit);
StageScheduler scheduler = getSourcePartitionedScheduler(splitSource, stage, nodeManager, nodeTaskMap, 1, STAGE);

assertEquals(scheduler.schedule().getNewTasks().size(), 1);

// ensure that next batch size fetched by scheduler will be empty and last
splitSource.getNextBatch(1);

ScheduleResult scheduleResult = scheduler.schedule();
assertEquals(scheduleResult.getNewTasks().size(), 0);

assertEffectivelyFinished(scheduleResult, scheduler);

stage.abort();
}

@Test
public void testScheduleSplitsOneAtATime()
{
Expand Down