Implement general purpose async functions#1
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
5cb6e4e to
90bd4f6
Compare
| /// A scalar UDF that will be bypassed when planning logical plan. | ||
| /// This is used to register the remote function to the context. The function should not be | ||
| /// invoked by DataFusion. It's only used to generate the logical plan and unparsed them to SQL. | ||
| /// A scalar UDF that can invoke using async methods |
There was a problem hiding this comment.
Here is the the new API. At a high level it is meant to mimic ScalarUDFImpl except that it has a async invoke function
| pub struct AsyncFuncRule {} | ||
|
|
||
| impl PhysicalOptimizerRule for AsyncFuncRule { | ||
| /// Insert a AsyncFunctionNode node in front of this projection if there are any async functions in it |
There was a problem hiding this comment.
Here is the high level design: add a new node before a ProjectionExec that does the actual async calls
| .with_optimizer_rules(vec![]) | ||
| .with_query_planner(Arc::new(LLMQueryPlanner {})) | ||
| .with_physical_optimizer_rules(vec![]) | ||
| .with_physical_optimizer_rule(Arc::new(AsyncFuncRule {})) |
There was a problem hiding this comment.
Here is how to use the new code: add the new optimizer rule:
| Ok(()) | ||
| } | ||
|
|
||
| /// This is a simple example of a UDF that takes a string, invokes a (remote) LLM function |
There was a problem hiding this comment.
Now, define a struct that implements AsyncScalarUDFImpl
| Ok(DataType::Boolean) | ||
| } | ||
|
|
||
| async fn invoke_async(&self, args: &RecordBatch) -> Result<ArrayRef> { |
There was a problem hiding this comment.
Here is the function that is invoked (it is async) and should be able to do any network and other calls
|
More discussion here: |
| fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType>; | ||
|
|
||
| /// Invoke the function asynchronously with the async arguments | ||
| async fn invoke_async(&self, args: &RecordBatch) -> Result<ArrayRef>; |
There was a problem hiding this comment.
I wonder whether this should return a Stream of ArrayRef, so that internally you can batch the calls to an external system with the right batch size ? In case of LLM there might be also a problem with the context, I suppose...
There was a problem hiding this comment.
That is also an excellent question -- the current situation is that Datafusion handles the batching (aka target_size) -- so normally will pass 8k rows or whatever to the`
I think we could potentially make the API something like:
fn invoke_async_stream(&self, input: SendableRecordBatchStream) -> Result<SendableRecordBatchStream>;but I think that might be tricker to code / get right
In terms of LLM context, this particualr PR only adds async scalar functions. I think we could likely do something similar to with window and aggregate functions, which might more naturally map to context 🤔
| let schema_captured = schema_captured.clone(); | ||
|
|
||
| async move { | ||
| let batch = batch?; |
There was a problem hiding this comment.
minor, would moving this invocation of the ? operator save a task in case of an error?
There was a problem hiding this comment.
yes, you are right -- that would be an improvement 👍
Welcome back! |
This PR implements what I think is a general purpose framework for implementing async user defined functions.
The high level design is to handle async functions with a special new execution plan
I will comment more inline about the design.
When run with
cargo runthis program shows: