Add lambda support and array_transform udf#21679
Conversation
|
@comphead That's great. I'm almost sure none of these improvements are breaking so we can address it on another PR. I added it to #21172 CI failure seems unrelated: https://github.com/apache/datafusion/actions/runs/25037217756/job/73331583409#step:7:919 |
|
@comphead Regarding the performance optimization, I'm adding some helpers in arrow-rs that will handle sliced and cleanup of nulls to make it super fast than we can use later |
|
@comphead looks like you have no other comments, and I approved, so I will wait for the last @LiaCastaneda comment to be resolved and I'll merge this |
|
Big thank you for @gstvg and everyone involved! |
|
Thanks everyone! This is a huge one! |
|
this is pretty amazing -- I put a note to include it in the 55 release's notes: |
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. Followup on #21679 (comment) ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Raz Luvaton <16746759+rluvaton@users.noreply.github.com>
|
Many thanks for everyone involved, both reviewers and also those who showed interest on the feature. Reviewing such a big PR is not easy and I'm very grateful for it, thanks again ❤️ |
|
This is amazing. Thank you so much for all the hard work on this. This is definitely one of the things I am most excited about in the next release. This is going to be huge! |
|
I just wanted to bring to attention that DuckDB deprecated this very sytnax because of conflicts with JSON operators. I guess we may want to support both long term (Spark uses the arrow syntax), but I think there's a real risk that we are not even evaluating incompatibility with JSON operators because they are not implemented by default in DataFusion (but we are in talks to do so #21301). Happy to open an issue for discussion but wanted to check first if this was discussed at all, as far as I can tell from going over the PR it has not. |
|
@adriangb I think this decision can be left to the user via the configurable dialect, as today. This PR merely consumes the LambdaFunction from sqlparser-rs AST, which syntax it parses is defined by the configured dialect. Is up to sqlparser to avoid conflicts (see apache/datafusion-sqlparser-rs#2224). The sqllogictests here requires setting the dialect to databricks, for example. I guess what we can do here is:
|
This a clean version of #18921 to make it easier to review
this is a breaking change due to adding variant to
Exprenum, new methods on traitsSession,FunctionRegistryandContextProviderand a new arg onTaskContext::newThis PR adds support for lambdas and the
array_transformfunction used to test the lambda implementation.Example usage:
Note: column capture has been removed for now and will be added on a follow on PR, see #21172
Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code
3 new
Exprvariants are added,HigherOrderFunction, owing a new traitHigherOrderUDF, which is like aScalarFunction/ScalarUDFImplwith support for lambdas,Lambda, for the lambda body and it's parameters names, andLambdaVariable, which is likeColumnbut for lambdas parameters.Their logical representations:
The example would be planned into a tree like this:
The physical counterparts definition:
Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of
HigherOrderUDFand thearray_transformimplementation ofHigherOrderUDFrelevant methods, collapsed due to their sizeThe added
HigherOrderUDFtrait is almost a clone ofScalarUDFImpl, with the exception of:return_field_from_argsandinvoke_with_args, where nowargs.argsis a list of enums with two variants:ValueorLambdainstead of a list of valueslambda_parameters, which return aFieldfor each parameter supported for every lambda argument based on theFieldof the non lambda argumentsreturn_fieldand the deprecated onesis_nullableanddisplay_name.HigherOrderUDF
array_transform lambda_parameters implementation
array_transform return_field_from_args implementation
array_transform invoke_with_args implementation
How relevant HigherOrderUDF methods would be called and what they would return during planning and evaluation of the example
A pair HigherOrderUDF/HigherOrderUDFImpl like ScalarFunction was not used because those exist only to maintain backwards compatibility with the older API #8045
Why
LambdaVariableand notColumn:Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t".
Example of code of another traversal that would break:
Furthermore, the implemention of
ExprSchemableandPhysicalExpr::return_fieldforColumnexpects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters.By including a
FieldRefonLambdaVariablethat should be resolved during construction time in the sql planner,ExprSchemableandPhysicalExpr::return_fieldsimply return it's own Field:LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation
Possible alternatives discarded due to complexity, requiring downstream changes and implementation size:
How minimize_join_filter would looks like:
How minimize_join_filter would look like:
For any given HigherOrderFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from HigherOrderUDF::lambda_parameters
How it would look like: