Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add field trait method to WindowUDFImpl, remove return_type/nullable #12374

Merged
merged 55 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
ae69e1e
Adds new library `functions-window-common`
jcsherin Sep 3, 2024
f09383c
Adds `FieldArgs` struct for field of final result
jcsherin Sep 3, 2024
ca256fa
Adds `field` method to `WindowUDFImpl` trait
jcsherin Sep 3, 2024
9b21b7f
Minor: fixes formatting
jcsherin Sep 4, 2024
9324df6
Fixes: udwf doc test
jcsherin Sep 4, 2024
4cd1467
Fixes: implements missing trait items
jcsherin Sep 4, 2024
deff13c
Updates `datafusion-cli` dependencies
jcsherin Sep 4, 2024
ac5ef0a
Fixes: formatting of `Cargo.toml` files
jcsherin Sep 4, 2024
7c4293a
Fixes: implementation of `field` in udwf example
jcsherin Sep 4, 2024
83d833b
Pass `FieldArgs` argument to `field`
jcsherin Sep 4, 2024
927f1cc
Use `field` in place of `return_type` for udwf
jcsherin Sep 4, 2024
e29a493
Update `field` in udwf implementations
jcsherin Sep 4, 2024
79d8372
Merge branch 'main' into field-args-for-window-udf
jcsherin Sep 4, 2024
fc969e8
Fixes: implementation of `field` in udwf example
jcsherin Sep 4, 2024
df2b1c8
Revert unrelated change
jcsherin Sep 4, 2024
e26aba6
Mark `return_type` for udwf as unreachable
jcsherin Sep 4, 2024
258986d
Delete code
jcsherin Sep 4, 2024
ba9e39a
Uses schema name of udwf to construct `FieldArgs`
jcsherin Sep 5, 2024
b671b36
Adds deprecated notice to `return_type` trait method
jcsherin Sep 5, 2024
712263e
Add doc comments to `field` trait method
jcsherin Sep 5, 2024
9426978
Reify `input_types` when creating the udwf window expression
jcsherin Sep 5, 2024
327db32
Rename name field to `schema_name` in `FieldArgs`
jcsherin Sep 5, 2024
f5b62ef
Make `FieldArgs` opaque
jcsherin Sep 5, 2024
34565ff
Minor refactor
jcsherin Sep 5, 2024
ede7827
Removes `nullable` trait method from `WindowUDFImpl`
jcsherin Sep 5, 2024
25f3b50
Add doc comments
jcsherin Sep 5, 2024
f012bdd
Rename to `WindowUDFResultArgs`
jcsherin Sep 5, 2024
0e8795f
Minor: fixes formatting
jcsherin Sep 5, 2024
4bd77d5
Copy edits for doc comments
jcsherin Sep 6, 2024
a0f2ccb
Renames field to `function_name`
jcsherin Sep 6, 2024
d0d429c
Rename struct to `WindowUDFFieldArgs`
jcsherin Sep 6, 2024
7bceb9e
Add comments for unreachable code
jcsherin Sep 6, 2024
9e04fd8
Copy edit for `WindowUDFImpl::field` trait method
jcsherin Sep 6, 2024
1e8c300
Renames module
jcsherin Sep 6, 2024
efbad36
Fix warning: unused doc comment
jcsherin Sep 6, 2024
7c445e1
Minor: rename bindings
jcsherin Sep 6, 2024
7ea3ce8
Merge branch 'main' into field-args-for-window-udf
jcsherin Sep 6, 2024
17da204
Minor refactor
jcsherin Sep 6, 2024
32bdf36
Minor: copy edit
jcsherin Sep 7, 2024
acfa03e
Merge branch 'main' into field-args-for-window-udf
jcsherin Sep 7, 2024
a47b7a0
Fixes: use `Expr::qualified_name` for window function name
jcsherin Sep 7, 2024
8da1364
Fixes: apply previous fix to `Expr::nullable`
jcsherin Sep 7, 2024
9413a58
Merge branch 'main' into field-args-for-window-udf
jcsherin Sep 12, 2024
275daea
Refactor: reuse type coercion for window functions
jcsherin Sep 14, 2024
4c7157d
Merge branch 'main' into field-args-for-window-udf
jcsherin Sep 16, 2024
045d352
Fixes: clippy errors
jcsherin Sep 16, 2024
5cc7d06
Adds name parameter to `WindowFunctionDefinition::return_type`
jcsherin Sep 17, 2024
0102334
Removes `return_type` field from `SimpleWindowUDF`
jcsherin Sep 17, 2024
4bb799d
Add doc comment for helper method
jcsherin Sep 17, 2024
2530169
Rewrite doc comments
jcsherin Sep 17, 2024
25b299e
Minor: remove empty comment
jcsherin Sep 17, 2024
ce7df3a
Remove `WindowUDFImpl::return_type`
jcsherin Sep 17, 2024
bb57f8f
Fixes doc test
jcsherin Sep 17, 2024
e6a7113
Merge branch 'main' into field-args-for-window-udf
jcsherin Sep 18, 2024
c627403
Merge branch 'main' into field-args-for-window-udf
jcsherin Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ members = [
"datafusion/functions-aggregate-common",
"datafusion/functions-nested",
"datafusion/functions-window",
"datafusion/functions-window-common",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/physical-expr-common",
Expand Down Expand Up @@ -103,6 +104,7 @@ datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", vers
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "41.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "41.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "41.0.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "41.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "41.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "41.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "41.0.0", default-features = false }
Expand Down
10 changes: 10 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use arrow::{
array::{ArrayRef, AsArray, Float64Array},
datatypes::Float64Type,
};
use arrow_schema::Field;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::ScalarValue;
use datafusion_expr::function::WindowUDFFieldArgs;
use datafusion_expr::{
PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
};
Expand Down Expand Up @@ -80,6 +82,10 @@ impl WindowUDFImpl for SmoothItUdf {
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(MyPartitionEvaluator::new()))
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(field_args.name(), DataType::Float64, true))
}
}

