diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index b0563d0d693a..c896a7b817ed 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -44,6 +44,7 @@ use crate::file::metadata::{ PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader, ParquetStatisticsPolicy, RowGroupMetaData, }; +use crate::file::page_index::offset_index::PageLocation; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; @@ -1184,7 +1185,7 @@ impl ParquetRecordBatchReaderBuilder { .build_limited() .build(); - Ok(ParquetRecordBatchReader::new(array_reader, read_plan)) + Ok(ParquetRecordBatchReader::new(array_reader, read_plan, None)) } } @@ -1286,6 +1287,7 @@ pub struct ParquetRecordBatchReader { array_reader: Box, schema: SchemaRef, read_plan: ReadPlan, + page_offsets: Option>, } impl Debug for ParquetRecordBatchReader { @@ -1324,8 +1326,11 @@ impl ParquetRecordBatchReader { RowSelectionCursor::Mask(mask_cursor) => { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. + let page_locations = self.page_offsets.as_deref(); + while !mask_cursor.is_empty() { - let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else { + let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size, page_locations) + else { return Ok(None); }; @@ -1492,13 +1497,18 @@ impl ParquetRecordBatchReader { array_reader, schema: Arc::new(Schema::new(levels.fields.clone())), read_plan, + page_offsets: None, }) } /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` /// all rows will be returned - pub(crate) fn new(array_reader: Box, read_plan: ReadPlan) -> Self { + pub(crate) fn new( + array_reader: Box, + read_plan: ReadPlan, + page_offsets: Option>, + ) -> Self { let schema = match array_reader.get_data_type() { ArrowType::Struct(fields) => Schema::new(fields.clone()), _ => unreachable!("Struct array reader's data type is not struct!"), @@ -1508,6 +1518,7 @@ impl ParquetRecordBatchReader { array_reader, schema: Arc::new(schema), read_plan, + page_offsets, } } @@ -5542,14 +5553,18 @@ pub(crate) mod tests { } #[test] - fn test_row_filter_full_page_skip_is_handled() { + fn test_page_aware_mask_handles_page_skip() { + // Test that when using Auto policy with page-aware bitmask, the reader + // correctly handles pages that are skipped entirely due to row filtering. + // + // This creates a file with 12 rows across 6 pages (2 rows per page). + // After filtering, only the first and last rows remain, skipping 4 pages + // in the middle. The page-aware mask should handle this correctly by + // respecting page boundaries during chunk iteration. let first_value: i64 = 1111; let last_value: i64 = 9999; let num_rows: usize = 12; - // build data with row selection average length 4 - // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999) - // The Row Selection would be [1111, (skip 10), 9999] let schema = Arc::new(Schema::new(vec![ Field::new("key", arrow_schema::DataType::Int64, false), Field::new("value", arrow_schema::DataType::Int64, false), @@ -5595,8 +5610,8 @@ pub(crate) mod tests { let options = ArrowReaderOptions::new().with_page_index(true); let predicate = make_predicate(filter_mask.clone()); - // The batch size is set to 12 to read all rows in one go after filtering - // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded. + // Using Auto policy with page index enabled - page-aware mask should handle + // the skipped pages correctly without panicking. let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) .unwrap() .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) @@ -5605,12 +5620,235 @@ pub(crate) mod tests { .build() .unwrap(); - // Predicate pruning used to panic once mask-backed plans removed whole pages. - // Collecting into batches validates the plan now downgrades to selectors instead. let schema = reader.schema().clone(); let batches = reader.collect::, _>>().unwrap(); let result = concat_batches(&schema, &batches).unwrap(); assert_eq!(result.num_rows(), 2); + assert_eq!( + result.column(0).as_ref(), + &Int64Array::from(vec![first_value, last_value]) + ); + assert_eq!( + result.column(1).as_ref(), + &Int64Array::from(vec![first_value, last_value]) + ); + } + + /// Test that bitmask-based row selection correctly handles page boundaries. + /// This test creates a parquet file with multiple small pages and verifies that + /// when using Mask policy, pages that are skipped entirely are handled correctly. + #[test] + fn test_bitmask_page_aware_selection() { + let first_value: i64 = 1111; + let last_value: i64 = 9999; + let num_rows: usize = 20; + + // Create a file with 20 rows, ~2 rows per page = 10 pages + // Selection will be: first row, skip middle rows, last row + // This forces the reader to handle skipped pages correctly + let schema = Arc::new(Schema::new(vec![ + Field::new("key", arrow_schema::DataType::Int64, false), + Field::new("value", arrow_schema::DataType::Int64, false), + ])); + + let mut int_values: Vec = (0..num_rows as i64).collect(); + int_values[0] = first_value; + int_values[num_rows - 1] = last_value; + let keys = Int64Array::from(int_values.clone()); + let values = Int64Array::from(int_values.clone()); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], + ) + .unwrap(); + + // Configure small pages to create multiple page boundaries + let props = WriterProperties::builder() + .set_write_batch_size(2) + .set_data_page_row_count_limit(2) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap(); + let schema = builder.parquet_schema().clone(); + let filter_mask = ProjectionMask::leaves(&schema, [0]); + + let make_predicate = |mask: ProjectionMask| { + ArrowPredicateFn::new(mask, move |batch: RecordBatch| { + let column = batch.column(0); + let match_first = eq(column, &Int64Array::new_scalar(first_value))?; + let match_second = eq(column, &Int64Array::new_scalar(last_value))?; + or(&match_first, &match_second) + }) + }; + + let options = ArrowReaderOptions::new().with_page_index(true); + let predicate = make_predicate(filter_mask.clone()); + + // Use Mask policy explicitly to test page-aware behavior + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) + .unwrap() + .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = reader.schema().clone(); + let batches = reader.collect::, _>>().unwrap(); + let result = concat_batches(&schema, &batches).unwrap(); + + // Should have exactly 2 rows: first and last + assert_eq!(result.num_rows(), 2); + + // Verify the values + let key_col = result.column(0).as_primitive::(); + assert_eq!(key_col.value(0), first_value); + assert_eq!(key_col.value(1), last_value); + } + + /// Test that page-aware bitmask handles edge cases at exact page boundaries. + /// When mask_start aligns exactly with a page boundary, verify correct behavior. + #[test] + fn test_bitmask_at_page_boundary() { + let num_rows: usize = 10; + let rows_per_page: usize = 2; + + // Create a file with 10 rows, 2 rows per page = 5 pages + let schema = Arc::new(Schema::new(vec![Field::new( + "id", + arrow_schema::DataType::Int64, + false, + )])); + + let ids: Vec = (0..num_rows as i64).collect(); + let id_array = Int64Array::from(ids); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(rows_per_page) + .set_data_page_row_count_limit(rows_per_page) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + // Create a selection that starts exactly at page boundaries + // Select rows 0-1 (page 0), skip 2-5 (pages 1-2), select 6-9 (pages 3-4) + let selection = RowSelection::from(vec![ + RowSelector::select(2), // Page 0: rows 0-1 + RowSelector::skip(4), // Pages 1-2: rows 2-5 + RowSelector::select(4), // Pages 3-4: rows 6-9 + ]); + + let options = ArrowReaderOptions::new().with_page_index(true); + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) + .unwrap() + .with_row_selection(selection.clone()) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = reader.schema().clone(); + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should have 2 + 4 = 6 rows + assert_eq!(total_rows, 6); + + // Verify values from the concatenated result + let result = concat_batches(&schema, &batches).unwrap(); + let id_col = result.column(0).as_primitive::(); + + // Expected values: 0, 1, 6, 7, 8, 9 + assert_eq!(id_col.value(0), 0); + assert_eq!(id_col.value(1), 1); + assert_eq!(id_col.value(2), 6); + assert_eq!(id_col.value(3), 7); + assert_eq!(id_col.value(4), 8); + assert_eq!(id_col.value(5), 9); + } + + /// Test that page-aware bitmask handles mid-page selections correctly. + /// The selection starts in the middle of a page and ends in the middle of another. + #[test] + fn test_bitmask_mid_page_selection() { + let num_rows: usize = 12; + let rows_per_page: usize = 3; + + // Create a file with 12 rows, 3 rows per page = 4 pages + // Page 0: rows 0-2 + // Page 1: rows 3-5 + // Page 2: rows 6-8 + // Page 3: rows 9-11 + let schema = Arc::new(Schema::new(vec![Field::new( + "id", + arrow_schema::DataType::Int64, + false, + )])); + + let ids: Vec = (0..num_rows as i64).collect(); + let id_array = Int64Array::from(ids); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(rows_per_page) + .set_data_page_row_count_limit(rows_per_page) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + // Select mid-page: skip first, select row 1-2, skip row 3-7 (cross pages), select row 8-10 + let selection = RowSelection::from(vec![ + RowSelector::skip(1), // Skip row 0 + RowSelector::select(2), // Select rows 1-2 (partial page 0) + RowSelector::skip(5), // Skip rows 3-7 (pages 1, part of 2) + RowSelector::select(3), // Select rows 8-10 (part of pages 2-3) + RowSelector::skip(1), // Skip row 11 + ]); + + let options = ArrowReaderOptions::new().with_page_index(true); + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options) + .unwrap() + .with_row_selection(selection.clone()) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = reader.schema().clone(); + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should have 2 + 3 = 5 rows + assert_eq!(total_rows, 5); + + let result = concat_batches(&schema, &batches).unwrap(); + let id_col = result.column(0).as_primitive::(); + + // Expected values: 1, 2, 8, 9, 10 + assert_eq!(id_col.value(0), 1); + assert_eq!(id_col.value(1), 2); + assert_eq!(id_col.value(2), 8); + assert_eq!(id_col.value(3), 9); + assert_eq!(id_col.value(4), 10); } #[test] diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 7c9eb36befe3..c8171f61b1a0 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -148,7 +148,7 @@ impl ReadPlanBuilder { array_reader: Box, predicate: &mut dyn ArrowPredicate, ) -> Result { - let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()); + let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build(), None); let mut filters = vec![]; for maybe_batch in reader { let maybe_batch = maybe_batch?; diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 2ddf812f9c39..ecba96362646 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -271,8 +271,8 @@ impl RowSelection { }) } - /// Returns true if selectors should be forced, preventing mask materialisation - pub(crate) fn should_force_selectors( + /// Returns true if bitmasks should be page aware + pub(crate) fn requires_page_aware_mask( &self, projection: &ProjectionMask, offset_index: Option<&[OffsetIndexMetaData]>, @@ -778,8 +778,13 @@ impl MaskCursor { self.position >= self.mask.len() } - /// Advance through the mask representation, producing the next chunk summary - pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option { + /// Advance through the mask representation, producing the next chunk summary. + /// Optionally clips chunk boundaries to page boundaries. + pub fn next_mask_chunk( + &mut self, + batch_size: usize, + page_locations: Option<&[PageLocation]>, + ) -> Option { let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { let mask = &self.mask; @@ -791,6 +796,7 @@ impl MaskCursor { let mut cursor = start_position; let mut initial_skip = 0; + // Skip unselected rows while cursor < mask.len() && !mask.value(cursor) { initial_skip += 1; cursor += 1; @@ -800,10 +806,21 @@ impl MaskCursor { let mut chunk_rows = 0; let mut selected_rows = 0; - // Advance until enough rows have been selected to satisfy the batch size, - // or until the mask is exhausted. This mirrors the behaviour of the legacy - // `RowSelector` queue-based iteration. - while cursor < mask.len() && selected_rows < batch_size { + let max_chunk_rows = page_locations + .and_then(|pages| { + let next_idx = + pages.partition_point(|loc| loc.first_row_index as usize <= mask_start); + pages.get(next_idx).and_then(|loc| { + let page_start = loc.first_row_index as usize; + (page_start > mask_start).then_some(page_start - mask_start) + }) + }) + .unwrap_or(usize::MAX); + + // Advance until enough rows have been selected to satisfy batch_size, + // or until the mask is exhausted or until a page boundary. + while cursor < mask.len() && selected_rows < batch_size && chunk_rows < max_chunk_rows { + // Increment counters chunk_rows += 1; if mask.value(cursor) { selected_rows += 1; @@ -1090,6 +1107,49 @@ mod tests { ); } + #[test] + fn test_mask_cursor_page_aware_chunking() { + let selectors = vec![RowSelector::skip(2), RowSelector::select(10)]; + let mask = boolean_mask_from_selectors(&selectors); + let mut cursor = MaskCursor { mask, position: 0 }; + + let pages = vec![ + PageLocation { + offset: 0, + compressed_page_size: 1, + first_row_index: 0, + }, + PageLocation { + offset: 1, + compressed_page_size: 1, + first_row_index: 4, + }, + PageLocation { + offset: 2, + compressed_page_size: 1, + first_row_index: 8, + }, + PageLocation { + offset: 3, + compressed_page_size: 1, + first_row_index: 12, + }, + ]; + // First chunk is page 1 + let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap(); + assert_eq!(chunk.initial_skip, 2); + assert_eq!(chunk.mask_start, 2); + assert_eq!(chunk.chunk_rows, 2); + assert_eq!(chunk.selected_rows, 2); + + // Second chunk is page 2 + let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap(); + assert_eq!(chunk.initial_skip, 0); + assert_eq!(chunk.mask_start, 4); + assert_eq!(chunk.chunk_rows, 4); + assert_eq!(chunk.selected_rows, 4); + } + #[test] fn test_and() { let mut a = RowSelection::from(vec![ diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 60f2ca1615a3..3cea952d3e58 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -782,7 +782,7 @@ mod tests { use arrow::error::Result as ArrowResult; use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder}; use arrow_array::cast::AsArray; - use arrow_array::types::Int32Type; + use arrow_array::types::{Int32Type, Int64Type}; use arrow_array::{ Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader, Scalar, StringArray, StructArray, UInt64Array, @@ -1214,15 +1214,19 @@ mod tests { assert_eq!(actual_rows, expected_rows); } + /// Test that when using Auto policy with page-aware bitmask, the reader + /// correctly handles pages that are skipped entirely due to row filtering. + /// + /// This creates a file with 12 rows across 6 pages (2 rows per page). + /// After filtering, only the first and last rows remain, skipping 4 pages + /// in the middle. The page-aware mask should handle this correctly by + /// respecting page boundaries during chunk iteration. #[tokio::test] - async fn test_row_filter_full_page_skip_is_handled_async() { + async fn test_page_aware_mask_handles_page_skip_async() { let first_value: i64 = 1111; let last_value: i64 = 9999; let num_rows: usize = 12; - // build data with row selection average length 4 - // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999) - // The Row Selection would be [1111, (skip 10), 9999] let schema = Arc::new(Schema::new(vec![ Field::new("key", DataType::Int64, false), Field::new("value", DataType::Int64, false), @@ -1270,8 +1274,8 @@ mod tests { let predicate = make_predicate(filter_mask.clone()); - // The batch size is set to 12 to read all rows in one go after filtering - // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded. + // Using Auto policy with page index enabled - page-aware mask should handle + // the skipped pages correctly without panicking. let stream = ParquetRecordBatchStreamBuilder::new_with_options( TestReader::new(data.clone()), ArrowReaderOptions::new().with_page_index(true), @@ -1288,6 +1292,14 @@ mod tests { let batches: Vec<_> = stream.try_collect().await.unwrap(); let result = concat_batches(&schema, &batches).unwrap(); assert_eq!(result.num_rows(), 2); + assert_eq!( + result.column(0).as_ref(), + &Int64Array::from(vec![first_value, last_value]) + ); + assert_eq!( + result.column(1).as_ref(), + &Int64Array::from(vec![first_value, last_value]) + ); } #[tokio::test] @@ -2308,4 +2320,225 @@ mod tests { Ok(()) } + + /// Test that bitmask-based row selection correctly handles page boundaries. + /// This test creates a parquet file with multiple small pages and verifies that + /// when using Mask policy, pages that are skipped entirely are handled correctly. + #[tokio::test] + async fn test_bitmask_page_aware_selection_async() { + let first_value: i64 = 1111; + let last_value: i64 = 9999; + let num_rows: usize = 20; + + // Create a file with 20 rows, ~2 rows per page = 10 pages + // Selection will be: first row, skip middle rows, last row + // This forces the reader to handle skipped pages correctly + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int64, false), + Field::new("value", DataType::Int64, false), + ])); + + let mut int_values: Vec = (0..num_rows as i64).collect(); + int_values[0] = first_value; + int_values[num_rows - 1] = last_value; + let keys = Int64Array::from(int_values.clone()); + let values = Int64Array::from(int_values.clone()); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], + ) + .unwrap(); + + // Configure small pages to create multiple page boundaries + let props = WriterProperties::builder() + .set_write_batch_size(2) + .set_data_page_row_count_limit(2) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let builder = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap(); + let schema = builder.parquet_schema().clone(); + let filter_mask = ProjectionMask::leaves(&schema, [0]); + + let make_predicate = |mask: ProjectionMask| { + ArrowPredicateFn::new(mask, move |batch: RecordBatch| { + let column = batch.column(0); + let match_first = eq(column, &Int64Array::new_scalar(first_value))?; + let match_second = eq(column, &Int64Array::new_scalar(last_value))?; + or(&match_first, &match_second) + }) + }; + + let predicate = make_predicate(filter_mask.clone()); + + // Use Mask policy explicitly to test page-aware behavior + let stream = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap() + .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = stream.schema().clone(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + let result = concat_batches(&schema, &batches).unwrap(); + + // Should have exactly 2 rows: first and last + assert_eq!(result.num_rows(), 2); + + // Verify the values + let key_col = result.column(0).as_primitive::(); + assert_eq!(key_col.value(0), first_value); + assert_eq!(key_col.value(1), last_value); + } + + /// Test that page-aware bitmask handles edge cases at exact page boundaries. + /// When mask_start aligns exactly with a page boundary, verify correct behavior. + #[tokio::test] + async fn test_bitmask_at_page_boundary_async() { + let num_rows: usize = 10; + let rows_per_page: usize = 2; + + // Create a file with 10 rows, 2 rows per page = 5 pages + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + + let ids: Vec = (0..num_rows as i64).collect(); + let id_array = Int64Array::from(ids); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(rows_per_page) + .set_data_page_row_count_limit(rows_per_page) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + // Create a selection that starts exactly at page boundaries + // Select rows 0-1 (page 0), skip 2-5 (pages 1-2), select 6-9 (pages 3-4) + let selection = RowSelection::from(vec![ + RowSelector::select(2), // Page 0: rows 0-1 + RowSelector::skip(4), // Pages 1-2: rows 2-5 + RowSelector::select(4), // Pages 3-4: rows 6-9 + ]); + + let stream = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap() + .with_row_selection(selection.clone()) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = stream.schema().clone(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should have 2 + 4 = 6 rows + assert_eq!(total_rows, 6); + + // Verify values from the concatenated result + let result = concat_batches(&schema, &batches).unwrap(); + let id_col = result.column(0).as_primitive::(); + + // Expected values: 0, 1, 6, 7, 8, 9 + assert_eq!(id_col.value(0), 0); + assert_eq!(id_col.value(1), 1); + assert_eq!(id_col.value(2), 6); + assert_eq!(id_col.value(3), 7); + assert_eq!(id_col.value(4), 8); + assert_eq!(id_col.value(5), 9); + } + + /// Test that page-aware bitmask handles mid-page selections correctly. + /// The selection starts in the middle of a page and ends in the middle of another. + #[tokio::test] + async fn test_bitmask_mid_page_selection_async() { + let num_rows: usize = 12; + let rows_per_page: usize = 3; + + // Create a file with 12 rows, 3 rows per page = 4 pages + // Page 0: rows 0-2 + // Page 1: rows 3-5 + // Page 2: rows 6-8 + // Page 3: rows 9-11 + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + + let ids: Vec = (0..num_rows as i64).collect(); + let id_array = Int64Array::from(ids); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id_array) as ArrayRef]) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(rows_per_page) + .set_data_page_row_count_limit(rows_per_page) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + // Select mid-page: skip first, select row 1-2, skip row 3-7 (cross pages), select row 8-10 + let selection = RowSelection::from(vec![ + RowSelector::skip(1), // Skip row 0 + RowSelector::select(2), // Select rows 1-2 (partial page 0) + RowSelector::skip(5), // Skip rows 3-7 (pages 1, part of 2) + RowSelector::select(3), // Select rows 8-10 (part of pages 2-3) + RowSelector::skip(1), // Skip row 11 + ]); + + let stream = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap() + .with_row_selection(selection.clone()) + .with_batch_size(num_rows) + .with_row_selection_policy(RowSelectionPolicy::Mask) + .build() + .unwrap(); + + let schema = stream.schema().clone(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should have 2 + 3 = 5 rows + assert_eq!(total_rows, 5); + + let result = concat_batches(&schema, &batches).unwrap(); + let id_col = result.column(0).as_primitive::(); + + // Expected values: 1, 2, 8, 9, 10 + assert_eq!(id_col.value(0), 1); + assert_eq!(id_col.value(1), 2); + assert_eq!(id_col.value(2), 8); + assert_eq!(id_col.value(3), 9); + assert_eq!(id_col.value(4), 10); + } } diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 61a244589c6d..bf728139579e 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -22,9 +22,9 @@ use crate::DecodeResult; use crate::arrow::ProjectionMask; use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; -use crate::arrow::arrow_reader::selection::RowSelectionStrategy; use crate::arrow::arrow_reader::{ ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy, + selection::RowSelectionStrategy, }; use crate::arrow::in_memory_row_group::ColumnChunkData; use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder; @@ -536,12 +536,6 @@ impl RowGroupReaderBuilder { plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy); - plan_builder = override_selector_strategy_if_needed( - plan_builder, - &self.projection, - self.row_group_offset_index(row_group_idx), - ); - let row_group_info = RowGroupInfo { row_group_idx, row_count, @@ -588,6 +582,23 @@ impl RowGroupReaderBuilder { &mut self.buffers, )?; + // before plan is build below + // check if plan is bitmask and if it is, put it in a variable + let page_offsets = if plan_builder.resolve_selection_strategy() + == RowSelectionStrategy::Mask + && plan_builder.selection().is_some_and(|selection| { + selection.requires_page_aware_mask( + &self.projection, + self.row_group_offset_index(row_group_idx), + ) + }) { + self.row_group_offset_index(row_group_idx) + .and_then(|columns| columns.first()) + .map(|column| column.page_locations()) + } else { + None + }; + let plan = plan_builder.build(); // if we have any cached results, connect them up @@ -603,7 +614,8 @@ impl RowGroupReaderBuilder { .build_array_reader(self.fields.as_deref(), &self.projection) }?; - let reader = ParquetRecordBatchReader::new(array_reader, plan); + let reader = + ParquetRecordBatchReader::new(array_reader, plan, page_offsets.cloned()); NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader)) } RowGroupDecoderState::Finished => { @@ -654,57 +666,6 @@ impl RowGroupReaderBuilder { } } -/// Override the selection strategy if needed. -/// -/// Some pages can be skipped during row-group construction if they are not read -/// by the selections. This means that the data pages for those rows are never -/// loaded and definition/repetition levels are never read. When using -/// `RowSelections` selection works because `skip_records()` handles this -/// case and skips the page accordingly. -/// -/// However, with the current mask design, all values must be read and decoded -/// and then a mask filter is applied. Thus if any pages are skipped during -/// row-group construction, the data pages are missing and cannot be decoded. -/// -/// A simple example: -/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1) -/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01) -/// -/// Using the row selection to skip(4), page2 won't be read at all, so in this -/// case we can't decode all the rows and apply a mask. To correctly apply the -/// bit mask, we need all 6 values be read, but page2 is not in memory. -fn override_selector_strategy_if_needed( - plan_builder: ReadPlanBuilder, - projection_mask: &ProjectionMask, - offset_index: Option<&[OffsetIndexMetaData]>, -) -> ReadPlanBuilder { - // override only applies to Auto policy, If the policy is already Mask or Selectors, respect that - let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else { - return plan_builder; - }; - - let preferred_strategy = plan_builder.resolve_selection_strategy(); - - let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask) - && plan_builder.selection().is_some_and(|selection| { - selection.should_force_selectors(projection_mask, offset_index) - }); - - let resolved_strategy = if force_selectors { - RowSelectionStrategy::Selectors - } else { - preferred_strategy - }; - - // override the plan builder strategy with the resolved one - let new_policy = match resolved_strategy { - RowSelectionStrategy::Mask => RowSelectionPolicy::Mask, - RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors, - }; - - plan_builder.with_row_selection_policy(new_policy) -} - #[cfg(test)] mod tests { use super::*;