Skip to content

Commit

Permalink
refactor(query): Multiple states Aggregate function (#17148)
Browse files Browse the repository at this point in the history
* refine

* AggrState

* ref AggrState

* AggrStateLoc

* fix

* fix

* fix

* num_states

* get_state_layout

* fix

* new ornull

* fix

* fix

* AggregatePartial output_schema

* revert mutil serialize

* fix

* refine

* fix

* update

* update

* fix

* fix

* refine

* fix

* fix

* update

* array alloc

* test

* comment

* fix

* fix
  • Loading branch information
forsaken628 authored Jan 20, 2025
1 parent 4db02ec commit 625e31a
Show file tree
Hide file tree
Showing 55 changed files with 1,685 additions and 1,400 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ arrow-ord = { workspace = true }
criterion = { workspace = true }
goldenfile = { workspace = true }
pretty_assertions = { workspace = true }
proptest = { workspace = true }
rand = { workspace = true }

[[bench]]
Expand Down
83 changes: 35 additions & 48 deletions src/query/expression/src/aggregate/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::alloc::Layout;
use std::fmt;
use std::sync::Arc;

use databend_common_column::bitmap::Bitmap;
use databend_common_exception::Result;

use super::AggrState;
use super::AggrStateLoc;
use super::AggrStateRegistry;
use super::StateAddr;
use crate::types::binary::BinaryColumnBuilder;
use crate::types::BinaryColumn;
use crate::types::DataType;
use crate::Column;
use crate::ColumnBuilder;
Expand All @@ -35,76 +37,61 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
fn name(&self) -> &str;
fn return_type(&self) -> Result<DataType>;

fn init_state(&self, place: StateAddr);
fn init_state(&self, place: AggrState);

fn is_state(&self) -> bool {
false
}

fn state_layout(&self) -> Layout;
fn register_state(&self, registry: &mut AggrStateRegistry);

// accumulate is to accumulate the arrays in batch mode
// common used when there is no group by for aggregate function
fn accumulate(
&self,
_place: StateAddr,
_columns: InputColumns,
_validity: Option<&Bitmap>,
_input_rows: usize,
place: AggrState,
columns: InputColumns,
validity: Option<&Bitmap>,
input_rows: usize,
) -> Result<()>;

// used when we need to calculate with group keys
fn accumulate_keys(
&self,
places: &[StateAddr],
offset: usize,
addrs: &[StateAddr],
loc: &[AggrStateLoc],
columns: InputColumns,
_input_rows: usize,
) -> Result<()> {
for (row, place) in places.iter().enumerate() {
self.accumulate_row(place.next(offset), columns, row)?;
for (row, addr) in addrs.iter().enumerate() {
self.accumulate_row(AggrState::new(*addr, loc), columns, row)?;
}
Ok(())
}

// Used in aggregate_null_adaptor
fn accumulate_row(&self, _place: StateAddr, _columns: InputColumns, _row: usize) -> Result<()>;
fn accumulate_row(&self, place: AggrState, columns: InputColumns, row: usize) -> Result<()>;

// serialize the state into binary array
fn batch_serialize(
&self,
places: &[StateAddr],
offset: usize,
builder: &mut BinaryColumnBuilder,
) -> Result<()> {
for place in places {
self.serialize(place.next(offset), &mut builder.data)?;
builder.commit_row();
}
Ok(())
}

fn serialize(&self, _place: StateAddr, _writer: &mut Vec<u8>) -> Result<()>;
fn serialize(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()>;

fn serialize_size_per_row(&self) -> Option<usize> {
None
}

fn merge(&self, _place: StateAddr, _reader: &mut &[u8]) -> Result<()>;
fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()>;

/// Batch merge and deserialize the state from binary array
fn batch_merge(&self, places: &[StateAddr], offset: usize, column: &Column) -> Result<()> {
let c = column.as_binary().unwrap();
for (place, mut data) in places.iter().zip(c.iter()) {
self.merge(place.next(offset), &mut data)?;
fn batch_merge(
&self,
places: &[StateAddr],
loc: &[AggrStateLoc],
state: &BinaryColumn,
) -> Result<()> {
for (place, mut data) in places.iter().zip(state.iter()) {
self.merge(AggrState::new(*place, loc), &mut data)?;
}

Ok(())
}

fn batch_merge_single(&self, place: StateAddr, column: &Column) -> Result<()> {
let c = column.as_binary().unwrap();

fn batch_merge_single(&self, place: AggrState, state: &Column) -> Result<()> {
let c = state.as_binary().unwrap();
for mut data in c.iter() {
self.merge(place, &mut data)?;
}
Expand All @@ -115,29 +102,29 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
&self,
places: &[StateAddr],
rhses: &[StateAddr],
offset: usize,
loc: &[AggrStateLoc],
) -> Result<()> {
for (place, rhs) in places.iter().zip(rhses.iter()) {
self.merge_states(place.next(offset), rhs.next(offset))?;
self.merge_states(AggrState::new(*place, loc), AggrState::new(*rhs, loc))?;
}
Ok(())
}

fn merge_states(&self, _place: StateAddr, _rhs: StateAddr) -> Result<()>;
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()>;

fn batch_merge_result(
&self,
places: &[StateAddr],
offset: usize,
loc: Box<[AggrStateLoc]>,
builder: &mut ColumnBuilder,
) -> Result<()> {
for place in places {
self.merge_result(place.next(offset), builder)?;
self.merge_result(AggrState::new(*place, &loc), builder)?;
}
Ok(())
}
// TODO append the value into the column builder
fn merge_result(&self, _place: StateAddr, _builder: &mut ColumnBuilder) -> Result<()>;

fn merge_result(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()>;

// std::mem::needs_drop::<State>
// if true will call drop_state
Expand All @@ -147,7 +134,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {

/// # Safety
/// The caller must ensure that the [`_place`] has defined memory.
unsafe fn drop_state(&self, _place: StateAddr) {}
unsafe fn drop_state(&self, _place: AggrState) {}

fn get_own_null_adaptor(
&self,
Expand Down
Loading

0 comments on commit 625e31a

Please sign in to comment.