Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): Multiple states Aggregate function #17148

Merged
merged 39 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
8c1ad32
refine
forsaken628 Dec 27, 2024
d15e3b7
AggrState
forsaken628 Dec 30, 2024
6db6b60
ref AggrState
forsaken628 Dec 30, 2024
de5b9c1
AggrStateLoc
forsaken628 Dec 31, 2024
a9b01c6
fix
forsaken628 Dec 31, 2024
e6ab004
fix
forsaken628 Dec 31, 2024
33b3724
fix
forsaken628 Dec 31, 2024
aacda80
Merge remote-tracking branch 'up/main' into or_null
forsaken628 Jan 2, 2025
0a2d53b
num_states
forsaken628 Jan 3, 2025
28d4891
Merge branch 'main' into or_null
forsaken628 Jan 6, 2025
768b9d3
Merge remote-tracking branch 'origin/or_null' into or_null
forsaken628 Jan 6, 2025
a12e5ab
get_state_layout
forsaken628 Jan 6, 2025
b9b5984
fix
forsaken628 Jan 6, 2025
be92559
new ornull
forsaken628 Jan 7, 2025
3e9dd3b
Merge remote-tracking branch 'up/main' into or_null
forsaken628 Jan 7, 2025
020d6c9
fix
forsaken628 Jan 7, 2025
940272b
fix
forsaken628 Jan 7, 2025
9b680a7
AggregatePartial output_schema
forsaken628 Jan 7, 2025
0eca346
revert mutil serialize
forsaken628 Jan 8, 2025
cce286e
fix
forsaken628 Jan 8, 2025
be8df9d
refine
forsaken628 Jan 9, 2025
23b52e1
Merge remote-tracking branch 'up/main' into or_null
forsaken628 Jan 9, 2025
e677020
fix
forsaken628 Jan 9, 2025
f44dfdc
update
forsaken628 Jan 9, 2025
ab6f046
update
forsaken628 Jan 9, 2025
1738c52
fix
forsaken628 Jan 10, 2025
bf7e7cc
fix
forsaken628 Jan 10, 2025
78603dd
refine
forsaken628 Jan 10, 2025
333b590
fix
forsaken628 Jan 13, 2025
86b353e
fix
forsaken628 Jan 13, 2025
018122c
update
forsaken628 Jan 13, 2025
1e14ba5
array alloc
forsaken628 Jan 13, 2025
3397f5e
Merge remote-tracking branch 'up/main' into or_null
forsaken628 Jan 14, 2025
ac37dda
test
forsaken628 Jan 14, 2025
5ecfa0f
comment
forsaken628 Jan 14, 2025
e6fa24f
fix
forsaken628 Jan 14, 2025
ec38fec
Merge remote-tracking branch 'up/main' into or_null
forsaken628 Jan 17, 2025
1d264fe
fix
forsaken628 Jan 17, 2025
8f6eeed
Merge remote-tracking branch 'up/main' into or_null
forsaken628 Jan 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ arrow-ord = { workspace = true }
criterion = { workspace = true }
goldenfile = { workspace = true }
pretty_assertions = { workspace = true }
proptest = { workspace = true }
rand = { workspace = true }

[[bench]]
Expand Down
83 changes: 35 additions & 48 deletions src/query/expression/src/aggregate/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::alloc::Layout;
use std::fmt;
use std::sync::Arc;

use databend_common_column::bitmap::Bitmap;
use databend_common_exception::Result;

use super::AggrState;
use super::AggrStateLoc;
use super::AggrStateRegistry;
use super::StateAddr;
use crate::types::binary::BinaryColumnBuilder;
use crate::types::BinaryColumn;
use crate::types::DataType;
use crate::Column;
use crate::ColumnBuilder;
Expand All @@ -35,76 +37,61 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
fn name(&self) -> &str;
fn return_type(&self) -> Result<DataType>;

fn init_state(&self, place: StateAddr);
fn init_state(&self, place: AggrState);

fn is_state(&self) -> bool {
false
}

fn state_layout(&self) -> Layout;
fn register_state(&self, registry: &mut AggrStateRegistry);

