Skip to content

Commit 38a99bd

Browse files
committed
feat(query): update
1 parent 99e5b04 commit 38a99bd

File tree

6 files changed

+127
-50
lines changed

6 files changed

+127
-50
lines changed

src/query/service/src/pipelines/builders/builder_aggregate.rs

+13-6
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,16 @@ impl PipelineBuilder {
103103
self.build_pipeline(&aggregate.input)?;
104104

105105
let max_block_size = self.settings.get_max_block_size()?;
106+
let enable_experimental_aggregate_hashtable = self
107+
.settings
108+
.get_enable_experimental_aggregate_hashtable()?
109+
&& self.ctx.get_cluster().is_empty();
110+
106111
let params = Self::build_aggregator_params(
107112
aggregate.input.output_schema()?,
108113
&aggregate.group_by,
109114
&aggregate.agg_funcs,
115+
enable_experimental_aggregate_hashtable,
110116
max_block_size as usize,
111117
None,
112118
)?;
@@ -128,10 +134,6 @@ impl PipelineBuilder {
128134
}
129135

130136
let efficiently_memory = self.settings.get_efficiently_memory_group_by()?;
131-
let enable_experimental_aggregate_hashtable = self
132-
.settings
133-
.get_enable_experimental_aggregate_hashtable()?
134-
&& self.ctx.get_cluster().is_empty();
135137

136138
let group_cols = &params.group_columns;
137139
let schema_before_group_by = params.input_schema.clone();
@@ -151,7 +153,6 @@ impl PipelineBuilder {
151153
output,
152154
params.clone(),
153155
partial_agg_config.clone(),
154-
enable_experimental_aggregate_hashtable,
155156
),
156157
}),
157158
false => with_mappedhash_method!(|T| match method.clone() {
@@ -162,7 +163,6 @@ impl PipelineBuilder {
162163
output,
163164
params.clone(),
164165
partial_agg_config.clone(),
165-
enable_experimental_aggregate_hashtable,
166166
),
167167
}),
168168
}?;
@@ -235,11 +235,16 @@ impl PipelineBuilder {
235235

236236
pub(crate) fn build_aggregate_final(&mut self, aggregate: &AggregateFinal) -> Result<()> {
237237
let max_block_size = self.settings.get_max_block_size()?;
238+
let enable_experimental_aggregate_hashtable = self
239+
.settings
240+
.get_enable_experimental_aggregate_hashtable()?
241+
&& self.ctx.get_cluster().is_empty();
238242

239243
let params = Self::build_aggregator_params(
240244
aggregate.before_group_by_schema.clone(),
241245
&aggregate.group_by,
242246
&aggregate.agg_funcs,
247+
enable_experimental_aggregate_hashtable,
243248
max_block_size as usize,
244249
aggregate.limit,
245250
)?;
@@ -339,6 +344,7 @@ impl PipelineBuilder {
339344
input_schema: DataSchemaRef,
340345
group_by: &[IndexType],
341346
agg_funcs: &[AggregateFunctionDesc],
347+
enable_experimental_aggregate_hashtable: bool,
342348
max_block_size: usize,
343349
limit: Option<usize>,
344350
) -> Result<Arc<AggregatorParams>> {
@@ -379,6 +385,7 @@ impl PipelineBuilder {
379385
&group_by,
380386
&aggs,
381387
&agg_args,
388+
enable_experimental_aggregate_hashtable,
382389
max_block_size,
383390
limit,
384391
)?;

src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs

+3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub struct AggregatorParams {
3838
pub layout: Option<Layout>,
3939
pub offsets_aggregate_states: Vec<usize>,
4040

41+
pub enable_experimental_aggregate_hashtable: bool,
4142
pub max_block_size: usize,
4243
// Limit is push down to AggregatorTransform
4344
pub limit: Option<usize>,
@@ -50,6 +51,7 @@ impl AggregatorParams {
5051
group_columns: &[usize],
5152
agg_funcs: &[AggregateFunctionRef],
5253
agg_args: &[Vec<usize>],
54+
enable_experimental_aggregate_hashtable: bool,
5355
max_block_size: usize,
5456
limit: Option<usize>,
5557
) -> Result<Arc<AggregatorParams>> {
@@ -68,6 +70,7 @@ impl AggregatorParams {
6870
aggregate_functions_arguments: agg_args.to_vec(),
6971
layout: states_layout,
7072
offsets_aggregate_states: states_offsets,
73+
enable_experimental_aggregate_hashtable,
7174
max_block_size,
7275
limit,
7376
}))

src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs

+56-40
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,57 @@ impl<Method: HashMethodBounds> TransformFinalAggregate<Method> {
6565
},
6666
)))
6767
}
68+
69+
fn transform_agg_hashtable(&mut self, meta: AggregateMeta<Method, usize>) -> Result<DataBlock> {
70+
let mut agg_hashtable: Option<AggregateHashTable> = None;
71+
if let AggregateMeta::Partitioned { bucket: _, data } = meta {
72+
for bucket_data in data {
73+
match bucket_data {
74+
AggregateMeta::AggregateHashTable(payload) => match agg_hashtable.as_mut() {
75+
Some(ht) => {
76+
ht.combine_payloads(&payload, &mut self.flush_state)?;
77+
}
78+
None => {
79+
let capacity =
80+
AggregateHashTable::get_capacity_for_count(payload.len());
81+
82+
let mut hashtable = AggregateHashTable::new_with_capacity(
83+
self.params.group_data_types.clone(),
84+
self.params.aggregate_functions.clone(),
85+
HashTableConfig::default().with_initial_radix_bits(0),
86+
capacity,
87+
);
88+
hashtable.combine_payloads(&payload, &mut self.flush_state)?;
89+
agg_hashtable = Some(hashtable);
90+
}
91+
},
92+
_ => unreachable!(),
93+
}
94+
}
95+
}
96+
97+
if let Some(mut ht) = agg_hashtable {
98+
let mut blocks = vec![];
99+
self.flush_state.clear();
100+
loop {
101+
if ht.merge_result(&mut self.flush_state)? {
102+
let mut cols = self.flush_state.take_aggregate_results();
103+
cols.extend_from_slice(&self.flush_state.group_columns);
104+
105+
blocks.push(DataBlock::new_from_columns(cols));
106+
} else {
107+
break;
108+
}
109+
}
110+
111+
if blocks.is_empty() {
112+
return Ok(DataBlock::empty());
113+
}
114+
return DataBlock::concat(&blocks);
115+
}
116+
117+
Ok(DataBlock::empty())
118+
}
68119
}
69120

