Skip to content

Commit

Permalink
chore: group limit optimization in new hashtable (#14989)
Browse files Browse the repository at this point in the history
* chore: group limit optimizer in new hashtable

* chore: group limit optimizer in new hashtable

* chore: group limit optimizer in new hashtable
  • Loading branch information
sundy-li authored Mar 18, 2024
1 parent da77059 commit c485db6
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 30 deletions.
25 changes: 25 additions & 0 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ impl AggregateHashTable {
}
}

pub fn new_directly(
group_types: Vec<DataType>,
aggrs: Vec<AggregateFunctionRef>,
config: HashTableConfig,
capacity: usize,
arena: Arc<Bump>,
) -> Self {
Self {
entries: vec![],
count: 0,
direct_append: true,
current_radix_bits: config.initial_radix_bits,
payload: PartitionedPayload::new(
group_types,
aggrs,
1 << config.initial_radix_bits,
vec![arena],
),
capacity,
config,
}
}

pub fn len(&self) -> usize {
self.payload.len()
}
Expand Down Expand Up @@ -272,6 +295,7 @@ impl AggregateHashTable {
// 2. append new_group_count to payload
if new_entry_count != 0 {
new_group_count += new_entry_count;

self.payload
.append_rows(state, new_entry_count, group_columns);

Expand Down Expand Up @@ -535,6 +559,7 @@ impl AggregateHashTable {
}

pub fn clear_ht(&mut self) {
self.payload.mark_min_cardinality();
self.entries.fill(0);
}

Expand Down
7 changes: 7 additions & 0 deletions src/query/expression/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,11 @@ impl HashTableConfig {

self
}

pub fn cluster_with_partial(mut self, partial_agg: bool, node_nums: usize) -> Self {
self.partial_agg = partial_agg;
self.max_partial_capacity = 131072 * (2 << node_nums);

self
}
}
19 changes: 16 additions & 3 deletions src/query/expression/src/aggregate/partitioned_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ impl PartitionedPayload {
}
}

pub fn mark_min_cardinality(&mut self) {
for payload in self.payloads.iter_mut() {
payload.mark_min_cardinality();
}
}

pub fn append_rows(
&mut self,
state: &mut ProbeState,
Expand Down Expand Up @@ -161,12 +167,17 @@ impl PartitionedPayload {
state.clear();

for payload in other.payloads.into_iter() {
self.combine_single(payload, state)
self.combine_single(payload, state, None)
}
}
}