// accumulate is to accumulate the arrays in batch mode
// common used when there is no group by for aggregate function
fn accumulate(
&self,
_place: StateAddr,
_columns: InputColumns,
_validity: Option<&Bitmap>,
_input_rows: usize,
place: AggrState,
columns: InputColumns,
validity: Option<&Bitmap>,
input_rows: usize,
) -> Result<()>;

// used when we need to calculate with group keys
fn accumulate_keys(
&self,
places: &[StateAddr],
offset: usize,
addrs: &[StateAddr],
loc: &[AggrStateLoc],
columns: InputColumns,
_input_rows: usize,
) -> Result<()> {
for (row, place) in places.iter().enumerate() {
self.accumulate_row(place.next(offset), columns, row)?;
for (row, addr) in addrs.iter().enumerate() {
self.accumulate_row(AggrState::new(*addr, loc), columns, row)?;
}
Ok(())
}

// Used in aggregate_null_adaptor
fn accumulate_row(&self, _place: StateAddr, _columns: InputColumns, _row: usize) -> Result<()>;
fn accumulate_row(&self, place: AggrState, columns: InputColumns, row: usize) -> Result<()>;

// serialize the state into binary array
fn batch_serialize(
&self,
places: &[StateAddr],
offset: usize,
builder: &mut BinaryColumnBuilder,
) -> Result<()> {
for place in places {
self.serialize(place.next(offset), &mut builder.data)?;
builder.commit_row();
}
Ok(())
}

fn serialize(&self, _place: StateAddr, _writer: &mut Vec<u8>) -> Result<()>;
fn serialize(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()>;

fn serialize_size_per_row(&self) -> Option<usize> {
None
}

fn merge(&self, _place: StateAddr, _reader: &mut &[u8]) -> Result<()>;
fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()>;

/// Batch merge and deserialize the state from binary array
fn batch_merge(&self, places: &[StateAddr], offset: usize, column: &Column) -> Result<()> {
let c = column.as_binary().unwrap();
for (place, mut data) in places.iter().zip(c.iter()) {
self.merge(place.next(offset), &mut data)?;
fn batch_merge(
&self,
places: &[StateAddr],
loc: &[AggrStateLoc],
state: &BinaryColumn,
) -> Result<()> {
for (place, mut data) in places.iter().zip(state.iter()) {
self.merge(AggrState::new(*place, loc), &mut data)?;
}

Ok(())
}

fn batch_merge_single(&self, place: StateAddr, column: &Column) -> Result<()> {
let c = column.as_binary().unwrap();

fn batch_merge_single(&self, place: AggrState, state: &Column) -> Result<()> {
let c = state.as_binary().unwrap();
for mut data in c.iter() {
self.merge(place, &mut data)?;
}
Expand All @@ -115,29 +102,29 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
&self,
places: &[StateAddr],
rhses: &[StateAddr],
offset: usize,
loc: &[AggrStateLoc],
) -> Result<()> {
for (place, rhs) in places.iter().zip(rhses.iter()) {
self.merge_states(place.next(offset), rhs.next(offset))?;
self.merge_states(AggrState::new(*place, loc), AggrState::new(*rhs, loc))?;
}
Ok(())
}

fn merge_states(&self, _place: StateAddr, _rhs: StateAddr) -> Result<()>;
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()>;

fn batch_merge_result(
&self,
places: &[StateAddr],
offset: usize,
loc: Box<[AggrStateLoc]>,
builder: &mut ColumnBuilder,
) -> Result<()> {
for place in places {
self.merge_result(place.next(offset), builder)?;
self.merge_result(AggrState::new(*place, &loc), builder)?;
}
Ok(())
}
// TODO append the value into the column builder
fn merge_result(&self, _place: StateAddr, _builder: &mut ColumnBuilder) -> Result<()>;

fn merge_result(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()>;

// std::mem::needs_drop::<State>
// if true will call drop_state
Expand All @@ -147,7 +134,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {

/// # Safety
/// The caller must ensure that the [`_place`] has defined memory.
unsafe fn drop_state(&self, _place: StateAddr) {}
unsafe fn drop_state(&self, _place: AggrState) {}

fn get_own_null_adaptor(
&self,
Expand Down
Loading
Loading