Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Add more context when UDFs fail #2325

Merged
merged 6 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion daft/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ def __call__(self, evaluated_expressions: list[Series]) -> PySeries:
# This is not ideal and we should cache initializations across calls for the same process.
func = self.udf.get_initialized_func()

result = func(*args, **kwargs)
try:
result = func(*args, **kwargs)
except Exception as user_function_exception:
raise RuntimeError(
f"User-defined function `{func.__name__}` failed when executing on inputs with lengths: {tuple(len(series) for series in evaluated_expressions)}"
) from user_function_exception

# HACK: Series have names and the logic for naming fields/series in a UDF is to take the first
# Expression's name. Note that this logic is tied to the `to_field` implementation of the Rust PythonUDF
Expand Down
16 changes: 14 additions & 2 deletions tests/dataframe/test_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,15 @@ def echo_or_trigger(s):
assert next(it) == {"a": 0, "b": 0}

# Ensure the exception does trigger if execution continues.
with pytest.raises(MockException):
with pytest.raises(RuntimeError) as exc_info:
list(it)

# Ray's wrapping of the exception loses information about the `.cause`, but preserves it in the string error message
if daft.context.get_context().runner_config.name == "ray":
assert "MockException" in str(exc_info.value)
else:
assert isinstance(exc_info.value.__cause__, MockException)


def test_iter_partitions_exception(make_df):
# Test that df.iter_partitions actually returns results before completing execution.
Expand All @@ -95,7 +101,13 @@ def echo_or_trigger(s):
assert part == {"a": [0, 1], "b": [0, 1]}

# Ensure the exception does trigger if execution continues.
with pytest.raises(MockException):
with pytest.raises(RuntimeError) as exc_info:
res = list(it)
if daft.context.get_context().runner_config.name == "ray":
ray.get(res)

# Ray's wrapping of the exception loses information about the `.cause`, but preserves it in the string error message
if daft.context.get_context().runner_config.name == "ray":
assert "MockException" in str(exc_info.value)
else:
assert isinstance(exc_info.value.__cause__, MockException)
4 changes: 3 additions & 1 deletion tests/expressions/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ def throw_value_err(x):

expr = throw_value_err(col("a"))

with pytest.raises(ValueError, match="AN ERROR OCCURRED!"):
with pytest.raises(RuntimeError) as exc_info:
table.eval_expression_list([expr])
assert isinstance(exc_info.value.__cause__, ValueError)
assert str(exc_info.value.__cause__) == "AN ERROR OCCURRED!"


def test_no_args_udf_call():
Expand Down
22 changes: 4 additions & 18 deletions tests/integration/io/test_url_download_http.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import pytest
from aiohttp.client_exceptions import ClientResponseError

import daft

Expand All @@ -17,31 +16,18 @@ def test_url_download_http(mock_http_image_urls, image_data, use_native_download

@pytest.mark.integration()
@pytest.mark.parametrize("status_code", [400, 401, 403, 404, 429, 500, 503])
@pytest.mark.parametrize("use_native_downloader", [True, False])
def test_url_download_http_error_codes(nginx_config, use_native_downloader, status_code):
def test_url_download_http_error_codes(nginx_config, status_code):
server_url, _ = nginx_config
data = {"urls": [f"{server_url}/{status_code}.html"]}
df = daft.from_pydict(data)
df = df.with_column("data", df["urls"].url.download(on_error="raise", use_native_downloader=use_native_downloader))

skip_fsspec_downloader = daft.context.get_context().runner_config.name == "ray"
df = df.with_column("data", df["urls"].url.download(on_error="raise", use_native_downloader=True))

# 404 should always be corner-cased to return FileNotFoundError regardless of I/O implementation
# 404 should always be corner-cased to return FileNotFoundError
if status_code == 404:
with pytest.raises(FileNotFoundError):
df.collect()
# When using fsspec, other error codes are bubbled up to the user as aiohttp.client_exceptions.ClientResponseError
elif not use_native_downloader:
# Ray runner has a pretty catastrophic failure when raising non-pickleable exceptions (ClientResponseError is not pickleable)
# See Ray issue: https://github.com/ray-project/ray/issues/36893
if skip_fsspec_downloader:
pytest.skip()
with pytest.raises(ClientResponseError) as e:
df.collect()
assert e.value.code == status_code
# When using native downloader, we throw a ValueError
else:
# NOTE: We may want to add better errors in the future to provide a better
# user-facing I/O error with the error code
with pytest.raises(ValueError, match=f"{status_code}") as e:
with pytest.raises(ValueError, match=f"{status_code}"):
df.collect()
15 changes: 13 additions & 2 deletions tests/udf_library/test_url_udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,19 @@ def test_download_with_missing_urls_reraise_errors(files, use_native_downloader)
"bytes", col("filenames").url.download(on_error="raise", use_native_downloader=use_native_downloader)
)
# TODO: Change to a FileNotFound Error
with pytest.raises(FileNotFoundError):
df.collect()

if not use_native_downloader:
with pytest.raises(RuntimeError) as exc_info:
df.collect()

# Ray's wrapping of the exception loses information about the `.cause`, but preserves it in the string error message
if daft.context.get_context().runner_config.name == "ray":
assert "FileNotFoundError" in str(exc_info.value)
else:
assert isinstance(exc_info.value.__cause__, FileNotFoundError)
else:
with pytest.raises(FileNotFoundError):
df.collect()


@pytest.mark.parametrize("use_native_downloader", [False, True])
Expand Down
Loading