Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support User Defined Window Functions #6703

Merged
merged 9 commits into from
Jun 22, 2023
Merged

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jun 16, 2023

Which issue does this PR close?

Closes #5781
Closes #6617

Rationale for this change

See #5781

This is the final PR that integrates all the work we have done this week with @mustafarepo and others to support User Defined Window Functions

It includes the parts of #6617 that have not been merged already (connecting the window functions to the various registries), and adds tests.

I tried to keep the size of this PR as small as possible -- 2/3 of the change is tests or examples -- but it is still large and I apologize to reviewers.

What changes are included in this PR?

  1. Introduce WindowUDF and connect it through datafusion_expr and contexts
  2. Add simple_udwf.rs example
  3. Add tests in user_defined_functions.rs integration test

Are these changes tested?

yes

Are there any user-facing changes?

yes, new feature

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sql SQL Planner labels Jun 20, 2023
@alamb alamb added the api change Changes the API exposed to users of the crate label Jun 20, 2023
@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Jun 20, 2023
@alamb
Copy link
Contributor Author

alamb commented Jun 20, 2023

Ok, I think this PR is basically ready for review (once #6690 is merged).

cc @stuartcarnie

PartitionEvaluator, ReturnTypeFunction, Signature, Volatility, WindowUDF,
};

/// A query with a window function evaluated over the entire partition
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are the new tests


/// Basic stateful user defined window function
#[tokio::test]
async fn test_stateful_udwf() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't think of anything else to test with a stateful function other than update_state is called. I would be interested in anyone else's opinions in this matter

@github-actions github-actions bot removed the physical-expr Physical Expressions label Jun 21, 2023
// specific language governing permissions and limitations
// under the License.

/// Run all tests that are found in the `user_defined` directory
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the user defined tests now run as part of the same test binaries user_defined_integration

@@ -0,0 +1,213 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a user facing example of how to use User Defined Window Functions

@@ -206,6 +212,71 @@ fn create_built_in_window_expr(
})
}

/// Creates a `BuiltInWindowFunctionExpr` suitable for a user defined window function
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mirrors the code and structure for AggregateUDF

@alamb alamb marked this pull request as ready for review June 21, 2023 13:33
values: &[ArrayRef],
range: &std::ops::Range<usize>,
) -> Result<ScalarValue> {
//println!("evaluate_inside_range(). range: {range:#?}, values: {values:#?}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is leftover

WindowUDF {
name: String::from("smooth_it"),
// it will take 1 arguments -- the column to smooth
signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem to be effecting result, but I don't understand why signature takes DataType::Int32?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was the default type that the CSV parser picked, but apparently not:

❯ describe './datafusion/core/tests/data/cars.csv';
describe './datafusion/core/tests/data/cars.csv';
+-------------+-----------------------------+-------------+
| column_name | data_type                   | is_nullable |
+-------------+-----------------------------+-------------+
| car         | Utf8                        | YES         |
| speed       | Float64                     | YES         |
| time        | Timestamp(Nanosecond, None) | YES         |
+-------------+-----------------------------+-------------+
3 rows in set. Query took 0.031 seconds.

I'lll update this to take a Float instead.

@@ -218,6 +218,14 @@ impl DataFrame {
Ok(DataFrame::new(self.session_state, plan))
}

/// Apply one or more window functions ([`Expr::WindowFunction`]) to extend the schema
pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add an example usage to the docstring. such as in the pub fn aggregate. We can do so in following PRs also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call -- I tried to make one, and it turns out to be non trivial. I will do it in a follow on PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is what I was thinking: #6746 if you like it I will polish it up

@@ -92,13 +100,15 @@ impl TaskContext {
config.set(&k, &v)?;
}
let session_config = SessionConfig::from(config);
let window_functions = HashMap::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, since this API is deprecated anyway, you didn't update its arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. I also tried to lead people to SessionContext::task_ctx with another doc comment above.

Copy link
Contributor

@mustafasrepo mustafasrepo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb. I left some inline comments, related to comments, etc. Other than these, This PR is LGTM!. Thanks for this feature.

Copy link
Contributor Author

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the thorough review @mustafasrepo.

WindowUDF {
name: String::from("smooth_it"),
// it will take 1 arguments -- the column to smooth
signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was the default type that the CSV parser picked, but apparently not:

❯ describe './datafusion/core/tests/data/cars.csv';
describe './datafusion/core/tests/data/cars.csv';
+-------------+-----------------------------+-------------+
| column_name | data_type                   | is_nullable |
+-------------+-----------------------------+-------------+
| car         | Utf8                        | YES         |
| speed       | Float64                     | YES         |
| time        | Timestamp(Nanosecond, None) | YES         |
+-------------+-----------------------------+-------------+
3 rows in set. Query took 0.031 seconds.

I'lll update this to take a Float instead.

@@ -92,13 +100,15 @@ impl TaskContext {
config.set(&k, &v)?;
}
let session_config = SessionConfig::from(config);
let window_functions = HashMap::new();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. I also tried to lead people to SessionContext::task_ctx with another doc comment above.

datafusion/expr/src/expr_fn.rs Outdated Show resolved Hide resolved
@@ -218,6 +218,14 @@ impl DataFrame {
Ok(DataFrame::new(self.session_state, plan))
}

/// Apply one or more window functions ([`Expr::WindowFunction`]) to extend the schema
pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call -- I tried to make one, and it turns out to be non trivial. I will do it in a follow on PR

@alamb alamb merged commit b1b8c9c into apache:main Jun 22, 2023
20 checks passed
Copy link
Contributor

@stuartcarnie stuartcarnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff, @alamb – apologies, I didn't realise my comments were tied to a review, so they are late, but all just giving praise 👍🏻

@@ -57,6 +57,7 @@ cargo run --example csv_sql
- [`rewrite_expr.rs`](examples/rewrite_expr.rs): Define and invoke a custom Query Optimizer pass
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined (scalar) Function (UDF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These examples are 💯

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation in this example is excellent 👏🏻

Comment on lines +221 to +222
/// Apply one or more window functions ([`Expr::WindowFunction`]) to extend the schema
pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just looking for a similar API today – great to know it is coming

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stuartcarnie I wonder if you have any thoughts about the API here for making window functions: #6746 (it is somewhat complex at the moment)

@alamb alamb deleted the alamb/udfw_for_real branch June 23, 2023 14:57
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

User defined window functions
3 participants