diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java index d34fd273d550..6a767e7ff354 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java @@ -128,6 +128,7 @@ private AssignmentResult assignReplicatedSplits(PlanNodeId planNodeId, List scheduledTask = stageExecution.addPartition(partition.partitionId(), partition.nodeRequirements()); - scheduledTask.ifPresent(schedulingQueue::addOrUpdate); + stageExecution.addPartition(partition.partitionId(), partition.nodeRequirements()); } for (PartitionUpdate partitionUpdate : assignment.partitionUpdates()) { - stageExecution.updatePartition( + Optional scheduledTask = stageExecution.updatePartition( partitionUpdate.partitionId(), partitionUpdate.planNodeId(), + partitionUpdate.readyForScheduling(), partitionUpdate.splits(), partitionUpdate.noMoreSplits()); + scheduledTask.ifPresent(schedulingQueue::addOrUpdate); } assignment.sealedPartitions().forEach(partitionId -> { Optional scheduledTask = stageExecution.sealPartition(partitionId); @@ -1300,10 +1301,10 @@ public boolean isExchangeClosed() return exchangeClosed; } - public Optional addPartition(int partitionId, NodeRequirements nodeRequirements) + public void addPartition(int partitionId, NodeRequirements nodeRequirements) { if (getState().isDone()) { - return Optional.empty(); + return; } ExchangeSinkHandle exchangeSinkHandle = exchange.addSink(partitionId); @@ -1323,18 +1324,28 @@ public Optional addPartition(int partitionId, NodeRequ checkState(partitions.putIfAbsent(partitionId, partition) == null, "partition with id %s already exist in stage %s", partitionId, stage.getStageId()); getSourceOutputSelectors().forEach((partition::updateExchangeSourceOutputSelector)); remainingPartitions.add(partitionId); - - return Optional.of(PrioritizedScheduledTask.createSpeculative(stage.getStageId(), partitionId, schedulingPriority)); } - public void updatePartition(int partitionId, PlanNodeId planNodeId, List splits, boolean noMoreSplits) + public Optional updatePartition( + int partitionId, + PlanNodeId planNodeId, + boolean readyForScheduling, + List splits, + boolean noMoreSplits) { if (getState().isDone()) { - return; + return Optional.empty(); } StagePartition partition = getStagePartition(partitionId); partition.addSplits(planNodeId, splits, noMoreSplits); + if (readyForScheduling && !partition.isTaskScheduled()) { + partition.setTaskScheduled(true); + return Optional.of(PrioritizedScheduledTask.createSpeculative(stage.getStageId(), partitionId, schedulingPriority)); + } + else { + return Optional.empty(); + } } public Optional sealPartition(int partitionId) @@ -1760,6 +1771,7 @@ private static class StagePartition private final Set runningTasks = new HashSet<>(); private final Set finalSelectors = new HashSet<>(); private final Set noMoreSplits = new HashSet<>(); + private boolean taskScheduled; private boolean finished; public StagePartition( @@ -1951,6 +1963,17 @@ public boolean isRunning() return !runningTasks.isEmpty(); } + public boolean isTaskScheduled() + { + return taskScheduled; + } + + public void setTaskScheduled(boolean taskScheduled) + { + checkArgument(taskScheduled, "taskScheduled must be true"); + this.taskScheduled = taskScheduled; + } + public boolean isFinished() { return finished; diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java index f7775abe553d..81235c8a46a3 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java @@ -147,7 +147,7 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap splits, boolean noMoreSplits) + record PartitionUpdate( + int partitionId, + PlanNodeId planNodeId, + boolean readyForScheduling, + List splits, + boolean noMoreSplits) { public PartitionUpdate { requireNonNull(planNodeId, "planNodeId is null"); + checkArgument(!(readyForScheduling && splits.isEmpty()), "partition update with empty splits marked as ready for scheduling"); splits = ImmutableList.copyOf(requireNonNull(splits, "splits is null")); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java index abb16729ead9..a5320299276d 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestEventDrivenTaskSource.java @@ -665,15 +665,15 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap