Skip to content

Commit

Permalink
test new group index.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Aug 30, 2024
1 parent c2cb573 commit 5bb1805
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion_expr_common::accumulator::Accumulator;
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};

pub const MAX_PREALLOC_BLOCK_SIZE: usize = 8192;
const GROUP_INDEX_DATA_MASK: u64 = 0x7fffffffffffffff;

/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`]
///
Expand Down Expand Up @@ -452,37 +453,36 @@ impl EmitToExt for EmitTo {
pub struct BlockedGroupIndex {
pub block_id: u32,
pub block_offset: u64,
pub is_blocked: bool,
pub flag: u8,
}

impl BlockedGroupIndex {
#[inline]
pub fn new_from_parts(block_id: u32, block_offset: u64, is_blocked: bool) -> Self {
pub fn new_from_parts(flag: u8, block_id: u32, block_offset: u64) -> Self {
Self {
block_id,
block_offset,
is_blocked,
flag,
}
}

#[inline]
pub fn new_flat(raw_index: usize) -> Self {
Self {
block_id: 0,
block_offset: raw_index as u64,
is_blocked: false,
}
}

#[inline]
pub fn new_blocked(raw_index: usize) -> Self {
let block_id = ((raw_index as u64 >> 32) & 0x00000000ffffffff) as u32;
let block_offset = (raw_index as u64) & 0x00000000ffffffff;
pub fn new(raw_index: usize) -> Self {
let raw_index = raw_index as u64;
let flag = raw_index >> 63;
let data = raw_index & GROUP_INDEX_DATA_MASK;
let (highs, lows) = ((data >> 32) as u32, data as u32);

let block_id = highs * flag as u32;
let block_offset = {
let offset_high = highs as u64 * (1 - flag);
(offset_high << 32) | (lows as u64)
};

Self {
block_id,
block_offset,
is_blocked: true,
flag: flag as u8,
}
}

Expand All @@ -496,12 +496,10 @@ impl BlockedGroupIndex {
self.block_offset as usize
}

#[inline]
pub fn as_packed_index(&self) -> usize {
if self.is_blocked {
(((self.block_id as u64) << 32) | self.block_offset) as usize
} else {
self.block_offset as usize
}
(((self.flag as u64) << 63) | ((self.block_id as u64) << 32) | self.block_offset)
as usize
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,31 +304,16 @@ impl BlockedNullState {
);
let seen_values_blocks = &mut self.seen_values_blocks;

if self.block_size.is_some() {
do_blocked_accumulate(
group_indices,
values,
opt_filter,
BlockedGroupIndex::new_blocked,
value_fn,
|index: &BlockedGroupIndex| {
seen_values_blocks[index.block_id()]
.set_bit(index.block_offset(), true);
},
)
} else {
do_blocked_accumulate(
group_indices,
values,
opt_filter,
BlockedGroupIndex::new_flat,
value_fn,
|index: &BlockedGroupIndex| {
seen_values_blocks[index.block_id()]
.set_bit(index.block_offset(), true);
},
);
}
do_blocked_accumulate(
group_indices,
values,
opt_filter,
value_fn,
|group_index| {
seen_values_blocks[group_index.block_id()]
.set_bit(group_index.block_offset(), true);
},
)
}

/// Similar as [NullState::build] but support the blocked version accumulator
Expand Down Expand Up @@ -598,16 +583,14 @@ pub fn accumulate_indices<F>(
}
}

fn do_blocked_accumulate<T, F1, F2, G>(
fn do_blocked_accumulate<T, F1, F2>(
group_indices: &[usize],
values: &PrimitiveArray<T>,
opt_filter: Option<&BooleanArray>,
group_index_parse_fn: G,
mut value_fn: F1,
mut set_valid_fn: F2,
) where
T: ArrowPrimitiveType + Send,
G: Fn(usize) -> BlockedGroupIndex,
F1: FnMut(&BlockedGroupIndex, T::Native) + Send,
F2: FnMut(&BlockedGroupIndex) + Send,
{
Expand All @@ -617,7 +600,7 @@ fn do_blocked_accumulate<T, F1, F2, G>(
(false, None) => {
let iter = group_indices.iter().zip(data.iter());
for (&group_index, &new_value) in iter {
let blocked_index = group_index_parse_fn(group_index);
let blocked_index = BlockedGroupIndex::new(group_index);
set_valid_fn(&blocked_index);
value_fn(&blocked_index, new_value);
}
Expand Down Expand Up @@ -645,7 +628,7 @@ fn do_blocked_accumulate<T, F1, F2, G>(
// valid bit was set, real value
let is_valid = (mask & index_mask) != 0;
if is_valid {
let blocked_index = group_index_parse_fn(group_index);
let blocked_index = BlockedGroupIndex::new(group_index);
set_valid_fn(&blocked_index);
value_fn(&blocked_index, new_value);
}
Expand All @@ -663,7 +646,7 @@ fn do_blocked_accumulate<T, F1, F2, G>(
.for_each(|(i, (&group_index, &new_value))| {
let is_valid = remainder_bits & (1 << i) != 0;
if is_valid {
let blocked_index = group_index_parse_fn(group_index);
let blocked_index = BlockedGroupIndex::new(group_index);
set_valid_fn(&blocked_index);
value_fn(&blocked_index, new_value);
}
Expand All @@ -681,7 +664,7 @@ fn do_blocked_accumulate<T, F1, F2, G>(
.zip(filter.iter())
.for_each(|((&group_index, &new_value), filter_value)| {
if let Some(true) = filter_value {
let blocked_index = group_index_parse_fn(group_index);
let blocked_index = BlockedGroupIndex::new(group_index);
set_valid_fn(&blocked_index);
value_fn(&blocked_index, new_value);
}
Expand All @@ -700,9 +683,9 @@ fn do_blocked_accumulate<T, F1, F2, G>(
.for_each(|((filter_value, &group_index), new_value)| {
if let Some(true) = filter_value {
if let Some(new_value) = new_value {
let blocked_index = group_index_parse_fn(group_index);
let blocked_index = BlockedGroupIndex::new(group_index);
set_valid_fn(&blocked_index);
value_fn(&blocked_index, new_value)
value_fn(&blocked_index, new_value);
}
}
})
Expand Down Expand Up @@ -931,9 +914,9 @@ mod test {
let block_id = *idx / self.block_size;
let block_offset = *idx % self.block_size;
BlockedGroupIndex::new_from_parts(
1,
block_id as u32,
block_offset as u64,
true,
)
.as_packed_index()
})
Expand Down
72 changes: 22 additions & 50 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,31 +399,17 @@ impl GroupsAccumulator for CountGroupsAccumulator {
0,
);

if self.block_size.is_some() {
accumulate_indices(
group_indices,
values.logical_nulls().as_ref(),
opt_filter,
|group_index| {
let blocked_index = BlockedGroupIndex::new_blocked(group_index);
let count = &mut self.counts[blocked_index.block_id()]
[blocked_index.block_offset()];
*count += 1;
},
);
} else {
accumulate_indices(
group_indices,
values.logical_nulls().as_ref(),
opt_filter,
|group_index| {
let blocked_index = BlockedGroupIndex::new_flat(group_index);
let count = &mut self.counts[blocked_index.block_id()]
[blocked_index.block_offset()];
*count += 1;
},
);
}
accumulate_indices(
group_indices,
values.logical_nulls().as_ref(),
opt_filter,
|group_index| {
let blocked_index = BlockedGroupIndex::new(group_index);
let count = &mut self.counts[blocked_index.block_id()]
[blocked_index.block_offset()];
*count += 1;
},
);

Ok(())
}
Expand All @@ -450,31 +436,17 @@ impl GroupsAccumulator for CountGroupsAccumulator {
0,
);

if self.block_size.is_some() {
do_count_merge_batch(
values,
group_indices,
opt_filter,
|group_index, partial_count| {
let blocked_index = BlockedGroupIndex::new_blocked(group_index);
let count = &mut self.counts[blocked_index.block_id()]
[blocked_index.block_offset()];
*count += partial_count;
},
);
} else {
do_count_merge_batch(
values,
group_indices,
opt_filter,
|group_index, partial_count| {
let blocked_index = BlockedGroupIndex::new_flat(group_index);
let count = &mut self.counts[blocked_index.block_id()]
[blocked_index.block_offset()];
*count += partial_count;
},
);
}
do_count_merge_batch(
values,
group_indices,
opt_filter,
|group_index, partial_count| {
let blocked_index = BlockedGroupIndex::new(group_index);
let count = &mut self.counts[blocked_index.block_id()]
[blocked_index.block_offset()];
*count += partial_count;
},
);

Ok(())
}
Expand Down
Loading

0 comments on commit 5bb1805

Please sign in to comment.