/// This implements the lowest level evaluation for a window function
Expand Down
8 changes: 6 additions & 2 deletions datafusion-examples/examples/simplify_udwf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

use std::any::Any;

use arrow_schema::DataType;
use arrow_schema::{DataType, Field};

use datafusion::execution::context::SessionContext;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::{error::Result, execution::options::CsvReadOptions};
use datafusion_expr::function::WindowFunctionSimplification;
use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs};
use datafusion_expr::{
expr::WindowFunction, simplify::SimplifyInfo, Expr, PartitionEvaluator, Signature,
Volatility, WindowUDF, WindowUDFImpl,
Expand Down Expand Up @@ -84,6 +84,10 @@ impl WindowUDFImpl for SimplifySmoothItUdf {

Some(Box::new(simplify))
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(field_args.name(), DataType::Float64, true))
}
}

// create local execution context with `cars.csv` registered as a table named `cars`
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ bigdecimal = { workspace = true }
criterion = { version = "0.5", features = ["async_tokio"] }
csv = "1.1.6"
ctor = { workspace = true }
datafusion-functions-window-common = { workspace = true }
doc-comment = { workspace = true }
env_logger = { workspace = true }
half = { workspace = true, default-features = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ use std::{

use arrow::array::AsArray;
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
use arrow_schema::DataType;
use arrow_schema::{DataType, Field};
use datafusion::{assert_batches_eq, prelude::SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
PartitionEvaluator, Signature, Volatility, WindowUDF, WindowUDFImpl,
};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;

/// A query with a window function evaluated over the entire partition
const UNBOUNDED_WINDOW_QUERY: &str = "SELECT x, y, val, \
Expand Down Expand Up @@ -565,6 +566,14 @@ impl OddCounter {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(
field_args.name(),
self.return_type.clone(),
true,
))
}
}

