Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace supports_bounded_execution with supports_retract_batch #6695

Merged
merged 7 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ impl AggregateExpr for Avg {
is_row_accumulator_support_dtype(&self.sum_data_type)
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn create_row_accumulator(
&self,
start_index: usize,
Expand Down Expand Up @@ -263,6 +259,9 @@ impl Accumulator for AvgAccumulator {
)),
}
}
fn supports_retract_batch(&self) -> bool {
true
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/aggregate/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ impl AggregateExpr for Count {
true
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn create_row_accumulator(
&self,
start_index: usize,
Expand Down Expand Up @@ -214,6 +210,10 @@ impl Accumulator for CountAccumulator {
Ok(ScalarValue::Int64(Some(self.count)))
}

fn supports_retract_batch(&self) -> bool {
true
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
Expand Down
16 changes: 8 additions & 8 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,6 @@ impl AggregateExpr for Max {
is_row_accumulator_support_dtype(&self.data_type)
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn create_row_accumulator(
&self,
start_index: usize,
Expand Down Expand Up @@ -699,6 +695,10 @@ impl Accumulator for SlidingMaxAccumulator {
Ok(self.max.clone())
}

fn supports_retract_batch(&self) -> bool {
true
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size()
}
Expand Down Expand Up @@ -825,10 +825,6 @@ impl AggregateExpr for Min {
is_row_accumulator_support_dtype(&self.data_type)
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn create_row_accumulator(
&self,
start_index: usize,
Expand Down Expand Up @@ -958,6 +954,10 @@ impl Accumulator for SlidingMinAccumulator {
Ok(self.min.clone())
}

fn supports_retract_batch(&self) -> bool {
true
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size()
}
Expand Down
6 changes: 0 additions & 6 deletions datafusion/physical-expr/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
false
}

/// Specifies whether this aggregate function can run using bounded memory.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THis is nice to have moved this logic entirely to the accumulator 👍

/// Any accumulator returning "true" needs to implement `retract_batch`.
fn supports_bounded_execution(&self) -> bool {
false
}

/// RowAccumulator to access/update row-based aggregation state in-place.
/// Currently, row accumulator only supports states of fixed-sized type.
///
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,6 @@ impl AggregateExpr for Sum {
is_row_accumulator_support_dtype(&self.data_type)
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn create_row_accumulator(
&self,
start_index: usize,
Expand Down Expand Up @@ -361,6 +357,10 @@ impl Accumulator for SumAccumulator {
}
}

fn supports_retract_batch(&self) -> bool {
true
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ impl WindowExpr for PlainAggregateWindowExpr {
}

fn uses_bounded_memory(&self) -> bool {
self.aggregate.supports_bounded_execution()
&& !self.window_frame.end_bound.is_unbounded()
!self.window_frame.end_bound.is_unbounded()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking through this logic. Actually, as long as end_bound is not bounded (not UNBOUNDED FOLLOWING such as in the form N FOLLOWING). We can produce results without waiting for the whole data to come (If accumulator do not support retract_batch method. We wouldn't be able to run queries in the form M PRECEDING and N FOLLOWING, in this case we will give an error anyway.). Hence here we do not need to check for self.aggregate.supports_bounded_execution() (acc.supports_retract_batch() method with the new API.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me

}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl WindowExpr for BuiltInWindowExpr {
} else if evaluator.include_rank() {
let columns = self.sort_columns(batch)?;
let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
evaluator.evaluate_with_rank_all(num_rows, &sort_partition_points)
evaluator.evaluate_all_with_rank(num_rows, &sort_partition_points)
} else {
let (values, _) = self.get_values_orderbys(batch)?;
evaluator.evaluate_all(&values, num_rows)
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/window/cume_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl BuiltInWindowFunctionExpr for CumeDist {
pub(crate) struct CumeDistEvaluator;

impl PartitionEvaluator for CumeDistEvaluator {
fn evaluate_with_rank_all(
fn evaluate_all_with_rank(
&self,
num_rows: usize,
ranks_in_partition: &[Range<usize>],
Expand Down Expand Up @@ -109,7 +109,7 @@ mod tests {
) -> Result<()> {
let result = expr
.create_evaluator()?
.evaluate_with_rank_all(num_rows, &ranks)?;
.evaluate_all_with_rank(num_rows, &ranks)?;
let result = as_float64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/window/partition_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use std::ops::Range;
///
/// # Stateless `PartitionEvaluator`
///
/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_with_rank_all`] is called with values for the
/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_all_with_rank`] is called with values for the
/// entire partition.
///
/// # Stateful `PartitionEvaluator`
Expand Down Expand Up @@ -221,7 +221,7 @@ pub trait PartitionEvaluator: Debug + Send {
))
}

/// [`PartitionEvaluator::evaluate_with_rank_all`] is called for window
/// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window
/// functions that only need the rank of a row within its window
/// frame.
///
Expand All @@ -248,7 +248,7 @@ pub trait PartitionEvaluator: Debug + Send {
/// (3,4),
/// ]
/// ```
fn evaluate_with_rank_all(
fn evaluate_all_with_rank(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, evaluate_all_with_rank is better name. As part of this PR, I changed the method from evaluate_with_rank_all to evaluate_all_with_rank.

&self,
_num_rows: usize,
_ranks_in_partition: &[Range<usize>],
Expand Down Expand Up @@ -278,7 +278,7 @@ pub trait PartitionEvaluator: Debug + Send {

/// Can this function be evaluated with (only) rank
///
/// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_with_rank_all`]
/// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_all_with_rank`]
fn include_rank(&self) -> bool {
false
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/window/rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl PartitionEvaluator for RankEvaluator {
}
}

fn evaluate_with_rank_all(
fn evaluate_all_with_rank(
&self,
num_rows: usize,
ranks_in_partition: &[Range<usize>],
Expand Down Expand Up @@ -236,7 +236,7 @@ mod tests {
) -> Result<()> {
let result = expr
.create_evaluator()?
.evaluate_with_rank_all(num_rows, &ranks)?;
.evaluate_all_with_rank(num_rows, &ranks)?;
let result = as_float64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
Expand All @@ -248,7 +248,7 @@ mod tests {
ranks: Vec<Range<usize>>,
expected: Vec<u64>,
) -> Result<()> {
let result = expr.create_evaluator()?.evaluate_with_rank_all(8, &ranks)?;
let result = expr.create_evaluator()?.evaluate_all_with_rank(8, &ranks)?;
let result = as_uint64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ impl WindowExpr for SlidingAggregateWindowExpr {
}

fn uses_bounded_memory(&self) -> bool {
self.aggregate.supports_bounded_execution()
&& !self.window_frame.end_bound.is_unbounded()
!self.window_frame.end_bound.is_unbounded()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the case above.

}
}

Expand Down