diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index aa0071ca38e5..79569691b214 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -260,5 +260,10 @@ name = "row_selector" harness = false required-features = ["arrow"] +[[bench]] +name = "row_selection_state" +harness = false +required-features = ["arrow"] + [lib] bench = false diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs new file mode 100644 index 000000000000..16a93e6272c0 --- /dev/null +++ b/parquet/benches/row_selection_state.rs @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint; +use std::sync::Arc; + +use arrow_array::{ArrayRef, Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::Bytes; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::{ + ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionStrategy, RowSelector, +}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +const TOTAL_ROWS: usize = 1 << 20; +const BATCH_SIZE: usize = 1 << 10; +const BASE_SEED: u64 = 0xA55AA55A; +const READ_STRATEGIES: &[(&str, RowSelectionStrategy)] = &[ + ("read_mask", RowSelectionStrategy::Mask), + ("read_selectors", RowSelectionStrategy::Selectors), +]; + +fn criterion_benchmark(c: &mut Criterion) { + let avg_selector_lengths: &[usize] = &[16, 20, 24, 28, 32, 36, 40]; + let parquet_data = build_parquet_data(TOTAL_ROWS); + + let scenarios = [ + Scenario { + name: "uniform50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Constant, + }, + Scenario { + name: "spread50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Uniform { spread: 0.9 }, + }, + Scenario { + name: "sparse20", + select_ratio: 0.2, + start_with_select: false, + distribution: RunDistribution::Bimodal { + long_factor: 6.0, + long_prob: 0.1, + }, + }, + Scenario { + name: "dense80", + select_ratio: 0.8, + start_with_select: true, + distribution: RunDistribution::Bimodal { + long_factor: 4.0, + long_prob: 0.05, + }, + }, + ]; + + for scenario in scenarios.iter() { + for (offset, &avg_len) in avg_selector_lengths.iter().enumerate() { + let selectors = + generate_selectors(avg_len, TOTAL_ROWS, scenario, BASE_SEED + offset as u64); + let stats = SelectorStats::new(&selectors); + let suffix = format!( + "{}-avg{:.1}-sel{:02}", + scenario.name, + stats.average_selector_len, + (stats.select_ratio * 100.0).round() as u32 + ); + + let bench_input = BenchInput { + parquet_data: parquet_data.clone(), + selection: RowSelection::from(selectors.clone()), + }; + + for (label, strategy) in READ_STRATEGIES.iter().copied() { + c.bench_with_input( + BenchmarkId::new(label, &suffix), + &bench_input, + |b, input| { + b.iter(|| { + let total = run_read(&input.parquet_data, &input.selection, strategy); + hint::black_box(total); + }); + }, + ); + } + } + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); + +struct BenchInput { + parquet_data: Bytes, + selection: RowSelection, +} + +fn run_read( + parquet_data: &Bytes, + selection: &RowSelection, + strategy: RowSelectionStrategy, +) -> usize { + let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone()) + .unwrap() + .with_batch_size(BATCH_SIZE) + .with_row_selection(selection.clone()) + .with_row_selection_strategy(strategy) + .build() + .unwrap(); + + let mut total_rows = 0usize; + for batch in reader { + let batch = batch.unwrap(); + total_rows += batch.num_rows(); + } + total_rows +} + +fn build_parquet_data(total_rows: usize) -> Bytes { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + false, + )])); + let values = Int32Array::from_iter_values((0..total_rows).map(|v| v as i32)); + let columns: Vec = vec![Arc::new(values) as ArrayRef]; + let batch = RecordBatch::try_new(schema.clone(), columns).unwrap(); + + let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap(); + writer.write(&batch).unwrap(); + let buffer = writer.into_inner().unwrap(); + Bytes::from(buffer) +} + +#[derive(Clone)] +struct Scenario { + name: &'static str, + select_ratio: f64, + start_with_select: bool, + distribution: RunDistribution, +} + +#[derive(Clone)] +enum RunDistribution { + Constant, + Uniform { spread: f64 }, + Bimodal { long_factor: f64, long_prob: f64 }, +} + +fn generate_selectors( + avg_selector_len: usize, + total_rows: usize, + scenario: &Scenario, + seed: u64, +) -> Vec { + assert!( + (0.0..=1.0).contains(&scenario.select_ratio), + "select_ratio must be in [0, 1]" + ); + + let mut select_mean = scenario.select_ratio * 2.0 * avg_selector_len as f64; + let mut skip_mean = (1.0 - scenario.select_ratio) * 2.0 * avg_selector_len as f64; + + select_mean = select_mean.max(1.0); + skip_mean = skip_mean.max(1.0); + + let sum = select_mean + skip_mean; + // Rebalance the sampled select/skip run lengths so their sum matches the requested + // average selector length while respecting the configured selectivity ratio. + let scale = if sum == 0.0 { + 1.0 + } else { + (2.0 * avg_selector_len as f64) / sum + }; + select_mean *= scale; + skip_mean *= scale; + + let mut rng = StdRng::seed_from_u64(seed ^ (avg_selector_len as u64).wrapping_mul(0x9E3779B1)); + let mut selectors = Vec::with_capacity(total_rows / avg_selector_len.max(1)); + let mut remaining = total_rows; + let mut is_select = scenario.start_with_select; + + while remaining > 0 { + let mean = if is_select { select_mean } else { skip_mean }; + let len = sample_length(mean, &scenario.distribution, &mut rng).max(1); + let len = len.min(remaining); + selectors.push(if is_select { + RowSelector::select(len) + } else { + RowSelector::skip(len) + }); + remaining -= len; + if remaining == 0 { + break; + } + is_select = !is_select; + } + + let selection: RowSelection = selectors.into(); + selection.into() +} + +fn sample_length(mean: f64, distribution: &RunDistribution, rng: &mut StdRng) -> usize { + match distribution { + RunDistribution::Constant => mean.round().max(1.0) as usize, + RunDistribution::Uniform { spread } => { + let spread = spread.clamp(0.0, 0.99); + let lower = (mean * (1.0 - spread)).max(1.0); + let upper = (mean * (1.0 + spread)).max(lower + f64::EPSILON); + if (upper - lower) < 1.0 { + lower.round().max(1.0) as usize + } else { + let low = lower.floor() as usize; + let high = upper.ceil() as usize; + rng.random_range(low..=high).max(1) + } + } + RunDistribution::Bimodal { + long_factor, + long_prob, + } => { + let long_prob = long_prob.clamp(0.0, 0.5); + let short_prob = 1.0 - long_prob; + let short_factor = if short_prob == 0.0 { + 1.0 / long_factor.max(f64::EPSILON) + } else { + (1.0 - long_prob * long_factor).max(0.0) / short_prob + }; + let use_long = rng.random_bool(long_prob); + let factor = if use_long { + *long_factor + } else { + short_factor.max(0.1) + }; + (mean * factor).round().max(1.0) as usize + } + } +} + +struct SelectorStats { + average_selector_len: f64, + select_ratio: f64, +} + +impl SelectorStats { + fn new(selectors: &[RowSelector]) -> Self { + if selectors.is_empty() { + return Self { + average_selector_len: 0.0, + select_ratio: 0.0, + }; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selected_rows: usize = selectors + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + Self { + average_selector_len: total_rows as f64 / selectors.len() as f64, + select_ratio: if total_rows == 0 { + 0.0 + } else { + selected_rows as f64 / total_rows as f64 + }, + } + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1cc7673a5776..191708d2e5ae 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -17,10 +17,10 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] -use arrow_array::Array; use arrow_array::cast::AsArray; -use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_array::{Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; +use arrow_select::filter::filter_record_batch; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; use std::fmt::{Debug, Formatter}; @@ -43,7 +43,7 @@ use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; -pub use read_plan::{ReadPlan, ReadPlanBuilder}; +pub use read_plan::{ReadPlan, ReadPlanBuilder, RowSelectionStrategy}; mod filter; pub mod metrics; @@ -117,6 +117,8 @@ pub struct ArrowReaderBuilder { pub(crate) selection: Option, + pub(crate) selection_strategy: RowSelectionStrategy, + pub(crate) limit: Option, pub(crate) offset: Option, @@ -138,6 +140,7 @@ impl Debug for ArrowReaderBuilder { .field("projection", &self.projection) .field("filter", &self.filter) .field("selection", &self.selection) + .field("selection_strategy", &self.selection_strategy) .field("limit", &self.limit) .field("offset", &self.offset) .field("metrics", &self.metrics) @@ -157,6 +160,7 @@ impl ArrowReaderBuilder { projection: ProjectionMask::all(), filter: None, selection: None, + selection_strategy: RowSelectionStrategy::Auto, limit: None, offset: None, metrics: ArrowReaderMetrics::Disabled, @@ -271,6 +275,14 @@ impl ArrowReaderBuilder { } } + /// Override the strategy used to execute the configured [`RowSelection`] + pub fn with_row_selection_strategy(self, strategy: RowSelectionStrategy) -> Self { + Self { + selection_strategy: strategy, + ..self + } + } + /// Provide a [`RowFilter`] to skip decoding rows /// /// Row filters are applied after row group selection and row selection @@ -866,6 +878,7 @@ impl ParquetRecordBatchReaderBuilder { projection, mut filter, selection, + selection_strategy, limit, offset, metrics, @@ -886,7 +899,9 @@ impl ParquetRecordBatchReaderBuilder { row_groups, }; - let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); + let mut plan_builder = ReadPlanBuilder::new(batch_size) + .with_selection(selection) + .with_selection_strategy(selection_strategy); // Update selection based on any filters if let Some(filter) = filter.as_mut() { @@ -1034,9 +1049,82 @@ impl ParquetRecordBatchReader { let mut read_records = 0; let batch_size = self.batch_size(); match self.read_plan.selection_mut() { - Some(selection) => { - while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); + Some(selection_cursor) => { + if selection_cursor.is_mask_backed() { + // Stream the record batch reader using contiguous segments of the selection + // mask, avoiding the need to materialize intermediate `RowSelector` ranges. + while !selection_cursor.is_empty() { + let Some(mask_chunk) = selection_cursor.next_mask_chunk(batch_size) else { + return Ok(None); + }; + + if mask_chunk.initial_skip > 0 { + let skipped = + self.array_reader.skip_records(mask_chunk.initial_skip)?; + if skipped != mask_chunk.initial_skip { + return Err(general_err!( + "failed to skip rows, expected {}, got {}", + mask_chunk.initial_skip, + skipped + )); + } + } + + if mask_chunk.chunk_rows == 0 { + if selection_cursor.is_empty() && mask_chunk.selected_rows == 0 { + return Ok(None); + } + continue; + } + + let mask = selection_cursor + .mask_values_for(&mask_chunk) + .ok_or_else(|| general_err!("row selection mask out of bounds"))?; + + let read = self.array_reader.read_records(mask_chunk.chunk_rows)?; + if read == 0 { + return Err(general_err!( + "reached end of column while expecting {} rows", + mask_chunk.chunk_rows + )); + } + if read != mask_chunk.chunk_rows { + return Err(general_err!( + "insufficient rows read from array reader - expected {}, got {}", + mask_chunk.chunk_rows, + read + )); + } + + let array = self.array_reader.consume_batch()?; + // The column reader exposes the projection as a struct array; convert this + // into a record batch before applying the boolean filter mask. + let struct_array = array.as_struct_opt().ok_or_else(|| { + ArrowError::ParquetError( + "Struct array reader should return struct array".to_string(), + ) + })?; + + let filtered_batch = + filter_record_batch(&RecordBatch::from(struct_array), &mask)?; + + if filtered_batch.num_rows() != mask_chunk.selected_rows { + return Err(general_err!( + "filtered rows mismatch selection - expected {}, got {}", + mask_chunk.selected_rows, + filtered_batch.num_rows() + )); + } + + if filtered_batch.num_rows() == 0 { + continue; + } + + return Ok(Some(filtered_batch)); + } + } + while read_records < batch_size && !selection_cursor.is_empty() { + let front = selection_cursor.next_selector().unwrap(); if front.skip { let skipped = self.array_reader.skip_records(front.row_count)?; @@ -1062,7 +1150,7 @@ impl ParquetRecordBatchReader { Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); + selection_cursor.return_selector(RowSelector::select(remaining)); need_read } _ => front.row_count, @@ -4588,6 +4676,93 @@ mod tests { assert_eq!(out, batch.slice(2, 1)); } + #[test] + fn test_row_selection_interleaved_skip() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "v", + ArrowDataType::Int32, + false, + )])); + + let values = Int32Array::from(vec![0, 1, 2, 3, 4]); + let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap(); + + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap(); + writer.write(&batch)?; + writer.close()?; + + let selection = RowSelection::from(vec![ + RowSelector::select(1), + RowSelector::skip(2), + RowSelector::select(2), + ]); + + let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))? + .with_batch_size(4) + .with_row_selection(selection) + .build()?; + + let out = reader.next().unwrap()?; + assert_eq!(out.num_rows(), 3); + let values = out + .column(0) + .as_primitive::() + .values(); + assert_eq!(values, &[0, 3, 4]); + assert!(reader.next().is_none()); + Ok(()) + } + + #[test] + fn test_row_selection_mask_sparse_rows() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "v", + ArrowDataType::Int32, + false, + )])); + + let values = Int32Array::from((0..30).collect::>()); + let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?; + + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?; + writer.write(&batch)?; + writer.close()?; + + let total_rows = batch.num_rows(); + let ranges = (1..total_rows) + .step_by(2) + .map(|i| i..i + 1) + .collect::>(); + let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows); + + let selectors: Vec = selection.clone().into(); + assert!(total_rows < selectors.len() * 8); + + let bytes = Bytes::from(buffer); + + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())? + .with_batch_size(7) + .with_row_selection(selection) + .build()?; + + let mut collected = Vec::new(); + for batch in reader { + let batch = batch?; + collected.extend_from_slice( + batch + .column(0) + .as_primitive::() + .values(), + ); + } + + let expected: Vec = (1..total_rows).step_by(2).map(|i| i as i32).collect(); + assert_eq!(collected, expected); + Ok(()) + } + fn test_decimal32_roundtrip() { let d = |values: Vec, p: u8| { let iter = values.into_iter(); diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 2210f47df2c1..ec143bae799f 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -23,16 +23,32 @@ use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector, }; use crate::errors::{ParquetError, Result}; -use arrow_array::Array; +use arrow_array::{Array, BooleanArray}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; +/// Strategy for materialising [`RowSelection`] during execution. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub enum RowSelectionStrategy { + /// Automatically choose between mask- and selector-backed execution + /// based on heuristics + #[default] + Auto, + /// Always use a boolean mask to materialise the selection + Mask, + /// Always use a queue of [`RowSelector`] values + Selectors, +} + /// A builder for [`ReadPlan`] #[derive(Clone, Debug)] pub struct ReadPlanBuilder { batch_size: usize, /// Current to apply, includes all filters selection: Option, + /// Strategy to use when materialising the row selection + selection_strategy: RowSelectionStrategy, } impl ReadPlanBuilder { @@ -41,6 +57,7 @@ impl ReadPlanBuilder { Self { batch_size, selection: None, + selection_strategy: RowSelectionStrategy::Auto, } } @@ -50,6 +67,12 @@ impl ReadPlanBuilder { self } + /// Force a specific strategy when materialising the [`RowSelection`] + pub fn with_selection_strategy(mut self, strategy: RowSelectionStrategy) -> Self { + self.selection_strategy = strategy; + self + } + /// Returns the current selection, if any pub fn selection(&self) -> Option<&RowSelection> { self.selection.as_ref() @@ -129,9 +152,14 @@ impl ReadPlanBuilder { let Self { batch_size, selection, + selection_strategy, } = self; - let selection = selection.map(|s| s.trim().into()); + let selection = selection.map(|s| { + let trimmed = s.trim(); + let selectors: Vec = trimmed.into(); + RowSelectionCursor::new(selectors, selection_strategy) + }); ReadPlan { batch_size, @@ -233,12 +261,12 @@ pub struct ReadPlan { /// The number of rows to read in each batch batch_size: usize, /// Row ranges to be selected from the data source - selection: Option>, + selection: Option, } impl ReadPlan { /// Returns a mutable reference to the selection, if any - pub fn selection_mut(&mut self) -> Option<&mut VecDeque> { + pub fn selection_mut(&mut self) -> Option<&mut RowSelectionCursor> { self.selection.as_mut() } @@ -248,3 +276,192 @@ impl ReadPlan { self.batch_size } } + +/// Cursor for iterating a [`RowSelection`] during execution within a [`ReadPlan`]. +/// +/// This keeps per-reader state such as the current position and delegates the +/// actual storage strategy to [`RowSelectionBacking`]. +#[derive(Debug)] +pub struct RowSelectionCursor { + /// Backing storage describing how the selection is materialised + storage: RowSelectionBacking, + /// Current absolute offset into the selection + position: usize, +} + +/// Backing storage that powers [`RowSelectionCursor`]. +/// +/// The cursor either walks a boolean mask (dense representation) or a queue +/// of [`RowSelector`] ranges (sparse representation). +#[derive(Debug)] +enum RowSelectionBacking { + Mask(BooleanBuffer), + Selectors(VecDeque), +} + +/// Result of computing the next chunk to read when using a bitmap mask +pub struct MaskChunk { + /// Number of leading rows to skip before reaching selected rows + pub initial_skip: usize, + /// Total rows covered by this chunk (selected + skipped) + pub chunk_rows: usize, + /// Rows actually selected within the chunk + pub selected_rows: usize, + /// Starting offset within the mask where the chunk begins + pub mask_start: usize, +} + +impl RowSelectionCursor { + /// Create a cursor, choosing an efficient backing representation + fn new(selectors: Vec, strategy: RowSelectionStrategy) -> Self { + if matches!(strategy, RowSelectionStrategy::Selectors) { + return Self { + storage: RowSelectionBacking::Selectors(selectors.into()), + position: 0, + }; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selector_count = selectors.len(); + const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 16; + // Prefer a bitmap mask when the selectors are short on average, as the mask + // (re)construction cost is amortized by a simpler execution path during reads. + let use_mask = match strategy { + RowSelectionStrategy::Mask => true, + RowSelectionStrategy::Auto => { + selector_count == 0 + || total_rows < selector_count.saturating_mul(AVG_SELECTOR_LEN_MASK_THRESHOLD) + } + RowSelectionStrategy::Selectors => unreachable!(), + }; + + let storage = if use_mask { + RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors)) + } else { + RowSelectionBacking::Selectors(selectors.into()) + }; + + Self { + storage, + position: 0, + } + } + + /// Returns `true` when no further rows remain + pub fn is_empty(&self) -> bool { + match &self.storage { + RowSelectionBacking::Mask(mask) => self.position >= mask.len(), + RowSelectionBacking::Selectors(selectors) => selectors.is_empty(), + } + } + + /// Current position within the overall selection + pub fn position(&self) -> usize { + self.position + } + + /// Return the next [`RowSelector`] when using the sparse representation + pub fn next_selector(&mut self) -> Option { + match &mut self.storage { + RowSelectionBacking::Selectors(selectors) => { + let selector = selectors.pop_front()?; + self.position += selector.row_count; + Some(selector) + } + RowSelectionBacking::Mask(_) => { + unreachable!("next_selector called for mask-based RowSelectionCursor") + } + } + } + + /// Return a selector to the front, rewinding the position (sparse-only) + pub fn return_selector(&mut self, selector: RowSelector) { + match &mut self.storage { + RowSelectionBacking::Selectors(selectors) => { + self.position = self.position.saturating_sub(selector.row_count); + selectors.push_front(selector); + } + RowSelectionBacking::Mask(_) => { + unreachable!("return_selector called for mask-based RowSelectionCursor") + } + } + } + + /// Returns `true` if the cursor is backed by a boolean mask + pub fn is_mask_backed(&self) -> bool { + matches!(self.storage, RowSelectionBacking::Mask(_)) + } + + /// Advance through the mask representation, producing the next chunk summary + pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option { + let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { + let mask = match &self.storage { + RowSelectionBacking::Mask(mask) => mask, + RowSelectionBacking::Selectors(_) => return None, + }; + + if self.position >= mask.len() { + return None; + } + + let start_position = self.position; + let mut cursor = start_position; + let mut initial_skip = 0; + + while cursor < mask.len() && !mask.value(cursor) { + initial_skip += 1; + cursor += 1; + } + + let mask_start = cursor; + let mut chunk_rows = 0; + let mut selected_rows = 0; + + // Advance until enough rows have been selected to satisfy the batch size, + // or until the mask is exhausted. This mirrors the behaviour of the legacy + // `RowSelector` queue-based iteration. + while cursor < mask.len() && selected_rows < batch_size { + chunk_rows += 1; + if mask.value(cursor) { + selected_rows += 1; + } + cursor += 1; + } + + (initial_skip, chunk_rows, selected_rows, mask_start, cursor) + }; + + self.position = end_position; + + Some(MaskChunk { + initial_skip, + chunk_rows, + selected_rows, + mask_start, + }) + } + + /// Materialise the boolean values for a mask-backed chunk + pub fn mask_values_for(&self, chunk: &MaskChunk) -> Option { + match &self.storage { + RowSelectionBacking::Mask(mask) => { + if chunk.mask_start.saturating_add(chunk.chunk_rows) > mask.len() { + return None; + } + Some(BooleanArray::from( + mask.slice(chunk.mask_start, chunk.chunk_rows), + )) + } + RowSelectionBacking::Selectors(_) => None, + } + } +} + +fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer { + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let mut builder = BooleanBufferBuilder::new(total_rows); + for selector in selectors { + builder.append_n(selector.row_count, !selector.skip); + } + builder.finish() +} diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 9b81e8e56905..19ff884a5f9f 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -40,7 +40,7 @@ use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, - RowFilter, RowSelection, + RowFilter, RowSelection, RowSelectionStrategy, }; use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; @@ -508,6 +508,7 @@ impl ParquetRecordBatchStreamBuilder { filter: self.filter, metadata: self.metadata.clone(), fields: self.fields, + selection_strategy: self.selection_strategy, limit: self.limit, offset: self.offset, metrics: self.metrics, @@ -557,6 +558,9 @@ struct ReaderFactory { /// Optional filter filter: Option, + /// Strategy used to materialise row selections for this reader + selection_strategy: RowSelectionStrategy, + /// Limit to apply to remaining row groups. limit: Option, @@ -620,7 +624,9 @@ where let cache_options_builder = CacheOptionsBuilder::new(&cache_projection, &row_group_cache); let filter = self.filter.as_mut(); - let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); + let mut plan_builder = ReadPlanBuilder::new(batch_size) + .with_selection(selection) + .with_selection_strategy(self.selection_strategy); // Update selection based on any filters if let Some(filter) = filter { @@ -1774,6 +1780,7 @@ mod tests { fields: fields.map(Arc::new), input: async_reader, filter: None, + selection_strategy: RowSelectionStrategy::Auto, limit: None, offset: None, metrics: ArrowReaderMetrics::disabled(), @@ -2239,6 +2246,7 @@ mod tests { fields: None, input: TestReader::new(data), filter: Some(filter), + selection_strategy: RowSelectionStrategy::Auto, limit: None, offset: None, metrics: ArrowReaderMetrics::disabled(), diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 4b932fb08034..3da7c4bd2fbc 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -168,6 +168,7 @@ impl ParquetPushDecoderBuilder { projection, filter, selection, + selection_strategy: _, limit, offset, metrics, diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index be9070ae8b49..69b32d181d3c 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -654,6 +654,6 @@ mod tests { #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 184); + assert_eq!(std::mem::size_of::(), 192); } }