Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/query/expression/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub(crate) const L2_CACHE_SIZE: usize = 1048576 / 2;
// Assume (1 << 20) + (1 << 19) = 1.5MB L3 cache per core (shared), divided by two because hyperthreading
pub(crate) const L3_CACHE_SIZE: usize = 1572864 / 2;

pub(crate) const MAX_RADIX_BITS: u64 = 7;
pub const MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM: u64 = 1 << MAX_RADIX_BITS;

#[derive(Clone, Debug)]
pub struct HashTableConfig {
// Max radix bits across all threads, this is a hint to repartition
Expand All @@ -77,7 +80,7 @@ impl Default for HashTableConfig {
Self {
current_max_radix_bits: Arc::new(AtomicU64::new(3)),
initial_radix_bits: 3,
max_radix_bits: 7,
max_radix_bits: MAX_RADIX_BITS,
repartition_radix_bits_incr: 2,
block_fill_factor: 1.8,
partial_agg: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ impl IPhysicalPlan for AggregateFinal {
}

fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
let max_block_size = builder.settings.get_max_block_size()?;
let max_block_rows = builder.settings.get_max_block_size()? as usize;
let max_block_bytes = builder.settings.get_max_block_bytes()? as usize;
let enable_experimental_aggregate_hashtable = builder
.settings
.get_enable_experimental_aggregate_hashtable()?;
Expand All @@ -161,9 +162,10 @@ impl IPhysicalPlan for AggregateFinal {
&self.agg_funcs,
enable_experimental_aggregate_hashtable,
is_cluster_aggregate,
max_block_size as usize,
max_spill_io_requests as usize,
enable_experiment_aggregate,
max_block_rows,
max_block_bytes,
)?;

if params.group_columns.is_empty() {
Expand Down
53 changes: 38 additions & 15 deletions src/query/service/src/physical_plans/physical_aggregate_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::HashTableConfig;
use databend_common_expression::LimitType;
use databend_common_expression::SortColumnDescription;
use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM;
use databend_common_functions::aggregates::AggregateFunctionFactory;
use databend_common_pipeline::core::ProcessorPtr;
use databend_common_pipeline_transforms::sorts::TransformSortPartial;
Expand All @@ -44,7 +45,9 @@ use crate::physical_plans::physical_plan::IPhysicalPlan;
use crate::physical_plans::physical_plan::PhysicalPlan;
use crate::physical_plans::physical_plan::PhysicalPlanMeta;
use crate::pipelines::processors::transforms::aggregator::AggregateInjector;
use crate::pipelines::processors::transforms::aggregator::NewTransformAggregateSpillWriter;
use crate::pipelines::processors::transforms::aggregator::PartialSingleStateAggregator;
use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream;
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillWriter;
use crate::pipelines::processors::transforms::aggregator::TransformPartialAggregate;
use crate::pipelines::PipelineBuilder;
Expand Down Expand Up @@ -169,7 +172,8 @@ impl IPhysicalPlan for AggregatePartial {
fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
self.input.build_pipeline(builder)?;

let max_block_size = builder.settings.get_max_block_size()?;
let max_block_rows = builder.settings.get_max_block_size()? as usize;
let max_block_bytes = builder.settings.get_max_block_bytes()? as usize;
let max_threads = builder.settings.get_max_threads()?;
let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?;

Expand All @@ -185,9 +189,10 @@ impl IPhysicalPlan for AggregatePartial {
&self.agg_funcs,
enable_experimental_aggregate_hashtable,
builder.is_exchange_parent(),
max_block_size as usize,
max_spill_io_requests as usize,
enable_experiment_aggregate,
max_block_rows,
max_block_bytes,
)?;

if params.group_columns.is_empty() {
Expand Down Expand Up @@ -241,19 +246,37 @@ impl IPhysicalPlan for AggregatePartial {
if !builder.is_exchange_parent() {
let operator = DataOperator::instance().spill_operator();
let location_prefix = builder.ctx.query_id_spill_prefix();

builder.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(
TransformAggregateSpillWriter::try_create(
builder.ctx.clone(),
input,
output,
operator.clone(),
params.clone(),
location_prefix.clone(),
)?,
))
})?;
if params.enable_experiment_aggregate {
let shared_partition_stream = SharedPartitionStream::new(
builder.main_pipeline.output_len(),
max_block_rows,
max_block_bytes,
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
);
builder.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(
NewTransformAggregateSpillWriter::try_create(
input,
output,
builder.ctx.clone(),
shared_partition_stream.clone(),
)?,
))
})?;
} else {
builder.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(
TransformAggregateSpillWriter::try_create(
builder.ctx.clone(),
input,
output,
operator.clone(),
params.clone(),
location_prefix.clone(),
)?,
))
})?;
}
}