70121
impl<Method> BlockMetaTransform<AggregateMeta<Method, usize>> for TransformFinalAggregate<Method>
@@ -73,15 +124,17 @@ where Method: HashMethodBounds
73124
const NAME: &'static str = "TransformFinalAggregate";
74125

75126
fn transform(&mut self, meta: AggregateMeta<Method, usize>) -> Result<DataBlock> {
127+
if self.params.enable_experimental_aggregate_hashtable {
128+
return self.transform_agg_hashtable(meta);
129+
}
130+
76131
if let AggregateMeta::Partitioned { bucket, data } = meta {
77132
let mut reach_limit = false;
78133
let arena = Arc::new(Bump::new());
79134
let hashtable = self.method.create_hash_table::<usize>(arena)?;
80135
let _dropper = AggregateHashTableDropper::create(self.params.clone());
81136
let mut hash_cell = HashTableCell::<Method, usize>::create(hashtable, _dropper);
82137

83-
let mut agg_hashtable: Option<AggregateHashTable> = None;
84-
85138
for bucket_data in data {
86139
match bucket_data {
87140
AggregateMeta::Spilled(_) => unreachable!(),
@@ -186,45 +239,8 @@ where Method: HashMethodBounds
186239
}
187240
}
188241
},
189-
AggregateMeta::AggregateHashTable(payload) => match agg_hashtable.as_mut() {
190-
Some(ht) => {
191-
ht.combine_payloads(&payload, &mut self.flush_state)?;
192-
}
193-
None => {
194-
let capacity =
195-
AggregateHashTable::get_capacity_for_count(payload.len());
196-
197-
let mut hashtable = AggregateHashTable::new_with_capacity(
198-
self.params.group_data_types.clone(),
199-
self.params.aggregate_functions.clone(),
200-
HashTableConfig::default().with_initial_radix_bits(0),
201-
capacity,
202-
);
203-
hashtable.combine_payloads(&payload, &mut self.flush_state)?;
204-
agg_hashtable = Some(hashtable);
205-
}
206-
},
207-
}
208-
}
209-
210-
if let Some(mut ht) = agg_hashtable {
211-
let mut blocks = vec![];
212-
self.flush_state.clear();
213-
loop {
214-
if ht.merge_result(&mut self.flush_state)? {
215-
let mut cols = self.flush_state.take_aggregate_results();
216-
cols.extend_from_slice(&self.flush_state.group_columns);
217-
218-
blocks.push(DataBlock::new_from_columns(cols));
219-
} else {
220-
break;
221-
}
222-
}
223-
224-
if blocks.is_empty() {
225-
return Ok(DataBlock::empty());
242+
AggregateMeta::AggregateHashTable(_) => unreachable!(),
226243
}
227-
return DataBlock::concat(&blocks);
228244
}
229245