pub fn combine_single(&mut self, mut other: Payload, state: &mut PayloadFlushState) {
pub fn combine_single(
&mut self,
mut other: Payload,
state: &mut PayloadFlushState,
only_bucket: Option<usize>,
) {
if other.len() == 0 {
return;
}
Expand All @@ -179,7 +190,9 @@ impl PartitionedPayload {
// flush for other's each page to correct partition
while self.gather_flush(&other, state) {
// copy rows
for partition in 0..self.partition_count as usize {
for partition in (0..self.partition_count as usize)
.filter(|x| only_bucket.is_none() || only_bucket == Some(*x))
{
let payload = &mut self.payloads[partition];
let count = state.probe_state.partition_count[partition];

Expand Down
11 changes: 11 additions & 0 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub struct Payload {
pub state_offset: usize,
pub state_addr_offsets: Vec<usize>,
pub state_layout: Option<Layout>,

// if set, the payload contains at least duplicate rows
pub min_cardinality: Option<usize>,
}

unsafe impl Send for Payload {}
Expand Down Expand Up @@ -135,6 +138,7 @@ impl Payload {
aggrs,
tuple_size,
row_per_page,
min_cardinality: None,
total_rows: 0,
group_offsets,
group_sizes,
Expand Down Expand Up @@ -304,10 +308,17 @@ impl Payload {
other.total_rows,
other.pages.iter().map(|x| x.rows).sum::<usize>()
);

self.total_rows += other.total_rows;
self.pages.append(other.pages.as_mut());
}

pub fn mark_min_cardinality(&mut self) {
if self.min_cardinality.is_none() {
self.min_cardinality = Some(self.total_rows);
}
}

pub fn copy_rows(
&mut self,
select_vector: &SelectVector,
Expand Down
8 changes: 6 additions & 2 deletions src/query/service/src/pipelines/builders/builder_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,12 @@ impl PipelineBuilder {
let method = DataBlock::choose_hash_method(&sample_block, group_cols, efficiently_memory)?;

// Need a global atomic to read the max current radix bits hint
let partial_agg_config =
HashTableConfig::default().with_partial(true, max_threads as usize);
let partial_agg_config = if self.ctx.get_cluster().is_empty() {
HashTableConfig::default().with_partial(true, max_threads as usize)
} else {
HashTableConfig::default()
.cluster_with_partial(true, self.ctx.get_cluster().nodes.len())
};

self.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,16 @@ fn scatter<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>(
Ok(res)
}

fn scatter_paylaod(mut payload: Payload, buckets: usize) -> Result<Vec<Payload>> {
fn scatter_payload(mut payload: Payload, buckets: usize) -> Result<Vec<Payload>> {
let mut buckets = Vec::with_capacity(buckets);

let group_types = payload.group_types.clone();
let aggrs = payload.aggrs.clone();
let mut state = PayloadFlushState::default();

for _ in 0..buckets.capacity() {
buckets.push(Payload::new(
payload.arena.clone(),
group_types.clone(),
aggrs.clone(),
));
let p = Payload::new(payload.arena.clone(), group_types.clone(), aggrs.clone());
buckets.push(p);
}

// scatter each page of the payload.
Expand Down Expand Up @@ -224,7 +221,7 @@ fn scatter_partitioned_payload(
}

for (idx, payload) in payloads.into_iter().enumerate() {
buckets[idx].combine_single(payload, &mut state);
buckets[idx].combine_single(payload, &mut state, None);
}

Ok(buckets)
Expand Down Expand Up @@ -276,7 +273,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> FlightScatter
}
AggregateMeta::AggregateHashTable(_) => unreachable!(),
AggregateMeta::AggregatePayload(p) => {
for payload in scatter_paylaod(p.payload, self.buckets)? {
for payload in scatter_payload(p.payload, self.buckets)? {
blocks.push(DataBlock::empty_with_meta(
AggregateMeta::<Method, V>::create_agg_payload(
p.bucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,13 @@ impl SerializedPayload {
let mut state = ProbeState::default();
let agg_len = aggrs.len();
let group_len = group_types.len();
let mut hashtable = AggregateHashTable::new_with_capacity(
let mut hashtable = AggregateHashTable::new_directly(
group_types,
aggrs,
config,
rows_num,
Arc::new(Bump::new()),
);
hashtable.direct_append = true;

let agg_states = (0..agg_len)
.map(|i| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> Processor
vec![Arc::new(Bump::new())],
);

let mut reach_limit_bucket = None;

for agg_payload in self.agg_payloads.drain(0..) {
if !partitioned_payload.include_arena(&agg_payload.payload.arena) {
partitioned_payload
Expand All @@ -483,27 +485,49 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> Processor

if agg_payload.max_partition_count != self.max_partition_count {
debug_assert!(agg_payload.max_partition_count < self.max_partition_count);
partitioned_payload.combine_single(agg_payload.payload, &mut self.flush_state);
} else {
partitioned_payload.combine_single(
agg_payload.payload,
&mut self.flush_state,
reach_limit_bucket,
);
} else if reach_limit_bucket.is_none()
|| reach_limit_bucket == Some(agg_payload.bucket as usize)
{
let add_duplicates = agg_payload.payload.min_cardinality.unwrap_or_default();

partitioned_payload.payloads[agg_payload.bucket as usize]
.combine(agg_payload.payload);

if reach_limit_bucket.is_none() {
if let Some(limit) = self.params.limit {
if add_duplicates >= limit {
log::info!(
"Group Limit optimizer, receive {add_duplicates} groups over {limit}"
);
reach_limit_bucket = Some(agg_payload.bucket as usize);
}
}
}
}
}

for (bucket, payload) in partitioned_payload.payloads.into_iter().enumerate() {
let mut part = PartitionedPayload::new(
group_types.clone(),
aggrs.clone(),
1,
partitioned_payload.arenas.clone(),
);
part.combine_single(payload, &mut self.flush_state);

if part.len() != 0 {
self.buckets_blocks
.insert(bucket as isize, vec![DataBlock::empty_with_meta(
AggregateMeta::<Method, V>::create_agg_hashtable(part),
)]);
if reach_limit_bucket.is_none() || reach_limit_bucket == Some(bucket) {
let mut part = PartitionedPayload::new(
group_types.clone(),
aggrs.clone(),
1,
partitioned_payload.arenas.clone(),
);
part.combine_single(payload, &mut self.flush_state, None);

if part.len() != 0 {
self.buckets_blocks.insert(bucket as isize, vec![
DataBlock::empty_with_meta(
AggregateMeta::<Method, V>::create_agg_hashtable(part),
),
]);
}
}
}

Expand Down

0 comments on commit c485db6

Please sign in to comment.