-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Sum UDAF #10651
Introduce Sum UDAF #10651
Conversation
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me @jayzhan211 -- thank you. I left some small comments, but overall I think it looks really nice.
@@ -142,7 +142,7 @@ async fn test_udaf_as_window_with_frame_without_retract_batch() { | |||
let sql = "SELECT time_sum(time) OVER(ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as time_sum from t"; | |||
// Note if this query ever does start working | |||
let err = execute(&ctx, sql).await.unwrap_err(); | |||
assert_contains!(err.to_string(), "This feature is not implemented: Aggregate can not be used as a sliding accumulator because `retract_batch` is not implemented: AggregateUDF { inner: AggregateUDF { name: \"time_sum\", signature: Signature { type_signature: Exact([Timestamp(Nanosecond, None)]), volatility: Immutable }, fun: \"<FUNC>\" } }(t.time) ORDER BY [t.time ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING"); | |||
assert_contains!(err.to_string(), "This feature is not implemented: Aggregate can not be used as a sliding accumulator because `retract_batch` is not implemented: time_sum(t.time) ORDER BY [t.time ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is certainly nicer
|
||
/// Coerce arguments of a function call to types that the function can evaluate. | ||
/// | ||
/// This function is only called if [`AggregateUDFImpl::signature`] returns [`crate::TypeSignature::UserDefined`]. Most |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -389,6 +410,13 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { | |||
&[] | |||
} | |||
|
|||
fn create_sliding_accumulator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we please add some documentation here explaining what this is?
Basically I think this is to allow returning a different Accumulator
instance that is optimized for sliding windows (e.g. incrementally computing output via https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.Accumulator.html#method.retract_batch
@@ -459,7 +515,7 @@ pub enum ReversedUDAF { | |||
/// The expression does not support reverse calculation, like ArrayAgg | |||
NotSupported, | |||
/// The expression is different from the original expression | |||
Reversed(Arc<dyn AggregateUDFImpl>), | |||
Reversed(Arc<AggregateUDF>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Signed-off-by: jayzhan211 <[email protected]>
if fun.to_string() == "first_value" || fun.to_string() == "last_value" { | ||
assert_eq!(fun.to_string(), name); | ||
} else { | ||
assert_eq!(fun.to_string(), name.to_uppercase()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, maybe we should treat udf names case insensitive way. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#10695 track the issue to rename name
to lowercase
datafusion/expr/src/expr_schema.rs
Outdated
let data_types = args | ||
.iter() | ||
.map(|e| e.get_type(schema)) | ||
.collect::<Result<Vec<_>>>()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this section?. It seems that data_types
is already calculated at the outer scope.
WindowFunctionDefinition::AggregateUDF(udf) => { | ||
let new_types = data_types_with_aggregate_udf(&data_types, udf).map_err(|err| { | ||
plan_datafusion_err!( | ||
"{} and {}", | ||
err, | ||
utils::generate_signature_error_msg( | ||
fun.name(), | ||
fun.signature().clone(), | ||
&data_types | ||
) | ||
) | ||
})?; | ||
Ok(fun.return_type(&new_types)?) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should bury this check, and conversion inside to the fun.return_type
implementation for WindowFunctionDefinition::AggregateUDF
not sure though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer separate coerce_types
and return_types
given the difference between these two
let new_types = data_types_with_aggregate_udf(&data_types, fun).map_err(|err| { | ||
plan_datafusion_err!( | ||
"{} and {}", | ||
err, | ||
utils::generate_signature_error_msg( | ||
fun.name(), | ||
fun.signature().clone(), | ||
&data_types | ||
) | ||
) | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment above applies here
} | ||
|
||
fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { | ||
(!self.ordering_req.is_empty()).then_some(&self.ordering_req) | ||
if self.fun.has_ordering_requirements() && !self.ordering_req.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if has_ordering_requirements
is not introduced in this PR. I think, with the order_sensitivity
API. This API is redundant. I think, it is better to remove this API (in this Pr or in subsequent ones).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. We can returns None for AggregateOrderSensitivity::Insensitive
, ordering_req
if others
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change the default order sensitivity to AggregateOrderSensitivity::Insensitive. For example, I think Sum would expect AggregateOrderSensitivity::Insensitive
. And, probably only first/last
, nth value
and agg_order
would expect other kinds of AggregateOrderSensitivity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that, for most of the aggregate functions, AggregateOrderSensitivity::Insensitive
is the correct behavior. However, I think the safest default choice is AggregateOrderSensitivity::HardRequirement
. Also, as long as there is no requirement, default is not important. Hence, I think we can be strict in this choice.
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!. Thanks @jayzhan211 for this PR.
🚀 |
Are we tracking the follow on work (to remove the built in sum) anywhere? |
(AggregateFunction::Sum, _) => { | ||
return internal_err!("Builtin Sum will be removed"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jayzhan211! I saw @alamb already asked the same question here last week. I'd like to follow-up and check if you have any plans to remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@appletreeisyellow You can checkout main branch it should be removed in #10831
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayzhan211 Thank you so much! 💯
* move accumulate Signed-off-by: jayzhan211 <[email protected]> * move prim_op Signed-off-by: jayzhan211 <[email protected]> * move test to slt Signed-off-by: jayzhan211 <[email protected]> * remove sum distinct Signed-off-by: jayzhan211 <[email protected]> * move sum aggregate Signed-off-by: jayzhan211 <[email protected]> * fix args Signed-off-by: jayzhan211 <[email protected]> * add sum Signed-off-by: jayzhan211 <[email protected]> * merge fix Signed-off-by: jayzhan211 <[email protected]> * fix sum sig Signed-off-by: jayzhan211 <[email protected]> * todo: wait ahash merge Signed-off-by: jayzhan211 <[email protected]> * rebase Signed-off-by: jayzhan211 <[email protected]> * disable ordering req by default Signed-off-by: jayzhan211 <[email protected]> * check arg count Signed-off-by: jayzhan211 <[email protected]> * rm old workflow Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * fix failed test Signed-off-by: jayzhan211 <[email protected]> * doc and fmt Signed-off-by: jayzhan211 <[email protected]> * check udaf first Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * fix ci Signed-off-by: jayzhan211 <[email protected]> * fix ci Signed-off-by: jayzhan211 <[email protected]> * fix ci Signed-off-by: jayzhan211 <[email protected]> * fix err msg AGAIN Signed-off-by: jayzhan211 <[email protected]> * rm sum in builtin test which covered in sql Signed-off-by: jayzhan211 <[email protected]> * proto for window with udaf Signed-off-by: jayzhan211 <[email protected]> * fix slt Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * fix err msg Signed-off-by: jayzhan211 <[email protected]> * fix exprfn Signed-off-by: jayzhan211 <[email protected]> * fix ciy Signed-off-by: jayzhan211 <[email protected]> * fix ci Signed-off-by: jayzhan211 <[email protected]> * rename first/last to lowercase Signed-off-by: jayzhan211 <[email protected]> * skip sum Signed-off-by: jayzhan211 <[email protected]> * fix firstvalue Signed-off-by: jayzhan211 <[email protected]> * clippy Signed-off-by: jayzhan211 <[email protected]> * add doc Signed-off-by: jayzhan211 <[email protected]> * rm has_ordering_req Signed-off-by: jayzhan211 <[email protected]> * default hard req Signed-off-by: jayzhan211 <[email protected]> * insensitive for sum Signed-off-by: jayzhan211 <[email protected]> * cleanup duplicate code Signed-off-by: jayzhan211 <[email protected]> * Re-introduce check --------- Signed-off-by: jayzhan211 <[email protected]> Co-authored-by: Mustafa Akur <[email protected]>
apache/datafusion#10651 Don't know if the built-in sum used to work on intervals, but the UDAF does not.
Which issue does this PR close?
Closes #.
Rationale for this change
What changes are included in this PR?
AccumulatorArgs
tocreate_groups_accumulator
This PR only introduce Sum UDAF, remove builtin is not included to keep the PR small.
Are these changes tested?
Are there any user-facing changes?