diff --git a/datafusion/expr-common/src/groups_accumulator.rs b/datafusion/expr-common/src/groups_accumulator.rs index 81e44d01c1208..5adc02b8af13f 100644 --- a/datafusion/expr-common/src/groups_accumulator.rs +++ b/datafusion/expr-common/src/groups_accumulator.rs @@ -18,7 +18,10 @@ //! Vectorized [`GroupsAccumulator`] use std::{ - cmp::min, collections::VecDeque, iter, mem, ops::{Index, IndexMut} + cmp::min, + collections::VecDeque, + iter, mem, + ops::{Index, IndexMut}, }; use arrow::array::{ArrayRef, BooleanArray}; @@ -210,6 +213,14 @@ impl Blocks { } } + pub fn iter_mut(&mut self) -> Box + '_> { + match self { + Blocks::Single(None) => Box::new(iter::empty()), + Blocks::Single(Some(single)) => Box::new(iter::once(single)), + Blocks::Multiple(multiple) => Box::new(multiple.iter_mut()), + } + } + pub fn clear(&mut self) { *self = Self::new(); } @@ -236,7 +247,9 @@ impl Index for Blocks { single } Blocks::Multiple(multiple) => &multiple[index], - Blocks::Single(None) => unreachable!("can't use index to access empty blocks"), + Blocks::Single(None) => { + unreachable!("can't use index to access empty blocks") + } } } } @@ -249,7 +262,9 @@ impl IndexMut for Blocks { single } Blocks::Multiple(multiple) => &mut multiple[index], - Blocks::Single(None) => unreachable!("can't use index to access empty blocks"), + Blocks::Single(None) => { + unreachable!("can't use index to access empty blocks") + } } } } diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index 77f4e3e2ae028..6409da41ee64e 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -34,7 +34,9 @@ use datafusion_common::{ arrow_datafusion_err, utils::get_arrayref_at_indices, DataFusionError, Result, ScalarValue, }; -use datafusion_expr_common::groups_accumulator::{Blocks, EmitTo, GroupsAccumulator, VecBlocks}; +use datafusion_expr_common::groups_accumulator::{ + Blocks, EmitTo, GroupsAccumulator, VecBlocks, +}; use datafusion_expr_common::{ accumulator::Accumulator, groups_accumulator::GroupStatesMode, }; @@ -436,6 +438,10 @@ pub fn ensure_enough_room_for_values( match mode { // It flat mode, we just a single builder, and grow it constantly. GroupStatesMode::Flat => { + if values.num_blocks() == 0 { + values.push_block(Vec::new()); + } + values .current_mut() .unwrap() @@ -444,12 +450,14 @@ pub fn ensure_enough_room_for_values( // It blocked mode, we ensure the blks are enough first, // and then ensure slots in blks are enough. GroupStatesMode::Blocked(blk_size) => { - let (mut cur_blk_idx, exist_slots) = { + let (mut cur_blk_idx, exist_slots) = if values.num_blocks() == 0 { let cur_blk_idx = values.num_blocks() - 1; let exist_slots = (values.num_blocks() - 1) * blk_size + values.current().unwrap().len(); (cur_blk_idx, exist_slots) + } else { + (0, 0) }; // No new groups, don't need to expand, just return. @@ -457,8 +465,8 @@ pub fn ensure_enough_room_for_values( return; } - let exist_blks = values.num_blocks(); // Ensure blks are enough. + let exist_blks = values.num_blocks(); let new_blks = (total_num_groups + blk_size - 1) / blk_size - exist_blks; if new_blks > 0 { for _ in 0..new_blks { diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 42500cdab08ed..387e65849af71 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -19,14 +19,12 @@ //! //! [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator -use std::collections::VecDeque; - use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray}; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::ArrowPrimitiveType; use datafusion_expr_common::groups_accumulator::{ - BlockedGroupIndex, EmitTo, GroupStatesMode, + BlockedGroupIndex, Blocks, EmitTo, GroupStatesMode, }; /// Track the accumulator null state per row: if any values for that /// group were null and if any values have been seen at all for that group. @@ -368,7 +366,7 @@ pub struct BlockedNullState { /// /// If `seen_values[i]` is false, have not seen any values that /// pass the filter yet for group `i` - seen_values_blocks: VecDeque, + seen_values_blocks: Blocks, mode: GroupStatesMode, } @@ -382,7 +380,7 @@ impl Default for BlockedNullState { impl BlockedNullState { pub fn new(mode: GroupStatesMode) -> Self { Self { - seen_values_blocks: VecDeque::new(), + seen_values_blocks: Blocks::new(), mode, } } @@ -428,7 +426,7 @@ impl BlockedNullState { for (&group_index, &new_value) in iter { match self.mode { GroupStatesMode::Flat => seen_values_blocks - .back_mut() + .current_mut() .unwrap() .set_bit(group_index, true), GroupStatesMode::Blocked(_) => { @@ -468,7 +466,7 @@ impl BlockedNullState { if is_valid { match self.mode { GroupStatesMode::Flat => seen_values_blocks - .back_mut() + .current_mut() .unwrap() .set_bit(group_index, true), GroupStatesMode::Blocked(_) => { @@ -499,7 +497,7 @@ impl BlockedNullState { if is_valid { match self.mode { GroupStatesMode::Flat => seen_values_blocks - .back_mut() + .current_mut() .unwrap() .set_bit(group_index, true), GroupStatesMode::Blocked(_) => { @@ -527,7 +525,7 @@ impl BlockedNullState { if let Some(true) = filter_value { match self.mode { GroupStatesMode::Flat => seen_values_blocks - .back_mut() + .current_mut() .unwrap() .set_bit(group_index, true), GroupStatesMode::Blocked(_) => { @@ -556,7 +554,7 @@ impl BlockedNullState { if let Some(new_value) = new_value { match self.mode { GroupStatesMode::Flat => seen_values_blocks - .back_mut() + .current_mut() .unwrap() .set_bit(group_index, true), GroupStatesMode::Blocked(_) => { @@ -576,18 +574,18 @@ impl BlockedNullState { /// Similar as [NullState::build] but support the blocked version accumulator pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer { - if self.seen_values_blocks.is_empty() { + if self.seen_values_blocks.num_blocks() == 0 { return NullBuffer::new(BooleanBufferBuilder::new(0).finish()); } let nulls = match emit_to { EmitTo::All => match self.mode { GroupStatesMode::Flat => { - self.seen_values_blocks.back_mut().unwrap().finish() + self.seen_values_blocks.current_mut().unwrap().finish() } GroupStatesMode::Blocked(blk_size) => { - let total_num = (self.seen_values_blocks.len() - 1) * blk_size - + self.seen_values_blocks.back().unwrap().len(); + let total_num = (self.seen_values_blocks.num_blocks() - 1) * blk_size + + self.seen_values_blocks.current().unwrap().len(); let mut total_buffer = BooleanBufferBuilder::new(total_num); for blk in self.seen_values_blocks.iter_mut() { @@ -603,7 +601,7 @@ impl BlockedNullState { EmitTo::First(n) => { assert!(matches!(self.mode, GroupStatesMode::Flat)); - let blk = self.seen_values_blocks.back_mut().unwrap(); + let blk = self.seen_values_blocks.current_mut().unwrap(); // split off the first N values in seen_values // // TODO make this more efficient rather than two @@ -617,7 +615,7 @@ impl BlockedNullState { first_n_null } EmitTo::CurrentBlock(_) => { - let mut cur_blk = self.seen_values_blocks.pop_front().unwrap(); + let mut cur_blk = self.seen_values_blocks.pop_first_block().unwrap(); cur_blk.finish() } }; @@ -735,7 +733,7 @@ fn initialize_builder( /// Similar as the [initialize_builder] but supported the blocked version accumulator fn ensure_enough_room_for_nulls( - builder_blocks: &mut VecDeque, + builder_blocks: &mut Blocks, mode: GroupStatesMode, total_num_groups: usize, default_value: bool, @@ -747,11 +745,11 @@ fn ensure_enough_room_for_nulls( match mode { // It flat mode, we just a single builder, and grow it constantly. GroupStatesMode::Flat => { - if builder_blocks.is_empty() { - builder_blocks.push_back(BooleanBufferBuilder::new(0)); + if builder_blocks.num_blocks() == 0 { + builder_blocks.push_block(BooleanBufferBuilder::new(0)); } - let builder = builder_blocks.back_mut().unwrap(); + let builder = builder_blocks.current_mut().unwrap(); if builder.len() < total_num_groups { let new_groups = total_num_groups - builder.len(); builder.append_n(new_groups, default_value); @@ -760,37 +758,45 @@ fn ensure_enough_room_for_nulls( // It blocked mode, we ensure the blks are enough first, // and then ensure slots in blks are enough. GroupStatesMode::Blocked(blk_size) => { - let (mut cur_blk_idx, exist_slots) = if !builder_blocks.is_empty() { - let cur_blk_idx = builder_blocks.len() - 1; - let exist_slots = (builder_blocks.len() - 1) * blk_size - + builder_blocks.back().unwrap().len(); + let (mut cur_blk_idx, exist_slots) = if builder_blocks.num_blocks() > 0 { + let cur_blk_idx = builder_blocks.num_blocks() - 1; + let exist_slots = (builder_blocks.num_blocks() - 1) * blk_size + + builder_blocks.current().unwrap().len(); (cur_blk_idx, exist_slots) } else { (0, 0) }; - let exist_blks = builder_blocks.len(); - // Ensure blks are enough. + // No new groups, don't need to expand, just return. + if exist_slots >= total_num_groups { + return; + } + + // Ensure blks are enough + let exist_blks = builder_blocks.num_blocks(); let new_blks = (total_num_groups + blk_size - 1) / blk_size - exist_blks; - builder_blocks.reserve(new_blks); - for _ in 0..new_blks { - builder_blocks.push_back(BooleanBufferBuilder::new(blk_size)); + if new_blks > 0 { + for _ in 0..new_blks { + builder_blocks.push_block(BooleanBufferBuilder::new(blk_size)); + } } // Ensure slots are enough. let mut new_slots = total_num_groups - exist_slots; + // Expand current blk. let cur_blk_rest_slots = blk_size - builder_blocks[cur_blk_idx].len(); if cur_blk_rest_slots >= new_slots { builder_blocks[cur_blk_idx].append_n(new_slots, default_value); return; - } else { - builder_blocks[cur_blk_idx].append_n(cur_blk_rest_slots, default_value); - new_slots -= cur_blk_rest_slots; - cur_blk_idx += 1; } + // Expand current blk to full, and expand next blks + builder_blocks[cur_blk_idx].append_n(cur_blk_rest_slots, default_value); + new_slots -= cur_blk_rest_slots; + cur_blk_idx += 1; + // Expand blks let expand_blks = new_slots / blk_size; for _ in 0..expand_blks { @@ -800,10 +806,12 @@ fn ensure_enough_room_for_nulls( // Expand the last blk. let last_expand_slots = new_slots % blk_size; - builder_blocks - .back_mut() - .unwrap() - .append_n(last_expand_slots, default_value); + if last_expand_slots > 0 { + builder_blocks + .current_mut() + .unwrap() + .append_n(last_expand_slots, default_value); + } } } } diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index afc836f56448e..c32dfbcdbbcda 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -29,7 +29,9 @@ use arrow::datatypes::{ }; use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; -use datafusion_expr::groups_accumulator::{BlockedGroupIndex, Blocks, GroupStatesMode, VecBlocks}; +use datafusion_expr::groups_accumulator::{ + BlockedGroupIndex, Blocks, GroupStatesMode, VecBlocks, +}; use datafusion_expr::type_coercion::aggregates::{avg_return_type, coerce_avg_type}; use datafusion_expr::utils::format_state_name; use datafusion_expr::Volatility::Immutable; diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index ab2c3790a2660..58145aab5aa20 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -16,7 +16,9 @@ // under the License. use ahash::RandomState; -use datafusion_expr::groups_accumulator::{BlockedGroupIndex, Blocks, GroupStatesMode, VecBlocks}; +use datafusion_expr::groups_accumulator::{ + BlockedGroupIndex, Blocks, GroupStatesMode, VecBlocks, +}; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::ensure_enough_room_for_values; use std::collections::HashSet;