-
Notifications
You must be signed in to change notification settings - Fork 295
feat: async @daft.func #4908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: async @daft.func #4908
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR introduces async function support for Daft's @daft.func
decorator, allowing users to write asynchronous user-defined functions (UDFs) that can perform I/O-bound operations efficiently. The implementation adds three main components:
-
Detection of async functions: The Rust layer now uses
asyncio.iscoroutinefunction()
to identify when a decorated function is async and routes it to a specialized handler. -
Concurrent async execution: A new Python function
call_async_batch_with_evaluated_exprs()
executes all async UDF calls concurrently usingasyncio.gather()
, providing significant performance benefits for I/O-bound operations compared to sequential execution. -
Modified execution path: Async functions follow a different code path than synchronous functions, collecting all evaluated arguments upfront and processing them together rather than using the chunked parallel processing approach used for sync functions.
The feature integrates cleanly with Daft's existing UDF system - users can simply add async
to their function definition and await
async operations inside, while the framework handles the complexity of concurrent execution. This is particularly valuable for UDFs that need to make HTTP requests, query databases, or perform other async I/O operations. A comprehensive test validates that async UDFs produce identical results to their synchronous counterparts while benefiting from concurrent execution.
Confidence score: 3/5
- This PR introduces complex async functionality that could have subtle runtime issues
- Score reflects concerns about event loop management, error handling, and argument processing edge cases
- Pay close attention to
daft/udf/row_wise.py
andsrc/daft-dsl/src/python_udf.rs
for potential async-related issues
3 files reviewed, 2 comments
try: | ||
# try to use existing event loop | ||
event_loop = asyncio.get_running_loop() | ||
outputs = asyncio.run_coroutine_threadsafe(run_tasks(), event_loop).result() | ||
except RuntimeError: | ||
outputs = asyncio.run(run_tasks()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seemed like a pretty reasonable initial approach to me. I think as an optimization, we could create a global event loop that we reuse instead of potentially creating a new loop every time an async udf is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me, I was also thinking of using the Tokio event loop and making eval_expression_list
async to avoid blocking entirely.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4908 +/- ##
==========================================
+ Coverage 77.84% 79.26% +1.41%
==========================================
Files 906 906
Lines 127107 126048 -1059
==========================================
+ Hits 98952 99913 +961
+ Misses 28155 26135 -2020
🚀 New features to boost your workflow:
|
mkdocs.yml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
precommit autoformatter updated this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @universalmind303!
@@ -125,14 +125,14 @@ impl FunctionEvaluator for LegacyPythonUDF { | |||
} | |||
} | |||
|
|||
fn evaluate(&self, inputs: &[Series], _: &FunctionExpr) -> DaftResult<Series> { | |||
fn evaluate(&self, _inputs: &[Series], _: &FunctionExpr) -> DaftResult<Series> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Change _inputs
back to inputs
since its used
try: | ||
# try to use existing event loop | ||
event_loop = asyncio.get_running_loop() | ||
outputs = asyncio.run_coroutine_threadsafe(run_tasks(), event_loop).result() | ||
except RuntimeError: | ||
outputs = asyncio.run(run_tasks()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me, I was also thinking of using the Tokio event loop and making eval_expression_list
async to avoid blocking entirely.
CodSpeed Performance ReportMerging #4908 will degrade performances by 99.29%Comparing Summary
Benchmarks breakdown
|
Plz add this into docs! This seems super cool and we should 100% be shouting it out where possible. @kevinzwang to advise where best to advertise in the new docs |
## Changes Made adds basic support & tests for async `@daft.func` ```py @daft.func async def my_udf(text)->str: return text.upper() df = daft.from_pydict({ "text":["hello", "world"] }) print(df.select(my_udf(df["text"])).collect()) ``` ## Related Issues <!-- Link to related GitHub issues, e.g., "Closes #123" --> ## Checklist - [ ] Documented in API Docs (if applicable) - [ ] Documented in User Guide (if applicable) - [ ] If adding a new documentation page, doc is added to `docs/mkdocs.yml` navigation - [ ] Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)
Changes Made
adds basic support & tests for async
@daft.func
Related Issues
Checklist
docs/mkdocs.yml
navigation