Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 41 additions & 16 deletions superset/commands/sql_lab/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,40 +59,65 @@ def validate(self) -> None:
)
)

read_from_results_backend_start = now_as_float()
self._blob = results_backend.get(self._key)
app.config["STATS_LOGGER"].timing(
"sqllab.query.results_backend_read",
now_as_float() - read_from_results_backend_start,
)
stats_logger = app.config["STATS_LOGGER"]

if not self._blob:
# Check if query exists in database first (fast, avoids unnecessary S3 call)
self._query = (
db.session.query(Query).filter_by(results_key=self._key).one_or_none()
)
if self._query is None:
logger.warning(
"404 Error - Query not found in database for key: %s",
self._key,
)
stats_logger.incr("sqllab.results_backend.404_query_not_found")
raise SupersetErrorException(
SupersetError(
message=__(
"Data could not be retrieved from the results backend. You "
"need to re-run the original query."
"The query associated with these results could not be found. "
"You need to re-run the original query."
),
error_type=SupersetErrorType.RESULTS_BACKEND_ERROR,
level=ErrorLevel.ERROR,
),
status=410,
status=404,
)

self._query = (
db.session.query(Query).filter_by(results_key=self._key).one_or_none()
# Now fetch results from backend (query exists, so this is a valid request)
read_from_results_backend_start = now_as_float()
self._blob = results_backend.get(self._key)
stats_logger.timing(
"sqllab.query.results_backend_read",
now_as_float() - read_from_results_backend_start,
)
if self._query is None:

if not self._blob:
# Query exists in DB but results not in S3 - enhanced diagnostics
query_age_seconds = now_as_float() - (
self._query.end_time if self._query.end_time else now_as_float()
)
logger.warning(
"410 Error - Query exists in DB but results not in results backend"
" Query ID: %s, Status: %s, Age: %.2f seconds, "
"End time: %s, Results key: %s",
self._query.id,
self._query.status,
query_age_seconds,
self._query.end_time,
self._key,
)
stats_logger.incr("sqllab.results_backend.410_results_missing")

raise SupersetErrorException(
SupersetError(
message=__(
"The query associated with these results could not be found. "
"You need to re-run the original query."
"Data could not be retrieved from the results backend. You "
"need to re-run the original query."
),
error_type=SupersetErrorType.RESULTS_BACKEND_ERROR,
level=ErrorLevel.ERROR,
),
status=404,
status=410,
)

def run(
Expand Down
46 changes: 43 additions & 3 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,50 @@ def execute_sql_statements( # noqa: C901
"*** serialized payload size: %i", getsizeof(serialized_payload)
)
logger.debug("*** compressed payload size: %i", getsizeof(compressed))
results_backend.set(key, compressed, cache_timeout)
query.results_key = key

query.status = QueryStatus.SUCCESS
# Store results in backend and check if write succeeded
write_success = results_backend.set(key, compressed, cache_timeout)
if not write_success:
# Backend write failed - log error and don't set results_key
logger.error(
"Query %s: Failed to store results in backend, key: %s",
str(query_id),
key,
)
stats_logger.incr("sqllab.results_backend.write_failure")
# Don't set results_key to prevent 410 errors when fetching
query.results_key = None

# For async queries (not returning results inline), mark as FAILED
# because results are inaccessible to the user
if not return_results:
query.status = QueryStatus.FAILED
query.error_message = (
"Failed to store query results in the results backend. "
"Please try again or contact your administrator."
)
db.session.commit()
raise SupersetErrorException(
SupersetError(
message=__(
"Failed to store query results. Please try again."
),
error_type=SupersetErrorType.RESULTS_BACKEND_ERROR,
level=ErrorLevel.ERROR,
)
)
else:
# Write succeeded - set results_key in database
query.results_key = key
logger.info(
"Query %s: Successfully stored results in backend, key: %s",
str(query_id),
key,
)

# Only set SUCCESS if we didn't already set FAILED above
if query.status != QueryStatus.FAILED:
query.status = QueryStatus.SUCCESS
db.session.commit()

if return_results:
Expand Down
124 changes: 124 additions & 0 deletions tests/integration_tests/sql_lab/test_execute_sql_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest.mock import MagicMock, patch

from flask import current_app

from superset import db
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query
Expand Down Expand Up @@ -54,3 +57,124 @@ def test_non_async_execute(non_async_example_db: Database, example_query: Query)

if non_async_example_db.db_engine_spec.engine_name == "hive":
assert example_query.tracking_url_raw


@patch("superset.sql_lab.results_backend")
def test_results_backend_write_failure(
mock_results_backend: MagicMock,
async_example_db: Database,
example_query: Query,
):
"""Test async query marked FAILED when results_backend.set() False"""
import pytest

from superset.exceptions import SupersetErrorException

# Mock results backend to simulate write failure
mock_results_backend.set.return_value = False

# Execute query with store_results=True, return_results=False (async mode)
# Should raise exception because results can't be stored
with pytest.raises(SupersetErrorException) as exc_info:
execute_sql_statements(
example_query.id,
"select 1 as foo;",
store_results=True,
return_results=False,
start_time=now_as_float(),
expand_data=False,
log_params=dict(), # noqa: C408
)

# Verify exception message
assert "Failed to store query results" in str(exc_info.value.error.message)

# Refresh query from database to get updated state
db.session.refresh(example_query)

# Assert query status is FAILED (results inaccessible for async queries)
assert example_query.status == QueryStatus.FAILED

# Assert results_key is None (because backend write failed)
assert example_query.results_key is None

# Assert error message is set
assert "Failed to store query results" in example_query.error_message

# Assert backend.set() was called
assert mock_results_backend.set.called


@patch("superset.sql_lab.results_backend")
def test_results_backend_write_success(
mock_results_backend: MagicMock,
async_example_db: Database,
example_query: Query,
):
"""Test that query.results_key is set when results_backend.set() True"""
# Mock results backend to simulate successful write
mock_results_backend.set.return_value = True

# Execute query with store_results=True (async mode)
execute_sql_statements(
example_query.id,
"select 1 as foo;",
store_results=True,
return_results=False,
start_time=now_as_float(),
expand_data=False,
log_params=dict(), # noqa: C408
)

# Refresh query from database to get updated state
db.session.refresh(example_query)

# Assert query status is SUCCESS
assert example_query.status == QueryStatus.SUCCESS

# Assert results_key is set (UUID format)
assert example_query.results_key is not None
assert len(example_query.results_key) == 36 # UUID length with dashes

# Assert backend.set() was called
assert mock_results_backend.set.called


@patch("superset.sql_lab.results_backend")
def test_results_backend_write_failure_sync_mode(
mock_results_backend: MagicMock,
non_async_example_db: Database,
example_query: Query,
):
"""Test sync query SUCCESS when cache write fails (results inline)"""
# Mock results backend to simulate write failure
mock_results_backend.set.return_value = False

# Execute query with return_results=True (sync mode - results returned inline)
result = execute_sql_statements(
example_query.id,
"select 1 as foo;",
store_results=True,
return_results=True,
start_time=now_as_float(),
expand_data=True,
log_params=dict(), # noqa: C408
)

# Should return results inline even when cache write fails
assert result
assert result["query_id"] == example_query.id
assert result["status"] == QueryStatus.SUCCESS
assert result["data"] == [{"foo": 1}]

# Refresh query from database to get updated state
db.session.refresh(example_query)

# Assert query status is SUCCESS (results were returned inline)
assert example_query.status == QueryStatus.SUCCESS

# Assert results_key is None (because backend write failed)
assert example_query.results_key is None

# Assert backend.set() was called
assert mock_results_backend.set.called
Loading