diff --git a/src/query/service/src/physical_plans/physical_aggregate_final.rs b/src/query/service/src/physical_plans/physical_aggregate_final.rs index d81bf1eefd837..e07221cd0686e 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_final.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_final.rs @@ -148,8 +148,7 @@ impl IPhysicalPlan for AggregateFinal { .get_enable_experimental_aggregate_hashtable()?; let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?; let max_restore_worker = builder.settings.get_max_aggregate_restore_worker()?; - let experiment_aggregate_final = - builder.settings.get_enable_experiment_aggregate_final()?; + let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?; let mut is_cluster_aggregate = false; if ExchangeSource::check_physical_plan(&self.input) { @@ -164,6 +163,7 @@ impl IPhysicalPlan for AggregateFinal { is_cluster_aggregate, max_block_size as usize, max_spill_io_requests as usize, + enable_experiment_aggregate, )?; if params.group_columns.is_empty() { @@ -201,7 +201,6 @@ impl IPhysicalPlan for AggregateFinal { params.clone(), max_restore_worker, after_group_parallel, - experiment_aggregate_final, builder.ctx.clone(), ) } diff --git a/src/query/service/src/physical_plans/physical_aggregate_partial.rs b/src/query/service/src/physical_plans/physical_aggregate_partial.rs index 5e1f3e61799b8..b8a30e456a271 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_partial.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_partial.rs @@ -178,6 +178,8 @@ impl IPhysicalPlan for AggregatePartial { .settings .get_enable_experimental_aggregate_hashtable()?; + let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?; + let params = PipelineBuilder::build_aggregator_params( self.input.output_schema()?, &self.group_by, @@ -186,6 +188,7 @@ impl IPhysicalPlan for AggregatePartial { builder.is_exchange_parent(), max_block_size as usize, max_spill_io_requests as usize, + enable_experiment_aggregate, )?; if params.group_columns.is_empty() { diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index 45081cb977338..89ab750cd7583 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -38,6 +38,7 @@ impl PipelineBuilder { cluster_aggregator: bool, max_block_size: usize, max_spill_io_requests: usize, + enable_experiment_aggregate: bool, ) -> Result> { let mut agg_args = Vec::with_capacity(agg_funcs.len()); let (group_by, group_data_types) = group_by @@ -132,6 +133,7 @@ impl PipelineBuilder { cluster_aggregator, max_block_size, max_spill_io_requests, + enable_experiment_aggregate, )?; log::debug!("aggregate states layout: {:?}", params.states_layout); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs index 262cca08398e7..a0e8dce3a9e10 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs @@ -59,6 +59,7 @@ impl ExchangeSorting for AggregateExchangeSorting { ))), Some(meta_info) => match meta_info { AggregateMeta::Partitioned { .. } => unreachable!(), + AggregateMeta::NewBucketSpilled(_) => unreachable!(), AggregateMeta::Serialized(v) => { compute_block_number(v.bucket, v.max_partition_count) } @@ -181,6 +182,7 @@ impl FlightScatter for HashTableHashScatter { AggregateMeta::BucketSpilled(_) => unreachable!(), AggregateMeta::Serialized(_) => unreachable!(), AggregateMeta::Partitioned { .. } => unreachable!(), + AggregateMeta::NewBucketSpilled(_) => unreachable!(), AggregateMeta::AggregateSpilling(payload) => { for p in scatter_partitioned_payload(payload, self.buckets)? { blocks.push(DataBlock::empty_with_meta( diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs index 241fab7f9f9c3..a2bd28cff603a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs @@ -31,6 +31,7 @@ use databend_common_expression::PartitionedPayload; use databend_common_expression::Payload; use databend_common_expression::ProbeState; use databend_common_expression::ProjectedBlock; +use parquet::file::metadata::RowGroupMetaData; pub struct SerializedPayload { pub bucket: isize, @@ -116,6 +117,12 @@ pub struct BucketSpilledPayload { pub max_partition_count: usize, } +pub struct NewSpilledPayload { + pub bucket: isize, + pub location: String, + pub row_group: RowGroupMetaData, +} + pub struct AggregatePayload { pub bucket: isize, pub payload: Payload, @@ -131,6 +138,7 @@ pub enum AggregateMeta { Spilled(Vec), Partitioned { bucket: isize, data: Vec }, + NewBucketSpilled(NewSpilledPayload), } impl AggregateMeta { @@ -173,6 +181,10 @@ impl AggregateMeta { pub fn create_partitioned(bucket: isize, data: Vec) -> BlockMetaInfoPtr { Box::new(AggregateMeta::Partitioned { data, bucket }) } + + pub fn create_new_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr { + Box::new(AggregateMeta::NewBucketSpilled(payload)) + } } impl serde::Serialize for AggregateMeta { @@ -200,6 +212,9 @@ impl Debug for AggregateMeta { } AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilled").finish(), AggregateMeta::BucketSpilled(_) => f.debug_struct("Aggregate::BucketSpilled").finish(), + AggregateMeta::NewBucketSpilled(_) => { + f.debug_struct("Aggregate::NewBucketSpilled").finish() + } AggregateMeta::AggregatePayload(_) => { f.debug_struct("AggregateMeta:AggregatePayload").finish() } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs index ce73b65e164da..25a5449e47609 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs @@ -41,6 +41,8 @@ pub struct AggregatorParams { pub cluster_aggregator: bool, pub max_block_size: usize, pub max_spill_io_requests: usize, + + pub enable_experiment_aggregate: bool, } impl AggregatorParams { @@ -54,6 +56,7 @@ impl AggregatorParams { cluster_aggregator: bool, max_block_size: usize, max_spill_io_requests: usize, + enable_experiment_aggregate: bool, ) -> Result> { let states_layout = if !agg_funcs.is_empty() { Some(get_states_layout(agg_funcs)?) @@ -72,6 +75,7 @@ impl AggregatorParams { cluster_aggregator, max_block_size, max_spill_io_requests, + enable_experiment_aggregate, })) } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs index 2bd9556aeda36..b1d2cb83d441e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -27,10 +28,10 @@ use parking_lot::Mutex; use tokio::sync::Barrier; use tokio::sync::Semaphore; -use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::FinalAggregateSharedState; -use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::FinalAggregateSpiller; -use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::NewFinalAggregateTransform; -use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::TransformPartitionBucketScatter; +use crate::pipelines::processors::transforms::aggregator::new_aggregate::FinalAggregateSharedState; +use crate::pipelines::processors::transforms::aggregator::new_aggregate::NewAggregateSpiller; +use crate::pipelines::processors::transforms::aggregator::new_aggregate::NewFinalAggregateTransform; +use crate::pipelines::processors::transforms::aggregator::new_aggregate::TransformPartitionBucketScatter; use crate::pipelines::processors::transforms::aggregator::transform_partition_bucket::TransformPartitionBucket; use crate::pipelines::processors::transforms::aggregator::AggregatorParams; use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillReader; @@ -43,8 +44,6 @@ fn build_partition_bucket_experimental( after_worker: usize, ctx: Arc, ) -> Result<()> { - let operator = DataOperator::instance().spill_operator(); - // PartitionedPayload only accept power of two partitions let mut output_num = after_worker.next_power_of_two(); const MAX_PARTITION_COUNT: usize = 128; @@ -71,8 +70,13 @@ fn build_partition_bucket_experimental( let barrier = Arc::new(Barrier::new(output_num)); let shared_state = Arc::new(Mutex::new(FinalAggregateSharedState::new(output_num))); + let settings = ctx.get_settings(); + let rows = settings.get_max_block_size()? as usize; + let bytes = settings.get_max_block_bytes()? as usize; + let max_aggregate_spill_level = settings.get_max_aggregate_spill_level()? as usize; + for id in 0..output_num { - let spiller = FinalAggregateSpiller::try_create(ctx.clone(), operator.clone())?; + let spiller = NewAggregateSpiller::try_create(ctx.clone(), output_num, rows, bytes)?; let input_port = InputPort::create(); let output_port = OutputPort::create(); let processor = NewFinalAggregateTransform::try_create( @@ -84,7 +88,7 @@ fn build_partition_bucket_experimental( barrier.clone(), shared_state.clone(), spiller, - ctx.clone(), + max_aggregate_spill_level, )?; builder.add_transform(input_port, output_port, ProcessorPtr::create(processor)); } @@ -103,8 +107,8 @@ fn build_partition_bucket_legacy( ) -> Result<()> { let operator = DataOperator::instance().spill_operator(); - let input_nums = pipeline.output_len(); - let transform = TransformPartitionBucket::create(input_nums, params.clone())?; + let input_num = pipeline.output_len(); + let transform = TransformPartitionBucket::create(input_num, params.clone())?; let output = transform.get_output(); let inputs_port = transform.get_inputs(); @@ -115,7 +119,7 @@ fn build_partition_bucket_legacy( vec![output], )])); - pipeline.try_resize(std::cmp::min(input_nums, max_restore_worker as usize))?; + pipeline.try_resize(std::cmp::min(input_num, max_restore_worker as usize))?; let semaphore = Arc::new(Semaphore::new(params.max_spill_io_requests)); pipeline.add_transform(|input, output| { let operator = operator.clone(); @@ -133,17 +137,16 @@ fn build_partition_bucket_legacy( Ok(()) } -/// Build partition bucket pipeline based on the experiment_aggregate_final flag. +/// Build partition bucket pipeline based on the experiment_aggregate flag. /// Dispatches to either experimental or legacy implementation. pub fn build_partition_bucket( pipeline: &mut Pipeline, params: Arc, max_restore_worker: u64, after_worker: usize, - experiment_aggregate_final: bool, ctx: Arc, ) -> Result<()> { - if experiment_aggregate_final { + if params.enable_experiment_aggregate { build_partition_bucket_experimental(pipeline, params, after_worker, ctx) } else { build_partition_bucket_legacy(pipeline, params, max_restore_worker, after_worker) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs index 0e3bab15f9928..1f8806a82272d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs @@ -16,7 +16,7 @@ mod aggregate_exchange_injector; mod aggregate_meta; mod aggregator_params; mod build_partition_bucket; -mod new_final_aggregate; +mod new_aggregate; mod serde; mod transform_aggregate_expand; mod transform_aggregate_final; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/datablock_splitter.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/datablock_splitter.rs similarity index 100% rename from src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/datablock_splitter.rs rename to src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/datablock_splitter.rs diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs similarity index 84% rename from src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/mod.rs rename to src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs index d38eef8e5cc67..fb7c6f5af533b 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs @@ -13,12 +13,13 @@ // limitations under the License. mod datablock_splitter; -mod final_aggregate_spiller; +mod new_aggregate_spiller; +mod new_final_aggregate_state; mod new_transform_final_aggregate; mod transform_partition_bucket_scatter; pub use datablock_splitter::split_partitioned_meta_into_datablocks; -pub use final_aggregate_spiller::FinalAggregateSpiller; -pub use new_transform_final_aggregate::FinalAggregateSharedState; +pub use new_aggregate_spiller::NewAggregateSpiller; +pub use new_final_aggregate_state::FinalAggregateSharedState; pub use new_transform_final_aggregate::NewFinalAggregateTransform; pub use transform_partition_bucket_scatter::TransformPartitionBucketScatter; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs new file mode 100644 index 0000000000000..4e654405bb80e --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -0,0 +1,356 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; + +use databend_common_base::base::GlobalUniqName; +use databend_common_base::base::ProgressValues; +use databend_common_base::runtime::profile::Profile; +use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::GlobalIORuntime; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::arrow::deserialize_column; +use databend_common_expression::BlockPartitionStream; +use databend_common_expression::DataBlock; +use databend_common_pipeline_transforms::MemorySettings; +use databend_common_storage::DataOperator; +use databend_common_storages_parquet::ReadSettings; +use parquet::file::metadata::RowGroupMetaData; + +use crate::pipelines::memory_settings::MemorySettingsExt; +use crate::pipelines::processors::transforms::aggregator::AggregateMeta; +use crate::pipelines::processors::transforms::aggregator::BucketSpilledPayload; +use crate::pipelines::processors::transforms::aggregator::NewSpilledPayload; +use crate::pipelines::processors::transforms::aggregator::SerializedPayload; +use crate::sessions::QueryContext; +use crate::spillers::SpillsBufferPool; +use crate::spillers::SpillsDataWriter; + +struct AggregatePayloadWriter { + path: String, + writer: SpillsDataWriter, +} + +impl AggregatePayloadWriter { + fn create(prefix: &str) -> Result { + let operator = DataOperator::instance().spill_operator(); + let buffer_pool = SpillsBufferPool::instance(); + let file_path = format!("{}/{}", prefix, GlobalUniqName::unique()); + let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?; + + Ok(AggregatePayloadWriter { + path: file_path, + writer: spills_data_writer, + }) + } + + fn write_block(&mut self, block: DataBlock) -> Result<()> { + if block.is_empty() { + return Ok(()); + } + + self.writer.write(block)?; + self.writer.flush() + } + + fn close(self) -> Result<(String, usize, Vec)> { + let (bytes_written, row_groups) = self.writer.close()?; + Ok((self.path, bytes_written, row_groups)) + } +} + +#[derive(Default)] +struct WriteStats { + rows: usize, + bytes: usize, + count: usize, + elapsed: Duration, +} + +impl WriteStats { + fn accumulate(&mut self, elapsed: Duration) { + self.count += 1; + self.elapsed += elapsed; + } + + fn add_rows(&mut self, rows: usize) { + self.rows += rows; + } + + fn add_bytes(&mut self, bytes: usize) { + self.bytes += bytes; + } + + fn take(&mut self) -> Self { + std::mem::take(self) + } +} + +pub struct NewAggregateSpiller { + pub memory_settings: MemorySettings, + ctx: Arc, + read_setting: ReadSettings, + spill_prefix: String, + partition_count: usize, + partition_stream: BlockPartitionStream, + payload_writers: Option>, + write_stats: WriteStats, +} + +impl NewAggregateSpiller { + pub fn try_create( + ctx: Arc, + partition_count: usize, + rows_threshold: usize, + bytes_threshold: usize, + ) -> Result { + let memory_settings = MemorySettings::from_aggregate_settings(&ctx)?; + let table_ctx: Arc = ctx.clone(); + let read_setting = ReadSettings::from_settings(&table_ctx.get_settings())?; + let spill_prefix = ctx.query_id_spill_prefix(); + let partition_stream = + BlockPartitionStream::create(rows_threshold, bytes_threshold, partition_count); + + Ok(Self { + memory_settings, + ctx, + read_setting, + spill_prefix, + partition_count, + partition_stream, + payload_writers: None, + write_stats: WriteStats::default(), + }) + } + + fn create_payload_writers( + prefix: &str, + partition_count: usize, + ) -> Result> { + (0..partition_count) + .map(|_| AggregatePayloadWriter::create(prefix)) + .collect() + } + + fn ensure_payload_writers(&mut self) -> Result<()> { + if self.payload_writers.is_none() { + self.payload_writers = Some(Self::create_payload_writers( + &self.spill_prefix, + self.partition_count, + )?); + } + Ok(()) + } + + fn write_ready_blocks(&mut self, ready_blocks: Vec<(usize, DataBlock)>) -> Result<()> { + if ready_blocks.is_empty() { + return Ok(()); + } + + self.ensure_payload_writers()?; + + for (bucket, block) in ready_blocks { + if block.is_empty() { + continue; + } + + if bucket >= self.partition_count { + return Err(ErrorCode::Internal( + "NewAggregateSpiller produced invalid partition id", + )); + } + + let start = Instant::now(); + { + let writers = self + .payload_writers + .as_mut() + .ok_or_else(|| ErrorCode::Internal("payload writers are not initialized"))?; + writers[bucket].write_block(block)?; + } + let elapsed = start.elapsed(); + self.write_stats.accumulate(elapsed); + } + + Ok(()) + } + + pub fn spill(&mut self, partition_id: usize, block: DataBlock) -> Result<()> { + self.ensure_payload_writers()?; + + if block.is_empty() { + return Ok(()); + } + + if partition_id >= self.partition_count { + return Err(ErrorCode::Internal( + "NewAggregateSpiller received invalid partition id", + )); + } + + let indices = vec![partition_id as u64; block.num_rows()]; + let ready_blocks = self.partition_stream.partition(indices, block, true); + self.write_ready_blocks(ready_blocks) + } + + pub fn spill_finish(&mut self) -> Result> { + self.ensure_payload_writers()?; + + let mut pending_blocks = Vec::new(); + for partition_id in 0..self.partition_count { + if let Some(block) = self.partition_stream.finalize_partition(partition_id) { + pending_blocks.push((partition_id, block)); + } + } + self.write_ready_blocks(pending_blocks)?; + + let mut spilled_payloads = Vec::new(); + + if let Some(writers) = self.payload_writers.take() { + for (partition_id, writer) in writers.into_iter().enumerate() { + let (path, written_size, row_groups) = writer.close()?; + if row_groups.is_empty() { + continue; + } + + if written_size > 0 { + self.write_stats.add_bytes(written_size); + } + for row_group in row_groups { + self.write_stats.add_rows(row_group.num_rows() as usize); + spilled_payloads.push(NewSpilledPayload { + bucket: partition_id as isize, + location: path.clone(), + row_group, + }); + } + } + + self.flush_write_profile(); + } + + Ok(spilled_payloads) + } + + pub fn restore(&self, payload: NewSpilledPayload) -> Result { + let NewSpilledPayload { + bucket, + location, + row_group, + } = payload; + + let operator = DataOperator::instance().spill_operator(); + let buffer_pool = SpillsBufferPool::instance(); + let mut reader = buffer_pool.reader(operator.clone(), location, vec![row_group.clone()])?; + + let read_bytes = row_group.total_byte_size() as usize; + let instant = Instant::now(); + let data_block = reader.read(self.read_setting)?; + self.flush_read_profile(&instant, read_bytes); + + if let Some(block) = data_block { + Ok(AggregateMeta::Serialized(SerializedPayload { + bucket, + data_block: block, + max_partition_count: self.partition_count, + })) + } else { + Err(ErrorCode::Internal("read empty block from final aggregate")) + } + } + + // legacy, will remove later + pub fn restore_legacy(&self, payload: BucketSpilledPayload) -> Result { + // read + let instant = Instant::now(); + let operator = DataOperator::instance().spill_operator(); + let BucketSpilledPayload { + bucket, + location, + data_range, + columns_layout, + max_partition_count, + } = payload; + let data = GlobalIORuntime::instance().block_on(async move { + let data = operator + .read_with(&location) + .range(data_range) + .await? + .to_vec(); + Ok(data) + })?; + self.flush_read_profile(&instant, data.len()); + + // deserialize + let mut begin = 0; + let mut columns = Vec::with_capacity(columns_layout.len()); + for &column_layout in &columns_layout { + columns.push(deserialize_column( + &data[begin..begin + column_layout as usize], + )?); + begin += column_layout as usize; + } + + Ok(AggregateMeta::Serialized(SerializedPayload { + bucket, + data_block: DataBlock::new_from_columns(columns), + max_partition_count, + })) + } + + fn flush_read_profile(&self, instant: &Instant, read_bytes: usize) { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillReadTime, + instant.elapsed().as_millis() as usize, + ); + } + + fn flush_write_profile(&mut self) { + let stats = self.write_stats.take(); + if stats.count == 0 && stats.bytes == 0 && stats.rows == 0 { + return; + } + + if stats.count > 0 { + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteCount, + stats.count, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, + stats.elapsed.as_millis() as usize, + ); + } + if stats.bytes > 0 { + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + stats.bytes, + ); + } + + if stats.rows > 0 || stats.bytes > 0 { + let progress_val = ProgressValues { + rows: stats.rows, + bytes: stats.bytes, + }; + self.ctx.get_aggregate_spill_progress().incr(&progress_val); + } + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs new file mode 100644 index 0000000000000..114af69d0245e --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_final_aggregate_state.rs @@ -0,0 +1,227 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::Event; + +use super::split_partitioned_meta_into_datablocks; +use crate::pipelines::processors::transforms::aggregator::AggregateMeta; + +pub struct RepartitionedQueues(pub Vec>); + +impl RepartitionedQueues { + pub fn create(partition_count: usize) -> Self { + let queues = (0..partition_count).map(|_| Vec::new()).collect(); + Self(queues) + } + + pub fn take_queues(&mut self) -> Self { + let partition_count = self.0.len(); + std::mem::replace(self, Self::create(partition_count)) + } + + pub fn take_queue(&mut self, partition_idx: usize) -> Vec { + std::mem::take(&mut self.0[partition_idx]) + } + + pub fn merge_queues(&mut self, other: Self) { + for (idx, mut queue) in other.0.into_iter().enumerate() { + self.0[idx].append(&mut queue); + } + } + + pub fn push_to_queue(&mut self, partition_idx: usize, meta: AggregateMeta) { + self.0[partition_idx].push(meta); + } +} + +pub enum RoundPhase { + Idle, + NewTask(AggregateMeta), + OutputReady(DataBlock), + Aggregate, + AsyncWait, +} + +impl Display for RoundPhase { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RoundPhase::Idle => write!(f, "Idle"), + RoundPhase::NewTask(_) => write!(f, "NewTask"), + RoundPhase::OutputReady(_) => write!(f, "OutputReady"), + RoundPhase::AsyncWait => write!(f, "AsyncWait"), + RoundPhase::Aggregate => write!(f, "Aggregate"), + } + } +} + +pub struct LocalRoundState { + pub phase: RoundPhase, + pub first_data_ready: bool, + pub is_spilled: bool, + pub is_reported: bool, + pub current_queue_spill_round: usize, + pub max_aggregate_spill_level: usize, + pub working_queue: Vec, +} + +impl LocalRoundState { + pub fn new(max_aggregate_spill_level: usize) -> Self { + Self { + phase: RoundPhase::Idle, + first_data_ready: false, + is_spilled: false, + is_reported: false, + current_queue_spill_round: 0, + max_aggregate_spill_level, + working_queue: Vec::new(), + } + } + + /// Resets round-specific flags when a new round begins. + pub fn reset_for_new_round(&mut self, spill_round: usize) { + self.is_spilled = false; + self.is_reported = false; + self.current_queue_spill_round = spill_round; + } + + pub fn take_phase(&mut self) -> RoundPhase { + std::mem::replace(&mut self.phase, RoundPhase::Idle) + } + + pub fn schedule_next_task(&mut self) -> Option { + self.working_queue.pop().map(|aggregate_meta| { + self.phase = RoundPhase::NewTask(aggregate_meta); + Event::Sync + }) + } + + pub fn schedule_async_wait(&mut self) -> Event { + self.is_reported = true; + self.phase = RoundPhase::AsyncWait; + Event::Async + } + + pub fn enqueue_partitioned_meta(&mut self, datablock: &mut DataBlock) -> Result<()> { + if let Some(block_meta) = datablock.take_meta().and_then(AggregateMeta::downcast_from) { + match block_meta { + AggregateMeta::Partitioned { data, .. } => { + self.working_queue.extend(data); + } + _ => { + return Err(ErrorCode::Internal( + "NewFinalAggregateTransform expects Partitioned AggregateMeta from upstream", + )); + } + } + } + Ok(()) + } +} + +struct PendingQueue { + data: Vec, + spill_round: usize, +} + +pub struct FinalAggregateSharedState { + pub is_spilled: bool, + pub last_round_is_spilled: bool, + finished_count: usize, + partition_count: usize, + + /// Collects the per-partition aggregate metadata reported by each processor during the current round. + pub repartitioned_queues: RepartitionedQueues, + + pub need_aggregate_queues: RepartitionedQueues, + + /// Partition queues waiting for block-level repartitioning (typically restored from spill). + pending_queues: Vec, + + /// DataBlocks prepared from the pending queues for processors to consume in the next round. + next_round: Vec, + + /// Spill counter of the queue currently scheduled in `next_round`. + current_queue_spill_round: Option, +} + +impl FinalAggregateSharedState { + pub fn new(partition_count: usize) -> Self { + Self { + is_spilled: false, + last_round_is_spilled: false, + finished_count: 0, + repartitioned_queues: RepartitionedQueues::create(partition_count), + need_aggregate_queues: RepartitionedQueues::create(partition_count), + pending_queues: vec![], + next_round: Vec::with_capacity(partition_count), + current_queue_spill_round: None, + partition_count, + } + } + + pub fn add_repartitioned_queue(&mut self, queues: RepartitionedQueues) { + self.repartitioned_queues.merge_queues(queues); + + self.finished_count += 1; + if self.finished_count == self.partition_count { + self.finished_count = 0; + + let previous_spill_round = self.current_queue_spill_round.take(); + self.last_round_is_spilled = self.is_spilled; + + let queues = self.repartitioned_queues.take_queues(); + + if !self.is_spilled { + self.need_aggregate_queues = queues; + } else { + self.is_spilled = false; + + // flush all repartitioned queues to pending queues + for queue in queues.0.into_iter() { + if queue.is_empty() { + continue; + } + self.pending_queues.push(PendingQueue { + data: queue, + spill_round: previous_spill_round.unwrap_or(0) + 1, + }); + } + } + + if self.next_round.is_empty() { + if let Some(queue) = self.pending_queues.pop() { + self.current_queue_spill_round = Some(queue.spill_round); + self.next_round = + split_partitioned_meta_into_datablocks(0, queue.data, self.partition_count); + } + } + } + } + + pub fn get_next_datablock(&mut self) -> Option<(DataBlock, usize)> { + match self.next_round.pop() { + Some(block) => Some((block, self.current_queue_spill_round.unwrap_or(0))), + None => { + self.current_queue_spill_round = None; + None + } + } + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs new file mode 100644 index 0000000000000..03f620bf80cee --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -0,0 +1,468 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::borrow::BorrowMut; +use std::sync::Arc; + +use bumpalo::Bump; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::AggregateHashTable; +use databend_common_expression::DataBlock; +use databend_common_expression::HashTableConfig; +use databend_common_expression::PartitionedPayload; +use databend_common_expression::PayloadFlushState; +use databend_common_pipeline_core::processors::Event; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::Processor; +use log::debug; +use parking_lot::Mutex; +use tokio::sync::Barrier; + +use super::new_aggregate_spiller::NewAggregateSpiller; +use super::new_final_aggregate_state::FinalAggregateSharedState; +use super::new_final_aggregate_state::LocalRoundState; +use super::new_final_aggregate_state::RepartitionedQueues; +use super::new_final_aggregate_state::RoundPhase; +use crate::pipelines::processors::transforms::aggregator::AggregateMeta; +use crate::pipelines::processors::transforms::aggregator::AggregatePayload; +use crate::pipelines::processors::transforms::aggregator::AggregatorParams; + +pub struct NewFinalAggregateTransform { + input: Arc, + output: Arc, + id: usize, + partition_count: usize, + + /// final aggregate + params: Arc, + flush_state: PayloadFlushState, + + /// storing repartition result + repartitioned_queues: RepartitionedQueues, + + /// schedule + round_state: LocalRoundState, + barrier: Arc, + shared_state: Arc>, + + /// spill + spiller: NewAggregateSpiller, +} + +impl NewFinalAggregateTransform { + pub fn try_create( + input: Arc, + output: Arc, + id: usize, + params: Arc, + partition_count: usize, + barrier: Arc, + shared_state: Arc>, + spiller: NewAggregateSpiller, + max_aggregate_spill_level: usize, + ) -> Result> { + let round_state = LocalRoundState::new(max_aggregate_spill_level); + Ok(Box::new(NewFinalAggregateTransform { + input, + output, + id, + partition_count, + params, + flush_state: PayloadFlushState::default(), + round_state, + repartitioned_queues: RepartitionedQueues::create(partition_count), + barrier, + shared_state, + spiller, + })) + } + + /// Repartition the given AggregateMeta into `partition_count` partitions + /// in aggregate stage, `partition_count` processors will handle each partition respectively. + fn repartition(&mut self, meta: AggregateMeta) -> Result<()> { + // Step 1: normalize input into a single Payload to scatter. + let mut src_payload = match meta { + // Deserialize into a hashtable with radix_bits = 0. This yields a single payload. + AggregateMeta::Serialized(payload) => { + let p = payload.convert_to_partitioned_payload( + self.params.group_data_types.clone(), + self.params.aggregate_functions.clone(), + self.params.num_states(), + 0, + Arc::new(Bump::new()), + )?; + debug_assert_eq!(p.partition_count(), 1); + // Safe to unwrap due to partition_count == 1 + p.payloads.into_iter().next().unwrap() + } + // Already a single payload for one upstream bucket. + AggregateMeta::AggregatePayload(agg_payload) => agg_payload.payload, + _ => { + return Err(ErrorCode::Internal( + "Unexpected meta type for repartitioning", + )); + } + }; + + // Step 2: scatter this payload across all partitions using modulo hashing. + let arena = src_payload.arena.clone(); + let mut repartitioned = PartitionedPayload::new( + self.params.group_data_types.clone(), + self.params.aggregate_functions.clone(), + self.partition_count as u64, + vec![arena], + ); + + let mut state = PayloadFlushState::default(); + while src_payload.scatter(&mut state, self.partition_count) { + for partition_id in 0..self.partition_count { + let count = state.probe_state.partition_count[partition_id]; + if count == 0 { + continue; + } + + let sel = &state.probe_state.partition_entries[partition_id]; + repartitioned.payloads[partition_id].copy_rows(sel, count, &state.addresses); + } + } + // Avoid double drop of states moved into new payloads. + src_payload.state_move_out = true; + + // Step 3: enqueue into per-partition queues. + let mut new_produced = RepartitionedQueues::create(self.partition_count); + for (partition_id, payload) in repartitioned.payloads.into_iter().enumerate() { + if payload.len() == 0 { + continue; + } + let meta = AggregateMeta::AggregatePayload(AggregatePayload { + bucket: partition_id as isize, + payload, + max_partition_count: self.partition_count, + }); + new_produced.push_to_queue(partition_id, meta); + } + + // if spill already triggered, local repartition queue is all spilled out + // we only need to spill the new produced repartitioned queues out + if self.round_state.is_spilled { + // when no more task, we need to finalize the partition stream + if self.round_state.working_queue.is_empty() { + self.spill(new_produced, true)?; + } else { + self.spill(new_produced, false)?; + } + return Ok(()); + } + + // merge new produced repartitioned queues into local repartitioned queues + self.repartitioned_queues.merge_queues(new_produced); + + // if the queue is triggered spill and repartition too many times, considering performance affect, we may not + // continue to trigger spill + let can_trigger_spill = + self.round_state.current_queue_spill_round < self.round_state.max_aggregate_spill_level; + let need_spill = self.spiller.memory_settings.check_spill(); + + if !can_trigger_spill { + if need_spill { + debug!( + "NewFinalAggregateTransform[{}] skip spill after {} rounds", + self.id, self.round_state.current_queue_spill_round + ); + } + return Ok(()); + } + + if need_spill { + debug!( + "NewFinalAggregateTransform[{}] trigger spill due to memory limit, spilled round {}", + self.id, self.round_state.current_queue_spill_round + ); + self.shared_state.lock().is_spilled = true; + } + + // if other processor or itself trigger spill, this processor will need spill its local repartitioned queue out + if self.shared_state.lock().is_spilled + && !self.round_state.is_spilled + && !self.round_state.working_queue.is_empty() + { + self.round_state.is_spilled = true; + let queues = self.repartitioned_queues.take_queues(); + self.spill(queues, false)?; + } + + Ok(()) + } + + fn push_output(&mut self) -> Result { + if let RoundPhase::OutputReady(data_block) = self.round_state.take_phase() { + self.output.push_data(Ok(data_block)); + Ok(Event::NeedConsume) + } else { + Err(ErrorCode::Internal( + "NewFinalAggregateTransform output called in invalid state", + )) + } + } + + fn final_aggregate(&mut self, mut queue: Vec) -> Result<()> { + let mut agg_hashtable: Option = None; + + while let Some(meta) = queue.pop() { + match meta { + AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() { + Some(ht) => { + let payload = payload.convert_to_partitioned_payload( + self.params.group_data_types.clone(), + self.params.aggregate_functions.clone(), + self.params.num_states(), + 0, + Arc::new(Bump::new()), + )?; + ht.combine_payloads(&payload, &mut self.flush_state)?; + } + None => { + agg_hashtable = Some(payload.convert_to_aggregate_table( + self.params.group_data_types.clone(), + self.params.aggregate_functions.clone(), + self.params.num_states(), + 0, + Arc::new(Bump::new()), + true, + )?); + } + }, + AggregateMeta::AggregatePayload(payload) => match agg_hashtable.as_mut() { + Some(ht) => { + ht.combine_payload(&payload.payload, &mut self.flush_state)?; + } + None => { + let capacity = + AggregateHashTable::get_capacity_for_count(payload.payload.len()); + let mut hashtable = AggregateHashTable::new_with_capacity( + self.params.group_data_types.clone(), + self.params.aggregate_functions.clone(), + HashTableConfig::default().with_initial_radix_bits(0), + capacity, + Arc::new(Bump::new()), + ); + hashtable.combine_payload(&payload.payload, &mut self.flush_state)?; + agg_hashtable = Some(hashtable); + } + }, + _ => unreachable!(), + } + } + + let output_block = if let Some(mut ht) = agg_hashtable { + let mut blocks = vec![]; + self.flush_state.clear(); + + loop { + if ht.merge_result(&mut self.flush_state)? { + let mut entries = self.flush_state.take_aggregate_results(); + let group_columns = self.flush_state.take_group_columns(); + entries.extend_from_slice(&group_columns); + let num_rows = entries[0].len(); + blocks.push(DataBlock::new(entries, num_rows)); + } else { + break; + } + } + + if blocks.is_empty() { + self.params.empty_result_block() + } else { + DataBlock::concat(&blocks)? + } + } else { + self.params.empty_result_block() + }; + + if output_block.is_empty() { + self.round_state.phase = RoundPhase::Idle; + } else { + self.round_state.phase = RoundPhase::OutputReady(output_block); + } + + Ok(()) + } + + pub fn spill(&mut self, mut queues: RepartitionedQueues, finalize: bool) -> Result<()> { + for (id, queue) in queues.0.iter_mut().enumerate() { + while let Some(meta) = queue.pop() { + match meta { + AggregateMeta::AggregatePayload(AggregatePayload { payload, .. }) => { + let data_block = payload.aggregate_flush_all()?.consume_convert_to_full(); + self.spiller.spill(id, data_block)?; + } + _ => { + return Err(ErrorCode::Internal( + "NewAggregateSpiller expects AggregatePayload in repartitioned queue", + )); + } + } + } + } + + if finalize { + let spilled_payloads = self.spiller.spill_finish()?; + for payload in spilled_payloads { + self.repartitioned_queues.push_to_queue( + payload.bucket as usize, + AggregateMeta::NewBucketSpilled(payload), + ); + } + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Processor for NewFinalAggregateTransform { + fn name(&self) -> String { + "NewFinalAggregateTransform".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + let round_state = &mut self.round_state; + + if matches!(round_state.phase, RoundPhase::OutputReady(_)) { + return self.push_output(); + } + + if matches!(round_state.phase, RoundPhase::Aggregate) { + return Ok(Event::Sync); + } + + // schedule a task from local working queue first + if let Some(event) = round_state.schedule_next_task() { + return Ok(event); + } + + // no more task in local working queue, means we need report repartitioned queues to shared state + if !round_state.is_reported && round_state.first_data_ready { + return Ok(round_state.schedule_async_wait()); + } + + // after reported, try get datablock from shared state + let next_datablock = self.shared_state.lock().borrow_mut().get_next_datablock(); + if let Some((mut datablock, spill_round)) = next_datablock { + // begin a new round, reset spilled flag and reported flag + round_state.reset_for_new_round(spill_round); + + round_state.enqueue_partitioned_meta(&mut datablock)?; + + // schedule next task from working queue, if empty, begin to wait other processors + if let Some(event) = round_state.schedule_next_task() { + return Ok(event); + } else { + return Ok(round_state.schedule_async_wait()); + } + } + + // no more work from shared state, try pull data from input + if self.input.has_data() { + // begin a new round, reset spilled flag and reported flag + round_state.reset_for_new_round(0); + round_state.first_data_ready = true; + + let mut data_block = self.input.pull_data().unwrap()?; + round_state.enqueue_partitioned_meta(&mut data_block)?; + + // schedule next task from working queue, if empty, begin to wait other processors + if let Some(event) = round_state.schedule_next_task() { + return Ok(event); + } else { + return Ok(round_state.schedule_async_wait()); + } + } + + if self.input.is_finished() { + self.output.finish(); + return Ok(Event::Finished); + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + fn process(&mut self) -> Result<()> { + let phase = self.round_state.take_phase(); + match phase { + RoundPhase::NewTask(meta) => { + let meta = match meta { + AggregateMeta::NewBucketSpilled(p) => self.spiller.restore(p)?, + AggregateMeta::BucketSpilled(p) => self.spiller.restore_legacy(p)?, + other => other, + }; + self.repartition(meta)?; + + Ok(()) + } + RoundPhase::Aggregate => { + let queue = self + .shared_state + .lock() + .need_aggregate_queues + .take_queue(self.id); + self.final_aggregate(queue) + } + _ => Err(ErrorCode::Internal(format!( + "NewFinalAggregateTransform process called in {} state", + phase + ))), + } + } + + async fn async_process(&mut self) -> Result<()> { + let phase = self.round_state.take_phase(); + match phase { + RoundPhase::AsyncWait => { + // report local repartitioned queues to shared state + let queues = self.repartitioned_queues.take_queues(); + self.shared_state.lock().add_repartitioned_queue(queues); + + self.barrier.wait().await; + + // we can only begin aggregate when last round no processor spills + if !self.shared_state.lock().last_round_is_spilled { + self.round_state.phase = RoundPhase::Aggregate; + } + Ok(()) + } + _ => Err(ErrorCode::Internal( + "NewFinalAggregateTransform async_process called in invalid state", + ))?, + } + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/transform_partition_bucket_scatter.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/transform_partition_bucket_scatter.rs similarity index 99% rename from src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/transform_partition_bucket_scatter.rs rename to src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/transform_partition_bucket_scatter.rs index ddae6d268ea71..28658c84352fa 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/transform_partition_bucket_scatter.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/transform_partition_bucket_scatter.rs @@ -294,6 +294,7 @@ impl TransformPartitionBucketScatter { (payload.bucket, payload.max_partition_count) } + AggregateMeta::NewBucketSpilled(_) => unreachable!(), }; } else { return Err(ErrorCode::Internal(format!( @@ -460,7 +461,7 @@ impl TransformPartitionBucketScatter { #[async_trait::async_trait] impl Processor for TransformPartitionBucketScatter { fn name(&self) -> String { - String::from("TransformPartitionBucket") + String::from("TransformPartitionBucketScatter") } fn as_any(&mut self) -> &mut dyn Any { @@ -569,6 +570,7 @@ impl Processor for TransformPartitionBucketScatter { AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::AggregateSpilling(_) => unreachable!(), AggregateMeta::BucketSpilled(_) => unreachable!(), + AggregateMeta::NewBucketSpilled(_) => unreachable!(), AggregateMeta::Serialized(payload) => self.partition_block(payload)?, AggregateMeta::AggregatePayload(payload) => self.partition_payload(payload)?, }; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/final_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/final_aggregate_spiller.rs deleted file mode 100644 index 7751de7c6e0ed..0000000000000 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/final_aggregate_spiller.rs +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; -use std::time::Instant; - -use databend_common_base::base::ProgressValues; -use databend_common_base::runtime::profile::Profile; -use databend_common_base::runtime::profile::ProfileStatisticsName; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::arrow::deserialize_column; -use databend_common_expression::arrow::serialize_column; -use databend_common_expression::DataBlock; -use databend_common_pipeline_transforms::MemorySettings; -use log::info; -use opendal::layers::BlockingLayer; -use opendal::BlockingOperator; -use opendal::Operator; - -use crate::pipelines::memory_settings::MemorySettingsExt; -use crate::pipelines::processors::transforms::aggregator::AggregateMeta; -use crate::pipelines::processors::transforms::aggregator::BucketSpilledPayload; -use crate::pipelines::processors::transforms::aggregator::SerializedPayload; -use crate::sessions::QueryContext; -use crate::spillers::Spiller; -use crate::spillers::SpillerConfig; -use crate::spillers::SpillerType; -pub struct FinalAggregateSpiller { - pub spiller: Spiller, - pub blocking_operator: BlockingOperator, - #[allow(dead_code)] - pub memory_settings: MemorySettings, - pub ctx: Arc, -} - -impl FinalAggregateSpiller { - pub fn try_create(ctx: Arc, operator: Operator) -> Result { - let memory_settings = MemorySettings::from_aggregate_settings(&ctx)?; - - let location_prefix = ctx.query_id_spill_prefix(); - - let config = SpillerConfig { - spiller_type: SpillerType::Aggregation, - location_prefix, - disk_spill: None, - use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(), - }; - let spiller = Spiller::create(ctx.clone(), operator.clone(), config)?; - - let blocking_operator = operator.layer(BlockingLayer::create()?).blocking(); - - Ok(Self { - spiller, - blocking_operator, - memory_settings, - ctx, - }) - } - - pub fn restore(&self, payload: BucketSpilledPayload) -> Result { - // read - let instant = Instant::now(); - let data = self - .blocking_operator - .read_with(&payload.location) - .range(payload.data_range.clone()) - .call()? - .to_vec(); - - self.record_read_profile(&instant, data.len()); - - // deserialize - let mut begin = 0; - let mut columns = Vec::with_capacity(payload.columns_layout.len()); - for &column_layout in &payload.columns_layout { - columns.push(deserialize_column( - &data[begin..begin + column_layout as usize], - )?); - begin += column_layout as usize; - } - - Ok(AggregateMeta::Serialized(SerializedPayload { - bucket: payload.bucket, - data_block: DataBlock::new_from_columns(columns), - max_partition_count: payload.max_partition_count, - })) - } - - pub fn spill(&self, id: usize, data_block: DataBlock) -> Result { - let rows = data_block.num_rows(); - let mut columns_layout = Vec::with_capacity(data_block.num_columns()); - let mut columns_data = Vec::with_capacity(data_block.num_columns()); - - for entry in data_block.columns() { - let column = entry.as_column().ok_or_else(|| { - ErrorCode::Internal("Unexpected scalar when spilling aggregate data") - })?; - let column_data = serialize_column(column); - columns_layout.push(column_data.len() as u64); - columns_data.push(column_data); - } - - let location = self.spiller.create_unique_location(); - - let instant = Instant::now(); - - let mut writer = self - .blocking_operator - .writer_with(&location) - .chunk(8 * 1024 * 1024) - .call()?; - - let mut write_bytes = 0; - for data in columns_data.into_iter() { - write_bytes += data.len(); - writer.write(data)?; - } - - writer.close()?; - - self.spiller - .add_aggregate_spill_file(&location, write_bytes); - - self.record_write_profile(&instant, rows, write_bytes); - - info!( - "Write aggregate spill {} successfully, elapsed: {:?}", - location, - instant.elapsed() - ); - - let payload = BucketSpilledPayload { - bucket: id as isize, - location, - data_range: 0..write_bytes as u64, - columns_layout, - max_partition_count: 0, - }; - - Ok(payload) - } - - fn record_read_profile(&self, instant: &Instant, read_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillReadTime, - instant.elapsed().as_millis() as usize, - ); - } - - fn record_write_profile(&self, instant: &Instant, rows: usize, write_bytes: usize) { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteBytes, write_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillWriteTime, - instant.elapsed().as_millis() as usize, - ); - - let progress_val = ProgressValues { - rows, - bytes: write_bytes, - }; - self.ctx.get_aggregate_spill_progress().incr(&progress_val); - } -} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/new_transform_final_aggregate.rs deleted file mode 100644 index de673678505bd..0000000000000 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_final_aggregate/new_transform_final_aggregate.rs +++ /dev/null @@ -1,602 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::borrow::BorrowMut; -use std::fmt::Display; -use std::sync::Arc; - -use bumpalo::Bump; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::AggregateHashTable; -use databend_common_expression::BlockMetaInfoDowncast; -use databend_common_expression::BlockPartitionStream; -use databend_common_expression::DataBlock; -use databend_common_expression::HashTableConfig; -use databend_common_expression::PartitionedPayload; -use databend_common_expression::PayloadFlushState; -use databend_common_pipeline_core::processors::Event; -use databend_common_pipeline_core::processors::InputPort; -use databend_common_pipeline_core::processors::OutputPort; -use databend_common_pipeline_core::processors::Processor; -use parking_lot::Mutex; -use tokio::sync::Barrier; - -use super::final_aggregate_spiller::FinalAggregateSpiller; -use super::split_partitioned_meta_into_datablocks; -use crate::pipelines::processors::transforms::aggregator::AggregateMeta; -use crate::pipelines::processors::transforms::aggregator::AggregatePayload; -use crate::pipelines::processors::transforms::aggregator::AggregatorParams; -use crate::sessions::QueryContext; - -pub struct RepartitionedQueues(Vec>); - -impl RepartitionedQueues { - pub fn create(partition_count: usize) -> Self { - let queues = (0..partition_count).map(|_| Vec::new()).collect(); - Self(queues) - } - - pub fn take_queues(&mut self) -> Self { - let partition_count = self.0.len(); - std::mem::replace(self, Self::create(partition_count)) - } - - pub fn take_queue(&mut self, partition_idx: usize) -> Vec { - std::mem::take(&mut self.0[partition_idx]) - } - - pub fn merge_queues(&mut self, other: Self) { - for (idx, mut queue) in other.0.into_iter().enumerate() { - self.0[idx].append(&mut queue); - } - } - - pub fn push_to_queue(&mut self, partition_idx: usize, meta: AggregateMeta) { - self.0[partition_idx].push(meta); - } -} - -pub enum LocalState { - Idle, - NewTask(AggregateMeta), - OutputReady(DataBlock), - Aggregate, - AsyncWait, -} - -impl Display for LocalState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - LocalState::Idle => write!(f, "Idle"), - LocalState::NewTask(_) => write!(f, "NewTask"), - LocalState::OutputReady(_) => write!(f, "OutputReady"), - LocalState::AsyncWait => write!(f, "AsyncWait"), - LocalState::Aggregate => write!(f, "Aggregate"), - } - } -} - -pub struct FinalAggregateSharedState { - is_spilled: bool, - last_round_is_spilled: bool, - finished_count: usize, - repartitioned_queues: RepartitionedQueues, - next_round: Vec, - pending_queues: Vec>, - partition_count: usize, -} - -impl FinalAggregateSharedState { - pub fn new(partition_count: usize) -> Self { - Self { - is_spilled: false, - last_round_is_spilled: false, - finished_count: 0, - repartitioned_queues: RepartitionedQueues::create(partition_count), - next_round: Vec::with_capacity(partition_count), - pending_queues: vec![], - partition_count, - } - } - - pub fn add_repartitioned_queue(&mut self, queues: RepartitionedQueues) { - self.repartitioned_queues.merge_queues(queues); - - self.finished_count += 1; - if self.finished_count == self.partition_count { - self.finished_count = 0; - self.last_round_is_spilled = self.is_spilled; - if self.is_spilled { - self.is_spilled = false; - // flush all repartitioned queues to pending queues - let queues = self.repartitioned_queues.take_queues(); - for queue in queues.0.into_iter() { - if queue.is_empty() { - continue; - } - self.pending_queues.push(queue); - } - } - - // pop a queue and repartition in datablock level - if let Some(queue) = self.pending_queues.pop() { - self.next_round = - split_partitioned_meta_into_datablocks(0, queue, self.partition_count); - } - } - } - - pub fn get_next_datablock(&mut self) -> Option { - self.next_round.pop() - } -} - -pub struct NewFinalAggregateTransform { - input: Arc, - output: Arc, - id: usize, - params: Arc, - flush_state: PayloadFlushState, - working_queue: Vec, - state: LocalState, - partition_count: usize, - repartitioned_queues: RepartitionedQueues, - barrier: Arc, - shared_state: Arc>, - first_data_ready: bool, - spiller: FinalAggregateSpiller, - block_partition_stream: BlockPartitionStream, - is_spilled: bool, - is_reported: bool, -} - -impl NewFinalAggregateTransform { - pub fn try_create( - input: Arc, - output: Arc, - id: usize, - params: Arc, - partition_count: usize, - barrier: Arc, - shared_state: Arc>, - spiller: FinalAggregateSpiller, - ctx: Arc, - ) -> Result> { - let block_bytes = ctx.get_settings().get_max_block_bytes()? as usize; - let block_partition_stream = BlockPartitionStream::create(0, block_bytes, partition_count); - - Ok(Box::new(NewFinalAggregateTransform { - input, - output, - id, - params, - flush_state: PayloadFlushState::default(), - working_queue: vec![], - state: LocalState::Idle, - partition_count, - repartitioned_queues: RepartitionedQueues::create(partition_count), - barrier, - shared_state, - first_data_ready: false, - spiller, - block_partition_stream, - is_spilled: false, - is_reported: false, - })) - } - - /// Repartition the given AggregateMeta into `partition_count` partitions - /// in aggregate stage, `partition_count` processors will handle each partition respectively. - fn repartition(&mut self, meta: AggregateMeta) -> Result<()> { - let mut flush_state = PayloadFlushState::default(); - - let partition_payload = match meta { - AggregateMeta::Serialized(payload) => payload.convert_to_partitioned_payload( - self.params.group_data_types.clone(), - self.params.aggregate_functions.clone(), - self.params.num_states(), - 0, - Arc::new(Bump::new()), - )?, - AggregateMeta::AggregatePayload(agg_payload) => { - let payload = agg_payload.payload; - let arena = payload.arena.clone(); - let mut partitioned = PartitionedPayload::new( - self.params.group_data_types.clone(), - self.params.aggregate_functions.clone(), - 1, - vec![arena], - ); - partitioned.combine_single(payload, &mut flush_state, None); - partitioned - } - _ => { - return Err(ErrorCode::Internal( - "Unexpected meta type for repartitioning", - )); - } - }; - - let repartitioned = partition_payload.repartition(self.partition_count, &mut flush_state); - let mut new_produced = RepartitionedQueues::create(self.partition_count); - for (partition_id, payload) in repartitioned.payloads.into_iter().enumerate() { - if payload.len() == 0 { - continue; - } - - let meta = AggregateMeta::AggregatePayload(AggregatePayload { - bucket: partition_id as isize, - payload, - max_partition_count: self.partition_count, - }); - - new_produced.push_to_queue(partition_id, meta); - } - - // if spill already triggered, local repartition queue is all spilled out - // we only need to spill the new produced repartitioned queues out - if self.is_spilled { - // when no more task, we need to finalize the partition stream - if self.working_queue.is_empty() { - self.spill(new_produced, true)?; - } else { - self.spill(new_produced, false)?; - } - return Ok(()); - } - - // merge new produced repartitioned queues into local repartitioned queues - self.repartitioned_queues.merge_queues(new_produced); - - // if self.spiller.memory_settings.check_spill() { - // info!( - // "NewFinalAggregateTransform[{}] trigger spill due to memory limit", - // self.id - // ); - // self.shared_state.lock().is_spilled = true; - // } - - // if other processor or itself trigger spill, this processor will need spill its local repartitioned queue out - if self.shared_state.lock().is_spilled && !self.is_spilled && !self.working_queue.is_empty() - { - self.is_spilled = true; - let queues = self.repartitioned_queues.take_queues(); - self.spill(queues, false)?; - } - - Ok(()) - } - - fn push_output(&mut self) -> Result { - if let LocalState::OutputReady(data_block) = - std::mem::replace(&mut self.state, LocalState::Idle) - { - self.output.push_data(Ok(data_block)); - Ok(Event::NeedConsume) - } else { - Err(ErrorCode::Internal( - "NewFinalAggregateTransform output called in invalid state", - )) - } - } - - fn final_aggregate(&mut self, mut queue: Vec) -> Result<()> { - let total_row_count = queue - .iter() - .map(|meta| match meta { - AggregateMeta::Serialized(payload) => payload.data_block.num_rows(), - AggregateMeta::AggregatePayload(payload) => payload.payload.len(), - _ => 0, - }) - .sum::(); - - let mut agg_hashtable = if total_row_count > 0 { - let capacity = AggregateHashTable::get_capacity_for_count(total_row_count); - Some(AggregateHashTable::new_with_capacity( - self.params.group_data_types.clone(), - self.params.aggregate_functions.clone(), - HashTableConfig::default().with_initial_radix_bits(0), - capacity, - Arc::new(Bump::new()), - )) - } else { - None - }; - - if let Some(ht) = agg_hashtable.as_mut() { - while let Some(meta) = queue.pop() { - match meta { - AggregateMeta::Serialized(payload) => { - let partitioned = payload.convert_to_partitioned_payload( - self.params.group_data_types.clone(), - self.params.aggregate_functions.clone(), - self.params.num_states(), - 0, - Arc::new(Bump::new()), - )?; - ht.combine_payloads(&partitioned, &mut self.flush_state)?; - } - AggregateMeta::AggregatePayload(payload) => { - ht.combine_payload(&payload.payload, &mut self.flush_state)?; - } - _ => { - return Err(ErrorCode::Internal( - "Unexpected meta type in aggregate queue when final aggregate", - )); - } - } - } - } - - let output_block = if let Some(mut ht) = agg_hashtable { - let mut blocks = vec![]; - self.flush_state.clear(); - - loop { - if ht.merge_result(&mut self.flush_state)? { - let mut entries = self.flush_state.take_aggregate_results(); - let group_columns = self.flush_state.take_group_columns(); - entries.extend_from_slice(&group_columns); - let num_rows = entries[0].len(); - blocks.push(DataBlock::new(entries, num_rows)); - } else { - break; - } - } - - if blocks.is_empty() { - self.params.empty_result_block() - } else { - DataBlock::concat(&blocks)? - } - } else { - self.params.empty_result_block() - }; - - if output_block.is_empty() { - self.state = LocalState::Idle; - } else { - self.state = LocalState::OutputReady(output_block); - } - - Ok(()) - } - - pub fn spill(&mut self, mut queues: RepartitionedQueues, finalize: bool) -> Result<()> { - let mut ready_blocks: Vec> = vec![vec![]; self.partition_count]; - - for (id, queue) in queues.0.iter_mut().enumerate() { - while let Some(meta) = queue.pop() { - match meta { - AggregateMeta::AggregatePayload(AggregatePayload { payload, .. }) => { - let data_block = payload.aggregate_flush_all()?; - let indices = vec![id as u64; data_block.num_rows()]; - let blocks = self - .block_partition_stream - .partition(indices, data_block, true); - for (part_id, block) in blocks.into_iter() { - ready_blocks[part_id].push(block); - } - } - _ => { - return Err(ErrorCode::Internal( - "FinalAggregateSpiller expects AggregatePayload in repartitioned queue", - )); - } - } - } - } - - if finalize { - for (id, ready_block) in ready_blocks - .iter_mut() - .enumerate() - .take(self.partition_count) - { - if let Some(block) = self.block_partition_stream.finalize_partition(id) { - ready_block.push(block); - } - } - } - - for (partition_id, blocks) in ready_blocks.into_iter().enumerate() { - if blocks.is_empty() { - continue; - } - for block in blocks { - let bucket_spilled_payload = self.spiller.spill(partition_id, block)?; - self.repartitioned_queues.push_to_queue( - partition_id, - AggregateMeta::BucketSpilled(bucket_spilled_payload), - ); - } - } - - Ok(()) - } - - fn enqueue_partitioned_meta(&mut self, datablock: &mut DataBlock) -> Result<()> { - if let Some(block_meta) = datablock.take_meta().and_then(AggregateMeta::downcast_from) { - match block_meta { - AggregateMeta::Partitioned { data, .. } => { - self.working_queue.extend(data); - } - _ => { - return Err(ErrorCode::Internal( - "NewFinalAggregateTransform expects Partitioned AggregateMeta from upstream", - )); - } - } - } - Ok(()) - } - - fn schedule_next_task(&mut self) -> Option { - self.working_queue.pop().map(|aggregate_meta| { - self.state = LocalState::NewTask(aggregate_meta); - Event::Sync - }) - } - - fn schedule_async_wait(&mut self) -> Event { - self.is_reported = true; - self.state = LocalState::AsyncWait; - Event::Async - } - - fn debug_event(&mut self) -> Result { - if self.output.is_finished() { - self.input.finish(); - return Ok(Event::Finished); - } - - if !self.output.can_push() { - self.input.set_not_need_data(); - return Ok(Event::NeedConsume); - } - - match self.state { - LocalState::OutputReady(_) => return self.push_output(), - LocalState::Aggregate => return Ok(Event::Sync), - _ => {} - } - - // schedule a task from local working queue first - if let Some(event) = self.schedule_next_task() { - return Ok(event); - } - - // no more task in local working queue, means we need report repartitioned queues to shared state - if !self.is_reported && self.first_data_ready { - return Ok(self.schedule_async_wait()); - } - - // after reported, try get datablock from shared state - let datablock_opt = self.shared_state.lock().borrow_mut().get_next_datablock(); - if let Some(mut datablock) = datablock_opt { - // begin a new round, reset spilled flag and reported flag - self.is_spilled = false; - self.is_reported = false; - - self.enqueue_partitioned_meta(&mut datablock)?; - - // schedule next task from working queue, if empty, begin to wait other processors - if let Some(event) = self.schedule_next_task() { - return Ok(event); - } else { - return Ok(self.schedule_async_wait()); - } - } - - // no more work from shared state, try pull data from input - if self.input.has_data() { - // begin a new round, reset spilled flag and reported flag - self.is_spilled = false; - self.is_reported = false; - - self.first_data_ready = true; - - let mut data_block = self.input.pull_data().unwrap()?; - self.enqueue_partitioned_meta(&mut data_block)?; - - // schedule next task from working queue, if empty, begin to wait other processors - if let Some(event) = self.schedule_next_task() { - return Ok(event); - } else { - return Ok(self.schedule_async_wait()); - } - } - - if self.input.is_finished() { - self.output.finish(); - return Ok(Event::Finished); - } - - self.input.set_need_data(); - Ok(Event::NeedData) - } -} - -#[async_trait::async_trait] -impl Processor for NewFinalAggregateTransform { - fn name(&self) -> String { - "NewFinalAggregateTransform".to_string() - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - fn event(&mut self) -> Result { - let _before_state = self.state.to_string(); - let event = self.debug_event()?; - let _after_state = self.state.to_string(); - // info!( - // "NewFinalAggregateTransform[{}] return event: {:?}, state: {} -> {}", - // self.id, event, before_state, after_state - // ); - Ok(event) - } - - fn process(&mut self) -> Result<()> { - let state = std::mem::replace(&mut self.state, LocalState::Idle); - match state { - LocalState::NewTask(meta) => { - let meta = match meta { - AggregateMeta::BucketSpilled(p) => self.spiller.restore(p)?, - other => other, - }; - self.repartition(meta)?; - - Ok(()) - } - LocalState::Aggregate => { - let queue = self - .shared_state - .lock() - .repartitioned_queues - .take_queue(self.id); - self.final_aggregate(queue) - } - _ => Err(ErrorCode::Internal(format!( - "NewFinalAggregateTransform process called in {} state", - state - ))), - } - } - - async fn async_process(&mut self) -> Result<()> { - let state = std::mem::replace(&mut self.state, LocalState::Idle); - match state { - LocalState::AsyncWait => { - // report local repartitioned queues to shared state - let queues = self.repartitioned_queues.take_queues(); - self.shared_state.lock().add_repartitioned_queue(queues); - - self.barrier.wait().await; - - // we can only begin aggregate when last round no processor spills - if !self.shared_state.lock().last_round_is_spilled { - self.state = LocalState::Aggregate; - } - Ok(()) - } - _ => Err(ErrorCode::Internal( - "NewFinalAggregateTransform async_process called in invalid state", - ))?, - } - } -} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs index fd03b09e2f3f7..b0208d4d7389c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -162,6 +162,7 @@ impl Processor for TransformSpillReader { self.deserialized_meta = Some(AggregateMeta::create_partitioned(bucket, new_data)); } + AggregateMeta::NewBucketSpilled(_) => unreachable!(), } } @@ -176,6 +177,7 @@ impl Processor for TransformSpillReader { AggregateMeta::AggregatePayload(_) => unreachable!(), AggregateMeta::AggregateSpilling(_) => unreachable!(), AggregateMeta::Serialized(_) => unreachable!(), + AggregateMeta::NewBucketSpilled(_) => unreachable!(), AggregateMeta::BucketSpilled(payload) => { let _guard = self.semaphore.acquire().await; let instant = Instant::now(); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs index f856c17bbe135..9b3df2c2c14c0 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs @@ -71,25 +71,25 @@ impl TransformPartialAggregate { params: Arc, config: HashTableConfig, ) -> Result> { - let hash_table = { - let arena = Arc::new(Bump::new()); - match !params.has_distinct_combinator() { - true => HashTable::AggregateHashTable(AggregateHashTable::new( - params.group_data_types.clone(), - params.aggregate_functions.clone(), - config, - arena, - )), - false => { - let max_radix_bits = config.max_radix_bits; - HashTable::AggregateHashTable(AggregateHashTable::new( - params.group_data_types.clone(), - params.aggregate_functions.clone(), - config.with_initial_radix_bits(max_radix_bits), - arena, - )) - } - } + let arena = Arc::new(Bump::new()); + // when enable_experiment_aggregate, we will repartition again in the final stage + // it will be too small if we use max radix bits here + let hash_table = if params.has_distinct_combinator() && !params.enable_experiment_aggregate + { + let max_radix_bits = config.max_radix_bits; + HashTable::AggregateHashTable(AggregateHashTable::new( + params.group_data_types.clone(), + params.aggregate_functions.clone(), + config.with_initial_radix_bits(max_radix_bits), + arena, + )) + } else { + HashTable::AggregateHashTable(AggregateHashTable::new( + params.group_data_types.clone(), + params.aggregate_functions.clone(), + config, + arena, + )) }; Ok(AccumulatingTransformer::create( diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs index 3f0c7186eef83..287553b9decc7 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs @@ -237,6 +237,7 @@ impl TransformPartitionBucket { } unreachable!() } + AggregateMeta::NewBucketSpilled(_) => unreachable!(), AggregateMeta::Spilled(_) => { let meta = data_block.take_meta().unwrap(); @@ -545,6 +546,7 @@ impl Processor for TransformPartitionBucket { AggregateMeta::Partitioned { .. } => unreachable!(), AggregateMeta::AggregateSpilling(_) => unreachable!(), AggregateMeta::BucketSpilled(_) => unreachable!(), + AggregateMeta::NewBucketSpilled(_) => unreachable!(), AggregateMeta::Serialized(payload) => self.partition_block(payload)?, AggregateMeta::AggregatePayload(payload) => self.partition_payload(payload)?, }; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 4947e071f0242..ce0042a36505d 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1490,7 +1490,6 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), - ("s3_storage_class", DefaultSettingValue { value: { let storage_class = Self::extract_s3_storage_class_config(&global_conf).unwrap_or_default(); @@ -1501,13 +1500,20 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::String(vec![S3StorageClass::Standard.to_string(), S3StorageClass::IntelligentTiering.to_string()])), }), - ("enable_experiment_aggregate_final", DefaultSettingValue { + ("enable_experiment_aggregate", DefaultSettingValue { value: UserSettingValue::UInt64(0), - desc: "Enable experiment aggregate final, default is 0, 1 for enable", + desc: "Enable experiment aggregate, default is 0, 1 for enable", mode: SettingMode::Both, scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("max_aggregate_spill_level", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Maximum recursion depth for the aggregate spill. Each recursion level repartition data into `num_cpu` smaller parts to ensure it fits in memory.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=16)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 67b4733edf283..0203cf7b19b5c 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -1107,7 +1107,11 @@ impl Settings { }) } - pub fn get_enable_experiment_aggregate_final(&self) -> Result { - Ok(self.try_get_u64("enable_experiment_aggregate_final")? != 0) + pub fn get_enable_experiment_aggregate(&self) -> Result { + Ok(self.try_get_u64("enable_experiment_aggregate")? != 0) + } + + pub fn get_max_aggregate_spill_level(&self) -> Result { + self.try_get_u64("max_aggregate_spill_level") } }