ctx.register_udwf(WindowUDF::from(SimpleWindowUDF::new(test_state)))
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ chrono = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr-common = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
paste = "^1.0"
serde_json = { workspace = true }
Expand Down
7 changes: 6 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,12 @@ impl WindowFunctionDefinition {
WindowFunctionDefinition::AggregateUDF(fun) => {
fun.return_type(input_expr_types)
}
WindowFunctionDefinition::WindowUDF(fun) => fun.return_type(input_expr_types),
WindowFunctionDefinition::WindowUDF(_) => {
// To get the return data type of the result from
// evaluating the user-defined window function instead
// use the `WindowUDF::field` trait method.
unreachable!()
}
jcsherin marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
9 changes: 9 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use arrow::compute::kernels::cast_utils::{
};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{plan_err, Column, Result, ScalarValue, TableReference};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use sqlparser::ast::NullTreatment;
use std::any::Any;
use std::fmt::Debug;
Expand Down Expand Up @@ -665,6 +666,14 @@ impl WindowUDFImpl for SimpleWindowUDF {
fn partition_evaluator(&self) -> Result<Box<dyn crate::PartitionEvaluator>> {
(self.partition_evaluator_factory)()
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(
field_args.name(),
self.return_type.clone(),
true,
))
}
}

pub fn interval_year_month_lit(value: &str) -> Expr {
Expand Down
23 changes: 20 additions & 3 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion_common::{
not_impl_err, plan_datafusion_err, plan_err, Column, ExprSchema, Result,
TableReference,
};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -204,7 +205,12 @@ impl ExprSchemable for Expr {
)
)
})?;
Ok(fun.return_type(&new_types, &nullability)?)
let function_name = self.schema_name().to_string();
let field_args =
WindowUDFFieldArgs::new(&new_types, &function_name);

