Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static com.facebook.presto.execution.scheduler.ScheduleResult.BlockedReason.WRITER_SCALING;
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.facebook.presto.util.Failures.checkCondition;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -73,6 +74,8 @@ public ScaledWriterScheduler(
this.writerMinSizeBytes = requireNonNull(writerMinSize, "minWriterSize is null").toBytes();
this.optimizedScaleWriterProducerBuffer = optimizedScaleWriterProducerBuffer;
this.initialTaskCount = requireNonNull(initialTaskCount, "initialTaskCount is null");

future.set(null);
}

public void finish()
Expand All @@ -84,13 +87,20 @@ public void finish()
@Override
public ScheduleResult schedule()
{
List<RemoteTask> writers = scheduleTasks(getNewTaskCount());
List<RemoteTask> writers = ImmutableList.of();

future.set(null);
future = SettableFuture.create();
executor.schedule(() -> future.set(null), 200, MILLISECONDS);
if (future.isDone()) {
writers = scheduleTasks(getNewTaskCount());
future = SettableFuture.create();
executor.schedule(() -> future.set(null), 200, MILLISECONDS);
}

return ScheduleResult.blocked(done.get(), writers, future, WRITER_SCALING, 0);
return ScheduleResult.blocked(
done.get(),
writers,
nonCancellationPropagating(future),
WRITER_SCALING,
0);
}

private int getNewTaskCount()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.RemoteTaskFactory;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.StageExecutionId;
import com.facebook.presto.execution.StageExecutionInfo;
import com.facebook.presto.execution.StageExecutionState;
import com.facebook.presto.execution.StageId;
Expand Down Expand Up @@ -424,7 +423,6 @@ private void schedule()
Set<StageId> completedStages = new HashSet<>();

List<ExecutionSchedule> sectionExecutionSchedules = new LinkedList<>();
Map<StageExecutionId, ListenableFuture<?>> blockedStages = new HashMap<>();

while (!Thread.currentThread().isInterrupted()) {
// remove finished section
Expand All @@ -447,6 +445,8 @@ private void schedule()
.forEach(sectionExecutionSchedules::add);

while (sectionExecutionSchedules.stream().noneMatch(ExecutionSchedule::isFinished)) {
List<ListenableFuture<?>> blockedStages = new ArrayList<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Switching from a Map to a List for blockedStages may affect duplicate handling.

The previous Map structure ensured each stage was only tracked once, preventing duplicates. With a List, the same stage could be added multiple times, potentially causing redundant actions. Please review if this could introduce bugs when a stage is scheduled more than once before completion.


List<StageExecutionAndScheduler> executionsToSchedule = sectionExecutionSchedules.stream()
.flatMap(schedule -> schedule.getStagesToSchedule().stream())
.collect(toImmutableList());
Expand All @@ -459,12 +459,6 @@ private void schedule()
SqlStageExecution stageExecution = stageExecutionAndScheduler.getStageExecution();
stageExecution.beginScheduling();

ListenableFuture<?> stillBlocked = blockedStages.get(stageExecution.getStageExecutionId());
if (stillBlocked != null && !stillBlocked.isDone()) {
continue;
}
blockedStages.remove(stageExecution.getStageExecutionId());

// perform some scheduling work
ScheduleResult result = stageExecutionAndScheduler.getStageScheduler()
.schedule();
Expand All @@ -482,7 +476,7 @@ private void schedule()
stageExecution.schedulingComplete();
}
else if (!result.getBlocked().isDone()) {
blockedStages.put(stageExecution.getStageExecutionId(), result.getBlocked());
blockedStages.add(result.getBlocked());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Adding blocked futures to a List may allow duplicates for the same stage.

Previously, the Map ensured only one future per stage; switching to a List allows multiple futures for the same stage if schedule() is called repeatedly before completion. This may lead to redundant cancellations and missed wake-ups if futures aren't managed correctly.

}
else {
allBlocked = false;
Expand Down Expand Up @@ -547,12 +541,12 @@ else if (!result.getBlocked().isDone()) {
// wait for a state change and then schedule again
if (allBlocked && !blockedStages.isEmpty()) {
try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {
tryGetFutureValue(whenAnyComplete(blockedStages.values()), 1, SECONDS);
}
for (ListenableFuture<?> blockedStage : blockedStages.values()) {
blockedStage.cancel(true);
tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS);
}
}
for (ListenableFuture<?> blockedStage : blockedStages) {
blockedStage.cancel(true);
}
}
}

Expand Down
Loading