Skip to content

Commit ff0e23a

Browse files
JunhyungSongdain
authored andcommitted
Use explicit query executor to start and fail execution
1 parent f43703b commit ff0e23a

File tree

1 file changed

+17
-19
lines changed

1 file changed

+17
-19
lines changed

core/trino-main/src/main/java/io/trino/dispatcher/LocalDispatchQuery.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ private void waitForMinimumWorkers()
126126
}
127127
ListenableFuture<Void> minimumWorkerFuture = clusterSizeMonitor.waitForMinimumWorkers(executionMinCount, getRequiredWorkersMaxWait(session));
128128
// when worker requirement is met, start the execution
129-
addSuccessCallback(minimumWorkerFuture, () -> startExecution(queryExecution));
130-
addExceptionCallback(minimumWorkerFuture, throwable -> queryExecutor.execute(() -> stateMachine.transitionToFailed(throwable)));
129+
addSuccessCallback(minimumWorkerFuture, () -> startExecution(queryExecution), queryExecutor);
130+
addExceptionCallback(minimumWorkerFuture, throwable -> stateMachine.transitionToFailed(throwable), queryExecutor);
131131

132132
// cancel minimumWorkerFuture if query fails for some reason or is cancelled by user
133133
stateMachine.addStateChangeListener(state -> {
@@ -140,25 +140,23 @@ private void waitForMinimumWorkers()
140140

141141
private void startExecution(QueryExecution queryExecution)
142142
{
143-
queryExecutor.execute(() -> {
144-
if (stateMachine.transitionToDispatching()) {
145-
try {
146-
querySubmitter.accept(queryExecution);
147-
if (notificationSentOrGuaranteed.compareAndSet(false, true)) {
148-
queryExecution.addFinalQueryInfoListener(queryMonitor::queryCompletedEvent);
149-
}
150-
}
151-
catch (Throwable t) {
152-
// this should never happen but be safe
153-
stateMachine.transitionToFailed(t);
154-
log.error(t, "query submitter threw exception");
155-
throw t;
156-
}
157-
finally {
158-
submitted.set(null);
143+
if (stateMachine.transitionToDispatching()) {
144+
try {
145+
querySubmitter.accept(queryExecution);
146+
if (notificationSentOrGuaranteed.compareAndSet(false, true)) {
147+
queryExecution.addFinalQueryInfoListener(queryMonitor::queryCompletedEvent);
159148
}
160149
}
161-
});
150+
catch (Throwable t) {
151+
// this should never happen but be safe
152+
stateMachine.transitionToFailed(t);
153+
log.error(t, "query submitter threw exception");
154+
throw t;
155+
}
156+
finally {
157+
submitted.set(null);
158+
}
159+
}
162160
}
163161

164162
@Override

0 commit comments

Comments
 (0)