Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce expr builder for aggregate function #10560

Merged
merged 12 commits into from
Jun 9, 2024

Conversation

jayzhan211
Copy link
Contributor

Which issue does this PR close?

Closes #10545.

Rationale for this change

After this PR, there are two kinds of expression API.
One is the normal one like count(expr), convenient for expression that has few arguments.
Another is builder mode, useful for expressions that expects multiple kind of arguments, like first_value_builder(expr).order_by().build()

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

create_func!($UDAF, $AGGREGATE_UDF_FN);
};
($UDAF:ty, $EXPR_FN:ident, $($arg:ident)*, $distinct:ident, $DOC:expr, $AGGREGATE_UDF_FN:ident) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to introduce another macro for distinct, so we have count_distinct, count_distinct_builder expr::fn.

I change to order_by because first_value needs it. Also, I change the expression for first_value to single expression, since it does not expect variadic args.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of count_distinct_builder what do you think about adding a distinct type method instead?

Like

let agg = count_builder()
  .args(col("a"))
  .distinct()
  .build()?

🤔

@jayzhan211
Copy link
Contributor Author

I plan to add distinct macro in the follow up PR for count.

@jayzhan211 jayzhan211 changed the title introduce expr builder for aggregate function Introduce expr builder for aggregate function May 17, 2024
@jayzhan211 jayzhan211 marked this pull request as ready for review May 17, 2024 14:29
@alamb
Copy link
Contributor

alamb commented May 17, 2024

This looks super cool @jayzhan211

