15
15
use std:: any:: Any ;
16
16
use std:: sync:: Arc ;
17
17
18
+ use common_arrow:: arrow:: bitmap:: Bitmap ;
19
+ use common_arrow:: arrow:: bitmap:: MutableBitmap ;
20
+ use common_arrow:: parquet:: indexes:: Interval ;
18
21
use common_base:: base:: Progress ;
19
22
use common_base:: base:: ProgressValues ;
20
23
use common_catalog:: plan:: PartInfoPtr ;
@@ -48,9 +51,14 @@ struct PrewhereData {
48
51
/// The states for [`ParquetSource`]. The states will recycle for each row group of a parquet file.
49
52
enum State {
50
53
ReadDataPrewhere ( Option < PartInfoPtr > ) ,
51
- ReadDataRemain ( PartInfoPtr , PrewhereData ) ,
52
- PrewhereFilter ( PartInfoPtr , Vec < IndexedChunk > ) ,
53
- Deserialize ( PartInfoPtr , Vec < IndexedChunk > , Option < PrewhereData > ) ,
54
+ ReadDataRemain ( PartInfoPtr , PrewhereData , Option < Bitmap > ) ,
55
+ PrewhereFilter ( PartInfoPtr , Vec < IndexedChunk > , Option < Bitmap > ) ,
56
+ Deserialize (
57
+ PartInfoPtr ,
58
+ Vec < IndexedChunk > ,
59
+ Option < PrewhereData > ,
60
+ Option < Bitmap > ,
61
+ ) ,
54
62
Generated ( Option < PartInfoPtr > , DataBlock ) ,
55
63
Finish ,
56
64
}
@@ -108,12 +116,18 @@ impl ParquetSource {
108
116
& mut self ,
109
117
part : PartInfoPtr ,
110
118
raw_chunks : Vec < IndexedChunk > ,
119
+ row_selection : Option < Bitmap > ,
111
120
) -> Result < ( ) > {
112
121
let rg_part = ParquetRowGroupPart :: from_part ( & part) ?;
113
122
// deserialize prewhere data block first
114
- let data_block = self
115
- . prewhere_reader
116
- . deserialize ( rg_part, raw_chunks, None ) ?;
123
+ let data_block = if let Some ( row_selection) = & row_selection {
124
+ self . prewhere_reader
125
+ . deserialize ( rg_part, raw_chunks, Some ( row_selection. clone ( ) ) ) ?
126
+ } else {
127
+ self . prewhere_reader
128
+ . deserialize ( rg_part, raw_chunks, None ) ?
129
+ } ;
130
+
117
131
if let Some ( filter) = self . prewhere_filter . as_ref ( ) {
118
132
// do filter
119
133
let func_ctx = self . ctx . try_get_function_context ( ) ?;
@@ -170,10 +184,14 @@ impl ParquetSource {
170
184
filtered_block. resort ( self . src_schema . as_ref ( ) , self . output_schema . as_ref ( ) ) ?;
171
185
self . state = Generated ( self . ctx . try_get_part ( ) , block) ;
172
186
} else {
173
- self . state = State :: ReadDataRemain ( part, PrewhereData {
174
- data_block : filtered_block,
175
- filter,
176
- } ) ;
187
+ self . state = State :: ReadDataRemain (
188
+ part,
189
+ PrewhereData {
190
+ data_block : filtered_block,
191
+ filter,
192
+ } ,
193
+ row_selection,
194
+ ) ;
177
195
}
178
196
Ok ( ( ) )
179
197
} else {
@@ -188,6 +206,7 @@ impl ParquetSource {
188
206
part : PartInfoPtr ,
189
207
raw_chunks : Vec < IndexedChunk > ,
190
208
prewhere_data : Option < PrewhereData > ,
209
+ row_selection : Option < Bitmap > ,
191
210
) -> Result < ( ) > {
192
211
let rg_part = ParquetRowGroupPart :: from_part ( & part) ?;
193
212
let output_block = if let Some ( PrewhereData {
@@ -207,8 +226,15 @@ impl ParquetSource {
207
226
}
208
227
Value :: Column ( bitmap) => {
209
228
if !self . read_options . push_down_bitmap ( ) || bitmap. unset_bits ( ) == 0 {
210
- // don't need filter
211
- let block = remain_reader. deserialize ( rg_part, raw_chunks, None ) ?;
229
+ let block = if let Some ( row_selection) = & row_selection {
230
+ remain_reader. deserialize (
231
+ rg_part,
232
+ raw_chunks,
233
+ Some ( row_selection. clone ( ) ) ,
234
+ ) ?
235
+ } else {
236
+ remain_reader. deserialize ( rg_part, raw_chunks, None ) ?
237
+ } ;
212
238
DataBlock :: filter_with_bitmap ( block, & bitmap) ?
213
239
} else {
214
240
remain_reader. deserialize ( rg_part, raw_chunks, Some ( bitmap) ) ?
@@ -297,9 +323,9 @@ impl Processor for ParquetSource {
297
323
match self . state {
298
324
State :: Finish => Ok ( Event :: Finished ) ,
299
325
State :: ReadDataPrewhere ( _)
300
- | State :: ReadDataRemain ( _, _)
301
- | State :: PrewhereFilter ( _, _)
302
- | State :: Deserialize ( _, _, _) => Ok ( Event :: Sync ) ,
326
+ | State :: ReadDataRemain ( _, _, _ )
327
+ | State :: PrewhereFilter ( _, _, _ )
328
+ | State :: Deserialize ( _, _, _, _ ) => Ok ( Event :: Sync ) ,
303
329
State :: Generated ( _, _) => Err ( ErrorCode :: Internal ( "It's a bug." ) ) ,
304
330
}
305
331
}
@@ -308,32 +334,59 @@ impl Processor for ParquetSource {
308
334
match std:: mem:: replace ( & mut self . state , State :: Finish ) {
309
335
State :: ReadDataPrewhere ( Some ( part) ) => {
310
336
let rg_part = ParquetRowGroupPart :: from_part ( & part) ?;
337
+ let row_selection = rg_part
338
+ . row_selection
339
+ . as_ref ( )
340
+ . map ( |sel| intervals_to_bitmap ( sel, rg_part. num_rows ) ) ;
311
341
let chunks = self . prewhere_reader . sync_read_columns ( rg_part) ?;
312
342
if self . prewhere_filter . is_some ( ) {
313
- self . state = State :: PrewhereFilter ( part, chunks) ;
343
+ self . state = State :: PrewhereFilter ( part, chunks, row_selection ) ;
314
344
} else {
315
345
// If there is no prewhere filter, it means there is only the prewhere reader.
316
346
assert ! ( self . remain_reader. is_none( ) ) ;
317
347
// So all the needed columns are read.
318
- self . state = State :: Deserialize ( part, chunks, None )
348
+ self . state = State :: Deserialize ( part, chunks, None , row_selection )
319
349
}
320
350
Ok ( ( ) )
321
351
}
322
- State :: ReadDataRemain ( part, prewhere_data) => {
352
+ State :: ReadDataRemain ( part, prewhere_data, row_selection ) => {
323
353
if let Some ( remain_reader) = self . remain_reader . as_ref ( ) {
324
354
let rg_part = ParquetRowGroupPart :: from_part ( & part) ?;
325
355
let chunks = remain_reader. sync_read_columns ( rg_part) ?;
326
- self . state = State :: Deserialize ( part, chunks, Some ( prewhere_data) ) ;
356
+ self . state =
357
+ State :: Deserialize ( part, chunks, Some ( prewhere_data) , row_selection) ;
327
358
Ok ( ( ) )
328
359
} else {
329
360
Err ( ErrorCode :: Internal ( "It's a bug. No remain reader" ) )
330
361
}
331
362
}
332
- State :: PrewhereFilter ( part, chunks) => self . do_prewhere_filter ( part, chunks) ,
333
- State :: Deserialize ( part, chunks, prewhere_data) => {
334
- self . do_deserialize ( part, chunks, prewhere_data)
363
+ State :: PrewhereFilter ( part, chunks, row_selection) => {
364
+ self . do_prewhere_filter ( part, chunks, row_selection)
365
+ }
366
+ State :: Deserialize ( part, chunks, prewhere_data, row_selection) => {
367
+ self . do_deserialize ( part, chunks, prewhere_data, row_selection)
335
368
}
336
369
_ => Err ( ErrorCode :: Internal ( "It's a bug." ) ) ,
337
370
}
338
371
}
339
372
}
373
+
374
+ /// Convert intervals to a bitmap. The `intervals` represents the row selection across `num_rows`.
375
+ fn intervals_to_bitmap ( interval : & [ Interval ] , num_rows : usize ) -> Bitmap {
376
+ debug_assert ! (
377
+ interval. is_empty( )
378
+ || interval. last( ) . unwrap( ) . start + interval. last( ) . unwrap( ) . length < num_rows
379
+ ) ;
380
+
381
+ let mut bitmap = MutableBitmap :: with_capacity ( num_rows) ;
382
+ let mut offset = 0 ;
383
+
384
+ for intv in interval {
385
+ bitmap. extend_constant ( intv. start - offset, false ) ;
386
+ bitmap. extend_constant ( intv. length , true ) ;
387
+ offset = intv. start + intv. length ;
388
+ }
389
+ bitmap. extend_constant ( num_rows - offset, false ) ;
390
+
391
+ bitmap. into ( )
392
+ }
0 commit comments