Skip to content

Commit

Permalink
use Blocks in BlockedNullState.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Aug 18, 2024
1 parent b2d348a commit db55586
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 45 deletions.
21 changes: 18 additions & 3 deletions datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
//! Vectorized [`GroupsAccumulator`]

use std::{
cmp::min, collections::VecDeque, iter, mem, ops::{Index, IndexMut}
cmp::min,
collections::VecDeque,
iter, mem,
ops::{Index, IndexMut},
};

use arrow::array::{ArrayRef, BooleanArray};
Expand Down Expand Up @@ -210,6 +213,14 @@ impl<T> Blocks<T> {
}
}

pub fn iter_mut(&mut self) -> Box<dyn Iterator<Item = &'_ mut T> + '_> {
match self {
Blocks::Single(None) => Box::new(iter::empty()),
Blocks::Single(Some(single)) => Box::new(iter::once(single)),
Blocks::Multiple(multiple) => Box::new(multiple.iter_mut()),
}
}

pub fn clear(&mut self) {
*self = Self::new();
}
Expand All @@ -236,7 +247,9 @@ impl<T> Index<usize> for Blocks<T> {
single
}
Blocks::Multiple(multiple) => &multiple[index],
Blocks::Single(None) => unreachable!("can't use index to access empty blocks"),
Blocks::Single(None) => {
unreachable!("can't use index to access empty blocks")
}
}
}
}
Expand All @@ -249,7 +262,9 @@ impl<T> IndexMut<usize> for Blocks<T> {
single
}
Blocks::Multiple(multiple) => &mut multiple[index],
Blocks::Single(None) => unreachable!("can't use index to access empty blocks"),
Blocks::Single(None) => {
unreachable!("can't use index to access empty blocks")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use datafusion_common::{
arrow_datafusion_err, utils::get_arrayref_at_indices, DataFusionError, Result,
ScalarValue,
};
use datafusion_expr_common::groups_accumulator::{Blocks, EmitTo, GroupsAccumulator, VecBlocks};
use datafusion_expr_common::groups_accumulator::{
Blocks, EmitTo, GroupsAccumulator, VecBlocks,
};
use datafusion_expr_common::{
accumulator::Accumulator, groups_accumulator::GroupStatesMode,
};
Expand Down Expand Up @@ -436,6 +438,10 @@ pub fn ensure_enough_room_for_values<T: Clone>(
match mode {
// It flat mode, we just a single builder, and grow it constantly.
GroupStatesMode::Flat => {
if values.num_blocks() == 0 {
values.push_block(Vec::new());
}

values
.current_mut()
.unwrap()
Expand All @@ -444,21 +450,23 @@ pub fn ensure_enough_room_for_values<T: Clone>(
// It blocked mode, we ensure the blks are enough first,
// and then ensure slots in blks are enough.
GroupStatesMode::Blocked(blk_size) => {
let (mut cur_blk_idx, exist_slots) = {
let (mut cur_blk_idx, exist_slots) = if values.num_blocks() == 0 {
let cur_blk_idx = values.num_blocks() - 1;
let exist_slots = (values.num_blocks() - 1) * blk_size
+ values.current().unwrap().len();

(cur_blk_idx, exist_slots)
} else {
(0, 0)
};

// No new groups, don't need to expand, just return.
if exist_slots >= total_num_groups {
return;
}

let exist_blks = values.num_blocks();
// Ensure blks are enough.
let exist_blks = values.num_blocks();
let new_blks = (total_num_groups + blk_size - 1) / blk_size - exist_blks;
if new_blks > 0 {
for _ in 0..new_blks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
//!
//! [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator

use std::collections::VecDeque;

use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray};
use arrow::buffer::{BooleanBuffer, NullBuffer};
use arrow::datatypes::ArrowPrimitiveType;

use datafusion_expr_common::groups_accumulator::{
BlockedGroupIndex, EmitTo, GroupStatesMode,
BlockedGroupIndex, Blocks, EmitTo, GroupStatesMode,
};
/// Track the accumulator null state per row: if any values for that
/// group were null and if any values have been seen at all for that group.
Expand Down Expand Up @@ -368,7 +366,7 @@ pub struct BlockedNullState {
///
/// If `seen_values[i]` is false, have not seen any values that
/// pass the filter yet for group `i`
seen_values_blocks: VecDeque<BooleanBufferBuilder>,
seen_values_blocks: Blocks<BooleanBufferBuilder>,

mode: GroupStatesMode,
}
Expand All @@ -382,7 +380,7 @@ impl Default for BlockedNullState {
impl BlockedNullState {
pub fn new(mode: GroupStatesMode) -> Self {
Self {
seen_values_blocks: VecDeque::new(),
seen_values_blocks: Blocks::new(),
mode,
}
}
Expand Down Expand Up @@ -428,7 +426,7 @@ impl BlockedNullState {
for (&group_index, &new_value) in iter {
match self.mode {
GroupStatesMode::Flat => seen_values_blocks
.back_mut()
.current_mut()
.unwrap()
.set_bit(group_index, true),
GroupStatesMode::Blocked(_) => {
Expand Down Expand Up @@ -468,7 +466,7 @@ impl BlockedNullState {
if is_valid {
match self.mode {
GroupStatesMode::Flat => seen_values_blocks
.back_mut()
.current_mut()
.unwrap()
.set_bit(group_index, true),
GroupStatesMode::Blocked(_) => {
Expand Down Expand Up @@ -499,7 +497,7 @@ impl BlockedNullState {
if is_valid {
match self.mode {
GroupStatesMode::Flat => seen_values_blocks
.back_mut()
.current_mut()
.unwrap()
.set_bit(group_index, true),
GroupStatesMode::Blocked(_) => {
Expand Down Expand Up @@ -527,7 +525,7 @@ impl BlockedNullState {
if let Some(true) = filter_value {
match self.mode {
GroupStatesMode::Flat => seen_values_blocks
.back_mut()
.current_mut()
.unwrap()
.set_bit(group_index, true),
GroupStatesMode::Blocked(_) => {
Expand Down Expand Up @@ -556,7 +554,7 @@ impl BlockedNullState {
if let Some(new_value) = new_value {
match self.mode {
GroupStatesMode::Flat => seen_values_blocks
.back_mut()
.current_mut()
.unwrap()
.set_bit(group_index, true),
GroupStatesMode::Blocked(_) => {
Expand All @@ -576,18 +574,18 @@ impl BlockedNullState {

/// Similar as [NullState::build] but support the blocked version accumulator
pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
if self.seen_values_blocks.is_empty() {
if self.seen_values_blocks.num_blocks() == 0 {
return NullBuffer::new(BooleanBufferBuilder::new(0).finish());
}

let nulls = match emit_to {
EmitTo::All => match self.mode {
GroupStatesMode::Flat => {
self.seen_values_blocks.back_mut().unwrap().finish()
self.seen_values_blocks.current_mut().unwrap().finish()
}
GroupStatesMode::Blocked(blk_size) => {
let total_num = (self.seen_values_blocks.len() - 1) * blk_size
+ self.seen_values_blocks.back().unwrap().len();
let total_num = (self.seen_values_blocks.num_blocks() - 1) * blk_size
+ self.seen_values_blocks.current().unwrap().len();
let mut total_buffer = BooleanBufferBuilder::new(total_num);

for blk in self.seen_values_blocks.iter_mut() {
Expand All @@ -603,7 +601,7 @@ impl BlockedNullState {
EmitTo::First(n) => {
assert!(matches!(self.mode, GroupStatesMode::Flat));

let blk = self.seen_values_blocks.back_mut().unwrap();
let blk = self.seen_values_blocks.current_mut().unwrap();
// split off the first N values in seen_values
//
// TODO make this more efficient rather than two
Expand All @@ -617,7 +615,7 @@ impl BlockedNullState {
first_n_null
}
EmitTo::CurrentBlock(_) => {
let mut cur_blk = self.seen_values_blocks.pop_front().unwrap();
let mut cur_blk = self.seen_values_blocks.pop_first_block().unwrap();
cur_blk.finish()
}
};
Expand Down Expand Up @@ -735,7 +733,7 @@ fn initialize_builder(

/// Similar as the [initialize_builder] but supported the blocked version accumulator
fn ensure_enough_room_for_nulls(
builder_blocks: &mut VecDeque<BooleanBufferBuilder>,
builder_blocks: &mut Blocks<BooleanBufferBuilder>,
mode: GroupStatesMode,
total_num_groups: usize,
default_value: bool,
Expand All @@ -747,11 +745,11 @@ fn ensure_enough_room_for_nulls(
match mode {
// It flat mode, we just a single builder, and grow it constantly.
GroupStatesMode::Flat => {
if builder_blocks.is_empty() {
builder_blocks.push_back(BooleanBufferBuilder::new(0));
if builder_blocks.num_blocks() == 0 {
builder_blocks.push_block(BooleanBufferBuilder::new(0));
}

let builder = builder_blocks.back_mut().unwrap();
let builder = builder_blocks.current_mut().unwrap();
if builder.len() < total_num_groups {
let new_groups = total_num_groups - builder.len();
builder.append_n(new_groups, default_value);
Expand All @@ -760,37 +758,45 @@ fn ensure_enough_room_for_nulls(
// It blocked mode, we ensure the blks are enough first,
// and then ensure slots in blks are enough.
GroupStatesMode::Blocked(blk_size) => {
let (mut cur_blk_idx, exist_slots) = if !builder_blocks.is_empty() {
let cur_blk_idx = builder_blocks.len() - 1;
let exist_slots = (builder_blocks.len() - 1) * blk_size
+ builder_blocks.back().unwrap().len();
let (mut cur_blk_idx, exist_slots) = if builder_blocks.num_blocks() > 0 {
let cur_blk_idx = builder_blocks.num_blocks() - 1;
let exist_slots = (builder_blocks.num_blocks() - 1) * blk_size
+ builder_blocks.current().unwrap().len();

(cur_blk_idx, exist_slots)
} else {
(0, 0)
};
let exist_blks = builder_blocks.len();

// Ensure blks are enough.
// No new groups, don't need to expand, just return.
if exist_slots >= total_num_groups {
return;
}

// Ensure blks are enough
let exist_blks = builder_blocks.num_blocks();
let new_blks = (total_num_groups + blk_size - 1) / blk_size - exist_blks;
builder_blocks.reserve(new_blks);
for _ in 0..new_blks {
builder_blocks.push_back(BooleanBufferBuilder::new(blk_size));
if new_blks > 0 {
for _ in 0..new_blks {
builder_blocks.push_block(BooleanBufferBuilder::new(blk_size));
}
}

// Ensure slots are enough.
let mut new_slots = total_num_groups - exist_slots;

// Expand current blk.
let cur_blk_rest_slots = blk_size - builder_blocks[cur_blk_idx].len();
if cur_blk_rest_slots >= new_slots {
builder_blocks[cur_blk_idx].append_n(new_slots, default_value);
return;
} else {
builder_blocks[cur_blk_idx].append_n(cur_blk_rest_slots, default_value);
new_slots -= cur_blk_rest_slots;
cur_blk_idx += 1;
}

// Expand current blk to full, and expand next blks
builder_blocks[cur_blk_idx].append_n(cur_blk_rest_slots, default_value);
new_slots -= cur_blk_rest_slots;
cur_blk_idx += 1;

// Expand blks
let expand_blks = new_slots / blk_size;
for _ in 0..expand_blks {
Expand All @@ -800,10 +806,12 @@ fn ensure_enough_room_for_nulls(

// Expand the last blk.
let last_expand_slots = new_slots % blk_size;
builder_blocks
.back_mut()
.unwrap()
.append_n(last_expand_slots, default_value);
if last_expand_slots > 0 {
builder_blocks
.current_mut()
.unwrap()
.append_n(last_expand_slots, default_value);
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/functions-aggregate/src/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use arrow::datatypes::{
};
use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::groups_accumulator::{BlockedGroupIndex, Blocks, GroupStatesMode, VecBlocks};
use datafusion_expr::groups_accumulator::{
BlockedGroupIndex, Blocks, GroupStatesMode, VecBlocks,
};
use datafusion_expr::type_coercion::aggregates::{avg_return_type, coerce_avg_type};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::Volatility::Immutable;
Expand Down
4 changes: 3 additions & 1 deletion datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
// under the License.

use ahash::RandomState;
use datafusion_expr::groups_accumulator::{BlockedGroupIndex, Blocks, GroupStatesMode, VecBlocks};
use datafusion_expr::groups_accumulator::{
BlockedGroupIndex, Blocks, GroupStatesMode, VecBlocks,
};
use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator;
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::ensure_enough_room_for_values;
use std::collections::HashSet;
Expand Down

0 comments on commit db55586

Please sign in to comment.