Skip to content

Commit

Permalink
make Blocks more general.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Aug 18, 2024
1 parent 4f4f89f commit b2d348a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 59 deletions.
108 changes: 58 additions & 50 deletions datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
//! Vectorized [`GroupsAccumulator`]

use std::{
cmp::min,
collections::VecDeque,
mem,
ops::{Index, IndexMut},
cmp::min, collections::VecDeque, iter, mem, ops::{Index, IndexMut}
};

use arrow::array::{ArrayRef, BooleanArray};
Expand Down Expand Up @@ -82,13 +79,9 @@ impl EmitTo {
///
pub fn take_needed_from_blocks<T>(
&self,
blocks: &mut Blocks<T>,
blocks: &mut VecBlocks<T>,
mode: GroupStatesMode,
) -> Vec<T> {
if blocks.is_empty() {
return Vec::new();
}

match self {
Self::All => match mode {
GroupStatesMode::Flat => blocks.pop_first_block().unwrap(),
Expand Down Expand Up @@ -146,37 +139,42 @@ impl BlockedGroupIndex {
}

pub enum Blocks<T> {
Single(Vec<T>),
Multiple(VecDeque<Vec<T>>),
Single(Option<T>),
Multiple(VecDeque<T>),
}

impl<T> Blocks<T> {
pub fn new() -> Self {
Self::Single(Vec::new())
Self::Single(None)
}

pub fn current(&self) -> Option<&Vec<T>> {
pub fn current(&self) -> Option<&T> {
match self {
Blocks::Single(blk) => Some(blk),
Blocks::Single(blk) => blk.as_ref(),
Blocks::Multiple(blks) => blks.back(),
}
}

pub fn current_mut(&mut self) -> Option<&mut Vec<T>> {
pub fn current_mut(&mut self) -> Option<&mut T> {
match self {
Blocks::Single(blk) => Some(blk),
Blocks::Single(blk) => blk.as_mut(),
Blocks::Multiple(blks) => blks.back_mut(),
}
}

pub fn push_block(&mut self, block: Vec<T>) {
pub fn push_block(&mut self, block: T) {
loop {
match self {
// If found it is Single, convert to Multiple first
Blocks::Single(single) => {
Blocks::Single(single_opt) => {
let single_opt = mem::take(single_opt);
if single_opt.is_none() {
*self = Self::Single(Some(block));
break;
}

let mut new_multiple = VecDeque::with_capacity(2);
let first_block = mem::take(single);
new_multiple.push_back(first_block);
new_multiple.push_back(single_opt.unwrap());

*self = Self::Multiple(new_multiple);
}
Expand All @@ -189,42 +187,26 @@ impl<T> Blocks<T> {
}
}

pub fn pop_first_block(&mut self) -> Option<Vec<T>> {
pub fn pop_first_block(&mut self) -> Option<T> {
match self {
Blocks::Single(single) => Some(mem::take(single)),
Blocks::Single(single) => mem::take(single),
Blocks::Multiple(multiple) => multiple.pop_front(),
}
}

fn is_empty(&self) -> bool {
match self {
Blocks::Single(single) => single.is_empty(),
Blocks::Multiple(multiple) => multiple.is_empty(),
}
}

pub fn num_blocks(&self) -> usize {
match self {
Blocks::Single(_) => 1,
Blocks::Single(None) => 0,
Blocks::Single(Some(_)) => 1,
Blocks::Multiple(multiple) => multiple.len(),
}
}

pub fn into_to_vec(self) -> Vec<T> {
pub fn iter(&self) -> Box<dyn Iterator<Item = &'_ T> + '_> {
match self {
Blocks::Single(single) => single,
Blocks::Multiple(multiple) => {
multiple.into_iter().flat_map(|v| v.into_iter()).collect()
}
}
}

pub fn capacity(&self) -> usize {
match self {
Blocks::Single(single) => single.capacity(),
Blocks::Multiple(multiple) => {
multiple.iter().map(|blk| blk.capacity()).sum::<usize>()
}
Blocks::Single(None) => Box::new(iter::empty()),
Blocks::Single(Some(single)) => Box::new(iter::once(single)),
Blocks::Multiple(multiple) => Box::new(multiple.iter()),
}
}

Expand All @@ -245,37 +227,63 @@ impl<T: fmt::Debug> fmt::Debug for Blocks<T> {
}

impl<T> Index<usize> for Blocks<T> {
type Output = Vec<T>;
type Output = T;

fn index(&self, index: usize) -> &Vec<T> {
fn index(&self, index: usize) -> &T {
match self {
Blocks::Single(single) => {
Blocks::Single(Some(single)) => {
assert!(index == 0);
single
}
Blocks::Multiple(multiple) => &multiple[index],
Blocks::Single(None) => unreachable!("can't use index to access empty blocks"),
}
}
}

impl<T> IndexMut<usize> for Blocks<T> {
fn index_mut(&mut self, index: usize) -> &mut Vec<T> {
fn index_mut(&mut self, index: usize) -> &mut T {
match self {
Blocks::Single(single) => {
Blocks::Single(Some(single)) => {
assert!(index == 0);
single
}
Blocks::Multiple(multiple) => &mut multiple[index],
Blocks::Single(None) => unreachable!("can't use index to access empty blocks"),
}
}
}

impl<T> Default for Blocks<T> {
impl<T> Default for VecBlocks<T> {
fn default() -> Self {
Self::new()
}
}

pub type VecBlocks<T> = Blocks<Vec<T>>;

impl<T> VecBlocks<T> {
pub fn into_to_vec(self) -> Vec<T> {
match self {
Blocks::Single(None) => Vec::new(),
Blocks::Single(Some(single)) => single,
Blocks::Multiple(multiple) => {
multiple.into_iter().flat_map(|v| v.into_iter()).collect()
}
}
}

pub fn capacity(&self) -> usize {
match self {
Blocks::Single(None) => 0,
Blocks::Single(Some(single)) => single.capacity(),
Blocks::Multiple(multiple) => {
multiple.iter().map(|blk| blk.capacity()).sum::<usize>()
}
}
}
}

/// `GroupAccumulator` implements a single aggregate (e.g. AVG) and
/// stores the state for *all* groups internally.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_common::{
arrow_datafusion_err, utils::get_arrayref_at_indices, DataFusionError, Result,
ScalarValue,
};
use datafusion_expr_common::groups_accumulator::{Blocks, EmitTo, GroupsAccumulator};
use datafusion_expr_common::groups_accumulator::{Blocks, EmitTo, GroupsAccumulator, VecBlocks};
use datafusion_expr_common::{
accumulator::Accumulator, groups_accumulator::GroupStatesMode,
};
Expand Down Expand Up @@ -424,7 +424,7 @@ pub(crate) fn slice_and_maybe_filter(
/// values: [x, x, x], [x, x, x], [default, default, default]
///
pub fn ensure_enough_room_for_values<T: Clone>(
values: &mut Blocks<T>,
values: &mut VecBlocks<T>,
mode: GroupStatesMode,
total_num_groups: usize,
default_value: T,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::datatypes::ArrowPrimitiveType;
use arrow::datatypes::DataType;
use datafusion_common::{internal_datafusion_err, DataFusionError, Result};
use datafusion_expr_common::groups_accumulator::{
BlockedGroupIndex, EmitTo, GroupStatesMode, GroupsAccumulator,
BlockedGroupIndex, EmitTo, GroupStatesMode, GroupsAccumulator, VecBlocks,
};

use crate::aggregate::groups_accumulator::accumulate::BlockedNullState;
Expand All @@ -46,7 +46,7 @@ where
F: Fn(&mut T::Native, T::Native) + Send + Sync,
{
/// values per group, stored as the native type
values_blocks: Blocks<T::Native>,
values_blocks: VecBlocks<T::Native>,

/// The output type (needed for Decimal precision and scale)
data_type: DataType,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/functions-aggregate/src/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use arrow::datatypes::{
};
use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::groups_accumulator::{BlockedGroupIndex, Blocks, GroupStatesMode};
use datafusion_expr::groups_accumulator::{BlockedGroupIndex, Blocks, GroupStatesMode, VecBlocks};
use datafusion_expr::type_coercion::aggregates::{avg_return_type, coerce_avg_type};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::Volatility::Immutable;
Expand Down Expand Up @@ -398,10 +398,10 @@ where
return_data_type: DataType,

/// Count per group (use u64 to make UInt64Array)
counts: Blocks<u64>,
counts: VecBlocks<u64>,

/// Sums per group, stored as the native type
sums: Blocks<T::Native>,
sums: VecBlocks<T::Native>,

/// Track nulls in the input / filters
null_state: BlockedNullState,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use ahash::RandomState;
use datafusion_expr::groups_accumulator::{BlockedGroupIndex, Blocks, GroupStatesMode};
use datafusion_expr::groups_accumulator::{BlockedGroupIndex, Blocks, GroupStatesMode, VecBlocks};
use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator;
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::ensure_enough_room_for_values;
use std::collections::HashSet;
Expand Down Expand Up @@ -352,7 +352,7 @@ struct CountGroupsAccumulator {
/// output type of count is `DataType::Int64`. Thus by using `i64`
/// for the counts, the output [`Int64Array`] can be created
/// without copy.
counts: Blocks<i64>,
counts: VecBlocks<i64>,

mode: GroupStatesMode,
}
Expand Down

0 comments on commit b2d348a

Please sign in to comment.