builder.exchange_injector = AggregateInjector::create(builder.ctx.clone(), params.clone());
Expand Down
6 changes: 4 additions & 2 deletions src/query/service/src/pipelines/builders/builder_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ impl PipelineBuilder {
agg_funcs: &[AggregateFunctionDesc],
enable_experimental_aggregate_hashtable: bool,
cluster_aggregator: bool,
max_block_size: usize,
max_spill_io_requests: usize,
enable_experiment_aggregate: bool,
max_block_rows: usize,
max_block_bytes: usize,
) -> Result<Arc<AggregatorParams>> {
let mut agg_args = Vec::with_capacity(agg_funcs.len());
let (group_by, group_data_types) = group_by
Expand Down Expand Up @@ -131,9 +132,10 @@ impl PipelineBuilder {
&agg_args,
enable_experimental_aggregate_hashtable,
cluster_aggregator,
max_block_size,
max_spill_io_requests,
enable_experiment_aggregate,
max_block_rows,
max_block_bytes,
)?;

log::debug!("aggregate states layout: {:?}", params.states_layout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use databend_common_expression::DataBlock;
use databend_common_expression::PartitionedPayload;
use databend_common_expression::Payload;
use databend_common_expression::PayloadFlushState;
use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM;
use databend_common_pipeline::core::Pipeline;
use databend_common_pipeline::core::ProcessorPtr;
use databend_common_settings::FlightCompression;
Expand All @@ -31,6 +32,8 @@ use crate::pipelines::processors::transforms::aggregator::aggregate_meta::Aggreg
use crate::pipelines::processors::transforms::aggregator::serde::TransformExchangeAggregateSerializer;
use crate::pipelines::processors::transforms::aggregator::serde::TransformExchangeAsyncBarrier;
use crate::pipelines::processors::transforms::aggregator::AggregatorParams;
use crate::pipelines::processors::transforms::aggregator::NewTransformAggregateSpillWriter;
use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream;
use crate::pipelines::processors::transforms::aggregator::TransformAggregateDeserializer;
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSerializer;
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillWriter;
Expand Down Expand Up @@ -59,7 +62,6 @@ impl ExchangeSorting for AggregateExchangeSorting {
))),
Some(meta_info) => match meta_info {
AggregateMeta::Partitioned { .. } => unreachable!(),
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
AggregateMeta::Serialized(v) => {
compute_block_number(v.bucket, v.max_partition_count)
}
Expand All @@ -68,7 +70,9 @@ impl ExchangeSorting for AggregateExchangeSorting {
}
AggregateMeta::AggregateSpilling(_)
| AggregateMeta::Spilled(_)
| AggregateMeta::BucketSpilled(_) => Ok(-1),
| AggregateMeta::BucketSpilled(_)
| AggregateMeta::NewBucketSpilled(_)
| AggregateMeta::NewSpilled(_) => Ok(-1),
},
},
}
Expand Down Expand Up @@ -183,6 +187,7 @@ impl FlightScatter for HashTableHashScatter {
AggregateMeta::Serialized(_) => unreachable!(),
AggregateMeta::Partitioned { .. } => unreachable!(),
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
AggregateMeta::NewSpilled(_) => unreachable!(),
AggregateMeta::AggregateSpilling(payload) => {
for p in scatter_partitioned_payload(payload, self.buckets)? {
blocks.push(DataBlock::empty_with_meta(
Expand Down Expand Up @@ -259,21 +264,41 @@ impl ExchangeInjector for AggregateInjector {
) -> Result<()> {
let params = self.aggregator_params.clone();

let operator = DataOperator::instance().spill_operator();
let location_prefix = self.ctx.query_id_spill_prefix();

pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(
TransformAggregateSpillWriter::try_create(
self.ctx.clone(),
input,
output,
operator.clone(),
params.clone(),
location_prefix.clone(),
)?,
))
})?;
if self.aggregator_params.enable_experiment_aggregate {
let shared_partition_stream = SharedPartitionStream::new(
pipeline.output_len(),
self.aggregator_params.max_block_rows,
self.aggregator_params.max_block_bytes,
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
);

pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(
NewTransformAggregateSpillWriter::try_create(
input,
output,
self.ctx.clone(),
shared_partition_stream.clone(),
)?,
))
})?;
} else {
let operator = DataOperator::instance().spill_operator();
let location_prefix = self.ctx.query_id_spill_prefix();

pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(
TransformAggregateSpillWriter::try_create(
self.ctx.clone(),
input,
output,
operator.clone(),
params.clone(),
location_prefix.clone(),
)?,
))
})?;
}

