diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ScaledWriterScheduler.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ScaledWriterScheduler.java index d4791d04ea625..3d63bda706327 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ScaledWriterScheduler.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ScaledWriterScheduler.java @@ -34,6 +34,7 @@ import static com.facebook.presto.execution.scheduler.ScheduleResult.BlockedReason.WRITER_SCALING; import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static com.facebook.presto.util.Failures.checkCondition; +import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -73,6 +74,8 @@ public ScaledWriterScheduler( this.writerMinSizeBytes = requireNonNull(writerMinSize, "minWriterSize is null").toBytes(); this.optimizedScaleWriterProducerBuffer = optimizedScaleWriterProducerBuffer; this.initialTaskCount = requireNonNull(initialTaskCount, "initialTaskCount is null"); + + future.set(null); } public void finish() @@ -84,13 +87,20 @@ public void finish() @Override public ScheduleResult schedule() { - List writers = scheduleTasks(getNewTaskCount()); + List writers = ImmutableList.of(); - future.set(null); - future = SettableFuture.create(); - executor.schedule(() -> future.set(null), 200, MILLISECONDS); + if (future.isDone()) { + writers = scheduleTasks(getNewTaskCount()); + future = SettableFuture.create(); + executor.schedule(() -> future.set(null), 200, MILLISECONDS); + } - return ScheduleResult.blocked(done.get(), writers, future, WRITER_SCALING, 0); + return ScheduleResult.blocked( + done.get(), + writers, + nonCancellationPropagating(future), + WRITER_SCALING, + 0); } private int getNewTaskCount() diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java index d81e49a35c216..98a9bd80502f5 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java @@ -27,7 +27,6 @@ import com.facebook.presto.execution.RemoteTask; import com.facebook.presto.execution.RemoteTaskFactory; import com.facebook.presto.execution.SqlStageExecution; -import com.facebook.presto.execution.StageExecutionId; import com.facebook.presto.execution.StageExecutionInfo; import com.facebook.presto.execution.StageExecutionState; import com.facebook.presto.execution.StageId; @@ -424,7 +423,6 @@ private void schedule() Set completedStages = new HashSet<>(); List sectionExecutionSchedules = new LinkedList<>(); - Map> blockedStages = new HashMap<>(); while (!Thread.currentThread().isInterrupted()) { // remove finished section @@ -447,6 +445,8 @@ private void schedule() .forEach(sectionExecutionSchedules::add); while (sectionExecutionSchedules.stream().noneMatch(ExecutionSchedule::isFinished)) { + List> blockedStages = new ArrayList<>(); + List executionsToSchedule = sectionExecutionSchedules.stream() .flatMap(schedule -> schedule.getStagesToSchedule().stream()) .collect(toImmutableList()); @@ -459,12 +459,6 @@ private void schedule() SqlStageExecution stageExecution = stageExecutionAndScheduler.getStageExecution(); stageExecution.beginScheduling(); - ListenableFuture stillBlocked = blockedStages.get(stageExecution.getStageExecutionId()); - if (stillBlocked != null && !stillBlocked.isDone()) { - continue; - } - blockedStages.remove(stageExecution.getStageExecutionId()); - // perform some scheduling work ScheduleResult result = stageExecutionAndScheduler.getStageScheduler() .schedule(); @@ -482,7 +476,7 @@ private void schedule() stageExecution.schedulingComplete(); } else if (!result.getBlocked().isDone()) { - blockedStages.put(stageExecution.getStageExecutionId(), result.getBlocked()); + blockedStages.add(result.getBlocked()); } else { allBlocked = false; @@ -547,12 +541,12 @@ else if (!result.getBlocked().isDone()) { // wait for a state change and then schedule again if (allBlocked && !blockedStages.isEmpty()) { try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) { - tryGetFutureValue(whenAnyComplete(blockedStages.values()), 1, SECONDS); - } - for (ListenableFuture blockedStage : blockedStages.values()) { - blockedStage.cancel(true); + tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS); } } + for (ListenableFuture blockedStage : blockedStages) { + blockedStage.cancel(true); + } } }