diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index a8688e8af83c..425865bb0b07 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -809,53 +809,43 @@ impl ParquetRecordBatchReader { /// simplify error handling with `?` fn next_inner(&mut self) -> Result> { let mut read_records = 0; - let batch_size = self.batch_size(); - match self.read_plan.selection_mut() { - Some(selection) => { - while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); + + match &mut self.read_plan { + ReadPlan::All { batch_size } => { + self.array_reader.read_records(*batch_size)?; + } + ReadPlan::Subset { iterator } => { + let batch_size = iterator.batch_size(); + + while read_records < batch_size { + let Some(front) = self.read_plan.next() else { + break; + }; + if front.skip { let skipped = self.array_reader.skip_records(front.row_count)?; if skipped != front.row_count { return Err(general_err!( - "failed to skip rows, expected {}, got {}", + "Internal Error: failed to skip rows, expected {}, got {}", front.row_count, skipped )); } - continue; - } - - //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader. - //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669 - if front.row_count == 0 { - continue; - } - - // try to read record - let need_read = batch_size - read_records; - let to_read = match front.row_count.checked_sub(need_read) { - Some(remaining) if remaining != 0 => { - // if page row count less than batch_size we must set batch size to page row count. - // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); - need_read + } else { + let read = self.array_reader.read_records(front.row_count)?; + if read == 0 { + break; } - _ => front.row_count, - }; - match self.array_reader.read_records(to_read)? { - 0 => break, - rec => read_records += rec, + + read_records += read }; } } - None => { - self.array_reader.read_records(batch_size)?; - } - }; + } let array = self.array_reader.consume_batch()?; + let struct_array = array.as_struct_opt().ok_or_else(|| { ArrowError::ParquetError("Struct array reader should return struct array".to_string()) })?; @@ -928,9 +918,13 @@ impl ParquetRecordBatchReader { } } + #[cfg(test)] #[inline(always)] pub(crate) fn batch_size(&self) -> usize { - self.read_plan.batch_size() + match &self.read_plan { + ReadPlan::All { batch_size } => *batch_size, + ReadPlan::Subset { iterator } => iterator.batch_size(), + } } } @@ -3910,7 +3904,7 @@ mod tests { .build() .unwrap(); assert_ne!(1024, num_rows); - assert_eq!(reader.read_plan.batch_size(), num_rows as usize); + assert_eq!(reader.batch_size(), num_rows as usize); } #[test] diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index cf5d83385038..1014dfe0b330 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -55,10 +55,8 @@ impl ReadPlanBuilder { self.selection.as_ref() } - /// Specifies the number of rows in the row group, before filtering is applied. - /// - /// Returns a [`LimitedReadPlanBuilder`] that can apply - /// offset and limit. + /// Returns a [`LimitedReadPlanBuilder`] to apply offset and limit to the in + /// progress plan. /// /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this /// selection. @@ -131,13 +129,101 @@ impl ReadPlanBuilder { selection, } = self; - let selection = selection.map(|s| s.trim().into()); + // If the batch size is 0, read "all rows" + if batch_size == 0 { + return ReadPlan::All { batch_size: 0 }; + } + + // If no selection is provided, read all rows + let Some(selection) = selection else { + return ReadPlan::All { batch_size }; + }; + + let iterator = SelectionIterator::new(batch_size, selection.into()); + ReadPlan::Subset { iterator } + } +} + +/// Incrementally returns [`RowSelector`]s that describe reading from a Parquet file. +/// +/// The returned stream of [`RowSelector`]s is guaranteed to have: +/// 1. No empty selections (that select no rows) +/// 2. No selections that span batch_size boundaries +/// 3. No trailing skip selections +/// +/// For example, if the `batch_size` is 100 and we are selecting all 200 rows +/// from a Parquet file, the selectors will be: +/// - `RowSelector::select(100) <-- forced break at batch_size boundary` +/// - `RowSelector::select(100)` +#[derive(Debug, Clone)] +pub(crate) struct SelectionIterator { + /// how many rows to read in each batch + batch_size: usize, + /// how many records have been read by RowSelection in the "current" batch + read_records: usize, + /// Input selectors to read from + input_selectors: VecDeque, +} + +impl Iterator for SelectionIterator { + type Item = RowSelector; + + fn next(&mut self) -> Option { + while let Some(mut front) = self.input_selectors.pop_front() { + // RowSelectors with row_count = 0 terminate the read, so skip such + // entries. See https://github.com/apache/arrow-rs/issues/2669 + if front.row_count == 0 { + continue; + } + + if front.skip { + return Some(front); + } + + let need_read = self.batch_size - self.read_records; + + // if there are more rows in the current RowSelector than needed to + // finish the batch, split it up + if front.row_count > need_read { + // Part 1: return remaining rows to the front of the queue + let remaining = front.row_count - need_read; + self.input_selectors + .push_front(RowSelector::select(remaining)); + // Part 2: adjust the current selector to read the rows we need + front.row_count = need_read; + } + + self.read_records += front.row_count; + // if read enough records to complete a batch, emit + if self.read_records == self.batch_size { + self.read_records = 0; + } + + return Some(front); + } + // no more selectors to read, end of stream + None + } +} + +impl SelectionIterator { + fn new(batch_size: usize, mut input_selectors: VecDeque) -> Self { + // trim any trailing empty selectors + while input_selectors.back().map(|x| x.skip).unwrap_or(false) { + input_selectors.pop_back(); + } - ReadPlan { + Self { batch_size, - selection, + read_records: 0, + input_selectors, } } + + /// Return the number of rows to read in each output batch + pub(crate) fn batch_size(&self) -> usize { + self.batch_size + } } /// Builder for [`ReadPlan`] that applies a limit and offset to the read plan @@ -225,25 +311,400 @@ impl LimitedReadPlanBuilder { } } -/// A plan reading specific rows from a Parquet Row Group. +/// A plan for reading specific rows from a Parquet Row Group. /// /// See [`ReadPlanBuilder`] to create `ReadPlan`s -pub(crate) struct ReadPlan { - /// The number of rows to read in each batch - batch_size: usize, - /// Row ranges to be selected from the data source - selection: Option>, +/// +/// Also, note the `ReadPlan` is an iterator over [`RowSelector`]s. +#[derive(Debug)] +pub(crate) enum ReadPlan { + /// Read all rows in `batch_sized` chunks + All { + /// The number of rows to read in each batch + batch_size: usize, + }, + /// Read only a specific subset of rows + Subset { iterator: SelectionIterator }, } -impl ReadPlan { - /// Returns a mutable reference to the selection, if any - pub(crate) fn selection_mut(&mut self) -> Option<&mut VecDeque> { - self.selection.as_mut() +impl Iterator for ReadPlan { + type Item = RowSelector; + + fn next(&mut self) -> Option { + match self { + // If we are reading all rows, return a selector that selects + // the next batch_size rows + Self::All { batch_size } => Some(RowSelector::select(*batch_size)), + Self::Subset { iterator } => iterator.next(), + } } +} - /// Return the number of rows to read in each output batch - #[inline(always)] - pub fn batch_size(&self) -> usize { - self.batch_size +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_read_plan_select_all() { + TestCase::new() + .with_batch_size(100) + .with_empty_initial_selection() + .with_empty_expected_selection() + .run() + } + + #[test] + fn test_read_plan_empty_batch_size() { + TestCase::new() + .with_batch_size(0) + .with_row_count(0) + .with_empty_initial_selection() + .with_empty_expected_selection() + .run() + } + + #[test] + fn test_read_plan_select_only_empty() { + TestCase::new() + .with_batch_size(100) + .with_initial_selection(Some([RowSelector::skip(0)])) + .with_expected_selection(Some([])) + .run() + } + + #[test] + fn test_read_plan_select_subset() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + // filter out 50 rows in the middle + .with_initial_selection(Some([ + RowSelector::select(150), + RowSelector::skip(50), + RowSelector::select(100), + ])) + .with_expected_selection(Some([ + // broken up into batch_size chunks + RowSelector::select(100), + // second batch + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(50), + // third batch has 50 as we filtered out 50 rows + RowSelector::select(50), + ])) + .run() + } + + #[test] + fn test_read_plan_select_batch_boundaries() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + // select all but 50 rows in the middle using 50 row batches + .with_initial_selection(Some([ + RowSelector::select(50), + RowSelector::select(25), + RowSelector::select(25), + RowSelector::select(50), + RowSelector::skip(10), + RowSelector::skip(30), + RowSelector::skip(10), + RowSelector::select(50), + RowSelector::select(50), + ])) + .with_expected_selection(Some([ + // broken up into batch_size chunks, combined + RowSelector::select(100), + // second batch + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(50), + // third batch + RowSelector::select(50), + ])) + .run() + } + + #[test] + fn test_read_plan_filters_zero_row_selects() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(0), + RowSelector::select(125), + RowSelector::select(0), + RowSelector::skip(0), + RowSelector::skip(50), + RowSelector::select(25), + ])) + // empty selectors have been filtered out + .with_expected_selection(Some([ + RowSelector::select(100), + RowSelector::select(25), + RowSelector::skip(50), + RowSelector::select(25), + ])) + .run() + } + + #[test] + fn test_read_plan_filters_zero_row_end_skips() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(125), + RowSelector::skip(0), + RowSelector::skip(0), + ])) + .with_expected_selection(Some([RowSelector::select(100), RowSelector::select(25)])) + .run() + } + + #[test] + fn test_read_plan_with_limit_no_skip() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(200), // limit in middle of this select + RowSelector::skip(50), + RowSelector::select(50), + ])) + .with_limit(100) + .with_expected_selection(Some([RowSelector::select(100)])) + .run() + } + + #[test] + fn test_read_plan_with_limit_after_skip() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(150), + RowSelector::skip(50), // limit is hit after this skip + RowSelector::select(10), + ])) + .with_limit(200) + .with_expected_selection(Some([ + RowSelector::select(100), + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(10), + ])) + .run() + } + + #[test] + fn test_read_plan_with_limit_after_skip_remain() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(150), + RowSelector::skip(50), + RowSelector::select(100), // limit includes part but not all of this + ])) + .with_limit(175) + .with_expected_selection(Some([ + RowSelector::select(100), + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(25), + ])) + .run() + } + + #[test] + fn test_read_plan_with_offset() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(100), + ])) + .with_offset(25) // skip 25 rows + .with_expected_selection(Some([ + RowSelector::skip(25), // offset + RowSelector::select(25), + RowSelector::skip(50), + RowSelector::select(75), + // start second batch + RowSelector::select(25), + ])) + .run() + } + + #[test] + fn test_read_plan_with_limit_and_offset() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(100), + ])) + .with_offset(25) // skip 25 rows + .with_limit(110) + .with_expected_selection(Some([ + RowSelector::skip(25), // offset + RowSelector::select(25), + RowSelector::skip(50), + RowSelector::select(75), + // start second batch + RowSelector::select(10), // limited to 110 + ])) + .run() + } + + // test filtering + + /// Test harness for `ReadPlanBuilder` + #[derive(Debug, Default)] + struct TestCase { + batch_size: usize, + row_count: usize, + /// Optional limit to apply to plan + limit: Option, + /// Optional offset to apply to plan + offset: Option, + initial_selection: Option>, + /// if Some, expect ReadPlan::Subset + /// if None, expect ReadPlan::All + expected_selection: Option>, + } + + impl TestCase { + /// Create a new test case + fn new() -> Self { + Default::default() + } + + /// Set the batch size + fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + /// Set the row count + fn with_row_count(mut self, row_count: usize) -> Self { + self.row_count = row_count; + self + } + + /// Specify a limit to apply to the read plan + fn with_limit(mut self, limit: usize) -> Self { + self.limit = Some(limit); + self + } + + /// Specify an offset to apply to the read plan + fn with_offset(mut self, offset: usize) -> Self { + self.offset = Some(offset); + self + } + + /// Set the initial selection to the given set of selectors + fn with_initial_selection>( + mut self, + initial: Option, + ) -> Self { + self.initial_selection = initial.map(|initial| initial.into_iter().collect()); + self + } + /// Set the initial selection to None (used to make the tests self documenting) + fn with_empty_initial_selection(mut self) -> Self { + self.initial_selection = None; + self + } + + /// Set the expected plan to be RowPlan::Subset given set of selectors + fn with_expected_selection>( + mut self, + expected: Option, + ) -> Self { + self.expected_selection = expected.map(|expected| expected.into_iter().collect()); + self + } + /// Set the expected selection to None (used to make the tests self documenting) + fn with_empty_expected_selection(mut self) -> Self { + self.expected_selection = None; + self + } + + fn run(self) { + let Self { + batch_size, + row_count, + limit, + offset, + initial_selection, + expected_selection, + } = self; + + let initial_selection = initial_selection.map(RowSelection::from); + let plan = ReadPlanBuilder::new(batch_size) + .with_selection(initial_selection) + .limited(row_count) + .with_limit(limit) + .with_offset(offset) + .build_limited() + .build(); + + match expected_selection { + None => { + let expected_batch_size = batch_size; + assert!( + matches!(plan, ReadPlan::All { batch_size } if batch_size == expected_batch_size), + "Expected ReadPlan::All {{ batch_size={batch_size} }}, got {plan:#?}" + ); + } + Some(expected) => { + // Gather the generated selectors to compare with the expected + let actual: Vec = plan.collect(); + Self::validate_selection(&actual, batch_size); + // use debug formatting with newlines to generate easier to grok diffs + // if the test fails + assert_eq!(format!("{actual:#?}"), format!("{expected:#?}")); + // also use assert_eq! to ensure equality + assert_eq!(actual, expected); + } + }; + } + + /// Validate that the output selections obey the rules + fn validate_selection(selectors: &[RowSelector], batch_size: usize) { + // 1. no empty selections + for selector in selectors.iter() { + assert!(selector.row_count > 0, "{selector:?} empty selection"); + } + + // 2. no selections that span batch_size boundaries + let mut current_count = 0; + for selector in selectors.iter() { + if selector.skip { + continue; + } + current_count += selector.row_count; + assert!( + current_count <= batch_size, + "current_count {current_count} > batch_size {batch_size}. Plan:\n{selectors:#?}" + ); + if current_count == batch_size { + current_count = 0; + } + } + + // 3. no trailing skip selections + if let Some(last) = selectors.last() { + assert!(!last.skip, "last selector {last:?} is a skip selector"); + } + } } } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index c53d47be2e56..a4fa4473f3e9 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -21,6 +21,9 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; +/// Represents reading or skipping some number of contiguous +/// rows when decoding a parquet file +/// /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when /// scanning a parquet file #[derive(Debug, Clone, Copy, Eq, PartialEq)] @@ -358,14 +361,6 @@ impl RowSelection { self.selectors.iter().any(|x| !x.skip) } - /// Trims this [`RowSelection`] removing any trailing skips - pub(crate) fn trim(mut self) -> Self { - while self.selectors.last().map(|x| x.skip).unwrap_or(false) { - self.selectors.pop(); - } - self - } - /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows pub(crate) fn offset(mut self, offset: usize) -> Self { if offset == 0 {