230246
let keys_len = hash_cell.hashtable.len();

src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,8 @@ impl<Method: HashMethodBounds> TransformPartialAggregate<Method> {
122122
output: Arc<OutputPort>,
123123
params: Arc<AggregatorParams>,
124124
config: HashTableConfig,
125-
enable_experimental_aggregate_hashtable: bool,
126125
) -> Result<Box<dyn Processor>> {
127-
let hash_table = if !enable_experimental_aggregate_hashtable {
126+
let hash_table = if !params.enable_experimental_aggregate_hashtable {
128127
let arena = Arc::new(Bump::new());
129128
let hashtable = method.create_hash_table(arena)?;
130129
let _dropper = AggregateHashTableDropper::create(params.clone());

src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs

+53
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,55 @@ impl<Method: HashMethodBounds> TransformFinalGroupBy<Method> {
5959
},
6060
)))
6161
}
62+
63+
fn transform_agg_hashtable(&mut self, meta: AggregateMeta<Method, ()>) -> Result<DataBlock> {
64+
let mut agg_hashtable: Option<AggregateHashTable> = None;
65+
if let AggregateMeta::Partitioned { bucket: _, data } = meta {
66+
for bucket_data in data {
67+
match bucket_data {
68+
AggregateMeta::AggregateHashTable(payload) => match agg_hashtable.as_mut() {
69+
Some(ht) => {
70+
ht.combine_payloads(&payload, &mut self.flush_state)?;
71+
}
72+
None => {
73+
let capacity =
74+
AggregateHashTable::get_capacity_for_count(payload.len());
75+
let mut hashtable = AggregateHashTable::new_with_capacity(
76+
self.params.group_data_types.clone(),
77+
self.params.aggregate_functions.clone(),
78+
HashTableConfig::default().with_initial_radix_bits(0),
79+
capacity,
80+
);
81+
hashtable.combine_payloads(&payload, &mut self.flush_state)?;
82+
agg_hashtable = Some(hashtable);
83+
}
84+
},
85+
_ => unreachable!(),
86+
}
87+
}
88+
}
89+
90+
if let Some(mut ht) = agg_hashtable {
91+
let mut blocks = vec![];
92+
self.flush_state.clear();
93+
loop {
94+
if ht.merge_result(&mut self.flush_state)? {
95+
blocks.push(DataBlock::new_from_columns(
96+
self.flush_state.take_group_columns(),
97+
));
98+
} else {
99+
break;
100+
}
101+
}
102+
103+
if blocks.is_empty() {
104+
return Ok(DataBlock::empty());
105+
}
106+
107+
return DataBlock::concat(&blocks);
108+
}
109+
Ok(DataBlock::empty())
110+
}
62111
}
63112

64113
impl<Method> BlockMetaTransform<AggregateMeta<Method, ()>> for TransformFinalGroupBy<Method>
@@ -67,6 +116,10 @@ where Method: HashMethodBounds
67116
const NAME: &'static str = "TransformFinalGroupBy";
68117

69118
fn transform(&mut self, meta: AggregateMeta<Method, ()>) -> Result<DataBlock> {
119+
if self.params.enable_experimental_aggregate_hashtable {
120+
return self.transform_agg_hashtable(meta);
121+
}
122+
70123
if let AggregateMeta::Partitioned { bucket, data } = meta {
71124
let arena = Arc::new(Bump::new());
72125
let mut hashtable = self.method.create_hash_table::<()>(arena)?;

src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,8 @@ impl<Method: HashMethodBounds> TransformPartialGroupBy<Method> {
117117
output: Arc<OutputPort>,
118118
params: Arc<AggregatorParams>,
119119
config: HashTableConfig,
120-
enable_experimental_aggregate_hashtable: bool,
121120
) -> Result<Box<dyn Processor>> {
122-
let hash_table = if !enable_experimental_aggregate_hashtable {
121+
let hash_table = if !params.enable_experimental_aggregate_hashtable {
123122
let arena = Arc::new(Bump::new());
124123
let hashtable = method.create_hash_table(arena)?;
125124
let _dropper = GroupByHashTableDropper::<Method>::create();

0 commit comments

Comments
 (0)