Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): record state_offsets to reduce the memory leak of agg #17285

Merged
merged 8 commits into from
Jan 16, 2025
10 changes: 5 additions & 5 deletions src/common/hashtable/src/dictionary_string_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::ptr::NonNull;
use std::sync::Arc;

use bumpalo::Bump;
use databend_common_base::mem_allocator::MmapAllocator;
use databend_common_base::mem_allocator::DefaultAllocator;

use crate::container::Container;
use crate::container::HeapContainer;
Expand Down Expand Up @@ -102,7 +102,7 @@ pub struct DictionaryStringHashTable<V> {
arena: Arc<Bump>,
dict_keys: usize,
entries_len: usize,
pub(crate) entries: HeapContainer<DictionaryEntry<V>, MmapAllocator>,
pub(crate) entries: HeapContainer<DictionaryEntry<V>, DefaultAllocator>,
pub dictionary_hashset: StringHashSet<[u8]>,
}

Expand All @@ -116,7 +116,7 @@ impl<V> DictionaryStringHashTable<V> {
arena: bump.clone(),
dict_keys,
entries_len: 0,
entries: unsafe { HeapContainer::new_zeroed(256, MmapAllocator::default()) },
entries: unsafe { HeapContainer::new_zeroed(256, DefaultAllocator {}) },
dictionary_hashset: StringHashSet::new(bump),
}
}
Expand Down Expand Up @@ -452,7 +452,7 @@ impl<V> HashtableLike for DictionaryStringHashTable<V> {
}
}

self.entries = HeapContainer::new_zeroed(0, MmapAllocator::default());
self.entries = HeapContainer::new_zeroed(0, DefaultAllocator {});
}

self.dictionary_hashset.clear();
Expand Down Expand Up @@ -615,7 +615,7 @@ impl<'a, V> Iterator for DictionaryTableMutIter<'a, V> {
}

pub struct DictionarySlotIter<'a> {
empty: Option<&'a TableEmpty<(), MmapAllocator>>,
empty: Option<&'a TableEmpty<(), DefaultAllocator>>,
entities_slice: &'a [Entry<FallbackKey, ()>],
i: usize,
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/hashtable/src/hashjoin_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::marker::PhantomData;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use databend_common_base::mem_allocator::MmapAllocator;
use databend_common_base::mem_allocator::DefaultAllocator;
use databend_common_column::bitmap::Bitmap;

use super::traits::HashJoinHashtableLike;
Expand Down Expand Up @@ -101,7 +101,7 @@ pub fn hash_bits() -> u32 {
}
}

