Skip to content

Commit 47dd6fc

Browse files
committed
chore: make standalone work
1 parent 4565f93 commit 47dd6fc

File tree

3 files changed

+35
-1
lines changed

3 files changed

+35
-1
lines changed

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ impl AggregatePayloadWriters {
143143
let start = Instant::now();
144144
self.writers[bucket].write_block(block)?;
145145

146+
// TODO: the time recorded may not be accurate due to the serialization is included
146147
let elapsed = start.elapsed();
147148
self.write_stats.accumulate(elapsed);
148149
}

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ impl AccumulatingTransform for NewTransformAggregateSpillWriter {
6161
let meta = data.take_meta().unwrap();
6262
let aggregate_meta = AggregateMeta::downcast_from(meta).unwrap();
6363
if let AggregateMeta::AggregateSpilling(partition) = aggregate_meta {
64+
// we use fixed size partitioning, the different bucket number will caused spilled data can't be merged correctly
65+
debug_assert_eq!(
66+
partition.payloads.len(),
67+
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM,
68+
"the number of payloads should be equal to MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM for spilling partition"
69+
);
70+
6471
for (bucket, payload) in partition.payloads.into_iter().enumerate() {
6572
if payload.len() == 0 {
6673
continue;

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/transform_partition_bucket_scatter.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,33 @@ impl TransformPartitionBucketScatter {
294294

295295
(payload.bucket, payload.max_partition_count)
296296
}
297-
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
297+
AggregateMeta::NewBucketSpilled(_) => {
298+
let meta = data_block.take_meta().unwrap();
299+
300+
if let Some(AggregateMeta::NewBucketSpilled(payload)) =
301+
AggregateMeta::downcast_from(meta)
302+
{
303+
let bucket = payload.bucket;
304+
let partition_count = MAX_PARTITION_COUNT;
305+
self.max_partition_count =
306+
self.max_partition_count.max(partition_count);
307+
308+
let data_block = DataBlock::empty_with_meta(
309+
AggregateMeta::create_new_spilled(payload),
310+
);
311+
match self.buckets_blocks.entry(bucket) {
312+
Entry::Vacant(v) => {
313+
v.insert(vec![data_block]);
314+
}
315+
Entry::Occupied(mut v) => {
316+
v.get_mut().push(data_block);
317+
}
318+
};
319+
320+
return Ok((SINGLE_LEVEL_BUCKET_NUM, MAX_PARTITION_COUNT));
321+
}
322+
unreachable!()
323+
}
298324
};
299325
} else {
300326
return Err(ErrorCode::Internal(format!(

0 commit comments

Comments
 (0)