Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: User Defined Window Functions #6617

Closed
wants to merge 4 commits into from

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jun 9, 2023

Which issue does this PR close?

#5781

Rationale for this change

We would like to allow users to take full advantage of the power of DataFusion's window functions (largely contributed by @ozankabak and @mustafasrepo 👏 )

This PR contains a potential implementation of User Defined Window Functions: (the "Use existing APIs" approach described on #5781 (comment))

I don't intend to merge this specific PR. Instead, if the community likes this basic approach I will break this PR up into pieces and incrementally merge it

What changes are included in this PR?

The new example in this PR shows how this works. Run

cargo run --example simple_udwf

Which produces the following output (where my_average's implementation is defined in simple_udwf.rs as a user defined window function):

+-------+-------+--------------------------+------------------------+---------------------+
| car   | speed | LAG(cars.speed,Int64(1)) | my_average(cars.speed) | time                |
+-------+-------+--------------------------+------------------------+---------------------+
| red   | 20.0  |                          | 20.0                   | 1996-04-12T12:05:03 |
| red   | 20.3  | 20.0                     | 20.15                  | 1996-04-12T12:05:04 |
| red   | 21.4  | 20.3                     | 20.85                  | 1996-04-12T12:05:05 |
| red   | 21.5  | 21.4                     | 21.45                  | 1996-04-12T12:05:06 |
| red   | 19.0  | 21.5                     | 20.25                  | 1996-04-12T12:05:07 |
| red   | 18.0  | 19.0                     | 18.5                   | 1996-04-12T12:05:08 |
| red   | 17.0  | 18.0                     | 17.5                   | 1996-04-12T12:05:09 |
| red   | 7.0   | 17.0                     | 12.0                   | 1996-04-12T12:05:10 |
| red   | 7.1   | 7.0                      | 7.05                   | 1996-04-12T12:05:11 |
| red   | 7.2   | 7.1                      | 7.15                   | 1996-04-12T12:05:12 |
| red   | 3.0   | 7.2                      | 5.1                    | 1996-04-12T12:05:13 |
| red   | 1.0   | 3.0                      | 2.0                    | 1996-04-12T12:05:14 |
| red   | 0.0   | 1.0                      | 0.5                    | 1996-04-12T12:05:15 |
| green | 10.0  |                          | 10.0                   | 1996-04-12T12:05:03 |
| green | 10.3  | 10.0                     | 10.15                  | 1996-04-12T12:05:04 |
| green | 10.4  | 10.3                     | 10.350000000000001     | 1996-04-12T12:05:05 |
| green | 10.5  | 10.4                     | 10.45                  | 1996-04-12T12:05:06 |
| green | 11.0  | 10.5                     | 10.75                  | 1996-04-12T12:05:07 |
| green | 12.0  | 11.0                     | 11.5                   | 1996-04-12T12:05:08 |
| green | 14.0  | 12.0                     | 13.0                   | 1996-04-12T12:05:09 |
| green | 15.0  | 14.0                     | 14.5                   | 1996-04-12T12:05:10 |
| green | 15.1  | 15.0                     | 15.05                  | 1996-04-12T12:05:11 |
| green | 15.2  | 15.1                     | 15.149999999999999     | 1996-04-12T12:05:12 |
| green | 8.0   | 15.2                     | 11.6                   | 1996-04-12T12:05:13 |
| green | 2.0   | 8.0                      | 5.0                    | 1996-04-12T12:05:14 |
+-------+-------+--------------------------+------------------------+---------------------+

Here are the major changes in this PR

  1. Move PartitionEvaluator definition into datafusion_expr (much like the Accumulator trait for AggregateUDFs)
  2. Moved WindowAggState, WindwFrameContext and some related structures to datafusion_expr (so the UDWF did not depend on datafusion-physical-expr
  3. Traitify the built in state so WindowUDF did not depend on datafusion-physical-expr

Open questions:

I think it may be possible to simplify the PartitionEvaluator to remove the state management which would make the needed changes (the amount of code that needs to be moved to datafusion_expr) smaller. I will try to do this as a separate PR

Outstanding cleanups

I found a place where the optimizer special cases a particular window function which I think I can remove (and I will try to do so as separate PR (#6619)

https://github.com/apache/arrow-datafusion/blob/1af846bd8de387ce7a6e61a2008917a7610b9a7b/datafusion/core/src/physical_plan/windows/mod.rs#L254-L257

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sql SQL Planner labels Jun 9, 2023
@alamb alamb force-pushed the alamb/user_defined_window_functions branch from c7119e6 to 1bc2d6e Compare June 9, 2023 18:03
"SELECT car, \
speed, \
lag(speed, 1) OVER (PARTITION BY car ORDER BY time),\
my_average(speed) OVER (PARTITION BY car ORDER BY time),\
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows calling the user defined window function via SQL

}

/// These different evaluation methods are called depending on the various settings of WindowUDF
impl PartitionEvaluator for MyPartitionEvaluator {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the proposal of how a user would specify specify the window calculation -- by impl PartitionEvaluator

// TODO make a helper funciton like `crate_udf` that helps to make these signatures

fn my_average() -> WindowUDF {
WindowUDF {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the structure that provides metadata about the window function

@@ -17,14 +17,24 @@

//! Partition evaluation module

use crate::window::window_expr::BuiltinWindowState;
use crate::window::WindowAggState;
use crate::window_frame_state::WindowAggState;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is moved into datafusion_expr and I had to make a few small changes related to state management -- but I think I may be able to avoid that.


/// Logical representation of a user-defined window function (UDWF)
/// A UDAF is different from a UDF in that it is stateful across batches.
#[derive(Clone)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the proposed interface for defining a WindowUDF -- it is very similar to ScalarUDF and AggregateUDF, on purpose

use std::cmp::min;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;

/// State for each unique partition determined according to PARTITION BY column(s)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this stuff is related to state management and was just moved (not modified)

@alamb alamb mentioned this pull request Jun 9, 2023
4 tasks
@ozankabak
Copy link
Contributor

Thanks for putting this together. @mustafasrepo will take a look and we will discuss in our next meeting, then we will circle back to you with our thoughts.

@alamb
Copy link
Contributor Author

alamb commented Jun 9, 2023

@stuartcarnie / @ozankabak / @mustafasrepo / @doki23 as you have expressed interest in the user defined window functionality I wonder if you have some time to review the proposed API for doing so

@mustafasrepo
Copy link
Contributor

Thanks @alamb for this proposal. I have examined this PR. Overall it is very well written, and showcases the usage of the new API. However, I have some concerns about flexibility of WindowUDF and AggregateUDF. Specifically, user can implement multiple different WindowUDFs. However, all implementations are initialized from WindowUDFExpr (There is a similar pattern in AggregateUDF. All AggregateUDFs are initialized from AggregateFunctionExpr).

BuiltInWindowFunctionExpr (WindowUDFExpr implements this trait) contains methods such as supports_bounded_execution, uses_window_frame, include_rank. These methods effect which methods will be used during evaluator. For instance if user wants to add a evaluator that uses window frame boundaries, user should be able to set flag uses_window_frame for the evaluator (Hence when WindowUDFExpr is implemented, these fields are fixed, and users have no control over them).

Hence I think, we should either support for user to create custom BuiltInWindowFunctionExpr for each new WindowUDF (I think this is hard and more cumbersome on the user side) or we should move all the parameters that effects the decision (such as supports_bounded_execution, uses_window_frame, include_rank) to the evaluator and/or aggregator side. By this way user will have full control on the custom implementation.

I am working on a new design to solve this problem. @metesynnada and I will discuss it tomorrow, and let you know about the final result.

@alamb
Copy link
Contributor Author

alamb commented Jun 12, 2023

Hence I think, we should either support for user to create custom BuiltInWindowFunctionExpr for each new WindowUDF (I think this is hard and more cumbersome on the user side) or we should move all the parameters that effects the decision (such as supports_bounded_execution, uses_window_frame, include_rank) to the evaluator and/or aggregator side. By this way user will have full control on the custom implementation.

thank you for the response @mustafasrepo -- this makes sense to me. I agree it would be better if the type system could help enforce functions that needed to be implemented

Are you thinking something like this?

pub trait Accumulator: Send + Sync + Debug {
    // Required methods
    fn state(&self) -> Result<Vec<ScalarValue, Global>, DataFusionError>;
    fn update_batch(
        &mut self,
        values: &[Arc<dyn Array + 'static>]
    ) -> Result<(), DataFusionError>;
    fn merge_batch(
        &mut self,
        states: &[Arc<dyn Array + 'static>]
    ) -> Result<(), DataFusionError>;
    fn evaluate(&self) -> Result<ScalarValue, DataFusionError>;
    fn size(&self) -> usize;
}

/// A trait for accumualtors that also provide the ability to remove rows from a window incrementally
pub trait RetractableAccumulator: Accumulator {.  // <---- add this trait
    // Provided method
    fn retract_batch(
        &mut self,
        _values: &[Arc<dyn Array + 'static>]
    ) -> Result<(), DataFusionError> { ... }
}

And then adding a separate function to AggregateUDF:

enum AccumulatorImpl {.     // <---- add this enum
  NonRetractable(Box<dyn Accumulator + 'static>
  /// returned if the accumulator supports retractable windows
  Retractable(Box<dyn RetractableAccumulator + 'static>),
}

pub struct AggregateUDF {
    pub name: ...,
    pub signature: ...,
    pub return_type: ...
    // change this signature ----+v
    pub accumulator: Arc<dyn Fn(&DataType) -> Result<AccumulatorImpl>, DataFusionError> + Sync + Send + 'static>,
    pub state_type: ...
}

?

Also related is #6611 we found (which is one of the features of the Accumulator that is not yet exposed by AggregateUDF)

Comment on lines +116 to +118
fn make_partition_evaluator() -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(MyPartitionEvaluator::new()))
}
Copy link
Contributor

@stuartcarnie stuartcarnie Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible we could support passing scalar arguments when creating an instance of the function, similar to the built-in functions?

For example, the lag function takes an optional scalar value for the second argument, which is the shift offset:

https://github.com/apache/arrow-datafusion/blob/a42cc8d98b6e875c485e7e9b106d30803a32b00a/datafusion/core/src/physical_plan/windows/mod.rs#L148-L152

I would use this for functions such as moving_average, which requires a scalar for specifying the minimum number of rows to average.


Note

This would be a welcomed feature for UDAFs too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try and figure out how to do this

Copy link
Contributor Author

@alamb alamb Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stuartcarnie I looked into adding the arguments. The primary issue I encountered is that a WindowUDF is specified in terms of structures in datafusion-expr ( aka it doesn't have access to PhysicalExprs as those are defined in a different crate.

Here are some possible signatures we could provide. Do you have any feedback on these possibilities?

Pass in the Exprs from the logical plan

This is non ideal in my mind as the PartitionEvaluator is created during execution (where the Exprs are normally not around anymore)

/// Factory that creates a PartitionEvaluator for the given window function.
///
/// This function is passed its input arguments so that cases such as
/// constants can be correctly handled.
pub type PartitionEvaluatorFunctionFactory =
    Arc<dyn Fn(&[Expr]) -> Result<Box<dyn PartitionEvaluator>> + Send + Sync>;

Pass in a ArgType enum

This is also non ideal in my mind as it seemingly artificially limits what the user defined window function can special case (why not Column's for example??)

enum ArgType {
  /// The argument was a single value
  Scalar(ScalarValue),
  /// the argument is something other than a single value
  Array
}

/// Factory that creates a PartitionEvaluator for the given window function.
///
/// This function is passed its input arguments so that cases such as
/// constants can be specially handled if desired.
pub type PartitionEvaluatorFunctionFactory =
    Arc<dyn Fn(args: Vec<ArgType>) -> Result<Box<dyn PartitionEvaluator>> + Send + Sync>;

Others?

Copy link
Contributor

@stuartcarnie stuartcarnie Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of passing the PhysicalExpr trait objects? Example:

/// Factory that creates a PartitionEvaluator for the given window function.
///
/// This function is passed its input arguments and schema so that cases such as
/// constants can be correctly handled.
pub type PartitionEvaluatorFunctionFactory =
    Arc<dyn Fn(&[Arc<dyn PhysicalExpr>], &Schema) -> Result<Box<dyn PartitionEvaluator>> + Send + Sync>;

Note

I've also included the input_schema, as this would be necessary to evaluate types for the arguments.

This would be similar to the create_built_in_window_expr:

https://github.com/apache/arrow-datafusion/blob/a42cc8d98b6e875c485e7e9b106d30803a32b00a/datafusion/core/src/physical_plan/windows/mod.rs#L120-L125

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of passing the PhysicalExpr trait objects? Example:

I think this would be ideal, the problem is that PhysicalExpr is defined in datafusion-physical-expr which is not a dependency of datafusion-expr (the dependency goes the other way): https://github.com/apache/arrow-datafusion/blob/6194d588d5c3e9f202a31a0c524f63e6fb08d040/datafusion/physical-expr/Cargo.toml#L54

Thus, since WindowUDF is defined in datafusion-expr it can't depend on PhysicalExpr

datafusion_expr: https://github.com/apache/arrow-datafusion/blob/6194d588d5c3e9f202a31a0c524f63e6fb08d040/datafusion/expr/Cargo.toml#L37

Copy link
Contributor

@stuartcarnie stuartcarnie Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, of course!

I'd suggest we don't hold up this work and move this problem to another PR to solve it for both user-defined aggregate and window functions.

It works today, just that the update_batch feels a bit awkward, as the scalar argument is passed as an ArrayRef. We might be able to engineer it so that it isn't a breaking change in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual runtime passes ColumnarValue: https://docs.rs/datafusion/latest/datafusion/physical_plan/enum.ColumnarValue.html

Which is either a scalar or an array

We could potentially update the signatures to accept that instead maybe (though we would have to move it to the datafusion_expr crate)

@alamb
Copy link
Contributor Author

alamb commented Jun 13, 2023

I pushed d531d50 that refines the API somewhat and makes it possible to call evaluate and evaluate_with_bounds

My plan for this PR is to

  1. complete the example (mostly done)
  2. write tests for the various window function apis.
  3. Port all documentation changes upstream incremntally

Once we have settled on the final API (pending the feedback from @mustafasrepo and @metesynnada, on #6617 (comment)) I'll then make one or more PRs to master with that API and the examples / tests.

@ozankabak
Copy link
Contributor

We had a meeting on this today. @mustafasrepo and @metesynnada will update you with their progress soon (tomorrow possibly).

@doki23
Copy link
Contributor

doki23 commented Jun 14, 2023

Hence I think, we should either support for user to create custom BuiltInWindowFunctionExpr for each new WindowUDF (I think this is hard and more cumbersome on the user side) or we should move all the parameters that effects the decision (such as supports_bounded_execution, uses_window_frame, include_rank) to the evaluator and/or aggregator side. By this way user will have full control on the custom implementation.

How about moving supports_bounded_execution and uses_window_frame into the evaluator?

@mustafasrepo
Copy link
Contributor

In the PR I have implemented my proposals. As a summary, I think on top of current approach, if we combine evaluate and evaluate_stateful fields. User can accomplish desired behavior by setting appropriate flags and implementing corresponding evaluation (either evaluate or evaluate_all) method.

I will open the PR that unify evaluate and evaluate_stateful fields on the main repo once it is ready.

@alamb
Copy link
Contributor Author

alamb commented Jun 14, 2023

How about moving supports_bounded_execution and uses_window_frame into the evaluator?

I think this is a good idea @doki23, and I think it is broadly speaking what @mustafasrepo has proposed in alamb#12

@alamb
Copy link
Contributor Author

alamb commented Jun 14, 2023

In the alamb#12 I have implemented my proposals.

TLDR I really like the proposal. I left my comments here: alamb#12 (review)

@alamb
Copy link
Contributor Author

alamb commented Jun 21, 2023

Update here is that the final PR (that has all the learnings from this PR and various cleanups we did along the way) is ready for review: #6703

@alamb alamb closed this in #6703 Jun 22, 2023
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants