Skip to content

Commit

Permalink
fix empty accumulators mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Aug 12, 2024
1 parent 1960084 commit fdb1789
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,14 +512,18 @@ impl GroupedHashAggregateStream {
};

// We don't support blocked emission in steaming aggregation mode
let emit_tos_builder = if matches!(group_ordering, GroupOrdering::None) {
let group_values_support_blocked_emission =
group_values.supports_blocked_emission();
let accumulators_support_blocked_emission =
// TODO: I am not sure, if we should disable blocked mode if `accumulator`s are empty.
let emit_tos_builder = if matches!(group_ordering, GroupOrdering::None)
&& !accumulators.is_empty()
{
let is_blocked_group_values = group_values.supports_blocked_emission();
let is_blocked_accumulators =
accumulators.iter().all(|a| a.supports_blocked_emission());

// TODO: if the batch size is too small, maybe we should fallback to single block mode.
GroupStatesContext::new(
group_values_support_blocked_emission,
accumulators_support_blocked_emission,
is_blocked_group_values,
is_blocked_accumulators,
batch_size,
)
} else {
Expand Down Expand Up @@ -1117,13 +1121,13 @@ pub struct GroupStatesContext {

impl GroupStatesContext {
pub fn new(
group_values_support_blocked_emission: bool,
accumulators_support_blocked_emission: bool,
is_blocked_group_values: bool,
is_blocked_accumulators: bool,
block_size: usize,
) -> Self {
Self {
is_blocked_group_values: group_values_support_blocked_emission,
is_blocked_accumulators: accumulators_support_blocked_emission,
is_blocked_group_values,
is_blocked_accumulators,
block_size,
}
}
Expand Down

0 comments on commit fdb1789

Please sign in to comment.