udwf.field(field_args)
.map(|field| field.data_type().clone())
jcsherin marked this conversation as resolved.
Show resolved Hide resolved
}
_ => fun.return_type(&data_types, &nullability),
}
Expand Down Expand Up @@ -340,7 +346,7 @@ impl ExprSchemable for Expr {
Expr::AggregateFunction(AggregateFunction { func, .. }) => {
Ok(func.is_nullable())
}
Expr::WindowFunction(WindowFunction { fun, .. }) => match fun {
Expr::WindowFunction(WindowFunction { fun, args, .. }) => match fun {
WindowFunctionDefinition::BuiltInWindowFunction(func) => {
if func.name() == "RANK"
|| func.name() == "NTILE"
Expand All @@ -352,7 +358,18 @@ impl ExprSchemable for Expr {
}
}
WindowFunctionDefinition::AggregateUDF(func) => Ok(func.is_nullable()),
WindowFunctionDefinition::WindowUDF(udwf) => Ok(udwf.nullable()),
WindowFunctionDefinition::WindowUDF(udwf) => {
let data_types = args
.iter()
.map(|e| e.get_type(input_schema))
.collect::<Result<Vec<_>>>()?;
let input_types = data_types_with_window_udf(&data_types, udwf)?;
Copy link
Contributor Author

@jcsherin jcsherin Sep 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I copied the call to data_types_with_window_udf: type-coercion performed for window function arguments from Expr::get_type (above).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should handle this compute in to_field. data_types_with_window_udf could reuse to_field. data_types and nullable could reuse data_types_with_window_udf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'll update the PR with the suggested change.

let function_name = self.schema_name().to_string();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to self.qualified_name() in to_field, we might have different name for Column and Alias?

let field_args =
WindowUDFFieldArgs::new(&input_types, &function_name);

udwf.field(field_args).map(|field| field.is_nullable())
}
jcsherin marked this conversation as resolved.
Show resolved Hide resolved
},
Expr::ScalarVariable(_, _)
| Expr::TryCast { .. }
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub use datafusion_functions_aggregate_common::accumulator::{
AccumulatorArgs, AccumulatorFactoryFunction, StateFieldsArgs,
};

pub use datafusion_functions_window_common::field::WindowUDFFieldArgs;

#[derive(Debug, Clone, Copy)]
pub enum Hint {
/// Indicates the argument needs to be padded if it is scalar
Expand Down
45 changes: 29 additions & 16 deletions datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ use std::{
sync::Arc,
};

use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, Field};

use datafusion_common::{not_impl_err, Result};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;

use crate::expr::WindowFunction;
use crate::{
Expand Down Expand Up @@ -162,6 +163,7 @@ impl WindowUDF {
/// Return the type of the function given its input types
///
/// See [`WindowUDFImpl::return_type`] for more details.
#[allow(deprecated)]
pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
self.inner.return_type(args)
}
Expand All @@ -178,11 +180,11 @@ impl WindowUDF {
self.inner.partition_evaluator()
}

/// Returns if column values are nullable for this window function.
/// Returns the field of the final result of evaluating this window function.
///
/// See [`WindowUDFImpl::nullable`] for more details.
pub fn nullable(&self) -> bool {
self.inner.nullable()
Comment on lines -161 to -165
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed nullable

/// See [`WindowUDFImpl::field`] for more details.
pub fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
self.inner.field(field_args)
}

/// Returns custom result ordering introduced by this window function
Expand Down Expand Up @@ -221,10 +223,11 @@ where
/// # Basic Example
/// ```
/// # use std::any::Any;
/// # use arrow::datatypes::DataType;
/// # use arrow::datatypes::{DataType, Field};
/// # use datafusion_common::{DataFusionError, plan_err, Result};
/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt};
/// # use datafusion_expr::{WindowUDFImpl, WindowUDF};
/// use datafusion_functions_window_common::field::WindowUDFFieldArgs;
/// #[derive(Debug, Clone)]
/// struct SmoothIt {
/// signature: Signature
Expand All @@ -251,6 +254,7 @@ where
/// }
/// // The actual implementation would add one to the argument
/// fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> { unimplemented!() }
/// fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> { unimplemented!() }
/// }
///
/// // Create a new WindowUDF from the implementation
Expand Down Expand Up @@ -281,6 +285,10 @@ pub trait WindowUDFImpl: Debug + Send + Sync {

/// What [`DataType`] will be returned by this function, given the types of
/// the arguments
#[deprecated(
since = "41.0.0",
note = "Use `field` instead to define the final result of evaluating this user-defined window function."
)]
jcsherin marked this conversation as resolved.
Show resolved Hide resolved
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;

/// Invoke the function, returning the [`PartitionEvaluator`] instance
Expand Down Expand Up @@ -344,14 +352,8 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
hasher.finish()
}

/// Allows customizing nullable of column for this window UDF.
///
/// By default, the final result of evaluating the window UDF is
/// allowed to have null values. But if that is not the case then
/// it can be customized in the window UDF implementation.
fn nullable(&self) -> bool {
true
}
/// The [`Field`] of the final result of evaluating this window function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be useful to document here how the "name" for the returned field is supposed to be set :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. It's a great suggestion. I'll implement in a follow-up PR.

Thanks @Blizzara

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field>;

/// Allows the window UDF to define a custom result ordering.
///
Expand Down Expand Up @@ -419,6 +421,7 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
self.inner.signature()
}

#[allow(deprecated)]
jcsherin marked this conversation as resolved.
Show resolved Hide resolved
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
self.inner.return_type(arg_types)
}
Expand Down Expand Up @@ -450,8 +453,8 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
hasher.finish()
}

fn nullable(&self) -> bool {
self.inner.nullable()
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
self.inner.field(field_args)
}

fn sort_options(&self) -> Option<SortOptions> {
Expand Down Expand Up @@ -510,4 +513,14 @@ impl WindowUDFImpl for WindowUDFLegacyWrapper {
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
(self.partition_evaluator_factory)()
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
let return_type = (self.return_type)(field_args.input_types())?;

Ok(Field::new(
field_args.name(),
return_type.as_ref().clone(),
true,
))
}
}
Loading
Loading