Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,8 @@ public class PipelinedStageExecution
// current stage task tracking
@GuardedBy("this")
private final Set<TaskId> allTasks = new HashSet<>();
@GuardedBy("this")
private final Set<TaskId> finishedTasks = new HashSet<>();
@GuardedBy("this")
private final Set<TaskId> flushingTasks = new HashSet<>();
private final Set<TaskId> finishedTasks = ConcurrentHashMap.newKeySet();
Comment thread
sopel39 marked this conversation as resolved.
Outdated
private final Set<TaskId> flushingTasks = ConcurrentHashMap.newKeySet();

// source task tracking
@GuardedBy("this")
Expand Down Expand Up @@ -219,16 +217,16 @@ public synchronized void transitionToSchedulingSplits()
}

@Override
public synchronized void schedulingComplete()
public void schedulingComplete()
{
if (!stateMachine.transitionToScheduled()) {
return;
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
}

if (isFlushing()) {
if (isStageFlushing()) {
stateMachine.transitionToFlushing();
}
if (finishedTasks.containsAll(allTasks)) {
if (isStageFinished()) {
stateMachine.transitionToFinished();
}

Expand All @@ -237,13 +235,6 @@ public synchronized void schedulingComplete()
}
}

private synchronized boolean isFlushing()
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
{
// to transition to flushing, there must be at least one flushing task, and all others must be flushing or finished.
return !flushingTasks.isEmpty()
&& allTasks.stream().allMatch(taskId -> finishedTasks.contains(taskId) || flushingTasks.contains(taskId));
}

@Override
public synchronized void schedulingComplete(PlanNodeId partitionedSource)
{
Expand Down Expand Up @@ -340,13 +331,13 @@ public synchronized Optional<RemoteTask> scheduleTask(
return Optional.of(task);
}

private synchronized void updateTaskStatus(TaskStatus taskStatus)
private void updateTaskStatus(TaskStatus taskStatus)
{
State stageState = stateMachine.getState();
if (stageState.isDone()) {
return;
}

boolean newFlushingOrFinishedTaskObserved = false;
TaskState taskState = taskStatus.getState();

switch (taskState) {
Expand All @@ -367,11 +358,10 @@ private synchronized void updateTaskStatus(TaskStatus taskStatus)
fail(new TrinoException(GENERIC_INTERNAL_ERROR, "A task is in the ABORTED state but stage is " + stageState));
break;
case FLUSHING:
flushingTasks.add(taskStatus.getTaskId());
newFlushingOrFinishedTaskObserved = addFlushingTask(taskStatus.getTaskId());
break;
case FINISHED:
finishedTasks.add(taskStatus.getTaskId());
flushingTasks.remove(taskStatus.getTaskId());
newFlushingOrFinishedTaskObserved = addFinishedTask(taskStatus.getTaskId());
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
break;
default:
}
Expand All @@ -380,13 +370,54 @@ private synchronized void updateTaskStatus(TaskStatus taskStatus)
if (taskState == TaskState.RUNNING) {
stateMachine.transitionToRunning();
}
if (isFlushing()) {
stateMachine.transitionToFlushing();
// avoid extra synchronization if no new flushing or finished task was observed
if (newFlushingOrFinishedTaskObserved) {
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stageState is read before a task is added to the finishedTasks set. I think this breaks happens-before assumptions. It is possible that State stageState = stateMachine.getState(); is run when the state is not yet SCHEDULED so the stateMachine.transitionToFinished(); is not run while at the same time the schedulingComplete() transitions the stage to SCHEDULED but doesn't see all tasks being finished.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. You are correct.

if (isStageFlushing()) {
stateMachine.transitionToFlushing();
}
if (isStageFinished()) {
stateMachine.transitionToFinished();
}
}
if (finishedTasks.containsAll(allTasks)) {
stateMachine.transitionToFinished();
}
}

private synchronized boolean isStageFlushing()
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
{
// to transition to flushing, there must be at least one flushing task, and all others must be flushing or finished.
return !flushingTasks.isEmpty()
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
&& allTasks.stream().allMatch(taskId -> finishedTasks.contains(taskId) || flushingTasks.contains(taskId));
}

private synchronized boolean isStageFinished()
{
return finishedTasks.containsAll(allTasks);
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
}

private boolean addFlushingTask(TaskId taskId)
{
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
if (!flushingTasks.contains(taskId) && !finishedTasks.contains(taskId)) {
synchronized (this) {
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
// We need to check whether that task is not already finished. It could happen because of out of order of
// task status events
if (!finishedTasks.contains(taskId)) {
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
return flushingTasks.add(taskId);
}
}
}
return false;
}

private boolean addFinishedTask(TaskId taskId)
{
if (!finishedTasks.contains(taskId)) {
synchronized (this) {
boolean added = finishedTasks.add(taskId);
flushingTasks.remove(taskId);
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
return added;
}
}
return false;
}

private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo executionFailureInfo)
Expand Down