Skip to content

Commit

Permalink
fix(query): record state_rows in aggregate payload (databendlabs#17194)
Browse files Browse the repository at this point in the history
* chore(query): use CommonHashSet to store  AggregateUniqStringState

* fix(query): check state is allocated or not when oom

* fix(query): zero init pages

* fix(query): zero init pages

* update
  • Loading branch information
sundy-li authored Jan 8, 2025
1 parent a83eb8f commit 64c53b2
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 22 deletions.
2 changes: 2 additions & 0 deletions src/query/expression/src/aggregate/partitioned_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl PartitionedPayload {
&state.empty_vector,
&state.group_hashes,
&mut state.addresses,
&mut state.page_index,
new_group_rows,
group_columns,
);
Expand All @@ -134,6 +135,7 @@ impl PartitionedPayload {
sel,
&state.group_hashes,
&mut state.addresses,
&mut state.page_index,
count,
group_columns,
);
Expand Down
49 changes: 27 additions & 22 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ unsafe impl Sync for Payload {}
pub struct Page {
pub(crate) data: Vec<MaybeUninit<u8>>,
pub(crate) rows: usize,
pub(crate) state_rows: usize,
pub(crate) capacity: usize,
}

Expand Down Expand Up @@ -167,21 +168,26 @@ impl Payload {
}

#[inline]
pub fn writable_page(&mut self) -> &mut Page {
pub fn writable_page(&mut self) -> (&mut Page, usize) {
if self.current_write_page == 0
|| self.pages[self.current_write_page - 1].rows
== self.pages[self.current_write_page - 1].capacity
{
self.current_write_page += 1;
if self.current_write_page > self.pages.len() {
let data = Vec::with_capacity(self.row_per_page * self.tuple_size);
self.pages.push(Page {
data: Vec::with_capacity(self.row_per_page * self.tuple_size),
data,
rows: 0,
state_rows: 0,
capacity: self.row_per_page,
});
}
}
&mut self.pages[self.current_write_page - 1]
(
&mut self.pages[self.current_write_page - 1],
self.current_write_page - 1,
)
}

#[inline]
Expand All @@ -194,31 +200,27 @@ impl Payload {
select_vector: &SelectVector,
group_hashes: &[u64],
address: &mut [*const u8],
page_index: &mut [usize],
new_group_rows: usize,
group_columns: InputColumns,
) {
let tuple_size = self.tuple_size;
let mut page = self.writable_page();
let (mut page, mut page_index_value) = self.writable_page();
for idx in select_vector.iter().take(new_group_rows).copied() {
address[idx] = unsafe { page.data.as_ptr().add(page.rows * tuple_size) as *const u8 };
page_index[idx] = page_index_value;
page.rows += 1;

if page.rows == page.capacity {
page = self.writable_page();
(page, page_index_value) = self.writable_page();
}
}

self.total_rows += new_group_rows;

debug_assert_eq!(
self.total_rows,
self.pages.iter().map(|x| x.rows).sum::<usize>()
);

self.append_rows(
select_vector,
group_hashes,
address,
page_index,
new_group_rows,
group_columns,
)
Expand All @@ -229,6 +231,7 @@ impl Payload {
select_vector: &SelectVector,
group_hashes: &[u64],
address: &mut [*const u8],
page_index: &mut [usize],
new_group_rows: usize,
group_columns: InputColumns,
) {
Expand Down Expand Up @@ -300,8 +303,16 @@ impl Payload {
for (aggr, offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) {
aggr.init_state(place.next(*offset));
}
self.pages[page_index[idx]].state_rows += 1;
}
}

self.total_rows += new_group_rows;

debug_assert_eq!(
self.total_rows,
self.pages.iter().map(|x| x.rows).sum::<usize>()
);
}

pub fn combine(&mut self, mut other: Payload) {
Expand All @@ -327,7 +338,7 @@ impl Payload {
address: &[*const u8],
) {
let tuple_size = self.tuple_size;
let mut page = self.writable_page();
let (mut page, _) = self.writable_page();
for i in 0..row_count {
let index = select_vector[i];

Expand All @@ -341,7 +352,7 @@ impl Payload {
page.rows += 1;

if page.rows == page.capacity {
page = self.writable_page();
(page, _) = self.writable_page();
}
}

Expand Down Expand Up @@ -412,18 +423,12 @@ impl Drop for Payload {
if !self.state_move_out {
for (aggr, addr_offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) {
if aggr.need_manual_drop_state() {
'PAGE_END: for page in self.pages.iter() {
for row in 0..page.rows {
for page in self.pages.iter() {
for row in 0..page.state_rows {
let ptr = self.data_ptr(page, row);
unsafe {
let state_addr =
read::<u64>(ptr.add(self.state_offset) as _) as usize;

// row is reserved, but not written (maybe throw by oom error)
if state_addr == 0 {
break 'PAGE_END;
}

let state_place = StateAddr::new(state_addr);
aggr.drop_state(state_place.next(*addr_offset));
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/src/aggregate/probe_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::BATCH_SIZE;
pub struct ProbeState {
pub group_hashes: [u64; BATCH_SIZE],
pub addresses: [*const u8; BATCH_SIZE],
pub page_index: [usize; BATCH_SIZE],
pub state_places: [StateAddr; BATCH_SIZE],
pub group_compare_vector: SelectVector,
pub no_match_vector: SelectVector,
Expand All @@ -38,6 +39,7 @@ impl Default for ProbeState {
Self {
group_hashes: [0_u64; BATCH_SIZE],
addresses: [std::ptr::null::<u8>(); BATCH_SIZE],
page_index: [0; BATCH_SIZE],
state_places: [StateAddr::new(0); BATCH_SIZE],
group_compare_vector: new_sel(),
no_match_vector: new_sel(),
Expand Down

0 comments on commit 64c53b2

Please sign in to comment.