diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index 93018f1baadf8..3d301bc08c9f7 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -91,6 +91,10 @@ impl HashTableConfig { pub fn with_partial(mut self, partial_agg: bool, active_threads: usize) -> Self { self.partial_agg = partial_agg; + if active_threads == 0 { + return self; + } + // init max_partial_capacity let total_shared_cache_size = active_threads * L3_CACHE_SIZE; let cache_per_active_thread = diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index f5017b951b87c..e42e3d4e64c9a 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -106,6 +106,9 @@ impl PipelineBuilder { .settings .get_enable_experimental_aggregate_hashtable()? && self.ctx.get_cluster().is_empty(); + // let enable_experimental_aggregate_hashtable = self + // .settings + // .get_enable_experimental_aggregate_hashtable()?; let params = Self::build_aggregator_params( aggregate.input.output_schema()?, @@ -215,6 +218,9 @@ impl PipelineBuilder { .settings .get_enable_experimental_aggregate_hashtable()? && self.ctx.get_cluster().is_empty(); + // let enable_experimental_aggregate_hashtable = self + // .settings + // .get_enable_experimental_aggregate_hashtable()?; let params = Self::build_aggregator_params( aggregate.before_group_by_schema.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index 3458cb66d36cb..200b1b43f227f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -19,8 +19,12 @@ use bumpalo::Bump; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::AggregateHashTable; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; +use databend_common_expression::HashTableConfig; +use databend_common_expression::PartitionedPayload; +use databend_common_expression::PayloadFlushState; use databend_common_hashtable::FastHash; use databend_common_hashtable::HashtableEntryMutRefLike; use databend_common_hashtable::HashtableEntryRefLike; @@ -77,7 +81,8 @@ impl ExchangeSorting AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::Serialized(v) => Ok(v.bucket), AggregateMeta::HashTable(v) => Ok(v.bucket), - AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregateHashTable(_) => Ok(-1), + AggregateMeta::AggregatePayload(_) => todo!("AGG_HASHTABLE"), AggregateMeta::Spilled(_) | AggregateMeta::Spilling(_) | AggregateMeta::BucketSpilled(_) => Ok(-1), @@ -139,6 +144,34 @@ fn scatter( Ok(res) } +// TODO: buckets and partitions have a relationship of integer division +#[allow(dead_code)] +fn agg_hashtable_scatter( + payload: PartitionedPayload, + buckets: usize, +) -> Result> { + let mut buckets = Vec::with_capacity(buckets); + + for _ in 0..buckets.capacity() { + let config = HashTableConfig::default().with_partial(true, 0); + buckets.push(AggregateHashTable::new( + payload.group_types.clone(), + payload.aggrs.clone(), + config, + )); + } + + let mut state = PayloadFlushState::default(); + let new_payload = payload.repartition(buckets.len(), &mut state); + + for (idx, item) in new_payload.payloads.iter().enumerate() { + let mut flush_state = PayloadFlushState::default(); + buckets[idx].combine_payload(item, &mut flush_state)?; + } + + Ok(buckets) +} + impl FlightScatter for HashTableHashScatter { @@ -176,8 +209,16 @@ impl FlightScatter }); } } - + // AggregateMeta::AggregateHashTable(payload) => { + // for agg_hashtable in agg_hashtable_scatter(payload, self.buckets)? { + // blocks.push( + // DataBlock::empty_with_meta( + // AggregateMeta::::create_agg_hashtable(agg_hashtable.payload) + // )) + // } + // }, AggregateMeta::AggregateHashTable(_) => todo!("AGG_HASHTABLE"), + AggregateMeta::AggregatePayload(_) => todo!("AGG_HASHTABLE"), }; return Ok(blocks); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs index 6bd84eb63bbf0..3a98723ea714d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs @@ -21,6 +21,7 @@ use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::PartitionedPayload; +use databend_common_expression::Payload; use crate::pipelines::processors::transforms::aggregator::HashTableCell; use crate::pipelines::processors::transforms::group_by::HashMethodBounds; @@ -31,6 +32,12 @@ pub struct HashTablePayload { pub cell: HashTableCell, } +pub struct AggregatePayload { + pub bucket: isize, + pub payload: Payload, + pub max_partition_count: usize, +} + pub struct SerializedPayload { pub bucket: isize, pub data_block: DataBlock, @@ -54,6 +61,7 @@ pub enum AggregateMeta { Serialized(SerializedPayload), HashTable(HashTablePayload), AggregateHashTable(PartitionedPayload), + AggregatePayload(AggregatePayload), BucketSpilled(BucketSpilledPayload), Spilled(Vec), Spilling(HashTablePayload, V>), @@ -69,6 +77,20 @@ impl AggregateMeta BlockMetaInfoPtr { + Box::new(AggregateMeta::::AggregatePayload( + AggregatePayload { + bucket, + payload, + max_partition_count, + }, + )) + } + pub fn create_agg_hashtable(payload: PartitionedPayload) -> BlockMetaInfoPtr { Box::new(AggregateMeta::::AggregateHashTable(payload)) } @@ -136,6 +158,9 @@ impl Debug for AggregateMeta AggregateMeta::AggregateHashTable(_) => { f.debug_struct("AggregateMeta:AggHashTable").finish() } + AggregateMeta::AggregatePayload(_) => { + f.debug_struct("AggregateMeta:AggregatePayload").finish() + } } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index 9d212ff15dabb..9a2710d6dd97f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -146,6 +146,7 @@ impl TransformAggregateSerializer { return Ok(Event::Sync); } AggregateMeta::AggregateHashTable(_) => todo!("AGG_HASHTABLE"), + AggregateMeta::AggregatePayload(_) => todo!("AGG_HASHTABLE"), } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 1a378817fa220..3aa92f67d4a04 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -177,6 +177,7 @@ impl BlockMetaTransform } Some(AggregateMeta::AggregateHashTable(_)) => todo!("AGG_HASHTABLE"), + Some(AggregateMeta::AggregatePayload(_)) => todo!("AGG_HASHTABLE"), }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index 23f92a995ad44..6735aea685c2f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -54,6 +54,7 @@ use futures_util::future::BoxFuture; use log::info; use opendal::Operator; +use super::SerializePayload; use crate::api::serialize_block; use crate::api::ExchangeShuffleMeta; use crate::pipelines::processors::transforms::aggregator::exchange_defines; @@ -207,7 +208,27 @@ impl BlockMetaTransform }, )); } - Some(AggregateMeta::AggregateHashTable(_)) => todo!("AGG_HASHTABLE"), + Some(AggregateMeta::AggregateHashTable(payload)) => { + if index == self.local_pos { + serialized_blocks.push(FlightSerialized::DataBlock(block.add_meta( + Some(Box::new(AggregateMeta::::AggregateHashTable( + payload, + ))), + )?)); + continue; + } + let bucket = -1; + let mut stream = SerializeGroupByStream::create( + &self.method, + SerializePayload::PartitionedPayload(payload), + ); + serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { + None => DataBlock::empty(), + Some(data_block) => { + serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? + } + })); + } Some(AggregateMeta::HashTable(payload)) => { if index == self.local_pos { serialized_blocks.push(FlightSerialized::DataBlock(block.add_meta( @@ -216,8 +237,11 @@ impl BlockMetaTransform continue; } - let mut stream = SerializeGroupByStream::create(&self.method, payload); - let bucket = stream.payload.bucket; + let bucket = payload.bucket; + let mut stream = SerializeGroupByStream::create( + &self.method, + SerializePayload::HashTablePayload(payload), + ); serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { None => DataBlock::empty(), Some(data_block) => { @@ -225,6 +249,7 @@ impl BlockMetaTransform } })); } + Some(AggregateMeta::AggregatePayload(_)) => todo!("AGG_HASHTABLE"), }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs index ecc137af40a64..fc5c637be44f0 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -19,7 +19,10 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::ColumnBuilder; use databend_common_expression::DataBlock; +use databend_common_expression::PartitionedPayload; +use databend_common_expression::PayloadFlushState; use databend_common_hashtable::HashtableEntryRefLike; use databend_common_hashtable::HashtableLike; use databend_common_pipeline_core::processors::Event; @@ -27,6 +30,7 @@ use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; +use itertools::Itertools; use crate::pipelines::processors::transforms::aggregator::estimated_key_size; use crate::pipelines::processors::transforms::aggregator::AggregateMeta; @@ -126,12 +130,22 @@ impl TransformGroupBySerializer { AggregateMeta::Serialized(_) => unreachable!(), AggregateMeta::BucketSpilled(_) => unreachable!(), AggregateMeta::Partitioned { .. } => unreachable!(), - AggregateMeta::AggregateHashTable(_) => todo!("AGG_HASHTABLE"), + // AggregateMeta::AggregateHashTable(payload) => { + // self.input_data = Some(SerializeGroupByStream::create( + // &self.method, + // SerializePayload::PartitionedPayload(payload), + // )); + // return Ok(Event::Sync); + // } AggregateMeta::HashTable(payload) => { - self.input_data = - Some(SerializeGroupByStream::create(&self.method, payload)); + self.input_data = Some(SerializeGroupByStream::create( + &self.method, + SerializePayload::HashTablePayload(payload), + )); return Ok(Event::Sync); } + AggregateMeta::AggregateHashTable(_) => todo!("AGG_HASHTABLE"), + AggregateMeta::AggregatePayload(_) => todo!("AGG_HASHTABLE"), } } } @@ -157,10 +171,15 @@ pub fn serialize_group_by( ])) } +pub enum SerializePayload { + HashTablePayload(HashTablePayload), + PartitionedPayload(PartitionedPayload), +} + pub struct SerializeGroupByStream { method: Method, - pub payload: Pin>>, - iter: as HashtableLike>::Iterator<'static>, + pub payload: Pin>>, + iter: Option< as HashtableLike>::Iterator<'static>>, end_iter: bool, } @@ -169,10 +188,15 @@ unsafe impl Send for SerializeGroupByStream {} unsafe impl Sync for SerializeGroupByStream {} impl SerializeGroupByStream { - pub fn create(method: &Method, payload: HashTablePayload) -> Self { + pub fn create(method: &Method, payload: SerializePayload) -> Self { unsafe { let payload = Box::pin(payload); - let iter = NonNull::from(&payload.cell.hashtable).as_ref().iter(); + + let iter = if let SerializePayload::HashTablePayload(p) = payload.as_ref().get_ref() { + Some(NonNull::from(&p.cell.hashtable).as_ref().iter()) + } else { + None + }; SerializeGroupByStream:: { iter, @@ -188,38 +212,143 @@ impl Iterator for SerializeGroupByStream { type Item = Result; fn next(&mut self) -> Option { - if self.end_iter { - return None; - } + match self.payload.as_ref().get_ref() { + SerializePayload::HashTablePayload(p) => { + if self.end_iter { + return None; + } - let max_block_rows = std::cmp::min(8192, self.payload.cell.hashtable.len()); - let max_block_bytes = std::cmp::min( - 8 * 1024 * 1024 + 1024, - self.payload - .cell - .hashtable - .unsize_key_size() - .unwrap_or(usize::MAX), - ); - - let mut group_key_builder = self - .method - .keys_column_builder(max_block_rows, max_block_bytes); - - #[allow(clippy::while_let_on_iterator)] - while let Some(group_entity) = self.iter.next() { - group_key_builder.append_value(group_entity.key()); - - if group_key_builder.bytes_size() >= 8 * 1024 * 1024 { - let bucket = self.payload.bucket; + let max_block_rows = std::cmp::min(8192, p.cell.hashtable.len()); + let max_block_bytes = std::cmp::min( + 8 * 1024 * 1024 + 1024, + p.cell.hashtable.unsize_key_size().unwrap_or(usize::MAX), + ); + + let mut group_key_builder = self + .method + .keys_column_builder(max_block_rows, max_block_bytes); + + #[allow(clippy::while_let_on_iterator)] + while let Some(group_entity) = self.iter.as_mut()?.next() { + group_key_builder.append_value(group_entity.key()); + + if group_key_builder.bytes_size() >= 8 * 1024 * 1024 { + let bucket = p.bucket; + let data_block = + DataBlock::new_from_columns(vec![group_key_builder.finish()]); + return Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))); + } + } + + self.end_iter = true; + let bucket = p.bucket; let data_block = DataBlock::new_from_columns(vec![group_key_builder.finish()]); - return Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))); + Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))) } - } + SerializePayload::PartitionedPayload(p) => { + if self.end_iter { + return None; + } + + let mut state = PayloadFlushState::default(); + let mut blocks = vec![]; + + for item in p.payloads.iter() { + state.clear(); + while item.flush(&mut state) { + blocks.push(DataBlock::new_from_columns(state.take_group_columns())); + } + } - self.end_iter = true; - let bucket = self.payload.bucket; - let data_block = DataBlock::new_from_columns(vec![group_key_builder.finish()]); - Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))) + self.end_iter = true; + let data_block = if blocks.is_empty() { + empty_block(p) + } else { + DataBlock::concat(&blocks).unwrap() + }; + Some(data_block.add_meta(Some(AggregateSerdeMeta::create(-1)))) + } + } } } + +pub fn empty_block(p: &PartitionedPayload) -> DataBlock { + let columns = p + .aggrs + .iter() + .map(|f| ColumnBuilder::with_capacity(&f.return_type().unwrap(), 0).build()) + .chain( + p.group_types + .iter() + .map(|t| ColumnBuilder::with_capacity(t, 0).build()), + ) + .collect_vec(); + DataBlock::new_from_columns(columns) +} + +// pub struct SerializeGroupByStream { +// method: Method, +// pub payload: Pin>>, +// iter: as HashtableLike>::Iterator<'static>, +// end_iter: bool, +// } + +// unsafe impl Send for SerializeGroupByStream {} + +// unsafe impl Sync for SerializeGroupByStream {} + +// impl SerializeGroupByStream { +// pub fn create(method: &Method, payload: HashTablePayload) -> Self { +// unsafe { +// let payload = Box::pin(payload); +// let iter = NonNull::from(&payload.cell.hashtable).as_ref().iter(); + +// SerializeGroupByStream:: { +// iter, +// payload, +// method: method.clone(), +// end_iter: false, +// } +// } +// } +// } + +// impl Iterator for SerializeGroupByStream { +// type Item = Result; + +// fn next(&mut self) -> Option { +// if self.end_iter { +// return None; +// } + +// let max_block_rows = std::cmp::min(8192, self.payload.cell.hashtable.len()); +// let max_block_bytes = std::cmp::min( +// 8 * 1024 * 1024 + 1024, +// self.payload +// .cell +// .hashtable +// .unsize_key_size() +// .unwrap_or(usize::MAX), +// ); + +// let mut group_key_builder = self +// .method +// .keys_column_builder(max_block_rows, max_block_bytes); + +// #[allow(clippy::while_let_on_iterator)] +// while let Some(group_entity) = self.iter.next() { +// group_key_builder.append_value(group_entity.key()); + +// if group_key_builder.bytes_size() >= 8 * 1024 * 1024 { +// let bucket = self.payload.bucket; +// let data_block = DataBlock::new_from_columns(vec![group_key_builder.finish()]); +// return Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))); +// } +// } + +// self.end_iter = true; +// let bucket = self.payload.bucket; +// let data_block = DataBlock::new_from_columns(vec![group_key_builder.finish()]); +// Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))) +// } +// } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs index a023572980464..e62489d28e563 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -137,6 +137,7 @@ impl Processor AggregateMeta::Spilled(_) => unreachable!(), AggregateMeta::Spilling(_) => unreachable!(), AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(_) => unreachable!(), AggregateMeta::HashTable(_) => unreachable!(), AggregateMeta::Serialized(_) => unreachable!(), AggregateMeta::BucketSpilled(payload) => { @@ -179,6 +180,7 @@ impl Processor AggregateMeta::Spilling(_) => unreachable!(), AggregateMeta::HashTable(_) => unreachable!(), AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(_) => unreachable!(), AggregateMeta::Serialized(_) => unreachable!(), AggregateMeta::BucketSpilled(payload) => { let instant = Instant::now(); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index 953cc0301f485..556062bb53dc6 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -240,6 +240,7 @@ where Method: HashMethodBounds } }, AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(_) => unreachable!(), } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs index 06b95674a0596..f3042dca63e05 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs @@ -419,9 +419,23 @@ impl AccumulatingTransform for TransformPartialAggrega blocks } - HashTable::AggregateHashTable(hashtable) => vec![DataBlock::empty_with_meta( - AggregateMeta::::create_agg_hashtable(hashtable.payload), - )], + HashTable::AggregateHashTable(hashtable) => { + let partition_count = hashtable.payload.partition_count(); + let mut blocks = Vec::with_capacity(partition_count); + for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { + if payload.len() != 0 { + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::::create_agg_payload( + bucket as isize, + payload, + partition_count, + ), + )); + } + } + + blocks + } }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs index 9eb4a59fb3541..0be006d7a94c8 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs @@ -62,6 +62,7 @@ impl TransformFinalGroupBy { fn transform_agg_hashtable(&mut self, meta: AggregateMeta) -> Result { let mut agg_hashtable: Option = None; + let mut blocks = vec![]; if let AggregateMeta::Partitioned { bucket: _, data } = meta { for bucket_data in data { match bucket_data { @@ -82,13 +83,15 @@ impl TransformFinalGroupBy { agg_hashtable = Some(hashtable); } }, + AggregateMeta::Serialized(payload) => { + blocks.push(payload.data_block); + } _ => unreachable!(), } } } if let Some(mut ht) = agg_hashtable { - let mut blocks = vec![]; self.flush_state.clear(); loop { if ht.merge_result(&mut self.flush_state)? { @@ -106,8 +109,62 @@ impl TransformFinalGroupBy { return DataBlock::concat(&blocks); } + + if !blocks.is_empty() { + return DataBlock::concat(&blocks); + } + Ok(self.params.empty_result_block()) } + + // fn transform_agg_hashtable(&mut self, meta: AggregateMeta) -> Result { + // let mut agg_hashtable: Option = None; + // if let AggregateMeta::Partitioned { bucket: _, data } = meta { + // for bucket_data in data { + // match bucket_data { + // AggregateMeta::AggregateHashTable(payload) => match agg_hashtable.as_mut() { + // Some(ht) => { + // ht.combine_payloads(&payload, &mut self.flush_state)?; + // } + // None => { + // let capacity = + // AggregateHashTable::get_capacity_for_count(payload.len()); + // let mut hashtable = AggregateHashTable::new_with_capacity( + // self.params.group_data_types.clone(), + // self.params.aggregate_functions.clone(), + // HashTableConfig::default().with_initial_radix_bits(0), + // capacity, + // ); + // hashtable.combine_payloads(&payload, &mut self.flush_state)?; + // agg_hashtable = Some(hashtable); + // } + // }, + // _ => unreachable!(), + // } + // } + // } + + // if let Some(mut ht) = agg_hashtable { + // let mut blocks = vec![]; + // self.flush_state.clear(); + // loop { + // if ht.merge_result(&mut self.flush_state)? { + // blocks.push(DataBlock::new_from_columns( + // self.flush_state.take_group_columns(), + // )); + // } else { + // break; + // } + // } + + // if blocks.is_empty() { + // return Ok(self.params.empty_result_block()); + // } + + // return DataBlock::concat(&blocks); + // } + // Ok(self.params.empty_result_block()) + // } } impl BlockMetaTransform> for TransformFinalGroupBy @@ -161,6 +218,7 @@ where Method: HashMethodBounds } }, AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(_) => unreachable!(), } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs index 139e349f57202..c03fa7661a146 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs @@ -264,9 +264,23 @@ impl AccumulatingTransform for TransformPartialGroupBy blocks } - HashTable::AggregateHashTable(hashtable) => vec![DataBlock::empty_with_meta( - AggregateMeta::::create_agg_hashtable(hashtable.payload), - )], + HashTable::AggregateHashTable(hashtable) => { + let partition_count = hashtable.payload.partition_count(); + let mut blocks = Vec::with_capacity(partition_count); + for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { + if payload.len() != 0 { + blocks.push(DataBlock::empty_with_meta( + AggregateMeta::::create_agg_payload( + bucket as isize, + payload, + partition_count, + ), + )); + } + } + + blocks + } }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs index 719239cb9e1ac..fe0abcdba2a5c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs @@ -36,8 +36,8 @@ use databend_common_pipeline_core::Pipe; use databend_common_pipeline_core::PipeItem; use databend_common_pipeline_core::Pipeline; use databend_common_storage::DataOperator; -use itertools::Itertools; +use super::AggregatePayload; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::HashTablePayload; use crate::pipelines::processors::transforms::aggregator::aggregate_meta::SerializedPayload; @@ -68,7 +68,7 @@ pub struct TransformPartitionBucket>, flush_state: PayloadFlushState, - partition_payloads: Vec, + agg_payloads: Vec, unsplitted_blocks: Vec, max_partition_count: usize, _phantom: PhantomData, @@ -97,7 +97,7 @@ impl buckets_blocks: BTreeMap::new(), unsplitted_blocks: vec![], flush_state: PayloadFlushState::default(), - partition_payloads: vec![], + agg_payloads: vec![], initialized_all_inputs: false, max_partition_count: 0, _phantom: Default::default(), @@ -158,9 +158,16 @@ impl AggregateMeta::BucketSpilled(payload) => { (payload.bucket, SINGLE_LEVEL_BUCKET_NUM) } - AggregateMeta::Serialized(payload) => (payload.bucket, payload.bucket), - AggregateMeta::HashTable(payload) => (payload.bucket, payload.bucket), + AggregateMeta::Serialized(payload) => { + println!("ser"); + (payload.bucket, payload.bucket) + } + AggregateMeta::HashTable(payload) => { + println!("hash"); + (payload.bucket, payload.bucket) + } AggregateMeta::Spilled(_) => { + println!("spilled"); let meta = data_block.take_meta().unwrap(); if let Some(AggregateMeta::Spilled(buckets_payload)) = @@ -190,11 +197,13 @@ impl unreachable!() } - AggregateMeta::AggregateHashTable(p) => { + AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(payload) => { + println!("agg_payload"); self.max_partition_count = - self.max_partition_count.max(p.partition_count()); + self.max_partition_count.max(payload.max_partition_count); - (0, 0) + (payload.bucket, payload.bucket) } }; @@ -215,12 +224,14 @@ impl if self.max_partition_count > 0 { let meta = data_block.take_meta().unwrap(); - if let Some(AggregateMeta::AggregateHashTable(p)) = + + if let Some(AggregateMeta::AggregatePayload(agg_payload)) = AggregateMeta::::downcast_from(meta) { - self.partition_payloads.push(p); + let res = agg_payload.bucket; + self.agg_payloads.push(agg_payload); + return res; } - return 0; } self.unsplitted_blocks.push(data_block); @@ -349,7 +360,7 @@ impl Processor return Ok(Event::NeedData); } - if self.partition_payloads.len() == self.inputs.len() + if (!self.agg_payloads.is_empty() && self.initialized_all_inputs) || (!self.buckets_blocks.is_empty() && !self.unsplitted_blocks.is_empty()) { // Split data blocks if it's unsplitted. @@ -423,50 +434,50 @@ impl Processor } fn process(&mut self) -> Result<()> { - if !self.partition_payloads.is_empty() { - let mut payloads = Vec::with_capacity(self.partition_payloads.len()); - - for p in self.partition_payloads.drain(0..) { - if p.partition_count() != self.max_partition_count { - let p = p.repartition(self.max_partition_count, &mut self.flush_state); - payloads.push(p); + if !self.agg_payloads.is_empty() { + let group_types = self.agg_payloads[0].payload.group_types.clone(); + let aggrs = self.agg_payloads[0].payload.aggrs.clone(); + + let mut partitioned_payload = PartitionedPayload::new( + group_types.clone(), + aggrs.clone(), + self.max_partition_count as u64, + ); + + for agg_payload in self.agg_payloads.drain(0..) { + partitioned_payload + .arenas + .append(&mut vec![agg_payload.payload.arena.clone()]); + if agg_payload.max_partition_count != self.max_partition_count { + debug_assert!(agg_payload.max_partition_count < self.max_partition_count); + partitioned_payload.combine_single(agg_payload.payload, &mut self.flush_state); } else { - payloads.push(p); - }; - } - - let group_types = payloads[0].group_types.clone(); - let aggrs = payloads[0].aggrs.clone(); - - let mut payload_map = (0..self.max_partition_count).map(|_| vec![]).collect_vec(); - - // All arenas should be kept in the bucket partition payload - let mut arenas = vec![]; - - for mut payload in payloads.into_iter() { - for (bucket, p) in payload.payloads.into_iter().enumerate() { - payload_map[bucket].push(p); + partitioned_payload.payloads[agg_payload.bucket as usize] + .combine(agg_payload.payload); } - arenas.append(&mut payload.arenas); } - for (bucket, mut payloads) in payload_map.into_iter().enumerate() { - let mut partition_payload = - PartitionedPayload::new(group_types.clone(), aggrs.clone(), 1); - - for payload in payloads.drain(0..) { - partition_payload.combine_single(payload, &mut self.flush_state); - } + for (bucket, payload) in partitioned_payload.payloads.into_iter().enumerate() { + let mut part = PartitionedPayload::new(group_types.clone(), aggrs.clone(), 1); + part.combine_single(payload, &mut self.flush_state); + part.arenas.extend_from_slice(&partitioned_payload.arenas); - partition_payload.arenas.extend_from_slice(&arenas); - - if partition_payload.len() != 0 { - self.buckets_blocks - .insert(bucket as isize, vec![DataBlock::empty_with_meta( - AggregateMeta::::create_agg_hashtable(partition_payload), - )]); + if part.len() != 0 { + match self.buckets_blocks.entry(bucket as isize) { + Entry::Vacant(v) => { + v.insert(vec![DataBlock::empty_with_meta( + AggregateMeta::::create_agg_hashtable(part), + )]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(DataBlock::empty_with_meta( + AggregateMeta::::create_agg_hashtable(part), + )); + } + }; } } + return Ok(()); } @@ -489,6 +500,7 @@ impl Processor AggregateMeta::Serialized(payload) => self.partition_block(payload)?, AggregateMeta::HashTable(payload) => self.partition_hashtable(payload)?, AggregateMeta::AggregateHashTable(_) => unreachable!(), + AggregateMeta::AggregatePayload(_) => unreachable!(), }; for (bucket, block) in data_blocks.into_iter().enumerate() { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index e227b3e76457a..8ee050715bc07 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -607,7 +607,7 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=1)), }), ("enable_experimental_aggregate_hashtable", DefaultSettingValue { - value: UserSettingValue::UInt64(0), + value: UserSettingValue::UInt64(1), desc: "Enables experimental aggregate hashtable", mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)),