pipeline.add_transform(|input, output| {
TransformAggregateSerializer::try_create(input, output, params.clone())
Expand All @@ -290,14 +315,25 @@ impl ExchangeInjector for AggregateInjector {
let operator = DataOperator::instance().spill_operator();
let location_prefix = self.ctx.query_id_spill_prefix();

let schema = shuffle_params.schema.clone();
let local_id = &shuffle_params.executor_id;
let local_pos = shuffle_params
.destination_ids
.iter()
.position(|x| x == local_id)
.unwrap();

let mut partition_streams = vec![];
if self.aggregator_params.enable_experiment_aggregate {
for _i in 0..shuffle_params.destination_ids.len() {
partition_streams.push(SharedPartitionStream::new(
pipeline.output_len(),
self.aggregator_params.max_block_rows,
self.aggregator_params.max_block_bytes,
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
));
}
}

pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(
TransformExchangeAggregateSerializer::try_create(
Expand All @@ -308,12 +344,11 @@ impl ExchangeInjector for AggregateInjector {
location_prefix.clone(),
params.clone(),
compression,
schema.clone(),
local_pos,
partition_streams.clone(),
)?,
))
})?;

pipeline.add_transform(TransformExchangeAsyncBarrier::try_create)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ pub enum AggregateMeta {
Spilled(Vec<BucketSpilledPayload>),

Partitioned { bucket: isize, data: Vec<Self> },

NewBucketSpilled(NewSpilledPayload),
NewSpilled(Vec<NewSpilledPayload>),
}

impl AggregateMeta {
Expand Down Expand Up @@ -182,9 +184,13 @@ impl AggregateMeta {
Box::new(AggregateMeta::Partitioned { data, bucket })
}

pub fn create_new_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr {
pub fn create_new_bucket_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr {
Box::new(AggregateMeta::NewBucketSpilled(payload))
}

pub fn create_new_spilled(payloads: Vec<NewSpilledPayload>) -> BlockMetaInfoPtr {
Box::new(AggregateMeta::NewSpilled(payloads))
}
}

impl serde::Serialize for AggregateMeta {
Expand Down Expand Up @@ -215,6 +221,7 @@ impl Debug for AggregateMeta {
AggregateMeta::NewBucketSpilled(_) => {
f.debug_struct("Aggregate::NewBucketSpilled").finish()
}
AggregateMeta::NewSpilled(_) => f.debug_struct("Aggregate::NewSpilled").finish(),
AggregateMeta::AggregatePayload(_) => {
f.debug_struct("AggregateMeta:AggregatePayload").finish()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ pub struct AggregatorParams {

pub enable_experimental_aggregate_hashtable: bool,
pub cluster_aggregator: bool,
pub max_block_size: usize,
pub max_spill_io_requests: usize,

pub enable_experiment_aggregate: bool,

pub max_block_rows: usize,
pub max_block_bytes: usize,
}

impl AggregatorParams {
#[allow(clippy::too_many_arguments)]
pub fn try_create(
input_schema: DataSchemaRef,
group_data_types: Vec<DataType>,
Expand All @@ -54,9 +57,10 @@ impl AggregatorParams {
agg_args: &[Vec<usize>],
enable_experimental_aggregate_hashtable: bool,
cluster_aggregator: bool,
max_block_size: usize,
max_spill_io_requests: usize,
enable_experiment_aggregate: bool,
max_block_rows: usize,
max_block_bytes: usize,
) -> Result<Arc<AggregatorParams>> {
let states_layout = if !agg_funcs.is_empty() {
Some(get_states_layout(agg_funcs)?)
Expand All @@ -73,7 +77,8 @@ impl AggregatorParams {
states_layout,
enable_experimental_aggregate_hashtable,
cluster_aggregator,
max_block_size,
max_block_rows,
max_block_bytes,
max_spill_io_requests,
enable_experiment_aggregate,
}))
Expand Down
Loading