Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ impl IPhysicalPlan for AggregateFinal {
.get_enable_experimental_aggregate_hashtable()?;
let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?;
let max_restore_worker = builder.settings.get_max_aggregate_restore_worker()?;
let experiment_aggregate_final =
builder.settings.get_enable_experiment_aggregate_final()?;
let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?;

let mut is_cluster_aggregate = false;
if ExchangeSource::check_physical_plan(&self.input) {
Expand All @@ -164,6 +163,7 @@ impl IPhysicalPlan for AggregateFinal {
is_cluster_aggregate,
max_block_size as usize,
max_spill_io_requests as usize,
enable_experiment_aggregate,
)?;

if params.group_columns.is_empty() {
Expand Down Expand Up @@ -201,7 +201,6 @@ impl IPhysicalPlan for AggregateFinal {
params.clone(),
max_restore_worker,
after_group_parallel,
experiment_aggregate_final,
builder.ctx.clone(),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ impl IPhysicalPlan for AggregatePartial {
.settings
.get_enable_experimental_aggregate_hashtable()?;

let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?;

let params = PipelineBuilder::build_aggregator_params(
self.input.output_schema()?,
&self.group_by,
Expand All @@ -186,6 +188,7 @@ impl IPhysicalPlan for AggregatePartial {
builder.is_exchange_parent(),
max_block_size as usize,
max_spill_io_requests as usize,
enable_experiment_aggregate,
)?;

if params.group_columns.is_empty() {
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/builders/builder_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl PipelineBuilder {
cluster_aggregator: bool,
max_block_size: usize,
max_spill_io_requests: usize,
enable_experiment_aggregate: bool,
) -> Result<Arc<AggregatorParams>> {
let mut agg_args = Vec::with_capacity(agg_funcs.len());
let (group_by, group_data_types) = group_by
Expand Down Expand Up @@ -132,6 +133,7 @@ impl PipelineBuilder {
cluster_aggregator,
max_block_size,
max_spill_io_requests,
enable_experiment_aggregate,
)?;

log::debug!("aggregate states layout: {:?}", params.states_layout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl ExchangeSorting for AggregateExchangeSorting {
))),
Some(meta_info) => match meta_info {
AggregateMeta::Partitioned { .. } => unreachable!(),
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
AggregateMeta::Serialized(v) => {
compute_block_number(v.bucket, v.max_partition_count)
}
Expand Down Expand Up @@ -181,6 +182,7 @@ impl FlightScatter for HashTableHashScatter {
AggregateMeta::BucketSpilled(_) => unreachable!(),
AggregateMeta::Serialized(_) => unreachable!(),
AggregateMeta::Partitioned { .. } => unreachable!(),
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
AggregateMeta::AggregateSpilling(payload) => {
for p in scatter_partitioned_payload(payload, self.buckets)? {
blocks.push(DataBlock::empty_with_meta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_expression::PartitionedPayload;
use databend_common_expression::Payload;
use databend_common_expression::ProbeState;
use databend_common_expression::ProjectedBlock;
use parquet::file::metadata::RowGroupMetaData;

pub struct SerializedPayload {
pub bucket: isize,
Expand Down Expand Up @@ -116,6 +117,12 @@ pub struct BucketSpilledPayload {
pub max_partition_count: usize,
}

pub struct NewSpilledPayload {
pub bucket: isize,
pub location: String,
pub row_group: RowGroupMetaData,
}

pub struct AggregatePayload {
pub bucket: isize,
pub payload: Payload,
Expand All @@ -131,6 +138,7 @@ pub enum AggregateMeta {
Spilled(Vec<BucketSpilledPayload>),

Partitioned { bucket: isize, data: Vec<Self> },
NewBucketSpilled(NewSpilledPayload),
}

impl AggregateMeta {
Expand Down Expand Up @@ -173,6 +181,10 @@ impl AggregateMeta {
pub fn create_partitioned(bucket: isize, data: Vec<Self>) -> BlockMetaInfoPtr {
Box::new(AggregateMeta::Partitioned { data, bucket })
}

pub fn create_new_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr {
Box::new(AggregateMeta::NewBucketSpilled(payload))
}
}

impl serde::Serialize for AggregateMeta {
Expand Down Expand Up @@ -200,6 +212,9 @@ impl Debug for AggregateMeta {
}
AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilled").finish(),
AggregateMeta::BucketSpilled(_) => f.debug_struct("Aggregate::BucketSpilled").finish(),
AggregateMeta::NewBucketSpilled(_) => {
f.debug_struct("Aggregate::NewBucketSpilled").finish()
}
AggregateMeta::AggregatePayload(_) => {
f.debug_struct("AggregateMeta:AggregatePayload").finish()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct AggregatorParams {
pub cluster_aggregator: bool,
pub max_block_size: usize,
pub max_spill_io_requests: usize,

pub enable_experiment_aggregate: bool,
}

impl AggregatorParams {
Expand All @@ -54,6 +56,7 @@ impl AggregatorParams {
cluster_aggregator: bool,
max_block_size: usize,
max_spill_io_requests: usize,
enable_experiment_aggregate: bool,
) -> Result<Arc<AggregatorParams>> {
let states_layout = if !agg_funcs.is_empty() {
Some(get_states_layout(agg_funcs)?)
Expand All @@ -72,6 +75,7 @@ impl AggregatorParams {
cluster_aggregator,
max_block_size,
max_spill_io_requests,
enable_experiment_aggregate,
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
Expand All @@ -27,10 +28,10 @@ use parking_lot::Mutex;
use tokio::sync::Barrier;
use tokio::sync::Semaphore;

use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::FinalAggregateSharedState;
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::FinalAggregateSpiller;
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::NewFinalAggregateTransform;
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::TransformPartitionBucketScatter;
use crate::pipelines::processors::transforms::aggregator::new_aggregate::FinalAggregateSharedState;
use crate::pipelines::processors::transforms::aggregator::new_aggregate::NewAggregateSpiller;
use crate::pipelines::processors::transforms::aggregator::new_aggregate::NewFinalAggregateTransform;
use crate::pipelines::processors::transforms::aggregator::new_aggregate::TransformPartitionBucketScatter;
use crate::pipelines::processors::transforms::aggregator::transform_partition_bucket::TransformPartitionBucket;
use crate::pipelines::processors::transforms::aggregator::AggregatorParams;
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillReader;
Expand All @@ -43,8 +44,6 @@ fn build_partition_bucket_experimental(
after_worker: usize,
ctx: Arc<QueryContext>,
) -> Result<()> {
let operator = DataOperator::instance().spill_operator();

// PartitionedPayload only accept power of two partitions
let mut output_num = after_worker.next_power_of_two();
const MAX_PARTITION_COUNT: usize = 128;
Expand All @@ -71,8 +70,13 @@ fn build_partition_bucket_experimental(
let barrier = Arc::new(Barrier::new(output_num));
let shared_state = Arc::new(Mutex::new(FinalAggregateSharedState::new(output_num)));

let settings = ctx.get_settings();
let rows = settings.get_max_block_size()? as usize;
let bytes = settings.get_max_block_bytes()? as usize;
let max_aggregate_spill_level = settings.get_max_aggregate_spill_level()? as usize;

for id in 0..output_num {
let spiller = FinalAggregateSpiller::try_create(ctx.clone(), operator.clone())?;
let spiller = NewAggregateSpiller::try_create(ctx.clone(), output_num, rows, bytes)?;
let input_port = InputPort::create();
let output_port = OutputPort::create();
let processor = NewFinalAggregateTransform::try_create(
Expand All @@ -84,7 +88,7 @@ fn build_partition_bucket_experimental(
barrier.clone(),
shared_state.clone(),
spiller,
ctx.clone(),
max_aggregate_spill_level,
)?;
builder.add_transform(input_port, output_port, ProcessorPtr::create(processor));
}
Expand All @@ -103,8 +107,8 @@ fn build_partition_bucket_legacy(
) -> Result<()> {
let operator = DataOperator::instance().spill_operator();

let input_nums = pipeline.output_len();
let transform = TransformPartitionBucket::create(input_nums, params.clone())?;
let input_num = pipeline.output_len();
let transform = TransformPartitionBucket::create(input_num, params.clone())?;

let output = transform.get_output();
let inputs_port = transform.get_inputs();
Expand All @@ -115,7 +119,7 @@ fn build_partition_bucket_legacy(
vec![output],
)]));

pipeline.try_resize(std::cmp::min(input_nums, max_restore_worker as usize))?;
pipeline.try_resize(std::cmp::min(input_num, max_restore_worker as usize))?;
let semaphore = Arc::new(Semaphore::new(params.max_spill_io_requests));
pipeline.add_transform(|input, output| {
let operator = operator.clone();
Expand All @@ -133,17 +137,16 @@ fn build_partition_bucket_legacy(
Ok(())
}

/// Build partition bucket pipeline based on the experiment_aggregate_final flag.
/// Build partition bucket pipeline based on the experiment_aggregate flag.
/// Dispatches to either experimental or legacy implementation.
pub fn build_partition_bucket(
pipeline: &mut Pipeline,
params: Arc<AggregatorParams>,
max_restore_worker: u64,
after_worker: usize,
experiment_aggregate_final: bool,
ctx: Arc<QueryContext>,
) -> Result<()> {
if experiment_aggregate_final {
if params.enable_experiment_aggregate {
build_partition_bucket_experimental(pipeline, params, after_worker, ctx)
} else {
build_partition_bucket_legacy(pipeline, params, max_restore_worker, after_worker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod aggregate_exchange_injector;
mod aggregate_meta;
mod aggregator_params;
mod build_partition_bucket;
mod new_final_aggregate;
mod new_aggregate;
mod serde;
mod transform_aggregate_expand;
mod transform_aggregate_final;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

mod datablock_splitter;
mod final_aggregate_spiller;
mod new_aggregate_spiller;
mod new_final_aggregate_state;
mod new_transform_final_aggregate;
mod transform_partition_bucket_scatter;

pub use datablock_splitter::split_partitioned_meta_into_datablocks;
pub use final_aggregate_spiller::FinalAggregateSpiller;
pub use new_transform_final_aggregate::FinalAggregateSharedState;
pub use new_aggregate_spiller::NewAggregateSpiller;
pub use new_final_aggregate_state::FinalAggregateSharedState;
pub use new_transform_final_aggregate::NewFinalAggregateTransform;
pub use transform_partition_bucket_scatter::TransformPartitionBucketScatter;
Loading
Loading