Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,6 @@ private SqlQueryExecution(
tableExecuteContextManager.unregisterTableExecuteContextForQuery(stateMachine.getQueryId());
});

// when the query finishes cache the final query info, and clear the reference to the output stage
AtomicReference<SqlQueryScheduler> queryScheduler = this.queryScheduler;
stateMachine.addStateChangeListener(state -> {
if (!state.isDone()) {
return;
}

// query is now done, so abort any work that is still running
SqlQueryScheduler scheduler = queryScheduler.get();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have test for aborting? Can it be that this listener covers some flow that is not covered by SqlQueryScheduler?

Copy link
Copy Markdown
Contributor Author

@arhimondr arhimondr Aug 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today the SqlQueryScheduler is responsible of attaching the listeners that terminate the scheduling process upon query completion (due to a failure / cancellation or early finish): https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java#L303. SqlQueryScheduler attaches the listener before the scheduling process starts and uses an internal lock to prevent race conditions.

These listeners do exactly what the abort method does. Having an external listener calling the abort method is not necessary and might be confusing for the readers.

if (scheduler != null) {
scheduler.abort();
}
});

this.remoteTaskFactory = new MemoryTrackingRemoteTaskFactory(requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"), stateMachine);
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
this.coordinatorTaskManager = requireNonNull(coordinatorTaskManager, "coordinatorTaskManager is null");
Expand Down Expand Up @@ -530,13 +516,6 @@ private void planDistribution(PlanRoot plan)
taskDescriptorStorage);

queryScheduler.set(scheduler);

// if query was canceled during scheduler creation, abort the scheduler
// directly since the callback may have already fired
if (stateMachine.isDone()) {
scheduler.abort();
queryScheduler.set(null);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't queryScheduler.set(null); be called anyway (e.g. to free resources)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queryScheduler variable is nullified in buildQueryInfo after building a final query info. It is currently not nullified in case of a failure or completion anywhere else, thus it seemed to be weird to nullify it here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is currently not nullified in case of a failure or completion anywhere else, thus it seemed to be weird to nullify it here.

So was this a race between buildQueryInfo and nullification here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt there any. It looked very weird. Later on there's another check where the scheduler is not getting nullified: https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java#L423.

}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,17 +487,6 @@ public synchronized void cancelStage(StageId stageId)
}
}

public synchronized void abort()
{
try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
coordinatorStagesScheduler.abort();
DistributedStagesScheduler distributedStagesScheduler = this.distributedStagesScheduler.get();
if (distributedStagesScheduler != null) {
distributedStagesScheduler.abort();
}
}
}

public void failTask(TaskId taskId, Throwable failureCause)
{
try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
Expand Down