pub struct HashJoinHashTable<K: Keyable, A: Allocator + Clone = MmapAllocator> {
pub struct HashJoinHashTable<K: Keyable, A: Allocator + Clone = DefaultAllocator> {
pub(crate) pointers: Box<[u64], A>,
pub(crate) atomic_pointers: *mut AtomicU64,
pub(crate) hash_shift: usize,
Expand Down
4 changes: 2 additions & 2 deletions src/common/hashtable/src/hashjoin_string_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::alloc::Allocator;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use databend_common_base::mem_allocator::MmapAllocator;
use databend_common_base::mem_allocator::DefaultAllocator;
use databend_common_column::bitmap::Bitmap;

use super::traits::HashJoinHashtableLike;
Expand All @@ -37,7 +37,7 @@ pub struct StringRawEntry {
pub next: u64,
}

pub struct HashJoinStringHashTable<A: Allocator + Clone = MmapAllocator> {
pub struct HashJoinStringHashTable<A: Allocator + Clone = DefaultAllocator> {
pub(crate) pointers: Box<[u64], A>,
pub(crate) atomic_pointers: *mut AtomicU64,
pub(crate) hash_shift: usize,
Expand Down
4 changes: 2 additions & 2 deletions src/common/hashtable/src/hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::intrinsics::unlikely;
use std::iter::TrustedLen;
use std::mem::MaybeUninit;

use databend_common_base::mem_allocator::MmapAllocator;
use databend_common_base::mem_allocator::DefaultAllocator;

use super::container::HeapContainer;
use super::table0::Entry;
Expand All @@ -29,7 +29,7 @@ use super::traits::Keyable;
use super::utils::ZeroEntry;
use crate::FastHash;

pub struct Hashtable<K, V, A = MmapAllocator>
pub struct Hashtable<K, V, A = DefaultAllocator>
where
K: Keyable,
A: Allocator + Clone,
Expand Down
10 changes: 7 additions & 3 deletions src/common/hashtable/src/lookup_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ use std::iter::TrustedLen;
use std::mem;
use std::mem::MaybeUninit;

use databend_common_base::mem_allocator::MmapAllocator;
use databend_common_base::mem_allocator::DefaultAllocator;

use crate::table0::Entry;
use crate::HashtableLike;

pub struct LookupHashtable<K: Sized, const CAPACITY: usize, V, A: Allocator + Clone = MmapAllocator>
{
pub struct LookupHashtable<
K: Sized,
const CAPACITY: usize,
V,
A: Allocator + Clone = DefaultAllocator,
> {
flags: Box<[bool; CAPACITY], A>,
data: Box<[Entry<K, V>; CAPACITY], A>,
len: usize,
Expand Down
4 changes: 2 additions & 2 deletions src/common/hashtable/src/short_string_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::ptr::NonNull;
use std::sync::Arc;

use bumpalo::Bump;
use databend_common_base::mem_allocator::MmapAllocator;
use databend_common_base::mem_allocator::DefaultAllocator;

use super::container::HeapContainer;
use super::table0::Entry;
Expand All @@ -39,7 +39,7 @@ use crate::table_empty::TableEmpty;
use crate::table_empty::TableEmptyIter;
use crate::table_empty::TableEmptyIterMut;

pub struct ShortStringHashtable<K, V, A = MmapAllocator>
pub struct ShortStringHashtable<K, V, A = DefaultAllocator>
where
K: UnsizedKeyable + ?Sized,
A: Allocator + Clone,
Expand Down
4 changes: 2 additions & 2 deletions src/common/hashtable/src/stack_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::alloc::Allocator;
use std::intrinsics::unlikely;
use std::mem::MaybeUninit;

use databend_common_base::mem_allocator::MmapAllocator;
use databend_common_base::mem_allocator::DefaultAllocator;

use super::container::StackContainer;
use super::table0::Entry;
Expand All @@ -25,7 +25,7 @@ use super::table0::Table0Iter;
use super::traits::Keyable;
use super::utils::ZeroEntry;

pub struct StackHashtable<K, V, const N: usize = 16, A = MmapAllocator>
pub struct StackHashtable<K, V, const N: usize = 16, A = DefaultAllocator>
where
K: Keyable,
A: Allocator + Clone,
Expand Down
4 changes: 2 additions & 2 deletions src/common/hashtable/src/string_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::mem::MaybeUninit;
use std::sync::Arc;

use bumpalo::Bump;
use databend_common_base::mem_allocator::MmapAllocator;
use databend_common_base::mem_allocator::DefaultAllocator;

use super::container::HeapContainer;
use super::table0::Entry;
Expand All @@ -38,7 +38,7 @@ use crate::table_empty::TableEmptyIterMut;
/// Simple unsized hashtable is used for storing unsized keys in arena. It can be worked with HashMethodSerializer.
/// Different from `ShortStringHashTable`, it doesn't use adaptive sub hashtable to store key values via key size.
/// It can be considered as a minimal hashtable implementation of ShortStringHashTable
pub struct StringHashtable<K, V, A = MmapAllocator>
pub struct StringHashtable<K, V, A = DefaultAllocator>
where
K: UnsizedKeyable + ?Sized,
A: Allocator + Clone,
Expand Down
46 changes: 40 additions & 6 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use bumpalo::Bump;
use databend_common_base::runtime::drop_guard;
use itertools::Itertools;
use log::info;
use strength_reduce::StrengthReducedU64;

use super::payload_row::rowformat_size;
Expand Down Expand Up @@ -78,10 +79,18 @@ unsafe impl Sync for Payload {}
pub struct Page {
pub(crate) data: Vec<MaybeUninit<u8>>,
pub(crate) rows: usize,
pub(crate) state_rows: usize,
// state_offset = state_rows * agg_len
// which mark that the offset to clean the agg states
pub(crate) state_offsets: usize,
pub(crate) capacity: usize,
}

impl Page {
pub fn is_partial_state(&self, agg_len: usize) -> bool {
self.rows * agg_len != self.state_offsets
}
}

pub type Pages = Vec<Page>;

// TODO FIXME
Expand Down Expand Up @@ -179,7 +188,7 @@ impl Payload {
self.pages.push(Page {
data,
rows: 0,
state_rows: 0,
state_offsets: 0,
capacity: self.row_per_page,
});
}
Expand Down Expand Up @@ -300,10 +309,18 @@ impl Payload {
}

let place = StateAddr::from(place);
let page = &mut self.pages[page_index[idx]];
for (aggr, offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) {
aggr.init_state(place.next(*offset));
page.state_offsets += 1;
}
}

#[cfg(debug_assertions)]
{
for page in self.pages.iter() {
assert_eq!(page.rows * self.aggrs.len(), page.state_offsets);
}
self.pages[page_index[idx]].state_rows += 1;
}
}

Expand Down Expand Up @@ -338,6 +355,7 @@ impl Payload {
address: &[*const u8],
) {
let tuple_size = self.tuple_size;
let agg_len = self.aggrs.len();
let (mut page, _) = self.writable_page();
for i in 0..row_count {
let index = select_vector[i];
Expand All @@ -350,7 +368,7 @@ impl Payload {
)
}
page.rows += 1;
page.state_rows += 1;
page.state_offsets += agg_len;

if page.rows == page.capacity {
(page, _) = self.writable_page();
Expand Down Expand Up @@ -421,10 +439,26 @@ impl Drop for Payload {
drop_guard(move || {
// drop states
if !self.state_move_out {
for (aggr, addr_offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) {
'FOR: for (idx, (aggr, addr_offset)) in self
.aggrs
.iter()
.zip(self.state_addr_offsets.iter())
.enumerate()
{
if aggr.need_manual_drop_state() {
for page in self.pages.iter() {
for row in 0..page.state_rows {
let is_partial_state = page.is_partial_state(self.aggrs.len());

if is_partial_state && idx == 0 {
info!("Cleaning partial page, state_offsets: {}, row: {}, agg length: {}", page.state_offsets, page.rows, self.aggrs.len());
}
for row in 0..page.state_offsets.div_ceil(self.aggrs.len()) {
// When OOM, some states are not initialized, we don't need to destroy them
if is_partial_state
&& row * self.aggrs.len() + idx >= page.state_offsets
{
continue 'FOR;
}
let ptr = self.data_ptr(page, row);
unsafe {
let state_addr =
Expand Down
Loading