From 73b536f80eeb33c474e1199b89b589ce19ce3359 Mon Sep 17 00:00:00 2001 From: jw Date: Tue, 20 Feb 2024 09:25:14 +0800 Subject: [PATCH 1/5] test --- .../aggregator/aggregate_exchange_injector.rs | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index 3458cb66d36cb..dc8bf9ae1fe77 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -21,6 +21,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; +use databend_common_expression::PartitionedPayload; use databend_common_hashtable::FastHash; use databend_common_hashtable::HashtableEntryMutRefLike; use databend_common_hashtable::HashtableEntryRefLike; @@ -139,6 +140,48 @@ fn scatter( Ok(res) } +fn scatter2(mut payload: PartitionedPayload, buckets: usize) -> Result> { + let mut buckets = Vec::with_capacity(buckets); + + for _ in 0..buckets.capacity() { + buckets.push(PartitionedPayload::new(payload.group_types.clone(), payload.aggrs.clone(), payload.partition_count() as u64)); + } + + let mods = StrengthReducedU64::new(buckets.len() as u64); + payload.repartition(new_partition_count, state) + for item in payload.cell.hashtable.iter() { + let bucket_index = (item.key().fast_hash() % mods) as usize; + + unsafe { + match buckets[bucket_index].insert_and_entry(item.key()) { + Ok(mut entry) => { + *entry.get_mut() = *item.get(); + } + Err(mut entry) => { + *entry.get_mut() = *item.get(); + } + } + } + } + + let mut res = Vec::with_capacity(buckets.len()); + let dropper = payload.cell._dropper.take(); + let arena = std::mem::replace(&mut payload.cell.arena, Area::create()); + payload + .cell + .arena_holders + .push(ArenaHolder::create(Some(arena))); + + for bucket_table in buckets { + let mut cell = HashTableCell::::create(bucket_table, dropper.clone().unwrap()); + cell.arena_holders + .extend(payload.cell.arena_holders.clone()); + res.push(cell); + } + + Ok(res) +} + impl FlightScatter for HashTableHashScatter { @@ -177,7 +220,9 @@ impl FlightScatter } } - AggregateMeta::AggregateHashTable(_) => todo!("AGG_HASHTABLE"), + AggregateMeta::AggregateHashTable(payload) => { + scatter2() + }, }; return Ok(blocks); From 0814f8047958f1831b0efb6ca0e37b89a6700ba8 Mon Sep 17 00:00:00 2001 From: jw Date: Thu, 22 Feb 2024 12:27:52 +0800 Subject: [PATCH 2/5] test --- src/query/expression/src/aggregate/mod.rs | 4 + .../pipelines/builders/builder_aggregate.rs | 14 +- .../aggregator/aggregate_exchange_injector.rs | 60 +++--- .../transform_exchange_group_by_serializer.rs | 23 ++- .../serde/transform_group_by_serializer.rs | 185 ++++++++++++++---- 5 files changed, 208 insertions(+), 78 deletions(-) diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index 93018f1baadf8..358b60bbab210 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -91,6 +91,10 @@ impl HashTableConfig { pub fn with_partial(mut self, partial_agg: bool, active_threads: usize) -> Self { self.partial_agg = partial_agg; + if active_threads <= 0 { + return self; + } + // init max_partial_capacity let total_shared_cache_size = active_threads * L3_CACHE_SIZE; let cache_per_active_thread = diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index f5017b951b87c..20ebb83c6b516 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -102,10 +102,13 @@ impl PipelineBuilder { let max_block_size = self.settings.get_max_block_size()?; let max_threads = self.settings.get_max_threads()?; + // let enable_experimental_aggregate_hashtable = self + // .settings + // .get_enable_experimental_aggregate_hashtable()? + // && self.ctx.get_cluster().is_empty(); let enable_experimental_aggregate_hashtable = self .settings - .get_enable_experimental_aggregate_hashtable()? - && self.ctx.get_cluster().is_empty(); + .get_enable_experimental_aggregate_hashtable()?; let params = Self::build_aggregator_params( aggregate.input.output_schema()?, @@ -211,10 +214,13 @@ impl PipelineBuilder { pub(crate) fn build_aggregate_final(&mut self, aggregate: &AggregateFinal) -> Result<()> { let max_block_size = self.settings.get_max_block_size()?; + // let enable_experimental_aggregate_hashtable = self + // .settings + // .get_enable_experimental_aggregate_hashtable()? + // && self.ctx.get_cluster().is_empty(); let enable_experimental_aggregate_hashtable = self .settings - .get_enable_experimental_aggregate_hashtable()? - && self.ctx.get_cluster().is_empty(); + .get_enable_experimental_aggregate_hashtable()?; let params = Self::build_aggregator_params( aggregate.before_group_by_schema.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index dc8bf9ae1fe77..fbcf50d665000 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -19,9 +19,12 @@ use bumpalo::Bump; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::AggregateHashTable; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; +use databend_common_expression::HashTableConfig; use databend_common_expression::PartitionedPayload; +use databend_common_expression::PayloadFlushState; use databend_common_hashtable::FastHash; use databend_common_hashtable::HashtableEntryMutRefLike; use databend_common_hashtable::HashtableEntryRefLike; @@ -78,7 +81,7 @@ impl ExchangeSorting AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::Serialized(v) => Ok(v.bucket), AggregateMeta::HashTable(v) => Ok(v.bucket), - AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregateHashTable(_) => Ok(0), AggregateMeta::Spilled(_) | AggregateMeta::Spilling(_) | AggregateMeta::BucketSpilled(_) => Ok(-1), @@ -140,46 +143,23 @@ fn scatter( Ok(res) } -fn scatter2(mut payload: PartitionedPayload, buckets: usize) -> Result> { +fn scatter2(payload: PartitionedPayload, buckets: usize) -> Result> { let mut buckets = Vec::with_capacity(buckets); for _ in 0..buckets.capacity() { - buckets.push(PartitionedPayload::new(payload.group_types.clone(), payload.aggrs.clone(), payload.partition_count() as u64)); + let config = HashTableConfig::default().with_partial(true, 0); + buckets.push(AggregateHashTable::new(payload.group_types.clone(), payload.aggrs.clone(), config)); } - let mods = StrengthReducedU64::new(buckets.len() as u64); - payload.repartition(new_partition_count, state) - for item in payload.cell.hashtable.iter() { - let bucket_index = (item.key().fast_hash() % mods) as usize; + let mut state = PayloadFlushState::default(); + let new_payload = payload.repartition(buckets.len(), &mut state); - unsafe { - match buckets[bucket_index].insert_and_entry(item.key()) { - Ok(mut entry) => { - *entry.get_mut() = *item.get(); - } - Err(mut entry) => { - *entry.get_mut() = *item.get(); - } - } - } + for (idx, item) in new_payload.payloads.iter().enumerate() { + let mut flush_state = PayloadFlushState::default(); + buckets[idx].combine_payload(item, &mut flush_state)?; } - let mut res = Vec::with_capacity(buckets.len()); - let dropper = payload.cell._dropper.take(); - let arena = std::mem::replace(&mut payload.cell.arena, Area::create()); - payload - .cell - .arena_holders - .push(ArenaHolder::create(Some(arena))); - - for bucket_table in buckets { - let mut cell = HashTableCell::::create(bucket_table, dropper.clone().unwrap()); - cell.arena_holders - .extend(payload.cell.arena_holders.clone()); - res.push(cell); - } - - Ok(res) + Ok(buckets) } impl FlightScatter @@ -219,9 +199,19 @@ impl FlightScatter }); } } - AggregateMeta::AggregateHashTable(payload) => { - scatter2() + for agg_hashtable in scatter2(payload, self.buckets)? { + // blocks.push(match agg_hashtable.len() == 0 { + // true => DataBlock::empty(), + // false => DataBlock::empty_with_meta( + // AggregateMeta::::create_agg_hashtable(agg_hashtable.payload) + // ), + // }); + blocks.push( + DataBlock::empty_with_meta( + AggregateMeta::::create_agg_hashtable(agg_hashtable.payload) + )) + } }, }; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index 23f92a995ad44..86674f39d84a2 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -67,6 +67,8 @@ use crate::pipelines::processors::transforms::group_by::HashMethodBounds; use crate::pipelines::processors::transforms::group_by::PartitionedHashMethod; use crate::sessions::QueryContext; +use super::SerializePayload; + pub struct TransformExchangeGroupBySerializer { ctx: Arc, method: Method, @@ -207,7 +209,22 @@ impl BlockMetaTransform }, )); } - Some(AggregateMeta::AggregateHashTable(_)) => todo!("AGG_HASHTABLE"), + Some(AggregateMeta::AggregateHashTable(payload)) => { + if index == self.local_pos { + serialized_blocks.push(FlightSerialized::DataBlock(block.add_meta( + Some(Box::new(AggregateMeta::::AggregateHashTable(payload))), + )?)); + continue; + } + let bucket = -1; + let mut stream = SerializeGroupByStream::create(&self.method, SerializePayload::PartitionedPayload(payload)); + serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { + None => DataBlock::empty(), + Some(data_block) => { + serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? + } + })); + } Some(AggregateMeta::HashTable(payload)) => { if index == self.local_pos { serialized_blocks.push(FlightSerialized::DataBlock(block.add_meta( @@ -216,8 +233,8 @@ impl BlockMetaTransform continue; } - let mut stream = SerializeGroupByStream::create(&self.method, payload); - let bucket = stream.payload.bucket; + let bucket = payload.bucket.clone(); + let mut stream = SerializeGroupByStream::create(&self.method, SerializePayload::HashTablePayload(payload)); serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { None => DataBlock::empty(), Some(data_block) => { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs index ecc137af40a64..12b74835d2c35 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -20,6 +20,8 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; +use databend_common_expression::PartitionedPayload; +use databend_common_expression::PayloadFlushState; use databend_common_hashtable::HashtableEntryRefLike; use databend_common_hashtable::HashtableLike; use databend_common_pipeline_core::processors::Event; @@ -126,10 +128,18 @@ impl TransformGroupBySerializer { AggregateMeta::Serialized(_) => unreachable!(), AggregateMeta::BucketSpilled(_) => unreachable!(), AggregateMeta::Partitioned { .. } => unreachable!(), - AggregateMeta::AggregateHashTable(_) => todo!("AGG_HASHTABLE"), + AggregateMeta::AggregateHashTable(payload) => { + self.input_data = Some(SerializeGroupByStream::create( + &self.method, + SerializePayload::PartitionedPayload(payload), + )); + return Ok(Event::Sync); + } AggregateMeta::HashTable(payload) => { - self.input_data = - Some(SerializeGroupByStream::create(&self.method, payload)); + self.input_data = Some(SerializeGroupByStream::create( + &self.method, + SerializePayload::HashTablePayload(payload), + )); return Ok(Event::Sync); } } @@ -157,10 +167,15 @@ pub fn serialize_group_by( ])) } +pub enum SerializePayload { + HashTablePayload(HashTablePayload), + PartitionedPayload(PartitionedPayload), +} + pub struct SerializeGroupByStream { method: Method, - pub payload: Pin>>, - iter: as HashtableLike>::Iterator<'static>, + pub payload: Pin>>, + iter: Option< as HashtableLike>::Iterator<'static>>, end_iter: bool, } @@ -169,10 +184,15 @@ unsafe impl Send for SerializeGroupByStream {} unsafe impl Sync for SerializeGroupByStream {} impl SerializeGroupByStream { - pub fn create(method: &Method, payload: HashTablePayload) -> Self { + pub fn create(method: &Method, payload: SerializePayload) -> Self { unsafe { let payload = Box::pin(payload); - let iter = NonNull::from(&payload.cell.hashtable).as_ref().iter(); + + let iter = if let SerializePayload::HashTablePayload(p) = payload.as_ref().get_ref() { + Some(NonNull::from(&p.cell.hashtable).as_ref().iter()) + } else { + None + }; SerializeGroupByStream:: { iter, @@ -188,38 +208,131 @@ impl Iterator for SerializeGroupByStream { type Item = Result; fn next(&mut self) -> Option { - if self.end_iter { - return None; - } + match self.payload.as_ref().get_ref() { + SerializePayload::HashTablePayload(p) => { + if self.end_iter { + return None; + } + + let max_block_rows = std::cmp::min(8192, p.cell.hashtable.len()); + let max_block_bytes = std::cmp::min( + 8 * 1024 * 1024 + 1024, + p.cell.hashtable.unsize_key_size().unwrap_or(usize::MAX), + ); + + let mut group_key_builder = self + .method + .keys_column_builder(max_block_rows, max_block_bytes); + + #[allow(clippy::while_let_on_iterator)] + while let Some(group_entity) = self.iter.as_mut()?.next() { + group_key_builder.append_value(group_entity.key()); + + if group_key_builder.bytes_size() >= 8 * 1024 * 1024 { + let bucket = p.bucket; + let data_block = + DataBlock::new_from_columns(vec![group_key_builder.finish()]); + return Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))); + } + } - let max_block_rows = std::cmp::min(8192, self.payload.cell.hashtable.len()); - let max_block_bytes = std::cmp::min( - 8 * 1024 * 1024 + 1024, - self.payload - .cell - .hashtable - .unsize_key_size() - .unwrap_or(usize::MAX), - ); - - let mut group_key_builder = self - .method - .keys_column_builder(max_block_rows, max_block_bytes); - - #[allow(clippy::while_let_on_iterator)] - while let Some(group_entity) = self.iter.next() { - group_key_builder.append_value(group_entity.key()); - - if group_key_builder.bytes_size() >= 8 * 1024 * 1024 { - let bucket = self.payload.bucket; + self.end_iter = true; + let bucket = p.bucket; let data_block = DataBlock::new_from_columns(vec![group_key_builder.finish()]); - return Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))); + Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))) } - } + SerializePayload::PartitionedPayload(p) => { + if self.end_iter { + return None; + } + + let mut state = PayloadFlushState::default(); + let mut blocks = vec![]; - self.end_iter = true; - let bucket = self.payload.bucket; - let data_block = DataBlock::new_from_columns(vec![group_key_builder.finish()]); - Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))) + for item in p.payloads.iter() { + state.clear(); + if item.flush(&mut state) { + blocks.push(DataBlock::new_from_columns( + state.take_group_columns(), + )); + } + } + + self.end_iter = true; + let data_block = if blocks.is_empty() { + DataBlock::empty() + } else { + DataBlock::concat(&blocks).unwrap() + }; + Some(data_block.add_meta(Some(AggregateSerdeMeta::create(0)))) + } + } } } + +// pub struct SerializeGroupByStream { +// method: Method, +// pub payload: Pin>>, +// iter: as HashtableLike>::Iterator<'static>, +// end_iter: bool, +// } + +// unsafe impl Send for SerializeGroupByStream {} + +// unsafe impl Sync for SerializeGroupByStream {} + +// impl SerializeGroupByStream { +// pub fn create(method: &Method, payload: HashTablePayload) -> Self { +// unsafe { +// let payload = Box::pin(payload); +// let iter = NonNull::from(&payload.cell.hashtable).as_ref().iter(); + +// SerializeGroupByStream:: { +// iter, +// payload, +// method: method.clone(), +// end_iter: false, +// } +// } +// } +// } + +// impl Iterator for SerializeGroupByStream { +// type Item = Result; + +// fn next(&mut self) -> Option { +// if self.end_iter { +// return None; +// } + +// let max_block_rows = std::cmp::min(8192, self.payload.cell.hashtable.len()); +// let max_block_bytes = std::cmp::min( +// 8 * 1024 * 1024 + 1024, +// self.payload +// .cell +// .hashtable +// .unsize_key_size() +// .unwrap_or(usize::MAX), +// ); + +// let mut group_key_builder = self +// .method +// .keys_column_builder(max_block_rows, max_block_bytes); + +// #[allow(clippy::while_let_on_iterator)] +// while let Some(group_entity) = self.iter.next() { +// group_key_builder.append_value(group_entity.key()); + +// if group_key_builder.bytes_size() >= 8 * 1024 * 1024 { +// let bucket = self.payload.bucket; +// let data_block = DataBlock::new_from_columns(vec![group_key_builder.finish()]); +// return Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))); +// } +// } + +// self.end_iter = true; +// let bucket = self.payload.bucket; +// let data_block = DataBlock::new_from_columns(vec![group_key_builder.finish()]); +// Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))) +// } +// } From 9dfbff56f40ffd7fa411f3e98e4f4703b1de4b49 Mon Sep 17 00:00:00 2001 From: jw Date: Fri, 23 Feb 2024 13:36:29 +0800 Subject: [PATCH 3/5] test --- .../aggregator/aggregate_exchange_injector.rs | 11 +--- .../serde/transform_group_by_serializer.rs | 22 ++++++- .../aggregator/transform_group_by_final.rs | 59 ++++++++++++++++++- .../aggregator/transform_partition_bucket.rs | 7 +++ 4 files changed, 87 insertions(+), 12 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index fbcf50d665000..0422820b45c0f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -143,7 +143,8 @@ fn scatter( Ok(res) } -fn scatter2(payload: PartitionedPayload, buckets: usize) -> Result> { +// TODO: buckets and partitions have a relationship of integer division +fn agg_hashtable_scatter(payload: PartitionedPayload, buckets: usize) -> Result> { let mut buckets = Vec::with_capacity(buckets); for _ in 0..buckets.capacity() { @@ -200,13 +201,7 @@ impl FlightScatter } } AggregateMeta::AggregateHashTable(payload) => { - for agg_hashtable in scatter2(payload, self.buckets)? { - // blocks.push(match agg_hashtable.len() == 0 { - // true => DataBlock::empty(), - // false => DataBlock::empty_with_meta( - // AggregateMeta::::create_agg_hashtable(agg_hashtable.payload) - // ), - // }); + for agg_hashtable in agg_hashtable_scatter(payload, self.buckets)? { blocks.push( DataBlock::empty_with_meta( AggregateMeta::::create_agg_hashtable(agg_hashtable.payload) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs index 12b74835d2c35..2e85045fc5766 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -22,6 +22,7 @@ use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_expression::PartitionedPayload; use databend_common_expression::PayloadFlushState; +use databend_common_expression::ColumnBuilder; use databend_common_hashtable::HashtableEntryRefLike; use databend_common_hashtable::HashtableLike; use databend_common_pipeline_core::processors::Event; @@ -36,6 +37,7 @@ use crate::pipelines::processors::transforms::aggregator::AggregateSerdeMeta; use crate::pipelines::processors::transforms::aggregator::HashTablePayload; use crate::pipelines::processors::transforms::group_by::HashMethodBounds; use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder; +use itertools::Itertools; pub struct TransformGroupBySerializer { method: Method, @@ -251,7 +253,7 @@ impl Iterator for SerializeGroupByStream { for item in p.payloads.iter() { state.clear(); - if item.flush(&mut state) { + while item.flush(&mut state) { blocks.push(DataBlock::new_from_columns( state.take_group_columns(), )); @@ -260,16 +262,30 @@ impl Iterator for SerializeGroupByStream { self.end_iter = true; let data_block = if blocks.is_empty() { - DataBlock::empty() + empty_block(p) } else { DataBlock::concat(&blocks).unwrap() }; - Some(data_block.add_meta(Some(AggregateSerdeMeta::create(0)))) + Some(data_block.add_meta(Some(AggregateSerdeMeta::create(-1)))) } } } } +pub fn empty_block(p: &PartitionedPayload) -> DataBlock { + let columns = p + .aggrs + .iter() + .map(|f| ColumnBuilder::with_capacity(&f.return_type().unwrap(), 0).build()) + .chain( + p.group_types + .iter() + .map(|t| ColumnBuilder::with_capacity(t, 0).build()), + ) + .collect_vec(); + DataBlock::new_from_columns(columns) +} + // pub struct SerializeGroupByStream { // method: Method, // pub payload: Pin>>, diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs index 9eb4a59fb3541..33357c1e300ec 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs @@ -62,6 +62,7 @@ impl TransformFinalGroupBy { fn transform_agg_hashtable(&mut self, meta: AggregateMeta) -> Result { let mut agg_hashtable: Option = None; + let mut blocks = vec![]; if let AggregateMeta::Partitioned { bucket: _, data } = meta { for bucket_data in data { match bucket_data { @@ -82,13 +83,15 @@ impl TransformFinalGroupBy { agg_hashtable = Some(hashtable); } }, + AggregateMeta::Serialized(payload) => { + blocks.push(payload.data_block); + } _ => unreachable!(), } } } if let Some(mut ht) = agg_hashtable { - let mut blocks = vec![]; self.flush_state.clear(); loop { if ht.merge_result(&mut self.flush_state)? { @@ -106,8 +109,62 @@ impl TransformFinalGroupBy { return DataBlock::concat(&blocks); } + + if !blocks.is_empty() { + return DataBlock::concat(&blocks); + } + Ok(self.params.empty_result_block()) } + + // fn transform_agg_hashtable(&mut self, meta: AggregateMeta) -> Result { + // let mut agg_hashtable: Option = None; + // if let AggregateMeta::Partitioned { bucket: _, data } = meta { + // for bucket_data in data { + // match bucket_data { + // AggregateMeta::AggregateHashTable(payload) => match agg_hashtable.as_mut() { + // Some(ht) => { + // ht.combine_payloads(&payload, &mut self.flush_state)?; + // } + // None => { + // let capacity = + // AggregateHashTable::get_capacity_for_count(payload.len()); + // let mut hashtable = AggregateHashTable::new_with_capacity( + // self.params.group_data_types.clone(), + // self.params.aggregate_functions.clone(), + // HashTableConfig::default().with_initial_radix_bits(0), + // capacity, + // ); + // hashtable.combine_payloads(&payload, &mut self.flush_state)?; + // agg_hashtable = Some(hashtable); + // } + // }, + // _ => unreachable!(), + // } + // } + // } + + // if let Some(mut ht) = agg_hashtable { + // let mut blocks = vec![]; + // self.flush_state.clear(); + // loop { + // if ht.merge_result(&mut self.flush_state)? { + // blocks.push(DataBlock::new_from_columns( + // self.flush_state.take_group_columns(), + // )); + // } else { + // break; + // } + // } + + // if blocks.is_empty() { + // return Ok(self.params.empty_result_block()); + // } + + // return DataBlock::concat(&blocks); + // } + // Ok(self.params.empty_result_block()) + // } } impl BlockMetaTransform> for TransformFinalGroupBy diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs index 719239cb9e1ac..c1fdcd01ab2cf 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs @@ -418,6 +418,13 @@ impl Processor return Ok(Event::NeedConsume); } + if let Some(p) = self.partition_payloads.pop() { + let data = AggregateMeta::::AggregateHashTable(p); + let data_block = DataBlock::empty_with_meta(AggregateMeta::::create_partitioned(0, vec![data])); + self.output.push_data(Ok(data_block)); + return Ok(Event::NeedConsume); + } + self.output.finish(); Ok(Event::Finished) } From a94ea1f91bf5ca6b5c09626b34189f969fea7e81 Mon Sep 17 00:00:00 2001 From: jw Date: Mon, 26 Feb 2024 19:37:35 +0800 Subject: [PATCH 4/5] test --- .../pipelines/builders/builder_aggregate.rs | 20 ++-- .../aggregator/aggregate_exchange_injector.rs | 21 ++-- .../transforms/aggregator/aggregate_meta.rs | 19 ++++ .../serde/transform_aggregate_serializer.rs | 1 + ...transform_exchange_aggregate_serializer.rs | 1 + .../transform_exchange_group_by_serializer.rs | 34 +++--- .../serde/transform_group_by_serializer.rs | 16 +-- .../serde/transform_spill_reader.rs | 2 + .../aggregator/transform_aggregate_final.rs | 19 ++++ .../aggregator/transform_aggregate_partial.rs | 17 ++- .../aggregator/transform_group_by_final.rs | 16 +++ .../aggregator/transform_group_by_partial.rs | 17 ++- .../aggregator/transform_partition_bucket.rs | 102 +++++++++++++++--- src/query/settings/src/settings_default.rs | 2 +- 14 files changed, 223 insertions(+), 64 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index 20ebb83c6b516..e42e3d4e64c9a 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -102,13 +102,13 @@ impl PipelineBuilder { let max_block_size = self.settings.get_max_block_size()?; let max_threads = self.settings.get_max_threads()?; - // let enable_experimental_aggregate_hashtable = self - // .settings - // .get_enable_experimental_aggregate_hashtable()? - // && self.ctx.get_cluster().is_empty(); let enable_experimental_aggregate_hashtable = self .settings - .get_enable_experimental_aggregate_hashtable()?; + .get_enable_experimental_aggregate_hashtable()? + && self.ctx.get_cluster().is_empty(); + // let enable_experimental_aggregate_hashtable = self + // .settings + // .get_enable_experimental_aggregate_hashtable()?; let params = Self::build_aggregator_params( aggregate.input.output_schema()?, @@ -214,13 +214,13 @@ impl PipelineBuilder { pub(crate) fn build_aggregate_final(&mut self, aggregate: &AggregateFinal) -> Result<()> { let max_block_size = self.settings.get_max_block_size()?; - // let enable_experimental_aggregate_hashtable = self - // .settings - // .get_enable_experimental_aggregate_hashtable()? - // && self.ctx.get_cluster().is_empty(); let enable_experimental_aggregate_hashtable = self .settings - .get_enable_experimental_aggregate_hashtable()?; + .get_enable_experimental_aggregate_hashtable()? + && self.ctx.get_cluster().is_empty(); + // let enable_experimental_aggregate_hashtable = self + // .settings + // .get_enable_experimental_aggregate_hashtable()?; let params = Self::build_aggregator_params( aggregate.before_group_by_schema.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index 0422820b45c0f..4531d30291f23 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -81,7 +81,8 @@ impl ExchangeSorting AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::Serialized(v) => Ok(v.bucket), AggregateMeta::HashTable(v) => Ok(v.bucket), - AggregateMeta::AggregateHashTable(_) => Ok(0), + AggregateMeta::AggregateHashTable(_) => Ok(-1), + AggregateMeta::AggregatePayload(_) => todo!("AGG_HASHTABLE"), AggregateMeta::Spilled(_) | AggregateMeta::Spilling(_) | AggregateMeta::BucketSpilled(_) => Ok(-1), @@ -200,14 +201,16 @@ impl FlightScatter }); } } - AggregateMeta::AggregateHashTable(payload) => { - for agg_hashtable in agg_hashtable_scatter(payload, self.buckets)? { - blocks.push( - DataBlock::empty_with_meta( - AggregateMeta::::create_agg_hashtable(agg_hashtable.payload) - )) - } - }, + // AggregateMeta::AggregateHashTable(payload) => { + // for agg_hashtable in agg_hashtable_scatter(payload, self.buckets)? { + // blocks.push( + // DataBlock::empty_with_meta( + // AggregateMeta::::create_agg_hashtable(agg_hashtable.payload) + // )) + // } + // }, + AggregateMeta::AggregateHashTable(_) => todo!("AGG_HASHTABLE"), + AggregateMeta::AggregatePayload(_) => todo!("AGG_HASHTABLE"), }; return Ok(blocks); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs index 6bd84eb63bbf0..0cf55007a262e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs @@ -21,6 +21,7 @@ use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::PartitionedPayload; +use databend_common_expression::Payload; use crate::pipelines::processors::transforms::aggregator::HashTableCell; use crate::pipelines::processors::transforms::group_by::HashMethodBounds; @@ -31,6 +32,12 @@ pub struct HashTablePayload { pub cell: HashTableCell, } +pub struct AggregatePayload { + pub bucket: isize, + pub payload: Payload, + pub max_partition_count: usize, +} + pub struct SerializedPayload { pub bucket: isize, pub data_block: DataBlock, @@ -54,6 +61,7 @@ pub enum AggregateMeta { Serialized(SerializedPayload), HashTable(HashTablePayload), AggregateHashTable(PartitionedPayload), + AggregatePayload(AggregatePayload), BucketSpilled(BucketSpilledPayload), Spilled(Vec), Spilling(HashTablePayload, V>), @@ -69,6 +77,14 @@ impl AggregateMeta BlockMetaInfoPtr { + Box::new(AggregateMeta::::AggregatePayload(AggregatePayload { + bucket, + payload, + max_partition_count, + })) + } + pub fn create_agg_hashtable(payload: PartitionedPayload) -> BlockMetaInfoPtr { Box::new(AggregateMeta::::AggregateHashTable(payload)) } @@ -136,6 +152,9 @@ impl Debug for AggregateMeta AggregateMeta::AggregateHashTable(_) => { f.debug_struct("AggregateMeta:AggHashTable").finish() } + AggregateMeta::AggregatePayload(_) => { + f.debug_struct("AggregateMeta:AggregatePayload").finish() + } } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index 9d212ff15dabb..9a2710d6dd97f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -146,6 +146,7 @@ impl TransformAggregateSerializer { return Ok(Event::Sync); } AggregateMeta::AggregateHashTable(_) => todo!("AGG_HASHTABLE"), + AggregateMeta::AggregatePayload(_) => todo!("AGG_HASHTABLE"), } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 1a378817fa220..3aa92f67d4a04 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -177,6 +177,7 @@ impl BlockMetaTransform } Some(AggregateMeta::AggregateHashTable(_)) => todo!("AGG_HASHTABLE"), + Some(AggregateMeta::AggregatePayload(_)) => todo!("AGG_HASHTABLE"), }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index 86674f39d84a2..95c52e00f8f1e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -209,22 +209,22 @@ impl BlockMetaTransform }, )); } - Some(AggregateMeta::AggregateHashTable(payload)) => { - if index == self.local_pos { - serialized_blocks.push(FlightSerialized::DataBlock(block.add_meta( - Some(Box::new(AggregateMeta::::AggregateHashTable(payload))), - )?)); - continue; - } - let bucket = -1; - let mut stream = SerializeGroupByStream::create(&self.method, SerializePayload::PartitionedPayload(payload)); - serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { - None => DataBlock::empty(), - Some(data_block) => { - serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? - } - })); - } + // Some(AggregateMeta::AggregateHashTable(payload)) => { + // if index == self.local_pos { + // serialized_blocks.push(FlightSerialized::DataBlock(block.add_meta( + // Some(Box::new(AggregateMeta::::AggregateHashTable(payload))), + // )?)); + // continue; + // } + // let bucket = -1; + // let mut stream = SerializeGroupByStream::create(&self.method, SerializePayload::PartitionedPayload(payload)); + // serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { + // None => DataBlock::empty(), + // Some(data_block) => { + // serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? + // } + // })); + // } Some(AggregateMeta::HashTable(payload)) => { if index == self.local_pos { serialized_blocks.push(FlightSerialized::DataBlock(block.add_meta( @@ -242,6 +242,8 @@ impl BlockMetaTransform } })); } + Some(AggregateMeta::AggregateHashTable(_)) => todo!("AGG_HASHTABLE"), + Some(AggregateMeta::AggregatePayload(_)) => todo!("AGG_HASHTABLE"), }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs index 2e85045fc5766..ba5ebca5bffad 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -130,13 +130,13 @@ impl TransformGroupBySerializer { AggregateMeta::Serialized(_) => unreachable!(), AggregateMeta::BucketSpilled(_) => unreachable!(), AggregateMeta::Partitioned { .. } => unreachable!(), - AggregateMeta::AggregateHashTable(payload) => { - self.input_data = Some(SerializeGroupByStream::create( - &self.method, - SerializePayload::PartitionedPayload(payload), - )); - return Ok(Event::Sync); - } + // AggregateMeta::AggregateHashTable(payload) => { + // self.input_data = Some(SerializeGroupByStream::create( + // &self.method, + // SerializePayload::PartitionedPayload(payload), + // )); + // return Ok(Event::Sync); + // } AggregateMeta::HashTable(payload) => { self.input_data = Some(SerializeGroupByStream::create( &self.method, @@ -144,6 +144,8 @@ impl TransformGroupBySerializer { )); return Ok(Event::Sync); } + AggregateMeta::AggregateHashTable(_) => todo!("AGG_HASHTABLE"), + AggregateMeta::AggregatePayload(_) => todo!("AGG_HASHTABLE"), } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs index a023572980464..e62489d28e563 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -137,6 +137,7 @@ impl Processor AggregateMeta::Spilled(_) => unreachable!(), AggregateMeta::Spilling(_) => unreachable!(), AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(_) => unreachable!(), AggregateMeta::HashTable(_) => unreachable!(), AggregateMeta::Serialized(_) => unreachable!(), AggregateMeta::BucketSpilled(payload) => { @@ -179,6 +180,7 @@ impl Processor AggregateMeta::Spilling(_) => unreachable!(), AggregateMeta::HashTable(_) => unreachable!(), AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(_) => unreachable!(), AggregateMeta::Serialized(_) => unreachable!(), AggregateMeta::BucketSpilled(payload) => { let instant = Instant::now(); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index 953cc0301f485..86db32b154fa0 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -89,6 +89,24 @@ impl TransformFinalAggregate { agg_hashtable = Some(hashtable); } }, + // AggregateMeta::Serialized(payload) => { + // blocks.push(payload.data_block); + // } + // AggregateMeta::AggregatePayload(payload) => match agg_hashtable.as_mut() { + // Some(ht) => ht.combine_payload(&payload.payload, &mut self.flush_state)?, + // None => { + // let capacity = + // AggregateHashTable::get_capacity_for_count(payload.payload.len()); + // let mut hashtable = AggregateHashTable::new_with_capacity( + // self.params.group_data_types.clone(), + // self.params.aggregate_functions.clone(), + // HashTableConfig::default().with_initial_radix_bits(0), + // capacity, + // ); + // hashtable.combine_payload(&payload.payload, &mut self.flush_state)?; + // agg_hashtable = Some(hashtable); + // } + // } _ => unreachable!(), } } @@ -240,6 +258,7 @@ where Method: HashMethodBounds } }, AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(_) => unreachable!(), } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs index 06b95674a0596..d929bc927f98a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs @@ -419,9 +419,20 @@ impl AccumulatingTransform for TransformPartialAggrega blocks } - HashTable::AggregateHashTable(hashtable) => vec![DataBlock::empty_with_meta( - AggregateMeta::::create_agg_hashtable(hashtable.payload), - )], + // HashTable::AggregateHashTable(hashtable) => vec![DataBlock::empty_with_meta( + // AggregateMeta::::create_agg_hashtable(hashtable.payload), + // )], + HashTable::AggregateHashTable(hashtable) => { + let partition_count = hashtable.payload.partition_count(); + let mut blocks = Vec::with_capacity(partition_count.clone()); + for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { + if payload.len() != 0 { + blocks.push(DataBlock::empty_with_meta(AggregateMeta::::create_agg_payload(bucket as isize, payload, partition_count))); + } + } + + blocks + } }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs index 33357c1e300ec..9b5c2ad08819f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs @@ -86,6 +86,21 @@ impl TransformFinalGroupBy { AggregateMeta::Serialized(payload) => { blocks.push(payload.data_block); } + // AggregateMeta::AggregatePayload(payload) => match agg_hashtable.as_mut() { + // Some(ht) => ht.combine_payload(&payload.payload, &mut self.flush_state)?, + // None => { + // let capacity = + // AggregateHashTable::get_capacity_for_count(payload.payload.len()); + // let mut hashtable = AggregateHashTable::new_with_capacity( + // self.params.group_data_types.clone(), + // self.params.aggregate_functions.clone(), + // HashTableConfig::default().with_initial_radix_bits(0), + // capacity, + // ); + // hashtable.combine_payload(&payload.payload, &mut self.flush_state)?; + // agg_hashtable = Some(hashtable); + // } + // } _ => unreachable!(), } } @@ -218,6 +233,7 @@ where Method: HashMethodBounds } }, AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(_) => unreachable!(), } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs index 139e349f57202..8491b179624ba 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs @@ -264,9 +264,20 @@ impl AccumulatingTransform for TransformPartialGroupBy blocks } - HashTable::AggregateHashTable(hashtable) => vec![DataBlock::empty_with_meta( - AggregateMeta::::create_agg_hashtable(hashtable.payload), - )], + // HashTable::AggregateHashTable(hashtable) => vec![DataBlock::empty_with_meta( + // AggregateMeta::::create_agg_hashtable(hashtable.payload), + // )], + HashTable::AggregateHashTable(hashtable) => { + let partition_count = hashtable.payload.partition_count(); + let mut blocks = Vec::with_capacity(partition_count.clone()); + for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { + if payload.len() != 0 { + blocks.push(DataBlock::empty_with_meta(AggregateMeta::::create_agg_payload(bucket as isize, payload, partition_count))); + } + } + + blocks + } }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs index c1fdcd01ab2cf..fb9a1f1ae422b 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs @@ -51,6 +51,8 @@ use crate::pipelines::processors::transforms::group_by::HashMethodBounds; use crate::pipelines::processors::transforms::group_by::KeysColumnIter; use crate::pipelines::processors::transforms::group_by::PartitionedHashMethod; +use super::AggregatePayload; + static SINGLE_LEVEL_BUCKET_NUM: isize = -1; struct InputPortState { @@ -69,6 +71,7 @@ pub struct TransformPartitionBucket>, flush_state: PayloadFlushState, partition_payloads: Vec, + payloads: Vec, unsplitted_blocks: Vec, max_partition_count: usize, _phantom: PhantomData, @@ -98,6 +101,7 @@ impl unsplitted_blocks: vec![], flush_state: PayloadFlushState::default(), partition_payloads: vec![], + payloads: vec![], initialized_all_inputs: false, max_partition_count: 0, _phantom: Default::default(), @@ -158,9 +162,14 @@ impl AggregateMeta::BucketSpilled(payload) => { (payload.bucket, SINGLE_LEVEL_BUCKET_NUM) } - AggregateMeta::Serialized(payload) => (payload.bucket, payload.bucket), - AggregateMeta::HashTable(payload) => (payload.bucket, payload.bucket), + AggregateMeta::Serialized(payload) => { + println!("ser"); + (payload.bucket, payload.bucket)}, + AggregateMeta::HashTable(payload) => { + println!("hash"); + (payload.bucket, payload.bucket)}, AggregateMeta::Spilled(_) => { + println!("spilled"); let meta = data_block.take_meta().unwrap(); if let Some(AggregateMeta::Spilled(buckets_payload)) = @@ -191,11 +200,18 @@ impl unreachable!() } AggregateMeta::AggregateHashTable(p) => { + println!("agg"); self.max_partition_count = self.max_partition_count.max(p.partition_count()); (0, 0) } + AggregateMeta::AggregatePayload(p) => { + println!("agg_payload"); + self.max_partition_count = self.max_partition_count.max(p.max_partition_count); + + (p.bucket, p.bucket) + } }; if bucket > SINGLE_LEVEL_BUCKET_NUM && self.max_partition_count == 0 { @@ -214,13 +230,20 @@ impl } if self.max_partition_count > 0 { + // self.unsplitted_blocks.push(data_block); let meta = data_block.take_meta().unwrap(); - if let Some(AggregateMeta::AggregateHashTable(p)) = - AggregateMeta::::downcast_from(meta) - { - self.partition_payloads.push(p); + // if let Some(AggregateMeta::AggregateHashTable(p)) = + // AggregateMeta::::downcast_from(meta) + // { + // self.partition_payloads.push(p); + // return 0; + // } + + if let Some(AggregateMeta::AggregatePayload(p)) = AggregateMeta::::downcast_from(meta) { + let res = p.bucket.clone(); + self.payloads.push(p); + return res; } - return 0; } self.unsplitted_blocks.push(data_block); @@ -349,7 +372,14 @@ impl Processor return Ok(Event::NeedData); } - if self.partition_payloads.len() == self.inputs.len() + // if self.partition_payloads.len() == self.inputs.len() + // || (!self.buckets_blocks.is_empty() && !self.unsplitted_blocks.is_empty()) + // { + // // Split data blocks if it's unsplitted. + // return Ok(Event::Sync); + // } + + if (self.payloads.len()>0 && self.initialized_all_inputs) || (!self.buckets_blocks.is_empty() && !self.unsplitted_blocks.is_empty()) { // Split data blocks if it's unsplitted. @@ -418,18 +448,59 @@ impl Processor return Ok(Event::NeedConsume); } - if let Some(p) = self.partition_payloads.pop() { - let data = AggregateMeta::::AggregateHashTable(p); - let data_block = DataBlock::empty_with_meta(AggregateMeta::::create_partitioned(0, vec![data])); - self.output.push_data(Ok(data_block)); - return Ok(Event::NeedConsume); - } - self.output.finish(); Ok(Event::Finished) } fn process(&mut self) -> Result<()> { + if !self.payloads.is_empty() { + let group_types = self.payloads[0].payload.group_types.clone(); + let aggrs = self.payloads[0].payload.aggrs.clone(); + let mut partitioned_payload = PartitionedPayload::new(group_types.clone(), aggrs.clone(), self.max_partition_count as u64); + + let mut arenas = vec![]; + for p in self.payloads.drain(0..) { + arenas.append(&mut vec![p.payload.arena.clone()]); + if p.max_partition_count != self.max_partition_count { + partitioned_payload.combine_single(p.payload, &mut self.flush_state); + } else { + partitioned_payload.payloads[p.bucket as usize].combine(p.payload); + } + // partitioned_payload.combine_single(p.payload, &mut self.flush_state); + } + partitioned_payload.arenas.extend_from_slice(&arenas); + + for (bucket, payload) in partitioned_payload.payloads.into_iter().enumerate() { + let mut part = PartitionedPayload::new(group_types.clone(), aggrs.clone(), 1); + part.combine_single(payload, &mut self.flush_state); + part.arenas.extend_from_slice(&partitioned_payload.arenas); + + if part.len() != 0 { + match self.buckets_blocks.entry(bucket as isize) { + Entry::Vacant(v) => { + v.insert(vec![DataBlock::empty_with_meta( + AggregateMeta::::create_agg_hashtable(part), + )]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(DataBlock::empty_with_meta( + AggregateMeta::::create_agg_hashtable(part), + )); + } + }; + + // self.buckets_blocks + // .insert(bucket as isize, vec![DataBlock::empty_with_meta( + // AggregateMeta::::create_agg_hashtable(part), + // )]); + } + } + + + return Ok(()); + + } + if !self.partition_payloads.is_empty() { let mut payloads = Vec::with_capacity(self.partition_payloads.len()); @@ -496,6 +567,7 @@ impl Processor AggregateMeta::Serialized(payload) => self.partition_block(payload)?, AggregateMeta::HashTable(payload) => self.partition_hashtable(payload)?, AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(_) => unreachable!(), }; for (bucket, block) in data_blocks.into_iter().enumerate() { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index e227b3e76457a..8ee050715bc07 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -607,7 +607,7 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=1)), }), ("enable_experimental_aggregate_hashtable", DefaultSettingValue { - value: UserSettingValue::UInt64(0), + value: UserSettingValue::UInt64(1), desc: "Enables experimental aggregate hashtable", mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), From f652a972b7ff12dda09312850db96a6e00722351 Mon Sep 17 00:00:00 2001 From: jw Date: Tue, 27 Feb 2024 11:02:35 +0800 Subject: [PATCH 5/5] test --- src/query/expression/src/aggregate/mod.rs | 2 +- .../aggregator/aggregate_exchange_injector.rs | 12 +- .../transforms/aggregator/aggregate_meta.rs | 18 ++- .../transform_exchange_group_by_serializer.rs | 48 +++--- .../serde/transform_group_by_serializer.rs | 8 +- .../aggregator/transform_aggregate_final.rs | 18 --- .../aggregator/transform_aggregate_partial.rs | 13 +- .../aggregator/transform_group_by_final.rs | 15 -- .../aggregator/transform_group_by_partial.rs | 13 +- .../aggregator/transform_partition_bucket.rs | 143 +++++------------- 10 files changed, 107 insertions(+), 183 deletions(-) diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index 358b60bbab210..3d301bc08c9f7 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -91,7 +91,7 @@ impl HashTableConfig { pub fn with_partial(mut self, partial_agg: bool, active_threads: usize) -> Self { self.partial_agg = partial_agg; - if active_threads <= 0 { + if active_threads == 0 { return self; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index 4531d30291f23..200b1b43f227f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -145,12 +145,20 @@ fn scatter( } // TODO: buckets and partitions have a relationship of integer division -fn agg_hashtable_scatter(payload: PartitionedPayload, buckets: usize) -> Result> { +#[allow(dead_code)] +fn agg_hashtable_scatter( + payload: PartitionedPayload, + buckets: usize, +) -> Result> { let mut buckets = Vec::with_capacity(buckets); for _ in 0..buckets.capacity() { let config = HashTableConfig::default().with_partial(true, 0); - buckets.push(AggregateHashTable::new(payload.group_types.clone(), payload.aggrs.clone(), config)); + buckets.push(AggregateHashTable::new( + payload.group_types.clone(), + payload.aggrs.clone(), + config, + )); } let mut state = PayloadFlushState::default(); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs index 0cf55007a262e..3a98723ea714d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs @@ -77,12 +77,18 @@ impl AggregateMeta BlockMetaInfoPtr { - Box::new(AggregateMeta::::AggregatePayload(AggregatePayload { - bucket, - payload, - max_partition_count, - })) + pub fn create_agg_payload( + bucket: isize, + payload: Payload, + max_partition_count: usize, + ) -> BlockMetaInfoPtr { + Box::new(AggregateMeta::::AggregatePayload( + AggregatePayload { + bucket, + payload, + max_partition_count, + }, + )) } pub fn create_agg_hashtable(payload: PartitionedPayload) -> BlockMetaInfoPtr { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index 95c52e00f8f1e..6735aea685c2f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -54,6 +54,7 @@ use futures_util::future::BoxFuture; use log::info; use opendal::Operator; +use super::SerializePayload; use crate::api::serialize_block; use crate::api::ExchangeShuffleMeta; use crate::pipelines::processors::transforms::aggregator::exchange_defines; @@ -67,8 +68,6 @@ use crate::pipelines::processors::transforms::group_by::HashMethodBounds; use crate::pipelines::processors::transforms::group_by::PartitionedHashMethod; use crate::sessions::QueryContext; -use super::SerializePayload; - pub struct TransformExchangeGroupBySerializer { ctx: Arc, method: Method, @@ -209,22 +208,27 @@ impl BlockMetaTransform }, )); } - // Some(AggregateMeta::AggregateHashTable(payload)) => { - // if index == self.local_pos { - // serialized_blocks.push(FlightSerialized::DataBlock(block.add_meta( - // Some(Box::new(AggregateMeta::::AggregateHashTable(payload))), - // )?)); - // continue; - // } - // let bucket = -1; - // let mut stream = SerializeGroupByStream::create(&self.method, SerializePayload::PartitionedPayload(payload)); - // serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { - // None => DataBlock::empty(), - // Some(data_block) => { - // serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? - // } - // })); - // } + Some(AggregateMeta::AggregateHashTable(payload)) => { + if index == self.local_pos { + serialized_blocks.push(FlightSerialized::DataBlock(block.add_meta( + Some(Box::new(AggregateMeta::::AggregateHashTable( + payload, + ))), + )?)); + continue; + } + let bucket = -1; + let mut stream = SerializeGroupByStream::create( + &self.method, + SerializePayload::PartitionedPayload(payload), + ); + serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { + None => DataBlock::empty(), + Some(data_block) => { + serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? + } + })); + } Some(AggregateMeta::HashTable(payload)) => { if index == self.local_pos { serialized_blocks.push(FlightSerialized::DataBlock(block.add_meta( @@ -233,8 +237,11 @@ impl BlockMetaTransform continue; } - let bucket = payload.bucket.clone(); - let mut stream = SerializeGroupByStream::create(&self.method, SerializePayload::HashTablePayload(payload)); + let bucket = payload.bucket; + let mut stream = SerializeGroupByStream::create( + &self.method, + SerializePayload::HashTablePayload(payload), + ); serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { None => DataBlock::empty(), Some(data_block) => { @@ -242,7 +249,6 @@ impl BlockMetaTransform } })); } - Some(AggregateMeta::AggregateHashTable(_)) => todo!("AGG_HASHTABLE"), Some(AggregateMeta::AggregatePayload(_)) => todo!("AGG_HASHTABLE"), }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs index ba5ebca5bffad..fc5c637be44f0 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -19,10 +19,10 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::ColumnBuilder; use databend_common_expression::DataBlock; use databend_common_expression::PartitionedPayload; use databend_common_expression::PayloadFlushState; -use databend_common_expression::ColumnBuilder; use databend_common_hashtable::HashtableEntryRefLike; use databend_common_hashtable::HashtableLike; use databend_common_pipeline_core::processors::Event; @@ -30,6 +30,7 @@ use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; +use itertools::Itertools; use crate::pipelines::processors::transforms::aggregator::estimated_key_size; use crate::pipelines::processors::transforms::aggregator::AggregateMeta; @@ -37,7 +38,6 @@ use crate::pipelines::processors::transforms::aggregator::AggregateSerdeMeta; use crate::pipelines::processors::transforms::aggregator::HashTablePayload; use crate::pipelines::processors::transforms::group_by::HashMethodBounds; use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder; -use itertools::Itertools; pub struct TransformGroupBySerializer { method: Method, @@ -256,9 +256,7 @@ impl Iterator for SerializeGroupByStream { for item in p.payloads.iter() { state.clear(); while item.flush(&mut state) { - blocks.push(DataBlock::new_from_columns( - state.take_group_columns(), - )); + blocks.push(DataBlock::new_from_columns(state.take_group_columns())); } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index 86db32b154fa0..556062bb53dc6 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -89,24 +89,6 @@ impl TransformFinalAggregate { agg_hashtable = Some(hashtable); } }, - // AggregateMeta::Serialized(payload) => { - // blocks.push(payload.data_block); - // } - // AggregateMeta::AggregatePayload(payload) => match agg_hashtable.as_mut() { - // Some(ht) => ht.combine_payload(&payload.payload, &mut self.flush_state)?, - // None => { - // let capacity = - // AggregateHashTable::get_capacity_for_count(payload.payload.len()); - // let mut hashtable = AggregateHashTable::new_with_capacity( - // self.params.group_data_types.clone(), - // self.params.aggregate_functions.clone(), - // HashTableConfig::default().with_initial_radix_bits(0), - // capacity, - // ); - // hashtable.combine_payload(&payload.payload, &mut self.flush_state)?; - // agg_hashtable = Some(hashtable); - // } - // } _ => unreachable!(), } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs index d929bc927f98a..f3042dca63e05 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs @@ -419,15 +419,18 @@ impl AccumulatingTransform for TransformPartialAggrega blocks } - // HashTable::AggregateHashTable(hashtable) => vec![DataBlock::empty_with_meta( - // AggregateMeta::::create_agg_hashtable(hashtable.payload), - // )], HashTable::AggregateHashTable(hashtable) => { let partition_count = hashtable.payload.partition_count(); - let mut blocks = Vec::with_capacity(partition_count.clone()); + let mut blocks = Vec::with_capacity(partition_count); for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { if payload.len() != 0 { - blocks.push(DataBlock::empty_with_meta(AggregateMeta::::create_agg_payload(bucket as isize, payload, partition_count))); + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::::create_agg_payload( + bucket as isize, + payload, + partition_count, + ), + )); } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs index 9b5c2ad08819f..0be006d7a94c8 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs @@ -86,21 +86,6 @@ impl TransformFinalGroupBy { AggregateMeta::Serialized(payload) => { blocks.push(payload.data_block); } - // AggregateMeta::AggregatePayload(payload) => match agg_hashtable.as_mut() { - // Some(ht) => ht.combine_payload(&payload.payload, &mut self.flush_state)?, - // None => { - // let capacity = - // AggregateHashTable::get_capacity_for_count(payload.payload.len()); - // let mut hashtable = AggregateHashTable::new_with_capacity( - // self.params.group_data_types.clone(), - // self.params.aggregate_functions.clone(), - // HashTableConfig::default().with_initial_radix_bits(0), - // capacity, - // ); - // hashtable.combine_payload(&payload.payload, &mut self.flush_state)?; - // agg_hashtable = Some(hashtable); - // } - // } _ => unreachable!(), } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs index 8491b179624ba..c03fa7661a146 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs @@ -264,15 +264,18 @@ impl AccumulatingTransform for TransformPartialGroupBy blocks } - // HashTable::AggregateHashTable(hashtable) => vec![DataBlock::empty_with_meta( - // AggregateMeta::::create_agg_hashtable(hashtable.payload), - // )], HashTable::AggregateHashTable(hashtable) => { let partition_count = hashtable.payload.partition_count(); - let mut blocks = Vec::with_capacity(partition_count.clone()); + let mut blocks = Vec::with_capacity(partition_count); for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { if payload.len() != 0 { - blocks.push(DataBlock::empty_with_meta(AggregateMeta::::create_agg_payload(bucket as isize, payload, partition_count))); + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::::create_agg_payload( + bucket as isize, + payload, + partition_count, + ), + )); } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs index fb9a1f1ae422b..fe0abcdba2a5c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs @@ -36,8 +36,8 @@ use databend_common_pipeline_core::Pipe; use databend_common_pipeline_core::PipeItem; use databend_common_pipeline_core::Pipeline; use databend_common_storage::DataOperator; -use itertools::Itertools; +use super::AggregatePayload; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::HashTablePayload; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::SerializedPayload; @@ -51,8 +51,6 @@ use crate::pipelines::processors::transforms::group_by::HashMethodBounds; use crate::pipelines::processors::transforms::group_by::KeysColumnIter; use crate::pipelines::processors::transforms::group_by::PartitionedHashMethod; -use super::AggregatePayload; - static SINGLE_LEVEL_BUCKET_NUM: isize = -1; struct InputPortState { @@ -70,8 +68,7 @@ pub struct TransformPartitionBucket>, flush_state: PayloadFlushState, - partition_payloads: Vec, - payloads: Vec, + agg_payloads: Vec, unsplitted_blocks: Vec, max_partition_count: usize, _phantom: PhantomData, @@ -100,8 +97,7 @@ impl buckets_blocks: BTreeMap::new(), unsplitted_blocks: vec![], flush_state: PayloadFlushState::default(), - partition_payloads: vec![], - payloads: vec![], + agg_payloads: vec![], initialized_all_inputs: false, max_partition_count: 0, _phantom: Default::default(), @@ -164,10 +160,12 @@ impl } AggregateMeta::Serialized(payload) => { println!("ser"); - (payload.bucket, payload.bucket)}, + (payload.bucket, payload.bucket) + } AggregateMeta::HashTable(payload) => { println!("hash"); - (payload.bucket, payload.bucket)}, + (payload.bucket, payload.bucket) + } AggregateMeta::Spilled(_) => { println!("spilled"); let meta = data_block.take_meta().unwrap(); @@ -199,18 +197,13 @@ impl unreachable!() } - AggregateMeta::AggregateHashTable(p) => { - println!("agg"); - self.max_partition_count = - self.max_partition_count.max(p.partition_count()); - - (0, 0) - } - AggregateMeta::AggregatePayload(p) => { + AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(payload) => { println!("agg_payload"); - self.max_partition_count = self.max_partition_count.max(p.max_partition_count); + self.max_partition_count = + self.max_partition_count.max(payload.max_partition_count); - (p.bucket, p.bucket) + (payload.bucket, payload.bucket) } }; @@ -230,18 +223,13 @@ impl } if self.max_partition_count > 0 { - // self.unsplitted_blocks.push(data_block); let meta = data_block.take_meta().unwrap(); - // if let Some(AggregateMeta::AggregateHashTable(p)) = - // AggregateMeta::::downcast_from(meta) - // { - // self.partition_payloads.push(p); - // return 0; - // } - - if let Some(AggregateMeta::AggregatePayload(p)) = AggregateMeta::::downcast_from(meta) { - let res = p.bucket.clone(); - self.payloads.push(p); + + if let Some(AggregateMeta::AggregatePayload(agg_payload)) = + AggregateMeta::::downcast_from(meta) + { + let res = agg_payload.bucket; + self.agg_payloads.push(agg_payload); return res; } } @@ -372,14 +360,7 @@ impl Processor return Ok(Event::NeedData); } - // if self.partition_payloads.len() == self.inputs.len() - // || (!self.buckets_blocks.is_empty() && !self.unsplitted_blocks.is_empty()) - // { - // // Split data blocks if it's unsplitted. - // return Ok(Event::Sync); - // } - - if (self.payloads.len()>0 && self.initialized_all_inputs) + if (!self.agg_payloads.is_empty() && self.initialized_all_inputs) || (!self.buckets_blocks.is_empty() && !self.unsplitted_blocks.is_empty()) { // Split data blocks if it's unsplitted. @@ -453,22 +434,28 @@ impl Processor } fn process(&mut self) -> Result<()> { - if !self.payloads.is_empty() { - let group_types = self.payloads[0].payload.group_types.clone(); - let aggrs = self.payloads[0].payload.aggrs.clone(); - let mut partitioned_payload = PartitionedPayload::new(group_types.clone(), aggrs.clone(), self.max_partition_count as u64); - - let mut arenas = vec![]; - for p in self.payloads.drain(0..) { - arenas.append(&mut vec![p.payload.arena.clone()]); - if p.max_partition_count != self.max_partition_count { - partitioned_payload.combine_single(p.payload, &mut self.flush_state); + if !self.agg_payloads.is_empty() { + let group_types = self.agg_payloads[0].payload.group_types.clone(); + let aggrs = self.agg_payloads[0].payload.aggrs.clone(); + + let mut partitioned_payload = PartitionedPayload::new( + group_types.clone(), + aggrs.clone(), + self.max_partition_count as u64, + ); + + for agg_payload in self.agg_payloads.drain(0..) { + partitioned_payload + .arenas + .append(&mut vec![agg_payload.payload.arena.clone()]); + if agg_payload.max_partition_count != self.max_partition_count { + debug_assert!(agg_payload.max_partition_count < self.max_partition_count); + partitioned_payload.combine_single(agg_payload.payload, &mut self.flush_state); } else { - partitioned_payload.payloads[p.bucket as usize].combine(p.payload); + partitioned_payload.payloads[agg_payload.bucket as usize] + .combine(agg_payload.payload); } - // partitioned_payload.combine_single(p.payload, &mut self.flush_state); } - partitioned_payload.arenas.extend_from_slice(&arenas); for (bucket, payload) in partitioned_payload.payloads.into_iter().enumerate() { let mut part = PartitionedPayload::new(group_types.clone(), aggrs.clone(), 1); @@ -488,63 +475,9 @@ impl Processor )); } }; - - // self.buckets_blocks - // .insert(bucket as isize, vec![DataBlock::empty_with_meta( - // AggregateMeta::::create_agg_hashtable(part), - // )]); } } - - return Ok(()); - - } - - if !self.partition_payloads.is_empty() { - let mut payloads = Vec::with_capacity(self.partition_payloads.len()); - - for p in self.partition_payloads.drain(0..) { - if p.partition_count() != self.max_partition_count { - let p = p.repartition(self.max_partition_count, &mut self.flush_state); - payloads.push(p); - } else { - payloads.push(p); - }; - } - - let group_types = payloads[0].group_types.clone(); - let aggrs = payloads[0].aggrs.clone(); - - let mut payload_map = (0..self.max_partition_count).map(|_| vec![]).collect_vec(); - - // All arenas should be kept in the bucket partition payload - let mut arenas = vec![]; - - for mut payload in payloads.into_iter() { - for (bucket, p) in payload.payloads.into_iter().enumerate() { - payload_map[bucket].push(p); - } - arenas.append(&mut payload.arenas); - } - - for (bucket, mut payloads) in payload_map.into_iter().enumerate() { - let mut partition_payload = - PartitionedPayload::new(group_types.clone(), aggrs.clone(), 1); - - for payload in payloads.drain(0..) { - partition_payload.combine_single(payload, &mut self.flush_state); - } - - partition_payload.arenas.extend_from_slice(&arenas); - - if partition_payload.len() != 0 { - self.buckets_blocks - .insert(bucket as isize, vec![DataBlock::empty_with_meta( - AggregateMeta::::create_agg_hashtable(partition_payload), - )]); - } - } return Ok(()); }