@@ -31,8 +31,10 @@ use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::FunctionRegistry;
use datafusion::functions_aggregate::covariance::{covar_pop, covar_samp};
use datafusion::functions_aggregate::expr_fn::first_value;
use datafusion::functions_aggregate::expr_fn::{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some documentation with examples somewhere? That would both serve to help document and test this feature.

But thank you @jayzhan211 -- this is looking very cool

covar_pop(lit(1.5), lit(2.2)),
covar_pop_builder(lit(1.5), lit(2.3)).build(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think roundtrip tests also cover the testing for these expression APIs.

I expect we add all the functions here for test, and only little functions in datafusion-example.


| Syntax | Equivalent to |
| ------------------------------------------------------ | ----------------------------------- |
| first_value_builder(expr).order_by(vec![expr]).build() | first_value(expr, Some(vec![expr])) |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add non-trivial examples in doc, but not all of them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think adding examples in doc comments is likely the best for this particular API as people will be looking at the code already if they want to programatically create Exprs

@jayzhan211 jayzhan211 requested a review from alamb May 22, 2024 00:01
@alamb
Copy link
Contributor

alamb commented May 22, 2024

THanks @jayzhan211 -- I will plan to review this tomorrow.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jayzhan211 -- this is looking really cool. I have some feeback on the API design

One major thing I think that might be worth considering is now to get one of these builders. Right now they are implemented as count_builder free functions, which means both adding many new functions as well as that they won't work for user defined aggregate functions

I wonder if it would be possible to add something to the AggregateUDF instead maybe make an extension trait so people could make a builder for any arbitrary aggregate expression,

So today to call a UDF you do

  // Given an aggregate UDF:
  let agg_udf: AggregateUdf = todo!();
  // agg(a)
  let expr = agg_udf.call(col("a"))

Could we implement a builder that works like

  // agg(a ORDER BY b)
  let expr = agg_udf.builder()
    .args(col("a"))
    .order_by("b")
    .build()

Or I potentailly keep the "call" syntax:

  // agg(a ORDER BY b)
  let expr = agg_udf.builder()
    .order_by(col("b"))
    .call(col("a"))
    .build()

What do you think?

datafusion-examples/examples/udaf_expr.rs Outdated Show resolved Hide resolved
use datafusion_expr::{expr::AggregateFunction, Expr};
use sqlparser::ast::NullTreatment;

/// Builder for creating an aggregate function expression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest:

  1. Add a docstring example here
  2. Consider putting this in datafusion/expr (not datafusion-functions-aggregate) -- if we put it in datafusion-functions-aggregate people can't use it unless they use the built in aggregates of DataFusion 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we put it in datafusion-functions-aggregate people can't use it unless they use the built in aggregates of DataFusion

Nice! I forgot this

create_func!($UDAF, $AGGREGATE_UDF_FN);
};
($UDAF:ty, $EXPR_FN:ident, $($arg:ident)*, $distinct:ident, $DOC:expr, $AGGREGATE_UDF_FN:ident) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of count_distinct_builder what do you think about adding a distinct type method instead?

Like

let agg = count_builder()
  .args(col("a"))
  .distinct()
  .build()?

🤔

@@ -39,6 +39,7 @@ path = "src/lib.rs"

[dependencies]
arrow = { workspace = true }
concat-idents = "1.1.5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to use https://docs.rs/paste/latest/paste/ (which is already a dependency) instead? I think that is what binary expression evaluation uses 🤔


| Syntax | Equivalent to |
| ------------------------------------------------------ | ----------------------------------- |
| first_value_builder(expr).order_by(vec![expr]).build() | first_value(expr, Some(vec![expr])) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think adding examples in doc comments is likely the best for this particular API as people will be looking at the code already if they want to programatically create Exprs

/// Builder for creating an aggregate function expression
///
/// Has the same arguments from [AggregateFunction]
pub struct ExprBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should call it AggBuilder 🤔 or AggregeExprBuilder 🤔

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented May 23, 2024

which means both adding many new functions as well as that they won't work for user defined aggregate functions

I think they can build the function with macro in function-aggregate, so they can have their function with ExprBuilder extension. But then I need to move the macro to datafusion-expr 🤔

maybe make an extension trait

It could be a good idea

@jayzhan211 jayzhan211 marked this pull request as draft May 23, 2024 05:15
@jayzhan211
Copy link
Contributor Author

Maybe we can just extend function for AggregateFunction 🤔

pub fn call(&self, args: Vec<Expr>) -> AggregateFunction

impl AggregateFunction {
  fn build() -> Expr
}

@github-actions github-actions bot added logical-expr Logical plan and expressions sqllogictest SQL Logic Tests (.slt) labels May 23, 2024
@jayzhan211 jayzhan211 marked this pull request as ready for review May 23, 2024 12:55
@github-actions github-actions bot removed the sqllogictest SQL Logic Tests (.slt) label May 26, 2024
@jayzhan211 jayzhan211 marked this pull request as draft May 27, 2024 11:58
@github-actions github-actions bot added the optimizer Optimizer rules label May 27, 2024
@jayzhan211 jayzhan211 marked this pull request as ready for review May 27, 2024 13:11
@jayzhan211 jayzhan211 requested a review from alamb May 30, 2024 12:35
@@ -412,7 +412,7 @@ async fn main() -> Result<()> {
let df = ctx.table("t").await?;

// perform the aggregation
let df = df.aggregate(vec![], vec![geometric_mean.call(vec![col("a")])])?;
let df = df.aggregate(vec![], vec![geometric_mean.call(vec![col("a")]).build()])?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in reviewing this @jayzhan211 -- I have been thinking about this PR and I am struggling

On one hand, having a builder allows for exposing the ability to provide other types of arguments to aggregate functions (e.g. ORDER BY or FILTER)

On the other hand, I feel like this API change (requiring a call to build() after geometric_mean.call makes DataFusion harder to learn / use as most calls to aggregate functions are made with just the arguments.

I was thinking about what an ideal user experience would be. What do you think about something that permits using the existing expr_fn API, but also allows easier construction of more advanced aggregate calls

What about something like this (as today, no need to call build)

    // geometric_mean(a)
    let agg = geometric_mean.call(vec![col("a")]);
    let df = df.aggregate(vec![], vec![agg])?;

And then then to create an ORDER BY the user could create and use a builder like this:

    // geometric_mean(a FILTER b > c)
    let agg = geometric_mean.call(vec![col("a")])
      .agg_builder() // creates a builder that wraps an Expr, no error return
      .filter(col("b").gt(col("c"((
      .build()?; // check here if everything was ok

    let df = df.aggregate(vec![], vec![agg])?;

We could probably implement a trait for Expr to add agg_builder (or maybe just add it directly)

Copy link
Contributor Author

@jayzhan211 jayzhan211 Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me to have both style for function call.

We can also have builder for expr_fn API
expr_fn::first_value(args).agg_builder().filter().builder()

Copy link
Contributor Author

@jayzhan211 jayzhan211 Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that we don't even need builder() with trait impl

Copy link
Contributor Author

@jayzhan211 jayzhan211 Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// geometric_mean(a FILTER b > c)
let agg = geometric_mean.call(vec![col("a")])
      .filter(col("b").gt(col("c")))

Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
@alamb
Copy link
Contributor

alamb commented Jun 7, 2024

I'm not sure if we really need the error check since we don't have any now

I guess what I was thinking is that the reason there is no check now is because the rust compiler will ensure the types are correct (specifically that they are AggregateUDF.

With the builder API as it is written now, you can call order_by on any arbitrary Expr (not just the appropriate Expr::AggregateFunction variant)

So I was imagining if any of the methods had been called on an invalid Expr, build() would return an error

Maybe something like

/// Traits to help build aggregate functions
trait AggregateExt {
    // return a builder
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder;
    fn filter(self, filter: Box<Expr>) -> AggregateBuilder;
    fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder;
    fn distinct(self) -> AggregateBuilder;
}

pub struct AggregateBuilder {
  // if set to none, builder errors
  udf: Option<Arc<AggregateUdf>>,
  order_by: Vec<Expr>,
....
}

impl AggregateBuilder {
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder {
      self.order_by = order_by;
      self
    }
    fn filter(self, filter: Box<Expr>) -> AggregateBuilder {..}
    fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder {..}
    fn distinct(self) -> AggregateBuilder {..}
   // builds up any in progress aggregate
    fn build(self) -> Result<Expr> {
      let Some(udf) = self.udf else {
        return plan_err!("Expr of type XXX is not an aggregate")
      } 
      udf.order_by = self.order_by;
      ...
      Ok(Expr::AggregateFunction(udf))
  }
}

impl AggregateExt for Expr {
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder {
        match self {
            Expr::AggregateFunction(mut udaf) => {
                AggregateBuilder { udf: Some(udaf) }
            }
            // wrong type passed -- error when build is called
            _ => {
                AggregateBuilder { udf: None }
            }
        }
    }

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Jun 7, 2024

I'm not sure if we really need the error check since we don't have any now

I guess what I was thinking is that the reason there is no check now is because the rust compiler will ensure the types are correct (specifically that they are AggregateUDF.

With the builder API as it is written now, you can call order_by on any arbitrary Expr (not just the appropriate Expr::AggregateFunction variant)

So I was imagining if any of the methods had been called on an invalid Expr, build() would return an error

Maybe something like

/// Traits to help build aggregate functions
trait AggregateExt {
    // return a builder
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder;
    fn filter(self, filter: Box<Expr>) -> AggregateBuilder;
    fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder;
    fn distinct(self) -> AggregateBuilder;
}

pub struct AggregateBuilder {
  // if set to none, builder errors
  udf: Option<Arc<AggregateUdf>>,
  order_by: Vec<Expr>,
....
}

impl AggregateBuilder {
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder {
      self.order_by = order_by;
      self
    }
    fn filter(self, filter: Box<Expr>) -> AggregateBuilder {..}
    fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder {..}
    fn distinct(self) -> AggregateBuilder {..}
   // builds up any in progress aggregate
    fn build(self) -> Result<Expr> {
      let Some(udf) = self.udf else {
        return plan_err!("Expr of type XXX is not an aggregate")
      } 
      udf.order_by = self.order_by;
      ...
      Ok(Expr::AggregateFunction(udf))
  }
}

impl AggregateExt for Expr {
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder {
        match self {
            Expr::AggregateFunction(mut udaf) => {
                AggregateBuilder { udf: Some(udaf) }
            }
            // wrong type passed -- error when build is called
            _ => {
                AggregateBuilder { udf: None }
            }
        }
    }

I'm not sure is it worth to introduce build() just for handling non Expr::AggregateFunction cases 🤔

Signed-off-by: jayzhan211 <[email protected]>
.call(vec![expression])
.order_by(order_by)
.build()
.unwrap()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine to unwrap since udaf.call() is guaranteed to be Expr::AggregateFunction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree

Suggested change
.unwrap()
// guaranteed to be `Expr::AggregateFunction`
.unwrap()

Signed-off-by: jayzhan211 <[email protected]>
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks great @jayzhan211 🙏

I have some ideas about additional comments / documentation that I would be happy to help add -- but I think we could do it as a follow on PR as well

.call(vec![expression])
.order_by(order_by)
.build()
.unwrap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree

Suggested change
.unwrap()
// guaranteed to be `Expr::AggregateFunction`
.unwrap()

Signed-off-by: jayzhan211 <[email protected]>
@jayzhan211
Copy link
Contributor Author

I have some ideas about additional comments / documentation that I would be happy to help add

Sure!

@github-actions github-actions bot added the core Core DataFusion crate label Jun 8, 2024
@alamb
Copy link
Contributor

alamb commented Jun 8, 2024

I have some ideas about additional comments / documentation that I would be happy to help add

Sure!

Thank you for your patience @jayzhan211 -- I just pushed a bunch of docs and tests (and a small API refinement):

  1. Consolidated the example into the expr_api.rs examples
  2. Simplified the api for filter from filter(Box<Expr>) to just filter(Expr)
  3. Added documentation and examples to the trait
  4. Added tests
  5. Checked for SortExprs in order_by

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am quite biased, but I think this API now looks really nice and hopefully is both easy to use as well as serve as a model for WindowFunctions

alamb and others added 2 commits June 8, 2024 14:51
Signed-off-by: jayzhan211 <[email protected]>
@jayzhan211
Copy link
Contributor Author

It looks pretty nice now! Thanks @alamb

@jayzhan211 jayzhan211 merged commit 24a0846 into apache:main Jun 9, 2024
24 checks passed
@jayzhan211 jayzhan211 deleted the expr-builder branch June 9, 2024 05:49
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
* expr builder

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* build

Signed-off-by: jayzhan211 <[email protected]>

* upd user-guide

Signed-off-by: jayzhan211 <[email protected]>

* fix builder

Signed-off-by: jayzhan211 <[email protected]>

* Consolidate example in udaf_expr.rs, simplify filter API

* Add doc strings and examples

* Add tests and checks

* Improve documentation more

* fixup

* rm spce

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Michael-J-Ward added a commit to Michael-J-Ward/datafusion-python that referenced this pull request Jul 24, 2024
Upstream signatures were changed for the new new `AggregateBuilder` api [0].

This simply gets the code to work. We should better incorporate that API into `datafusion-python`.

[0] apache/datafusion#10560
Michael-J-Ward added a commit to Michael-J-Ward/datafusion-python that referenced this pull request Jul 24, 2024
Upstream signatures were changed for the new new `AggregateBuilder` api [0].

This simply gets the code to work. We should better incorporate that API into `datafusion-python`.

[0] apache/datafusion#10560
Michael-J-Ward added a commit to Michael-J-Ward/datafusion-python that referenced this pull request Jul 25, 2024
Upstream signatures were changed for the new new `AggregateBuilder` api [0].

This simply gets the code to work. We should better incorporate that API into `datafusion-python`.

[0] apache/datafusion#10560
andygrove pushed a commit to apache/datafusion-python that referenced this pull request Jul 31, 2024
* chore: update datafusion deps

* feat: impl ExecutionPlan::static_name() for DatasetExec

This required trait method was added upstream [0] and recommends to simply forward to `static_name`.

[0]: apache/datafusion#10266

* feat: update first_value and last_value wrappers.

Upstream signatures were changed for the new new `AggregateBuilder` api [0].

This simply gets the code to work. We should better incorporate that API into `datafusion-python`.

[0] apache/datafusion#10560

* migrate count to UDAF

Builtin Count was removed upstream.

TBD whether we want to re-implement `count_star` with new API.

Ref: apache/datafusion#10893

* migrate approx_percentile_cont, approx_distinct, and approx_median to UDAF

Ref: approx_distinct apache/datafusion#10851
Ref: approx_median apache/datafusion#10840
Ref: approx_percentile_cont and _with_weight apache/datafusion#10917

* migrate avg to UDAF

Ref: apache/datafusion#10964

* migrage corr to UDAF

Ref: apache/datafusion#10884

* migrate grouping to UDAF

Ref: apache/datafusion#10906

* add alias `mean` for UDAF `avg`

* migrate stddev to UDAF

Ref: apache/datafusion#10827

* remove rust alias for stddev

The python wrapper now provides stddev_samp alias.

* migrage var_pop to UDAF

Ref: apache/datafusion#10836

* migrate regr_* functions to UDAF

Ref: apache/datafusion#10898

* migrate bitwise functions to UDAF

The functions now take a single expression instead of a Vec<_>.

Ref: apache/datafusion#10930

* add missing variants for ScalarValue with todo

* fix typo in approx_percentile_cont

* add distinct arg to count

* comment out failing test

`approx_percentile_cont` is now returning a DoubleArray instead of an IntArray.

This may be a bug upstream; it requires further investigation.

* update tests to expect lowercase `sum` in query plans

This was changed upstream.

Ref: apache/datafusion#10831

* update ScalarType data_type map

* add docs dependency pickleshare

* re-implement count_star

* lint: ruff python lint

* lint: rust cargo fmt

* include name of window function in error for find_window_fn

* refactor `find_window_fn` for debug clarity

* search default aggregate functions by both name and aliases

The alias list no longer includes the name of the function.

Ref: apache/datafusion#10658

* fix markdown in find_window_fn docs

* parameterize test_window_functions

`first_value` and `last_value` are currently failing and marked as xfail.

* add test ids to test_simple_select tests marked xfail

* update find_window_fn to search built-ins first

The behavior of `first_value` and `last_value` UDAFs currently does not match the built-in behavior.
This allowed me to remove `marks=pytest.xfail` from the window tests.

* improve first_call and last_call use of the builder API

* remove trailing todos

* fix examples/substrait.py

* chore: remove explicit aliases from functions.rs

Ref: #779

* remove `array_fn!` aliases

* remove alias rules for `expr_fn_vec!`

* remove alias rules from `expr_fn!` macro

* remove unnecessary pyo3 var-arg signatures in functions.rs

* remove pyo3 signatures that provided defaults for first_value and last_value

* parametrize test_string_functions

* test regr_ function wrappers

Closes #778
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AggregateUDF expression API design
2 participants