Skip to content

Commit 359c3c6

Browse files
committed
chore: support in cluster mode
1 parent f6dcfde commit 359c3c6

20 files changed

+520
-171
lines changed

src/query/service/src/physical_plans/physical_aggregate_final.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ impl IPhysicalPlan for AggregateFinal {
142142
}
143143

144144
fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
145-
let max_block_size = builder.settings.get_max_block_size()?;
145+
let max_block_rows = builder.settings.get_max_block_size()? as usize;
146+
let max_block_bytes = builder.settings.get_max_block_bytes()? as usize;
146147
let enable_experimental_aggregate_hashtable = builder
147148
.settings
148149
.get_enable_experimental_aggregate_hashtable()?;
@@ -161,9 +162,10 @@ impl IPhysicalPlan for AggregateFinal {
161162
&self.agg_funcs,
162163
enable_experimental_aggregate_hashtable,
163164
is_cluster_aggregate,
164-
max_block_size as usize,
165165
max_spill_io_requests as usize,
166166
enable_experiment_aggregate,
167+
max_block_rows,
168+
max_block_bytes,
167169
)?;
168170

169171
if params.group_columns.is_empty() {

src/query/service/src/physical_plans/physical_aggregate_partial.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,10 @@ impl IPhysicalPlan for AggregatePartial {
190190
&self.agg_funcs,
191191
enable_experimental_aggregate_hashtable,
192192
builder.is_exchange_parent(),
193-
max_block_rows,
194193
max_spill_io_requests as usize,
195194
enable_experiment_aggregate,
195+
max_block_rows,
196+
max_block_bytes,
196197
)?;
197198

198199
if params.group_columns.is_empty() {

src/query/service/src/pipelines/builders/builder_aggregate.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ impl PipelineBuilder {
3636
agg_funcs: &[AggregateFunctionDesc],
3737
enable_experimental_aggregate_hashtable: bool,
3838
cluster_aggregator: bool,
39-
max_block_size: usize,
4039
max_spill_io_requests: usize,
4140
enable_experiment_aggregate: bool,
41+
max_block_rows: usize,
42+
max_block_bytes: usize,
4243
) -> Result<Arc<AggregatorParams>> {
4344
let mut agg_args = Vec::with_capacity(agg_funcs.len());
4445
let (group_by, group_data_types) = group_by
@@ -131,9 +132,10 @@ impl PipelineBuilder {
131132
&agg_args,
132133
enable_experimental_aggregate_hashtable,
133134
cluster_aggregator,
134-
max_block_size,
135135
max_spill_io_requests,
136136
enable_experiment_aggregate,
137+
max_block_rows,
138+
max_block_bytes,
137139
)?;
138140

139141
log::debug!("aggregate states layout: {:?}", params.states_layout);

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use databend_common_expression::DataBlock;
2222
use databend_common_expression::PartitionedPayload;
2323
use databend_common_expression::Payload;
2424
use databend_common_expression::PayloadFlushState;
25+
use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM;
2526
use databend_common_pipeline::core::Pipeline;
2627
use databend_common_pipeline::core::ProcessorPtr;
2728
use databend_common_settings::FlightCompression;
@@ -31,6 +32,8 @@ use crate::pipelines::processors::transforms::aggregator::aggregate_meta::Aggreg
3132
use crate::pipelines::processors::transforms::aggregator::serde::TransformExchangeAggregateSerializer;
3233
use crate::pipelines::processors::transforms::aggregator::serde::TransformExchangeAsyncBarrier;
3334
use crate::pipelines::processors::transforms::aggregator::AggregatorParams;
35+
use crate::pipelines::processors::transforms::aggregator::NewTransformAggregateSpillWriter;
36+
use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream;
3437
use crate::pipelines::processors::transforms::aggregator::TransformAggregateDeserializer;
3538
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSerializer;
3639
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillWriter;
@@ -59,7 +62,6 @@ impl ExchangeSorting for AggregateExchangeSorting {
5962
))),
6063
Some(meta_info) => match meta_info {
6164
AggregateMeta::Partitioned { .. } => unreachable!(),
62-
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
6365
AggregateMeta::Serialized(v) => {
6466
compute_block_number(v.bucket, v.max_partition_count)
6567
}
@@ -68,7 +70,9 @@ impl ExchangeSorting for AggregateExchangeSorting {
6870
}
6971
AggregateMeta::AggregateSpilling(_)
7072
| AggregateMeta::Spilled(_)
71-
| AggregateMeta::BucketSpilled(_) => Ok(-1),
73+
| AggregateMeta::BucketSpilled(_)
74+
| AggregateMeta::NewBucketSpilled(_)
75+
| AggregateMeta::NewSpilled(_) => Ok(-1),
7276
},
7377
},
7478
}
@@ -183,6 +187,7 @@ impl FlightScatter for HashTableHashScatter {
183187
AggregateMeta::Serialized(_) => unreachable!(),
184188
AggregateMeta::Partitioned { .. } => unreachable!(),
185189
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
190+
AggregateMeta::NewSpilled(_) => unreachable!(),
186191
AggregateMeta::AggregateSpilling(payload) => {
187192
for p in scatter_partitioned_payload(payload, self.buckets)? {
188193
blocks.push(DataBlock::empty_with_meta(
@@ -259,21 +264,41 @@ impl ExchangeInjector for AggregateInjector {
259264
) -> Result<()> {
260265
let params = self.aggregator_params.clone();
261266

262-
let operator = DataOperator::instance().spill_operator();
263-
let location_prefix = self.ctx.query_id_spill_prefix();
264-
265-
pipeline.add_transform(|input, output| {
266-
Ok(ProcessorPtr::create(
267-
TransformAggregateSpillWriter::try_create(
268-
self.ctx.clone(),
269-
input,
270-
output,
271-
operator.clone(),
272-
params.clone(),
273-
location_prefix.clone(),
274-
)?,
275-
))
276-
})?;
267+
if self.aggregator_params.enable_experiment_aggregate {
268+
let shared_partition_stream = SharedPartitionStream::new(
269+
pipeline.output_len(),
270+
self.aggregator_params.max_block_rows,
271+
self.aggregator_params.max_block_bytes,
272+
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
273+
);
274+
275+
pipeline.add_transform(|input, output| {
276+
Ok(ProcessorPtr::create(
277+
NewTransformAggregateSpillWriter::try_create(
278+
input,
279+
output,
280+
self.ctx.clone(),
281+
shared_partition_stream.clone(),
282+
)?,
283+
))
284+
})?;
285+
} else {
286+
let operator = DataOperator::instance().spill_operator();
287+
let location_prefix = self.ctx.query_id_spill_prefix();
288+
289+
pipeline.add_transform(|input, output| {
290+
Ok(ProcessorPtr::create(
291+
TransformAggregateSpillWriter::try_create(
292+
self.ctx.clone(),
293+
input,
294+
output,
295+
operator.clone(),
296+
params.clone(),
297+
location_prefix.clone(),
298+
)?,
299+
))
300+
})?;
301+
}
277302

278303
pipeline.add_transform(|input, output| {
279304
TransformAggregateSerializer::try_create(input, output, params.clone())
@@ -290,14 +315,25 @@ impl ExchangeInjector for AggregateInjector {
290315
let operator = DataOperator::instance().spill_operator();
291316
let location_prefix = self.ctx.query_id_spill_prefix();
292317

293-
let schema = shuffle_params.schema.clone();
294318
let local_id = &shuffle_params.executor_id;
295319
let local_pos = shuffle_params
296320
.destination_ids
297321
.iter()
298322
.position(|x| x == local_id)
299323
.unwrap();
300324

325+
let mut partition_streams = vec![];
326+
if self.aggregator_params.enable_experiment_aggregate {
327+
for _i in 0..shuffle_params.destination_ids.len() {
328+
partition_streams.push(SharedPartitionStream::new(
329+
pipeline.output_len(),
330+
self.aggregator_params.max_block_rows,
331+
self.aggregator_params.max_block_bytes,
332+
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
333+
));
334+
}
335+
}
336+
301337
pipeline.add_transform(|input, output| {
302338
Ok(ProcessorPtr::create(
303339
TransformExchangeAggregateSerializer::try_create(
@@ -308,12 +344,11 @@ impl ExchangeInjector for AggregateInjector {
308344
location_prefix.clone(),
309345
params.clone(),
310346
compression,
311-
schema.clone(),
312347
local_pos,
348+
partition_streams.clone(),
313349
)?,
314350
))
315351
})?;
316-
317352
pipeline.add_transform(TransformExchangeAsyncBarrier::try_create)
318353
}
319354

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,9 @@ pub enum AggregateMeta {
138138
Spilled(Vec<BucketSpilledPayload>),
139139

140140
Partitioned { bucket: isize, data: Vec<Self> },
141+
141142
NewBucketSpilled(NewSpilledPayload),
143+
NewSpilled(Vec<NewSpilledPayload>),
142144
}
143145

144146
impl AggregateMeta {
@@ -182,9 +184,13 @@ impl AggregateMeta {
182184
Box::new(AggregateMeta::Partitioned { data, bucket })
183185
}
184186

185-
pub fn create_new_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr {
187+
pub fn create_new_bucket_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr {
186188
Box::new(AggregateMeta::NewBucketSpilled(payload))
187189
}
190+
191+
pub fn create_new_spilled(payloads: Vec<NewSpilledPayload>) -> BlockMetaInfoPtr {
192+
Box::new(AggregateMeta::NewSpilled(payloads))
193+
}
188194
}
189195

190196
impl serde::Serialize for AggregateMeta {
@@ -215,6 +221,7 @@ impl Debug for AggregateMeta {
215221
AggregateMeta::NewBucketSpilled(_) => {
216222
f.debug_struct("Aggregate::NewBucketSpilled").finish()
217223
}
224+
AggregateMeta::NewSpilled(_) => f.debug_struct("Aggregate::NewSpilled").finish(),
218225
AggregateMeta::AggregatePayload(_) => {
219226
f.debug_struct("AggregateMeta:AggregatePayload").finish()
220227
}

src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@ pub struct AggregatorParams {
3939

4040
pub enable_experimental_aggregate_hashtable: bool,
4141
pub cluster_aggregator: bool,
42-
pub max_block_size: usize,
4342
pub max_spill_io_requests: usize,
4443

4544
pub enable_experiment_aggregate: bool,
45+
46+
pub max_block_rows: usize,
47+
pub max_block_bytes: usize,
4648
}
4749

4850
impl AggregatorParams {
@@ -54,9 +56,10 @@ impl AggregatorParams {
5456
agg_args: &[Vec<usize>],
5557
enable_experimental_aggregate_hashtable: bool,
5658
cluster_aggregator: bool,
57-
max_block_size: usize,
5859
max_spill_io_requests: usize,
5960
enable_experiment_aggregate: bool,
61+
max_block_rows: usize,
62+
max_block_bytes: usize,
6063
) -> Result<Arc<AggregatorParams>> {
6164
let states_layout = if !agg_funcs.is_empty() {
6265
Some(get_states_layout(agg_funcs)?)
@@ -73,7 +76,8 @@ impl AggregatorParams {
7376
states_layout,
7477
enable_experimental_aggregate_hashtable,
7578
cluster_aggregator,
76-
max_block_size,
79+
max_block_rows,
80+
max_block_bytes,
7781
max_spill_io_requests,
7882
enable_experiment_aggregate,
7983
}))

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use databend_common_expression::DataBlock;
2929
use databend_common_pipeline_transforms::MemorySettings;
3030
use databend_common_storage::DataOperator;
3131
use databend_common_storages_parquet::ReadSettings;
32+
use log::info;
3233
use parking_lot::Mutex;
3334
use parquet::file::metadata::RowGroupMetaData;
3435

@@ -293,7 +294,10 @@ impl NewAggregateSpiller {
293294

294295
let (payloads, write_stats) = self.payload_writers.finalize()?;
295296
self.flush_write_profile(write_stats);
296-
297+
info!(
298+
"[NewAggregateSpiller] spill finish with {} payloads",
299+
payloads.len()
300+
);
297301
Ok(payloads)
298302
}
299303

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl AccumulatingTransform for NewTransformAggregateSpillWriter {
6363
if let AggregateMeta::AggregateSpilling(partition) = aggregate_meta {
6464
// we use fixed size partitioning, the different bucket number will caused spilled data can't be merged correctly
6565
debug_assert_eq!(
66-
partition.payloads.len(),
66+
partition.payloads.len() as u64,
6767
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM,
6868
"the number of payloads should be equal to MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM for spilling partition"
6969
);
@@ -90,7 +90,7 @@ impl AccumulatingTransform for NewTransformAggregateSpillWriter {
9090
let mut spilled_blocks = Vec::with_capacity(spilled_payloads.len());
9191
for payload in spilled_payloads {
9292
spilled_blocks.push(DataBlock::empty_with_meta(
93-
AggregateMeta::create_new_spilled(payload),
93+
AggregateMeta::create_new_bucket_spilled(payload),
9494
));
9595
}
9696

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ impl NewFinalAggregateTransform {
111111
}
112112
// Already a single payload for one upstream bucket.
113113
AggregateMeta::AggregatePayload(agg_payload) => agg_payload.payload,
114+
AggregateMeta::NewSpilled(_) => {
115+
return Err(ErrorCode::Internal(
116+
"New spilled payload must be restored before repartitioning",
117+
));
118+
}
114119
_ => {
115120
return Err(ErrorCode::Internal(
116121
"Unexpected meta type for repartitioning",
@@ -264,6 +269,7 @@ impl NewFinalAggregateTransform {
264269
agg_hashtable = Some(hashtable);
265270
}
266271
},
272+
AggregateMeta::NewSpilled(_) => unreachable!(),
267273
_ => unreachable!(),
268274
}
269275
}
@@ -310,6 +316,11 @@ impl NewFinalAggregateTransform {
310316
let data_block = payload.aggregate_flush_all()?.consume_convert_to_full();
311317
self.spiller.spill(id, data_block)?;
312318
}
319+
AggregateMeta::NewSpilled(_) => {
320+
return Err(ErrorCode::Internal(
321+
"New spilled payload should not exist in repartitioned queues",
322+
));
323+
}
313324
_ => {
314325
return Err(ErrorCode::Internal(
315326
"NewAggregateSpiller expects AggregatePayload in repartitioned queue",
@@ -423,6 +434,7 @@ impl Processor for NewFinalAggregateTransform {
423434
let meta = match meta {
424435
AggregateMeta::NewBucketSpilled(p) => self.spiller.restore(p)?,
425436
AggregateMeta::BucketSpilled(_) => unreachable!(),
437+
AggregateMeta::NewSpilled(_) => unreachable!(),
426438
other => other,
427439
};
428440
self.repartition(meta)?;

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/transform_partition_bucket_scatter.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,35 @@ impl TransformPartitionBucketScatter {
280280
}
281281
unreachable!()
282282
}
283+
AggregateMeta::NewSpilled(_) => {
284+
let meta = data_block.take_meta().unwrap();
285+
286+
if let Some(AggregateMeta::NewSpilled(payloads)) =
287+
AggregateMeta::downcast_from(meta)
288+
{
289+
let partition_count = MAX_PARTITION_COUNT;
290+
self.max_partition_count =
291+
self.max_partition_count.max(partition_count);
292+
293+
for payload in payloads {
294+
let bucket = payload.bucket;
295+
let data_block = DataBlock::empty_with_meta(
296+
AggregateMeta::create_new_bucket_spilled(payload),
297+
);
298+
match self.buckets_blocks.entry(bucket) {
299+
Entry::Vacant(v) => {
300+
v.insert(vec![data_block]);
301+
}
302+
Entry::Occupied(mut v) => {
303+
v.get_mut().push(data_block);
304+
}
305+
};
306+
}
307+
308+
return Ok((SINGLE_LEVEL_BUCKET_NUM, partition_count));
309+
}
310+
unreachable!()
311+
}
283312
AggregateMeta::Serialized(payload) => {
284313
is_empty_block = payload.data_block.is_empty();
285314
self.max_partition_count =
@@ -306,7 +335,7 @@ impl TransformPartitionBucketScatter {
306335
self.max_partition_count.max(partition_count);
307336

308337
let data_block = DataBlock::empty_with_meta(
309-
AggregateMeta::create_new_spilled(payload),
338+
AggregateMeta::create_new_bucket_spilled(payload),
310339
);
311340
match self.buckets_blocks.entry(bucket) {
312341
Entry::Vacant(v) => {
@@ -597,6 +626,7 @@ impl Processor for TransformPartitionBucketScatter {
597626
AggregateMeta::AggregateSpilling(_) => unreachable!(),
598627
AggregateMeta::BucketSpilled(_) => unreachable!(),
599628
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
629+
AggregateMeta::NewSpilled(_) => unreachable!(),
600630
AggregateMeta::Serialized(payload) => self.partition_block(payload)?,
601631
AggregateMeta::AggregatePayload(payload) => self.partition_payload(payload)?,
602632
};

0 commit comments

Comments
 (0)