Skip to content

Commit d265bd2

Browse files
betodealmeidamichael-s-molina
authored andcommitted
fix: trino cursor (#25897)
(cherry picked from commit cdb18e0)
1 parent 8c099a3 commit d265bd2

File tree

1 file changed

+19
-13
lines changed

1 file changed

+19
-13
lines changed

superset/db_engine_specs/trino.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
187187

188188
@classmethod
189189
def execute_with_cursor(
190-
cls, cursor: Any, sql: str, query: Query, session: Session
190+
cls, cursor: Cursor, sql: str, query: Query, session: Session
191191
) -> None:
192192
"""
193193
Trigger execution of a query and handle the resulting cursor.
@@ -196,34 +196,40 @@ def execute_with_cursor(
196196
in another thread and invoke `handle_cursor` to poll for the query ID
197197
to appear on the cursor in parallel.
198198
"""
199+
# Fetch the query ID beforehand, since it might fail inside the thread due to
200+
# how the SQLAlchemy session is handled.
201+
query_id = query.id
202+
199203
execute_result: dict[str, Any] = {}
204+
execute_event = threading.Event()
200205

201-
def _execute(results: dict[str, Any]) -> None:
202-
logger.debug("Query %d: Running query: %s", query.id, sql)
206+
def _execute(results: dict[str, Any], event: threading.Event) -> None:
207+
logger.debug("Query %d: Running query: %s", query_id, sql)
203208

204-
# Pass result / exception information back to the parent thread
205209
try:
206210
cls.execute(cursor, sql)
207-
results["complete"] = True
208211
except Exception as ex: # pylint: disable=broad-except
209-
results["complete"] = True
210212
results["error"] = ex
213+
finally:
214+
event.set()
211215

212-
execute_thread = threading.Thread(target=_execute, args=(execute_result,))
216+
execute_thread = threading.Thread(
217+
target=_execute,
218+
args=(execute_result, execute_event),
219+
)
213220
execute_thread.start()
214221

215222
# Wait for a query ID to be available before handling the cursor, as
216223
# it's required by that method; it may never become available on error.
217-
while not cursor.query_id and not execute_result.get("complete"):
224+
while not cursor.query_id and not execute_event.is_set():
218225
time.sleep(0.1)
219226

220-
logger.debug("Query %d: Handling cursor", query.id)
227+
logger.debug("Query %d: Handling cursor", query_id)
221228
cls.handle_cursor(cursor, query, session)
222229

223230
# Block until the query completes; same behaviour as the client itself
224-
logger.debug("Query %d: Waiting for query to complete", query.id)
225-
while not execute_result.get("complete"):
226-
time.sleep(0.5)
231+
logger.debug("Query %d: Waiting for query to complete", query_id)
232+
execute_event.wait()
227233

228234
# Unfortunately we'll mangle the stack trace due to the thread, but
229235
# throwing the original exception allows mapping database errors as normal
@@ -237,7 +243,7 @@ def prepare_cancel_query(cls, query: Query, session: Session) -> None:
237243
session.commit()
238244

239245
@classmethod
240-
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
246+
def cancel_query(cls, cursor: Cursor, query: Query, cancel_query_id: str) -> bool:
241247
"""
242248
Cancel query in the underlying database.
243249

0 commit comments

Comments
 (0)