@@ -26,6 +26,7 @@ use databend_common_exception::ErrorCode;
2626use databend_common_exception:: Result ;
2727use databend_common_expression:: BlockPartitionStream ;
2828use databend_common_expression:: DataBlock ;
29+ use databend_common_pipeline_transforms:: traits:: Location ;
2930use databend_common_pipeline_transforms:: MemorySettings ;
3031use databend_common_storage:: DataOperator ;
3132use databend_common_storages_parquet:: ReadSettings ;
@@ -38,6 +39,8 @@ use crate::pipelines::processors::transforms::aggregator::AggregateMeta;
3839use crate :: pipelines:: processors:: transforms:: aggregator:: NewSpilledPayload ;
3940use crate :: pipelines:: processors:: transforms:: aggregator:: SerializedPayload ;
4041use crate :: sessions:: QueryContext ;
42+ use crate :: spillers:: Layout ;
43+ use crate :: spillers:: SpillAdapter ;
4144use crate :: spillers:: SpillsBufferPool ;
4245use crate :: spillers:: SpillsDataWriter ;
4346
@@ -106,15 +109,17 @@ struct AggregatePayloadWriters {
106109 partition_count : usize ,
107110 writers : Vec < PayloadWriter > ,
108111 write_stats : WriteStats ,
112+ ctx : Arc < QueryContext > ,
109113}
110114
111115impl AggregatePayloadWriters {
112- pub fn create ( prefix : & str , partition_count : usize ) -> Self {
116+ pub fn create ( prefix : & str , partition_count : usize , ctx : Arc < QueryContext > ) -> Self {
113117 AggregatePayloadWriters {
114118 spill_prefix : prefix. to_string ( ) ,
115119 partition_count,
116120 writers : vec ! [ ] ,
117121 write_stats : WriteStats :: default ( ) ,
122+ ctx,
118123 }
119124 }
120125
@@ -144,23 +149,29 @@ impl AggregatePayloadWriters {
144149 let start = Instant :: now ( ) ;
145150 self . writers [ bucket] . write_block ( block) ?;
146151
147- // TODO: the time recorded may not be accurate due to the serialization is included
148152 let elapsed = start. elapsed ( ) ;
149153 self . write_stats . accumulate ( elapsed) ;
150154 }
151155
152156 Ok ( ( ) )
153157 }
154158
155- pub fn finalize ( & mut self ) -> Result < ( Vec < NewSpilledPayload > , WriteStats ) > {
159+ pub fn finalize ( & mut self ) -> Result < Vec < NewSpilledPayload > > {
156160 let writers = mem:: take ( & mut self . writers ) ;
157161 if writers. is_empty ( ) {
158- return Ok ( ( Vec :: new ( ) , self . write_stats . take ( ) ) ) ;
162+ return Ok ( Vec :: new ( ) ) ;
159163 }
160164
161165 let mut spilled_payloads = Vec :: new ( ) ;
162166 for ( partition_id, writer) in writers. into_iter ( ) . enumerate ( ) {
163167 let ( path, written_size, row_groups) = writer. close ( ) ?;
168+
169+ self . ctx . add_spill_file (
170+ Location :: Remote ( path. clone ( ) ) ,
171+ Layout :: Aggregate ,
172+ written_size,
173+ ) ;
174+
164175 if row_groups. is_empty ( ) {
165176 continue ;
166177 }
@@ -180,7 +191,9 @@ impl AggregatePayloadWriters {
180191 }
181192
182193 let stats = self . write_stats . take ( ) ;
183- Ok ( ( spilled_payloads, stats) )
194+ flush_write_profile ( & self . ctx , stats) ;
195+
196+ Ok ( spilled_payloads)
184197 }
185198}
186199
@@ -252,7 +265,6 @@ impl SharedPartitionStream {
252265
253266pub struct NewAggregateSpiller {
254267 pub memory_settings : MemorySettings ,
255- ctx : Arc < QueryContext > ,
256268 read_setting : ReadSettings ,
257269 partition_count : usize ,
258270 partition_stream : SharedPartitionStream ,
@@ -270,11 +282,10 @@ impl NewAggregateSpiller {
270282 let read_setting = ReadSettings :: from_settings ( & table_ctx. get_settings ( ) ) ?;
271283 let spill_prefix = ctx. query_id_spill_prefix ( ) ;
272284
273- let payload_writers = AggregatePayloadWriters :: create ( & spill_prefix, partition_count) ;
285+ let payload_writers = AggregatePayloadWriters :: create ( & spill_prefix, partition_count, ctx ) ;
274286
275287 Ok ( Self {
276288 memory_settings,
277- ctx,
278289 read_setting,
279290 partition_count,
280291 partition_stream,
@@ -292,8 +303,7 @@ impl NewAggregateSpiller {
292303 let pending_blocks = self . partition_stream . finish ( ) ;
293304 self . payload_writers . write_ready_blocks ( pending_blocks) ?;
294305
295- let ( payloads, write_stats) = self . payload_writers . finalize ( ) ?;
296- self . flush_write_profile ( write_stats) ;
306+ let payloads = self . payload_writers . finalize ( ) ?;
297307 info ! (
298308 "[NewAggregateSpiller] spill finish with {} payloads" ,
299309 payloads. len( )
@@ -315,7 +325,7 @@ impl NewAggregateSpiller {
315325 let read_bytes = row_group. total_byte_size ( ) as usize ;
316326 let instant = Instant :: now ( ) ;
317327 let data_block = reader. read ( self . read_setting ) ?;
318- self . flush_read_profile ( & instant, read_bytes) ;
328+ flush_read_profile ( & instant, read_bytes) ;
319329
320330 if let Some ( block) = data_block {
321331 Ok ( AggregateMeta :: Serialized ( SerializedPayload {
@@ -327,44 +337,38 @@ impl NewAggregateSpiller {
327337 Err ( ErrorCode :: Internal ( "read empty block from final aggregate" ) )
328338 }
329339 }
340+ }
330341
331- fn flush_read_profile ( & self , instant : & Instant , read_bytes : usize ) {
332- Profile :: record_usize_profile ( ProfileStatisticsName :: RemoteSpillReadCount , 1 ) ;
333- Profile :: record_usize_profile ( ProfileStatisticsName :: RemoteSpillReadBytes , read_bytes) ;
342+ fn flush_read_profile ( instant : & Instant , read_bytes : usize ) {
343+ Profile :: record_usize_profile ( ProfileStatisticsName :: RemoteSpillReadCount , 1 ) ;
344+ Profile :: record_usize_profile ( ProfileStatisticsName :: RemoteSpillReadBytes , read_bytes) ;
345+ Profile :: record_usize_profile (
346+ ProfileStatisticsName :: RemoteSpillReadTime ,
347+ instant. elapsed ( ) . as_millis ( ) as usize ,
348+ ) ;
349+ }
350+
351+ fn flush_write_profile ( ctx : & Arc < QueryContext > , stats : WriteStats ) {
352+ if stats. count == 0 && stats. bytes == 0 && stats. rows == 0 {
353+ return ;
354+ }
355+
356+ if stats. count > 0 {
357+ Profile :: record_usize_profile ( ProfileStatisticsName :: RemoteSpillWriteCount , stats. count ) ;
334358 Profile :: record_usize_profile (
335- ProfileStatisticsName :: RemoteSpillReadTime ,
336- instant . elapsed ( ) . as_millis ( ) as usize ,
359+ ProfileStatisticsName :: RemoteSpillWriteTime ,
360+ stats . elapsed . as_millis ( ) as usize ,
337361 ) ;
338362 }
363+ if stats. bytes > 0 {
364+ Profile :: record_usize_profile ( ProfileStatisticsName :: RemoteSpillWriteBytes , stats. bytes ) ;
365+ }
339366
340- fn flush_write_profile ( & self , stats : WriteStats ) {
341- if stats. count == 0 && stats. bytes == 0 && stats. rows == 0 {
342- return ;
343- }
344-
345- if stats. count > 0 {
346- Profile :: record_usize_profile (
347- ProfileStatisticsName :: RemoteSpillWriteCount ,
348- stats. count ,
349- ) ;
350- Profile :: record_usize_profile (
351- ProfileStatisticsName :: RemoteSpillWriteTime ,
352- stats. elapsed . as_millis ( ) as usize ,
353- ) ;
354- }
355- if stats. bytes > 0 {
356- Profile :: record_usize_profile (
357- ProfileStatisticsName :: RemoteSpillWriteBytes ,
358- stats. bytes ,
359- ) ;
360- }
361-
362- if stats. rows > 0 || stats. bytes > 0 {
363- let progress_val = ProgressValues {
364- rows : stats. rows ,
365- bytes : stats. bytes ,
366- } ;
367- self . ctx . get_aggregate_spill_progress ( ) . incr ( & progress_val) ;
368- }
367+ if stats. rows > 0 || stats. bytes > 0 {
368+ let progress_val = ProgressValues {
369+ rows : stats. rows ,
370+ bytes : stats. bytes ,
371+ } ;
372+ ctx. get_aggregate_spill_progress ( ) . incr ( & progress_val) ;
369373 }
370374}
0 commit comments