Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions superset/db_engine_specs/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import numpy as np
import pandas as pd
import pyarrow as pa
from flask import current_app, Flask, g
from flask import ctx, current_app, Flask, g
from sqlalchemy import text
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.url import URL
Expand Down Expand Up @@ -227,12 +227,22 @@ def execute_with_cursor(
execute_event = threading.Event()

def _execute(
results: dict[str, Any], event: threading.Event, app: Flask
results: dict[str, Any],
event: threading.Event,
app: Flask,
g_copy: ctx._AppCtxGlobals,
) -> None:
logger.debug("Query %d: Running query: %s", query_id, sql)

try:
# Flask contexts are local to the thread that handles the request.
# When you spawn a new thread, it does not inherit the contexts
# from the parent thread,
# meaning the g object and other context-bound variables are not
# accessible
with app.app_context():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dpgaspar I did a quick GPT search.

from flask import current_app
import threading

def my_thread():
    with current_app.app_context():
        # Access Flask context within the thread
        app = current_app._get_current_object()
        # Do something with the app

# Create and start the thread
thread = threading.Thread(target=my_thread)
thread.start()

Would something like this work for our use cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems basically similar, except I'm passing the current_app into the thread, also we need to recreate g inside the thread

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use copy_current_request_context?

@copy_current_request_context
def worker():
    # Access Flask's g object here
    # Your code goes here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only the request context is copied according to: pallets/flask#3306

Copy link
Contributor

@joaoferrao joaoferrao Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed not including @ copy_current_request_context makes the thread's downstream calls url_for function calls fail with the screenshot here #27631 (comment)

I found this out in the context adapting #27631 for Trino. My understanding is that, when using OAuth2 config, the class OAuth2ClientConfigSchema will be called in a separated thread and has this missing information.

for key, value in g_copy.__dict__.items():
setattr(g, key, value)
cls.execute(cursor, sql, query.database)
except Exception as ex: # pylint: disable=broad-except
results["error"] = ex
Expand All @@ -245,6 +255,7 @@ def _execute(
execute_result,
execute_event,
current_app._get_current_object(), # pylint: disable=protected-access
g._get_current_object(), # pylint: disable=protected-access
),
)
execute_thread.start()
Expand Down
28 changes: 28 additions & 0 deletions tests/unit_tests/db_engine_specs/test_trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import pandas as pd
import pytest
from flask import g, has_app_context
from pytest_mock import MockerFixture
from requests.exceptions import ConnectionError as RequestsConnectionError
from sqlalchemy import sql, text, types
Expand Down Expand Up @@ -435,6 +436,33 @@ def _mock_execute(*args, **kwargs):
)


def test_execute_with_cursor_app_context(app, mocker: MockerFixture):
"""Test that `execute_with_cursor` still contains the current app context"""
from superset.db_engine_specs.trino import TrinoEngineSpec

mock_cursor = mocker.MagicMock()
mock_cursor.query_id = None

mock_query = mocker.MagicMock()
g.some_value = "some_value"

def _mock_execute(*args, **kwargs):
assert has_app_context()
assert g.some_value == "some_value"

with patch.object(TrinoEngineSpec, "execute", side_effect=_mock_execute):
with patch.dict(
"superset.config.DISALLOWED_SQL_FUNCTIONS",
{},
clear=True,
):
TrinoEngineSpec.execute_with_cursor(
cursor=mock_cursor,
sql="SELECT 1 FROM foo",
query=mock_query,
)


def test_get_columns(mocker: MockerFixture):
"""Test that ROW columns are not expanded without expand_rows"""
from superset.db_engine_specs.trino import TrinoEngineSpec
Expand Down
Loading