From 97406ae8b60b61673ae9a0d37e10303a784a9b99 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Wed, 29 Oct 2025 01:58:35 +0800 Subject: [PATCH 01/29] Version2 --- parquet/Cargo.toml | 5 + parquet/benches/row_selection_state.rs | 405 ++++++++++++++++++ parquet/src/arrow/arrow_reader/mod.rs | 168 +++++++- parquet/src/arrow/arrow_reader/read_plan.rs | 162 ++++++- .../arrow/record_reader/definition_levels.rs | 36 +- parquet/src/column/reader.rs | 266 +++++++++++- parquet/src/column/reader/decoder.rs | 4 +- 7 files changed, 1034 insertions(+), 12 deletions(-) create mode 100644 parquet/benches/row_selection_state.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index aa0071ca38e5..79569691b214 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -260,5 +260,10 @@ name = "row_selector" harness = false required-features = ["arrow"] +[[bench]] +name = "row_selection_state" +harness = false +required-features = ["arrow"] + [lib] bench = false diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs new file mode 100644 index 000000000000..5d8aab5db458 --- /dev/null +++ b/parquet/benches/row_selection_state.rs @@ -0,0 +1,405 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::hint; + +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +const TOTAL_ROWS: usize = 1 << 20; +const BATCH_SIZE: usize = 1 << 10; +const BASE_SEED: u64 = 0xA55AA55A; + +fn criterion_benchmark(c: &mut Criterion) { + let avg_selector_lengths: &[usize] = &[2, 4, 8, 16, 32, 64, 128]; + + let scenarios = [ + Scenario { + name: "uniform50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Constant, + }, + Scenario { + name: "spread50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Uniform { spread: 0.9 }, + }, + Scenario { + name: "sparse20", + select_ratio: 0.2, + start_with_select: false, + distribution: RunDistribution::Bimodal { + long_factor: 6.0, + long_prob: 0.1, + }, + }, + Scenario { + name: "dense80", + select_ratio: 0.8, + start_with_select: true, + distribution: RunDistribution::Bimodal { + long_factor: 4.0, + long_prob: 0.05, + }, + }, + ]; + + for scenario in scenarios.iter() { + for (offset, &avg_len) in avg_selector_lengths.iter().enumerate() { + let selectors = + generate_selectors(avg_len, TOTAL_ROWS, scenario, BASE_SEED + offset as u64); + let stats = SelectorStats::new(&selectors); + let suffix = format!( + "{}-avg{:.1}-sel{:02}", + scenario.name, + stats.average_selector_len, + (stats.select_ratio * 100.0).round() as u32 + ); + + c.bench_with_input( + BenchmarkId::new("mask_build", &suffix), + &selectors, + |b, selectors| { + b.iter(|| { + let mask = MaskState::build_mask(selectors); + hint::black_box(mask); + }); + }, + ); + + c.bench_with_input( + BenchmarkId::new("selector_build", &suffix), + &selectors, + |b, selectors| { + b.iter(|| { + let queue = SelectorState::build_queue(selectors); + hint::black_box(queue); + }); + }, + ); + + let mask_state = MaskState::new(&selectors); + c.bench_with_input( + BenchmarkId::new("mask_scan", &suffix), + &mask_state, + |b, state| { + b.iter(|| { + let mut run = state.clone(); + let total = run.consume_all(BATCH_SIZE); + hint::black_box(total); + }); + }, + ); + + let selector_state = SelectorState::new(&selectors); + c.bench_with_input( + BenchmarkId::new("selector_scan", &suffix), + &selector_state, + |b, state| { + b.iter(|| { + let mut run = state.clone(); + let total = run.consume_all(BATCH_SIZE); + hint::black_box(total); + }); + }, + ); + } + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); + +#[derive(Clone)] +struct Scenario { + name: &'static str, + select_ratio: f64, + start_with_select: bool, + distribution: RunDistribution, +} + +#[derive(Clone)] +enum RunDistribution { + Constant, + Uniform { spread: f64 }, + Bimodal { long_factor: f64, long_prob: f64 }, +} + +fn generate_selectors( + avg_selector_len: usize, + total_rows: usize, + scenario: &Scenario, + seed: u64, +) -> Vec { + assert!( + (0.0..=1.0).contains(&scenario.select_ratio), + "select_ratio must be in [0, 1]" + ); + + let mut select_mean = scenario.select_ratio * 2.0 * avg_selector_len as f64; + let mut skip_mean = (1.0 - scenario.select_ratio) * 2.0 * avg_selector_len as f64; + + select_mean = select_mean.max(1.0); + skip_mean = skip_mean.max(1.0); + + let sum = select_mean + skip_mean; + // Rebalance the sampled select/skip run lengths so their sum matches the requested + // average selector length while respecting the configured selectivity ratio. + let scale = if sum == 0.0 { + 1.0 + } else { + (2.0 * avg_selector_len as f64) / sum + }; + select_mean *= scale; + skip_mean *= scale; + + let mut rng = StdRng::seed_from_u64(seed ^ (avg_selector_len as u64).wrapping_mul(0x9E3779B1)); + let mut selectors = Vec::with_capacity(total_rows / avg_selector_len.max(1)); + let mut remaining = total_rows; + let mut is_select = scenario.start_with_select; + + while remaining > 0 { + let mean = if is_select { select_mean } else { skip_mean }; + let len = sample_length(mean, &scenario.distribution, &mut rng).max(1); + let len = len.min(remaining); + selectors.push(if is_select { + RowSelector::select(len) + } else { + RowSelector::skip(len) + }); + remaining -= len; + if remaining == 0 { + break; + } + is_select = !is_select; + } + + let selection: RowSelection = selectors.into(); + selection.into() +} + +fn sample_length(mean: f64, distribution: &RunDistribution, rng: &mut StdRng) -> usize { + match distribution { + RunDistribution::Constant => mean.round().max(1.0) as usize, + RunDistribution::Uniform { spread } => { + let spread = spread.clamp(0.0, 0.99); + let lower = (mean * (1.0 - spread)).max(1.0); + let upper = (mean * (1.0 + spread)).max(lower + f64::EPSILON); + if (upper - lower) < 1.0 { + lower.round().max(1.0) as usize + } else { + let low = lower.floor() as usize; + let high = upper.ceil() as usize; + rng.random_range(low..=high).max(1) + } + } + RunDistribution::Bimodal { + long_factor, + long_prob, + } => { + let long_prob = long_prob.clamp(0.0, 0.5); + let short_prob = 1.0 - long_prob; + let short_factor = if short_prob == 0.0 { + 1.0 / long_factor.max(f64::EPSILON) + } else { + (1.0 - long_prob * long_factor).max(0.0) / short_prob + }; + let use_long = rng.random_bool(long_prob); + let factor = if use_long { + *long_factor + } else { + short_factor.max(0.1) + }; + (mean * factor).round().max(1.0) as usize + } + } +} + +#[derive(Clone)] +struct MaskState { + mask: BooleanBuffer, + position: usize, +} + +impl MaskState { + fn new(selectors: &[RowSelector]) -> Self { + Self { + mask: Self::build_mask(selectors), + position: 0, + } + } + + fn build_mask(selectors: &[RowSelector]) -> BooleanBuffer { + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let mut builder = BooleanBufferBuilder::new(total_rows); + for selector in selectors { + builder.append_n(selector.row_count, !selector.skip); + } + builder.finish() + } + + fn consume_all(&mut self, batch_size: usize) -> usize { + let mut selected = 0; + while let Some(batch) = self.next_mask_batch(batch_size) { + hint::black_box(batch.initial_skip); + hint::black_box(batch.chunk_rows); + hint::black_box(batch.mask_start); + selected += batch.selected_rows; + } + selected + } + + fn next_mask_batch(&mut self, batch_size: usize) -> Option { + let mask = &self.mask; + if self.position >= mask.len() { + return None; + } + + let start_position = self.position; + let mut cursor = start_position; + let mut initial_skip = 0usize; + + while cursor < mask.len() && !mask.value(cursor) { + initial_skip += 1; + cursor += 1; + } + + let mask_start = cursor; + let mut chunk_rows = 0usize; + let mut selected_rows = 0usize; + + while cursor < mask.len() && selected_rows < batch_size { + chunk_rows += 1; + if mask.value(cursor) { + selected_rows += 1; + } + cursor += 1; + } + + self.position = cursor; + + Some(MaskBatch { + initial_skip, + chunk_rows, + selected_rows, + mask_start, + }) + } +} + +#[derive(Clone)] +struct MaskBatch { + initial_skip: usize, + chunk_rows: usize, + selected_rows: usize, + mask_start: usize, +} + +#[derive(Clone)] +struct SelectorState { + selectors: VecDeque, +} + +impl SelectorState { + fn new(selectors: &[RowSelector]) -> Self { + Self { + selectors: Self::build_queue(selectors), + } + } + + fn build_queue(selectors: &[RowSelector]) -> VecDeque { + selectors.iter().copied().collect() + } + + fn consume_all(&mut self, batch_size: usize) -> usize { + // Drain the selector queue in `batch_size` chunks, splitting SELECT runs + // when they span across multiple batches. + let mut selected_total = 0usize; + + while !self.selectors.is_empty() { + let mut selected_rows = 0usize; + + while selected_rows < batch_size { + let front = match self.selectors.pop_front() { + Some(selector) => selector, + None => break, + }; + + hint::black_box(front.row_count); + + if front.skip { + continue; + } + + let need = (batch_size - selected_rows).min(front.row_count); + selected_rows += need; + if front.row_count > need { + self.selectors + .push_front(RowSelector::select(front.row_count - need)); + break; + } + } + + hint::black_box(selected_rows); + selected_total += selected_rows; + + if selected_rows == 0 && self.selectors.is_empty() { + break; + } + } + + selected_total + } +} + +struct SelectorStats { + average_selector_len: f64, + select_ratio: f64, +} + +impl SelectorStats { + fn new(selectors: &[RowSelector]) -> Self { + if selectors.is_empty() { + return Self { + average_selector_len: 0.0, + select_ratio: 0.0, + }; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selected_rows: usize = selectors + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + Self { + average_selector_len: total_rows as f64 / selectors.len() as f64, + select_ratio: if total_rows == 0 { + 0.0 + } else { + selected_rows as f64 / total_rows as f64 + }, + } + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 4e91685519f3..d33359731c48 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -17,10 +17,10 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] -use arrow_array::Array; use arrow_array::cast::AsArray; -use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_array::{Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; +use arrow_select::filter::filter_record_batch; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; use std::fmt::{Debug, Formatter}; @@ -1019,6 +1019,83 @@ impl ParquetRecordBatchReader { let batch_size = self.batch_size(); match self.read_plan.selection_mut() { Some(selection) => { + if selection.uses_mask() { + // Stream the record batch reader using contiguous segments of the selection + // mask, avoiding the need to materialize intermediate `RowSelector` ranges. + loop { + let mask_batch = match selection.next_mask_batch(batch_size) { + Some(batch) => batch, + None => return Ok(None), + }; + + if mask_batch.initial_skip > 0 { + let skipped = + self.array_reader.skip_records(mask_batch.initial_skip)?; + if skipped != mask_batch.initial_skip { + return Err(general_err!( + "failed to skip rows, expected {}, got {}", + mask_batch.initial_skip, + skipped + )); + } + } + + if mask_batch.chunk_rows == 0 { + if selection.is_empty() && mask_batch.selected_rows == 0 { + return Ok(None); + } + continue; + } + + let mask = selection + .mask_slice(mask_batch.mask_start, mask_batch.chunk_rows) + .ok_or_else(|| general_err!("row selection mask out of bounds"))?; + + let read = self.array_reader.read_records(mask_batch.chunk_rows)?; + if read == 0 { + return Err(general_err!( + "reached end of column while expecting {} rows", + mask_batch.chunk_rows + )); + } + if read != mask_batch.chunk_rows { + return Err(general_err!( + "insufficient rows read from array reader - expected {}, got {}", + mask_batch.chunk_rows, + read + )); + } + + let array = self.array_reader.consume_batch()?; + // The column reader exposes the projection as a struct array; convert this + // into a record batch before applying the boolean filter mask. + let struct_array = array.as_struct_opt().ok_or_else(|| { + ArrowError::ParquetError( + "Struct array reader should return struct array".to_string(), + ) + })?; + + let filtered_batch = + filter_record_batch(&RecordBatch::from(struct_array), &mask)?; + + if filtered_batch.num_rows() != mask_batch.selected_rows { + return Err(general_err!( + "filtered rows mismatch selection - expected {}, got {}", + mask_batch.selected_rows, + filtered_batch.num_rows() + )); + } + + if filtered_batch.num_rows() == 0 { + if selection.is_empty() { + return Ok(None); + } + continue; + } + + return Ok(Some(filtered_batch)); + } + } while read_records < batch_size && !selection.is_empty() { let front = selection.pop_front().unwrap(); if front.skip { @@ -4572,6 +4649,93 @@ mod tests { assert_eq!(out, batch.slice(2, 1)); } + #[test] + fn test_row_selection_interleaved_skip() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "v", + ArrowDataType::Int32, + false, + )])); + + let values = Int32Array::from(vec![0, 1, 2, 3, 4]); + let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap(); + + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap(); + writer.write(&batch)?; + writer.close()?; + + let selection = RowSelection::from(vec![ + RowSelector::select(1), + RowSelector::skip(2), + RowSelector::select(2), + ]); + + let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))? + .with_batch_size(4) + .with_row_selection(selection) + .build()?; + + let out = reader.next().unwrap()?; + assert_eq!(out.num_rows(), 3); + let values = out + .column(0) + .as_primitive::() + .values(); + assert_eq!(values, &[0, 3, 4]); + assert!(reader.next().is_none()); + Ok(()) + } + + #[test] + fn test_row_selection_mask_sparse_rows() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "v", + ArrowDataType::Int32, + false, + )])); + + let values = Int32Array::from((0..30).collect::>()); + let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?; + + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?; + writer.write(&batch)?; + writer.close()?; + + let total_rows = batch.num_rows(); + let ranges = (1..total_rows) + .step_by(2) + .map(|i| i..i + 1) + .collect::>(); + let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows); + + let selectors: Vec = selection.clone().into(); + assert!(total_rows < selectors.len() * 8); + + let bytes = Bytes::from(buffer); + + let mut reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())? + .with_batch_size(7) + .with_row_selection(selection) + .build()?; + + let mut collected = Vec::new(); + while let Some(batch) = reader.next() { + let batch = batch?; + collected.extend_from_slice( + batch + .column(0) + .as_primitive::() + .values(), + ); + } + + let expected: Vec = (1..total_rows).step_by(2).map(|i| i as i32).collect(); + assert_eq!(collected, expected); + Ok(()) + } + fn test_decimal32_roundtrip() { let d = |values: Vec, p: u8| { let iter = values.into_iter(); diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 754fcd339c5b..2eb3625631c2 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -23,7 +23,8 @@ use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector, }; use crate::errors::{ParquetError, Result}; -use arrow_array::Array; +use arrow_array::{Array, BooleanArray}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; @@ -133,7 +134,11 @@ impl ReadPlanBuilder { selection, } = self; - let selection = selection.map(|s| s.trim().into()); + let selection = selection.map(|s| { + let trimmed = s.trim(); + let selectors: Vec = trimmed.into(); + RowSelectionState::new(selectors) + }); ReadPlan { batch_size, @@ -234,12 +239,12 @@ pub struct ReadPlan { /// The number of rows to read in each batch batch_size: usize, /// Row ranges to be selected from the data source - selection: Option>, + selection: Option, } impl ReadPlan { /// Returns a mutable reference to the selection, if any - pub fn selection_mut(&mut self) -> Option<&mut VecDeque> { + pub fn selection_mut(&mut self) -> Option<&mut RowSelectionState> { self.selection.as_mut() } @@ -249,3 +254,152 @@ impl ReadPlan { self.batch_size } } + +/// Mutable execution state for a [`RowSelection`] stored within a [`ReadPlan`] +pub struct RowSelectionState { + storage: RowSelectionStorage, + position: usize, +} + +enum RowSelectionStorage { + Mask(BooleanBuffer), + Selectors(VecDeque), +} + +/// Result of computing the next chunk to read when using a bitmap mask +pub struct MaskBatch { + pub initial_skip: usize, + pub chunk_rows: usize, + pub selected_rows: usize, + pub mask_start: usize, +} + +impl RowSelectionState { + fn new(selectors: Vec) -> Self { + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selector_count = selectors.len(); + const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 8; + // Prefer a bitmap mask when the selectors are short on average, as the mask + // (re)construction cost is amortized by a simpler execution path during reads. + let use_mask = selector_count == 0 + || total_rows < selector_count.saturating_mul(AVG_SELECTOR_LEN_MASK_THRESHOLD); + + let storage = if use_mask { + RowSelectionStorage::Mask(boolean_mask_from_selectors(&selectors)) + } else { + RowSelectionStorage::Selectors(selectors.into()) + }; + + Self { + storage, + position: 0, + } + } + + pub fn is_empty(&self) -> bool { + match &self.storage { + RowSelectionStorage::Mask(mask) => self.position >= mask.len(), + RowSelectionStorage::Selectors(selectors) => selectors.is_empty(), + } + } + + pub fn position(&self) -> usize { + self.position + } + + pub fn pop_front(&mut self) -> Option { + match &mut self.storage { + RowSelectionStorage::Selectors(selectors) => { + let selector = selectors.pop_front()?; + self.position += selector.row_count; + Some(selector) + } + RowSelectionStorage::Mask(_) => None, + } + } + + pub fn push_front(&mut self, selector: RowSelector) { + match &mut self.storage { + RowSelectionStorage::Selectors(selectors) => { + self.position = self.position.saturating_sub(selector.row_count); + selectors.push_front(selector); + } + RowSelectionStorage::Mask(_) => { + unreachable!("push_front called for mask-based RowSelectionState") + } + } + } + + pub fn uses_mask(&self) -> bool { + matches!(self.storage, RowSelectionStorage::Mask(_)) + } + + pub fn next_mask_batch(&mut self, batch_size: usize) -> Option { + let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { + let mask = match &self.storage { + RowSelectionStorage::Mask(mask) => mask, + RowSelectionStorage::Selectors(_) => return None, + }; + + if self.position >= mask.len() { + return None; + } + + let start_position = self.position; + let mut cursor = start_position; + let mut initial_skip = 0; + + while cursor < mask.len() && !mask.value(cursor) { + initial_skip += 1; + cursor += 1; + } + + let mask_start = cursor; + let mut chunk_rows = 0; + let mut selected_rows = 0; + + // Advance until enough rows have been selected to satisfy the batch size, + // or until the mask is exhausted. This mirrors the behaviour of the legacy + // `RowSelector` queue-based iteration. + while cursor < mask.len() && selected_rows < batch_size { + chunk_rows += 1; + if mask.value(cursor) { + selected_rows += 1; + } + cursor += 1; + } + + (initial_skip, chunk_rows, selected_rows, mask_start, cursor) + }; + + self.position = end_position; + + Some(MaskBatch { + initial_skip, + chunk_rows, + selected_rows, + mask_start, + }) + } + + pub fn mask_slice(&self, start: usize, len: usize) -> Option { + match &self.storage { + RowSelectionStorage::Mask(mask) => { + if start.saturating_add(len) > mask.len() { + return None; + } + Some(BooleanArray::from(mask.slice(start, len))) + } + RowSelectionStorage::Selectors(_) => None, + } + } +} + +fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer { + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let mut builder = BooleanBufferBuilder::new(total_rows); + for selector in selectors { + builder.append_n(selector.row_count, !selector.skip); + } + builder.finish() +} diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 34b728d6fa1e..c7baef531587 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -16,8 +16,8 @@ // under the License. use arrow_array::builder::BooleanBufferBuilder; -use arrow_buffer::Buffer; use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; +use arrow_buffer::Buffer; use bytes::Bytes; use crate::arrow::buffer::bit_util::count_set_bits; @@ -25,6 +25,7 @@ use crate::basic::Encoding; use crate::column::reader::decoder::{ ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, }; +use crate::column::reader::SyntheticLevelBuffer; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; @@ -107,6 +108,37 @@ impl DefinitionLevelBuffer { } } +impl SyntheticLevelBuffer for DefinitionLevelBuffer { + fn append_repeated_level(&mut self, level: i16, count: usize, max_level: i16) -> Result { + // Mirror the layout the buffer was initialized with so downstream consumers + // do not need to special-case synthetic null batches. + match &mut self.inner { + BufferInner::Full { + levels, + nulls, + max_level: buffer_max_level, + } => { + assert_eq!(*buffer_max_level, max_level); + levels.resize(levels.len() + count, level); + nulls.append_n(count, level == max_level); + Ok(if level == max_level { count } else { 0 }) + } + BufferInner::Mask { nulls } => { + assert_eq!(max_level, 1); + nulls.append_n(count, level == max_level); + Ok(if level == max_level { count } else { 0 }) + } + } + } +} + +impl SyntheticLevelBuffer for Vec { + fn append_repeated_level(&mut self, level: i16, count: usize, max_level: i16) -> Result { + self.resize(self.len() + count, level); + Ok(if level == max_level { count } else { 0 }) + } +} + enum MaybePacked { Packed(PackedDecoder), Fallback(DefinitionLevelDecoderImpl), @@ -351,7 +383,7 @@ mod tests { use super::*; use crate::encodings::rle::RleEncoder; - use rand::{Rng, rng}; + use rand::{rng, Rng}; #[test] fn test_packed_decoder() { diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index ebde79e6a7f2..5676aef1fdae 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -19,7 +19,7 @@ use bytes::Bytes; -use super::page::{Page, PageReader}; +use super::page::{Page, PageMetadata, PageReader}; use crate::basic::*; use crate::column::reader::decoder::{ ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, @@ -126,6 +126,10 @@ pub struct GenericColumnReader { /// True if the end of the current data page denotes the end of a record has_record_delimiter: bool, + /// True if the currently buffered page is a synthetic all-null page created + /// to handle sparse column chunks with missing data pages. + synthetic_page: bool, + /// The decoder for the definition levels if any def_level_decoder: Option, @@ -160,10 +164,23 @@ where } } +/// Helper trait for appending synthetic definition levels. +/// +/// Column readers synthesising sparse pages rely on this to extend an existing +/// definition-level buffer with repeated values without reallocating or changing +/// the buffer's representation (structured levels vs. null mask only). +pub trait SyntheticLevelBuffer { + /// Append `count` copies of `level`, returning how many of those levels + /// correspond to non-null values given `max_level`. Implementations should + /// mutate in place without altering their underlying representation. + fn append_repeated_level(&mut self, level: i16, count: usize, max_level: i16) -> Result; +} + impl GenericColumnReader where R: RepetitionLevelDecoder, D: DefinitionLevelDecoder, + D::Buffer: SyntheticLevelBuffer, V: ColumnValueDecoder, { pub(crate) fn new_with_decoders( @@ -182,6 +199,7 @@ where num_decoded_values: 0, values_decoder, has_record_delimiter: false, + synthetic_page: false, } } @@ -214,6 +232,43 @@ where let remaining_records = max_records - total_records_read; let remaining_levels = self.num_buffered_values - self.num_decoded_values; + if self.synthetic_page { + // A previous sparse column chunk did not contain a physical data page; emit the + // implicit null records described by the page metadata before reading further. + debug_assert!(self.rep_level_decoder.is_none()); + let levels_to_emit = remaining_records.min(remaining_levels); + + if levels_to_emit == 0 { + self.synthetic_page = false; + continue; + } + + let mut values_from_levels = 0; + if self.descr.max_def_level() > 0 { + let out = def_levels + .as_mut() + .ok_or_else(|| general_err!("must specify definition levels"))?; + + let null_def_level = self.descr.max_def_level().saturating_sub(1); + values_from_levels = out.append_repeated_level( + null_def_level, + levels_to_emit, + self.descr.max_def_level(), + )?; + } + + self.num_decoded_values += levels_to_emit; + total_records_read += levels_to_emit; + total_levels_read += levels_to_emit; + total_values_read += values_from_levels; + + if self.num_decoded_values == self.num_buffered_values { + self.synthetic_page = false; + } + + continue; + } + let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() { Some(reader) => { let out = rep_levels @@ -327,6 +382,20 @@ where // The number of levels in the current data page let remaining_levels = self.num_buffered_values - self.num_decoded_values; + if self.synthetic_page { + // When synthesising null rows there is no physical data to skip; advance the + // virtual counters until the synthetic page is fully consumed. + debug_assert!(self.rep_level_decoder.is_none()); + let to_skip = remaining_records.min(remaining_levels); + self.num_decoded_values += to_skip; + remaining_records -= to_skip; + + if self.num_buffered_values == self.num_decoded_values { + self.synthetic_page = false; + } + continue; + } + let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() { Some(decoder) => { let (mut records_read, levels_read) = @@ -403,7 +472,27 @@ where /// Returns false if there's no page left. fn read_new_page(&mut self) -> Result { loop { - match self.page_reader.get_next_page()? { + let page_result = match self.page_reader.get_next_page() { + Ok(page) => page, + Err(err) => { + return match err { + ParquetError::General(message) + if message + .starts_with("Invalid offset in sparse column chunk data:") => + { + let metadata = self.page_reader.peek_next_page()?; + // Some writers omit data pages for sparse column chunks and encode the gap + // as a reader-visible error. Use the metadata peek to synthesise a page of + // null definition levels so downstream consumers see consistent row counts. + self.try_create_synthetic_page(metadata)?; + Ok(true) + } + _ => Err(err), + }; + } + }; + + match page_result { // No more page to read None => return Ok(false), Some(current_page) => { @@ -415,6 +504,7 @@ where encoding, is_sorted, } => { + self.synthetic_page = false; self.values_decoder .set_dict(buf, num_values, encoding, is_sorted)?; continue; @@ -428,6 +518,7 @@ where rep_level_encoding, statistics: _, } => { + self.synthetic_page = false; self.num_buffered_values = num_values as _; self.num_decoded_values = 0; @@ -497,6 +588,7 @@ where )); } + self.synthetic_page = false; self.num_buffered_values = num_values as _; self.num_decoded_values = 0; @@ -541,6 +633,44 @@ where } } + fn try_create_synthetic_page(&mut self, metadata: Option) -> Result<()> { + if self.descr.max_rep_level() != 0 { + return Err(general_err!( + "cannot synthesise sparse page for column with repetition levels ({message})" + )); + } + + if self.descr.max_def_level() == 0 { + return Err(general_err!( + "cannot synthesise sparse page for required column ({message})" + )); + } + + let Some(meta) = metadata else { + return Err(general_err!( + "missing page metadata for sparse column chunk ({message})" + )); + }; + + if meta.is_dict { + return Err(general_err!( + "unexpected dictionary page error while synthesising sparse page ({message})" + )); + } + + let num_levels = meta.num_levels.or(meta.num_rows).ok_or_else(|| { + general_err!("page metadata missing level counts for sparse column chunk ({message})") + })?; + + self.page_reader.skip_next_page()?; + + self.num_buffered_values = num_levels; + self.num_decoded_values = 0; + self.synthetic_page = true; + self.has_record_delimiter = true; + Ok(()) + } + /// Check whether there is more data to read from this column, /// If the current page is fully decoded, this will load the next page /// (if it exists) into the buffer @@ -598,6 +728,7 @@ mod tests { use std::{collections::VecDeque, sync::Arc}; use crate::basic::Type as PhysicalType; + use crate::column::page::{Page, PageMetadata}; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType}; use crate::util::test_common::page_util::InMemoryPageReader; use crate::util::test_common::rand_gen::make_pages; @@ -1062,6 +1193,137 @@ mod tests { ); } + #[test] + fn test_synthetic_sparse_page_fills_nulls() { + let primitive_type = SchemaType::primitive_type_builder("a", PhysicalType::INT32) + .with_repetition(Repetition::OPTIONAL) + .build() + .expect("build() should be OK"); + + let desc = Arc::new(ColumnDescriptor::new( + Arc::new(primitive_type), + 1, + 0, + ColumnPath::new(Vec::new()), + )); + + let levels = 8; + let page_reader: Box = Box::new(MissingPageReader::new(levels)); + + let mut reader = ColumnReaderImpl::::new(desc, page_reader); + + let mut values = Vec::new(); + let mut def_levels = Vec::new(); + + let (records, non_null_values, levels_read) = reader + .read_records(levels, Some(&mut def_levels), None, &mut values) + .expect("reading synthetic page succeeds"); + + assert_eq!(records, levels); + assert_eq!(levels_read, levels); + assert_eq!(non_null_values, 0); + assert!(values.is_empty()); + assert_eq!(def_levels, vec![0; levels]); + + // Subsequent read should indicate no additional records + def_levels.clear(); + let (records, non_null_values, levels_read) = reader + .read_records(levels, Some(&mut def_levels), None, &mut values) + .expect("no further pages"); + assert_eq!(records, 0); + assert_eq!(levels_read, 0); + assert_eq!(non_null_values, 0); + } + + #[test] + fn test_synthetic_sparse_page_preserves_parent_levels() { + let primitive_type = SchemaType::primitive_type_builder("leaf", PhysicalType::INT32) + .with_repetition(Repetition::OPTIONAL) + .build() + .expect("build() should be OK"); + + let desc = Arc::new(ColumnDescriptor::new( + Arc::new(primitive_type), + 2, + 0, + ColumnPath::new(vec!["struct".to_string(), "leaf".to_string()]), + )); + + let levels = 6; + let page_reader: Box = Box::new(MissingPageReader::new(levels)); + + let mut reader = ColumnReaderImpl::::new(desc, page_reader); + + let mut values = Vec::new(); + let mut def_levels = Vec::new(); + + let (records, non_null_values, levels_read) = reader + .read_records(levels, Some(&mut def_levels), None, &mut values) + .expect("reading synthetic page succeeds"); + + assert_eq!(records, levels); + assert_eq!(levels_read, levels); + assert_eq!(non_null_values, 0); + assert!(values.is_empty()); + assert_eq!(def_levels, vec![1; levels]); + } + + struct MissingPageReader { + metadata: Option, + skipped: bool, + } + + impl MissingPageReader { + fn new(levels: usize) -> Self { + Self { + metadata: Some(PageMetadata { + num_rows: Some(levels), + num_levels: Some(levels), + is_dict: false, + }), + skipped: false, + } + } + } + + impl Iterator for MissingPageReader { + type Item = Result; + + fn next(&mut self) -> Option { + match self.get_next_page() { + Ok(Some(page)) => Some(Ok(page)), + Ok(None) => None, + Err(err) => Some(Err(err)), + } + } + } + + impl PageReader for MissingPageReader { + fn get_next_page(&mut self) -> Result> { + if self.skipped { + Ok(None) + } else { + Err(general_err!( + "Invalid offset in sparse column chunk data: 0" + )) + } + } + + fn peek_next_page(&mut self) -> Result> { + Ok(self.metadata.clone()) + } + + fn skip_next_page(&mut self) -> Result<()> { + self.metadata = None; + self.skipped = true; + Ok(()) + } + + fn at_record_boundary(&mut self) -> Result { + Ok(true) + } + } + // ---------------------------------------------------------------------- // Helper methods to make pages and test // diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index e49906207577..4bec1d81c559 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -20,12 +20,12 @@ use bytes::Bytes; use crate::basic::{Encoding, EncodingMask}; use crate::data_type::DataType; use crate::encodings::{ - decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder}, + decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}, rle::RleDecoder, }; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use crate::util::bit_util::{BitReader, num_required_bits}; +use crate::util::bit_util::{num_required_bits, BitReader}; /// Decodes level data pub trait ColumnLevelDecoder { From 07435476220efe7ca76f82fb8ae29ae6a1c140c8 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Wed, 29 Oct 2025 02:12:01 +0800 Subject: [PATCH 02/29] rustfmt --- parquet/src/arrow/record_reader/definition_levels.rs | 6 +++--- parquet/src/column/reader/decoder.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index c7baef531587..d833bb146e92 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -16,16 +16,16 @@ // under the License. use arrow_array::builder::BooleanBufferBuilder; -use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use arrow_buffer::Buffer; +use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use bytes::Bytes; use crate::arrow::buffer::bit_util::count_set_bits; use crate::basic::Encoding; +use crate::column::reader::SyntheticLevelBuffer; use crate::column::reader::decoder::{ ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, }; -use crate::column::reader::SyntheticLevelBuffer; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; @@ -383,7 +383,7 @@ mod tests { use super::*; use crate::encodings::rle::RleEncoder; - use rand::{rng, Rng}; + use rand::{Rng, rng}; #[test] fn test_packed_decoder() { diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 4bec1d81c559..e49906207577 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -20,12 +20,12 @@ use bytes::Bytes; use crate::basic::{Encoding, EncodingMask}; use crate::data_type::DataType; use crate::encodings::{ - decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}, + decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder}, rle::RleDecoder, }; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use crate::util::bit_util::{num_required_bits, BitReader}; +use crate::util::bit_util::{BitReader, num_required_bits}; /// Decodes level data pub trait ColumnLevelDecoder { From 4e08441b5451330b239a3c7db499f91e8f07f963 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Wed, 29 Oct 2025 02:37:54 +0800 Subject: [PATCH 03/29] fix build error --- parquet/src/arrow/record_reader/definition_levels.rs | 7 ------- parquet/src/column/reader.rs | 7 +++++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index d833bb146e92..2121a6123e14 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -132,13 +132,6 @@ impl SyntheticLevelBuffer for DefinitionLevelBuffer { } } -impl SyntheticLevelBuffer for Vec { - fn append_repeated_level(&mut self, level: i16, count: usize, max_level: i16) -> Result { - self.resize(self.len() + count, level); - Ok(if level == max_level { count } else { 0 }) - } -} - enum MaybePacked { Packed(PackedDecoder), Fallback(DefinitionLevelDecoderImpl), diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 5676aef1fdae..982cb37ef64a 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -176,6 +176,13 @@ pub trait SyntheticLevelBuffer { fn append_repeated_level(&mut self, level: i16, count: usize, max_level: i16) -> Result; } +impl SyntheticLevelBuffer for Vec { + fn append_repeated_level(&mut self, level: i16, count: usize, max_level: i16) -> Result { + self.resize(self.len() + count, level); + Ok(if level == max_level { count } else { 0 }) + } +} + impl GenericColumnReader where R: RepetitionLevelDecoder, From 1ac405af3ec98b56deed7b9e06cd59a9387e179f Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Wed, 29 Oct 2025 02:42:36 +0800 Subject: [PATCH 04/29] fix build error --- parquet/src/arrow/arrow_reader/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d33359731c48..0124a21e5e01 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -4721,7 +4721,7 @@ mod tests { .build()?; let mut collected = Vec::new(); - while let Some(batch) = reader.next() { + for batch in reader { let batch = batch?; collected.extend_from_slice( batch From e2b7ca0b15cabe994fa8ec5fe3ade941d1efe043 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Wed, 29 Oct 2025 02:48:11 +0800 Subject: [PATCH 05/29] fix clippy build failure --- parquet/src/arrow/arrow_reader/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 0124a21e5e01..271058002a40 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -4715,7 +4715,7 @@ mod tests { let bytes = Bytes::from(buffer); - let mut reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())? + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())? .with_batch_size(7) .with_row_selection(selection) .build()?; From e401e266ec324a1b86d0c292b6e1912aa128efc9 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Wed, 29 Oct 2025 16:34:13 +0800 Subject: [PATCH 06/29] Rename the selection for better understanding. --- parquet/src/arrow/arrow_reader/read_plan.rs | 62 +++++++++++++-------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 2eb3625631c2..8be055b47893 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -137,7 +137,7 @@ impl ReadPlanBuilder { let selection = selection.map(|s| { let trimmed = s.trim(); let selectors: Vec = trimmed.into(); - RowSelectionState::new(selectors) + RowSelectionCursor::new(selectors) }); ReadPlan { @@ -239,12 +239,12 @@ pub struct ReadPlan { /// The number of rows to read in each batch batch_size: usize, /// Row ranges to be selected from the data source - selection: Option, + selection: Option, } impl ReadPlan { /// Returns a mutable reference to the selection, if any - pub fn selection_mut(&mut self) -> Option<&mut RowSelectionState> { + pub fn selection_mut(&mut self) -> Option<&mut RowSelectionCursor> { self.selection.as_mut() } @@ -255,13 +255,23 @@ impl ReadPlan { } } -/// Mutable execution state for a [`RowSelection`] stored within a [`ReadPlan`] -pub struct RowSelectionState { - storage: RowSelectionStorage, +/// Cursor for iterating a [`RowSelection`] during execution within a [`ReadPlan`]. +/// +/// This keeps per-reader state such as the current position and delegates the +/// actual storage strategy to [`RowSelectionBacking`]. +pub struct RowSelectionCursor { + /// Backing storage describing how the selection is materialised + storage: RowSelectionBacking, + /// Current absolute offset into the selection position: usize, } -enum RowSelectionStorage { +/// Backing storage that powers [`RowSelectionCursor`]. +/// +/// The cursor either walks a boolean mask (dense representation) or a queue +/// of [`RowSelector`] ranges (sparse representation), choosing whichever is +/// cheaper to evaluate for a given plan. +enum RowSelectionBacking { Mask(BooleanBuffer), Selectors(VecDeque), } @@ -274,7 +284,8 @@ pub struct MaskBatch { pub mask_start: usize, } -impl RowSelectionState { +impl RowSelectionCursor { + /// Create a cursor, choosing an efficient backing representation fn new(selectors: Vec) -> Self { let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); let selector_count = selectors.len(); @@ -285,9 +296,9 @@ impl RowSelectionState { || total_rows < selector_count.saturating_mul(AVG_SELECTOR_LEN_MASK_THRESHOLD); let storage = if use_mask { - RowSelectionStorage::Mask(boolean_mask_from_selectors(&selectors)) + RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors)) } else { - RowSelectionStorage::Selectors(selectors.into()) + RowSelectionBacking::Selectors(selectors.into()) }; Self { @@ -296,49 +307,55 @@ impl RowSelectionState { } } + /// Returns `true` when no further rows remain pub fn is_empty(&self) -> bool { match &self.storage { - RowSelectionStorage::Mask(mask) => self.position >= mask.len(), - RowSelectionStorage::Selectors(selectors) => selectors.is_empty(), + RowSelectionBacking::Mask(mask) => self.position >= mask.len(), + RowSelectionBacking::Selectors(selectors) => selectors.is_empty(), } } + /// Current position within the overall selection pub fn position(&self) -> usize { self.position } + /// Pop the next [`RowSelector`] when using the sparse representation pub fn pop_front(&mut self) -> Option { match &mut self.storage { - RowSelectionStorage::Selectors(selectors) => { + RowSelectionBacking::Selectors(selectors) => { let selector = selectors.pop_front()?; self.position += selector.row_count; Some(selector) } - RowSelectionStorage::Mask(_) => None, + RowSelectionBacking::Mask(_) => None, } } + /// Undo a `pop_front`, rewinding the position (sparse-only) pub fn push_front(&mut self, selector: RowSelector) { match &mut self.storage { - RowSelectionStorage::Selectors(selectors) => { + RowSelectionBacking::Selectors(selectors) => { self.position = self.position.saturating_sub(selector.row_count); selectors.push_front(selector); } - RowSelectionStorage::Mask(_) => { - unreachable!("push_front called for mask-based RowSelectionState") + RowSelectionBacking::Mask(_) => { + unreachable!("push_front called for mask-based RowSelectionCursor") } } } + /// Returns `true` if the cursor is backed by a boolean mask pub fn uses_mask(&self) -> bool { - matches!(self.storage, RowSelectionStorage::Mask(_)) + matches!(self.storage, RowSelectionBacking::Mask(_)) } + /// Advance through the mask representation, producing the next chunk summary pub fn next_mask_batch(&mut self, batch_size: usize) -> Option { let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { let mask = match &self.storage { - RowSelectionStorage::Mask(mask) => mask, - RowSelectionStorage::Selectors(_) => return None, + RowSelectionBacking::Mask(mask) => mask, + RowSelectionBacking::Selectors(_) => return None, }; if self.position >= mask.len() { @@ -382,15 +399,16 @@ impl RowSelectionState { }) } + /// Materialise a slice of the mask when backed by a bitmap pub fn mask_slice(&self, start: usize, len: usize) -> Option { match &self.storage { - RowSelectionStorage::Mask(mask) => { + RowSelectionBacking::Mask(mask) => { if start.saturating_add(len) > mask.len() { return None; } Some(BooleanArray::from(mask.slice(start, len))) } - RowSelectionStorage::Selectors(_) => None, + RowSelectionBacking::Selectors(_) => None, } } } From 6fdc9cb2ea789d7b73be2a21799898691949ed70 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Wed, 29 Oct 2025 21:22:42 +0800 Subject: [PATCH 07/29] Name update --- parquet/benches/row_selection_state.rs | 8 ++--- parquet/src/arrow/arrow_reader/mod.rs | 34 ++++++++++---------- parquet/src/arrow/arrow_reader/read_plan.rs | 35 ++++++++++++--------- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs index 5d8aab5db458..2e9c20d43263 100644 --- a/parquet/benches/row_selection_state.rs +++ b/parquet/benches/row_selection_state.rs @@ -260,7 +260,7 @@ impl MaskState { fn consume_all(&mut self, batch_size: usize) -> usize { let mut selected = 0; - while let Some(batch) = self.next_mask_batch(batch_size) { + while let Some(batch) = self.next_mask_chunk(batch_size) { hint::black_box(batch.initial_skip); hint::black_box(batch.chunk_rows); hint::black_box(batch.mask_start); @@ -269,7 +269,7 @@ impl MaskState { selected } - fn next_mask_batch(&mut self, batch_size: usize) -> Option { + fn next_mask_chunk(&mut self, batch_size: usize) -> Option { let mask = &self.mask; if self.position >= mask.len() { return None; @@ -298,7 +298,7 @@ impl MaskState { self.position = cursor; - Some(MaskBatch { + Some(MaskChunk { initial_skip, chunk_rows, selected_rows, @@ -308,7 +308,7 @@ impl MaskState { } #[derive(Clone)] -struct MaskBatch { +struct MaskChunk { initial_skip: usize, chunk_rows: usize, selected_rows: usize, diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 271058002a40..1052c6c95099 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1019,49 +1019,49 @@ impl ParquetRecordBatchReader { let batch_size = self.batch_size(); match self.read_plan.selection_mut() { Some(selection) => { - if selection.uses_mask() { + if selection.is_mask_backed() { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. loop { - let mask_batch = match selection.next_mask_batch(batch_size) { + let mask_chunk = match selection.next_mask_chunk(batch_size) { Some(batch) => batch, None => return Ok(None), }; - if mask_batch.initial_skip > 0 { + if mask_chunk.initial_skip > 0 { let skipped = - self.array_reader.skip_records(mask_batch.initial_skip)?; - if skipped != mask_batch.initial_skip { + self.array_reader.skip_records(mask_chunk.initial_skip)?; + if skipped != mask_chunk.initial_skip { return Err(general_err!( "failed to skip rows, expected {}, got {}", - mask_batch.initial_skip, + mask_chunk.initial_skip, skipped )); } } - if mask_batch.chunk_rows == 0 { - if selection.is_empty() && mask_batch.selected_rows == 0 { + if mask_chunk.chunk_rows == 0 { + if selection.is_empty() && mask_chunk.selected_rows == 0 { return Ok(None); } continue; } let mask = selection - .mask_slice(mask_batch.mask_start, mask_batch.chunk_rows) + .mask_values_for(&mask_chunk) .ok_or_else(|| general_err!("row selection mask out of bounds"))?; - let read = self.array_reader.read_records(mask_batch.chunk_rows)?; + let read = self.array_reader.read_records(mask_chunk.chunk_rows)?; if read == 0 { return Err(general_err!( "reached end of column while expecting {} rows", - mask_batch.chunk_rows + mask_chunk.chunk_rows )); } - if read != mask_batch.chunk_rows { + if read != mask_chunk.chunk_rows { return Err(general_err!( "insufficient rows read from array reader - expected {}, got {}", - mask_batch.chunk_rows, + mask_chunk.chunk_rows, read )); } @@ -1078,10 +1078,10 @@ impl ParquetRecordBatchReader { let filtered_batch = filter_record_batch(&RecordBatch::from(struct_array), &mask)?; - if filtered_batch.num_rows() != mask_batch.selected_rows { + if filtered_batch.num_rows() != mask_chunk.selected_rows { return Err(general_err!( "filtered rows mismatch selection - expected {}, got {}", - mask_batch.selected_rows, + mask_chunk.selected_rows, filtered_batch.num_rows() )); } @@ -1097,7 +1097,7 @@ impl ParquetRecordBatchReader { } } while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); + let front = selection.next_selector().unwrap(); if front.skip { let skipped = self.array_reader.skip_records(front.row_count)?; @@ -1123,7 +1123,7 @@ impl ParquetRecordBatchReader { Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); + selection.return_selector(RowSelector::select(remaining)); need_read } _ => front.row_count, diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 8be055b47893..5b1e32f3f4e0 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -269,18 +269,21 @@ pub struct RowSelectionCursor { /// Backing storage that powers [`RowSelectionCursor`]. /// /// The cursor either walks a boolean mask (dense representation) or a queue -/// of [`RowSelector`] ranges (sparse representation), choosing whichever is -/// cheaper to evaluate for a given plan. +/// of [`RowSelector`] ranges (sparse representation). enum RowSelectionBacking { Mask(BooleanBuffer), Selectors(VecDeque), } /// Result of computing the next chunk to read when using a bitmap mask -pub struct MaskBatch { +pub struct MaskChunk { + /// Number of leading rows to skip before reaching selected rows pub initial_skip: usize, + /// Total rows covered by this chunk (selected + skipped) pub chunk_rows: usize, + /// Rows actually selected within the chunk pub selected_rows: usize, + /// Starting offset within the mask where the chunk begins pub mask_start: usize, } @@ -320,8 +323,8 @@ impl RowSelectionCursor { self.position } - /// Pop the next [`RowSelector`] when using the sparse representation - pub fn pop_front(&mut self) -> Option { + /// Return the next [`RowSelector`] when using the sparse representation + pub fn next_selector(&mut self) -> Option { match &mut self.storage { RowSelectionBacking::Selectors(selectors) => { let selector = selectors.pop_front()?; @@ -332,26 +335,26 @@ impl RowSelectionCursor { } } - /// Undo a `pop_front`, rewinding the position (sparse-only) - pub fn push_front(&mut self, selector: RowSelector) { + /// Return a selector to the front, rewinding the position (sparse-only) + pub fn return_selector(&mut self, selector: RowSelector) { match &mut self.storage { RowSelectionBacking::Selectors(selectors) => { self.position = self.position.saturating_sub(selector.row_count); selectors.push_front(selector); } RowSelectionBacking::Mask(_) => { - unreachable!("push_front called for mask-based RowSelectionCursor") + unreachable!("return_selector called for mask-based RowSelectionCursor") } } } /// Returns `true` if the cursor is backed by a boolean mask - pub fn uses_mask(&self) -> bool { + pub fn is_mask_backed(&self) -> bool { matches!(self.storage, RowSelectionBacking::Mask(_)) } /// Advance through the mask representation, producing the next chunk summary - pub fn next_mask_batch(&mut self, batch_size: usize) -> Option { + pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option { let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { let mask = match &self.storage { RowSelectionBacking::Mask(mask) => mask, @@ -391,7 +394,7 @@ impl RowSelectionCursor { self.position = end_position; - Some(MaskBatch { + Some(MaskChunk { initial_skip, chunk_rows, selected_rows, @@ -399,14 +402,16 @@ impl RowSelectionCursor { }) } - /// Materialise a slice of the mask when backed by a bitmap - pub fn mask_slice(&self, start: usize, len: usize) -> Option { + /// Materialise the boolean values for a mask-backed chunk + pub fn mask_values_for(&self, chunk: &MaskChunk) -> Option { match &self.storage { RowSelectionBacking::Mask(mask) => { - if start.saturating_add(len) > mask.len() { + if chunk.mask_start.saturating_add(chunk.chunk_rows) > mask.len() { return None; } - Some(BooleanArray::from(mask.slice(start, len))) + Some(BooleanArray::from( + mask.slice(chunk.mask_start, chunk.chunk_rows), + )) } RowSelectionBacking::Selectors(_) => None, } From d385bb84fd566f13cb342561eac5c4e36c2b2a80 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Wed, 29 Oct 2025 22:30:54 +0800 Subject: [PATCH 08/29] fix build --- parquet/src/arrow/arrow_reader/read_plan.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 5b1e32f3f4e0..55ec89e2d02a 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -259,6 +259,7 @@ impl ReadPlan { /// /// This keeps per-reader state such as the current position and delegates the /// actual storage strategy to [`RowSelectionBacking`]. +#[derive(Debug)] pub struct RowSelectionCursor { /// Backing storage describing how the selection is materialised storage: RowSelectionBacking, @@ -270,6 +271,7 @@ pub struct RowSelectionCursor { /// /// The cursor either walks a boolean mask (dense representation) or a queue /// of [`RowSelector`] ranges (sparse representation). +#[derive(Debug)] enum RowSelectionBacking { Mask(BooleanBuffer), Selectors(VecDeque), From df78cbf5e2d48c32462e585a2f67f626b51a83a9 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 01:09:05 +0800 Subject: [PATCH 09/29] Update mask batch size --- parquet/src/arrow/arrow_reader/read_plan.rs | 64 +++++++++++++++++++-- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 55ec89e2d02a..224deeee8c27 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -380,10 +380,15 @@ impl RowSelectionCursor { let mut chunk_rows = 0; let mut selected_rows = 0; - // Advance until enough rows have been selected to satisfy the batch size, - // or until the mask is exhausted. This mirrors the behaviour of the legacy - // `RowSelector` queue-based iteration. - while cursor < mask.len() && selected_rows < batch_size { + // Advance until this chunk would consume `batch_size` input rows or the mask + // is exhausted, tracking how many of those rows are selected. This mirrors + // the behaviour of the legacy `RowSelector` queue-based iteration while + // ensuring the downstream readers never request more than `batch_size` + // physical rows for a single chunk. + while cursor < mask.len() + && chunk_rows < batch_size + && selected_rows < batch_size + { chunk_rows += 1; if mask.value(cursor) { selected_rows += 1; @@ -428,3 +433,54 @@ fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer { } builder.finish() } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn mask_chunk_respects_batch_size() { + // Build a selection that alternates skipping three rows and selecting one. + let selectors = (0..32) + .flat_map(|_| [RowSelector::skip(3), RowSelector::select(1)]) + .collect::>(); + + let mut cursor = RowSelectionCursor::new(selectors); + assert!(cursor.is_mask_backed()); + + let batch_size = 8; + let mut total_selected = 0; + let mut total_rows = 0; + let mut saw_sparse_chunk = false; + + while let Some(chunk) = cursor.next_mask_chunk(batch_size) { + assert!( + chunk.chunk_rows <= batch_size, + "chunk_rows {} exceeds batch_size {}", + chunk.chunk_rows, + batch_size + ); + assert!( + chunk.selected_rows <= chunk.chunk_rows, + "selected_rows {} exceeds chunk_rows {}", + chunk.selected_rows, + chunk.chunk_rows + ); + + if total_rows == 0 { + assert_eq!(chunk.initial_skip, 3); + } + + if chunk.chunk_rows == batch_size && chunk.selected_rows < batch_size { + saw_sparse_chunk = true; + } + + total_selected += chunk.selected_rows; + total_rows += chunk.initial_skip + chunk.chunk_rows; + } + + assert!(saw_sparse_chunk, "expected at least one sparse chunk"); + assert_eq!(total_selected, 32); + assert_eq!(total_rows, 128); + } +} From 8dd46f71dcaa59500c2fe4d2a62f99970301e122 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 01:26:37 +0800 Subject: [PATCH 10/29] Update mask batch size --- parquet/src/arrow/arrow_reader/read_plan.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 224deeee8c27..eeab1fe970b2 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -387,7 +387,6 @@ impl RowSelectionCursor { // physical rows for a single chunk. while cursor < mask.len() && chunk_rows < batch_size - && selected_rows < batch_size { chunk_rows += 1; if mask.value(cursor) { From a63d13018b73601cdc03e890a7dae0480e63bd21 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 02:28:01 +0800 Subject: [PATCH 11/29] Revert "Update mask batch size" This reverts commit 8dd46f71dcaa59500c2fe4d2a62f99970301e122. --- parquet/src/arrow/arrow_reader/read_plan.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index eeab1fe970b2..224deeee8c27 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -387,6 +387,7 @@ impl RowSelectionCursor { // physical rows for a single chunk. while cursor < mask.len() && chunk_rows < batch_size + && selected_rows < batch_size { chunk_rows += 1; if mask.value(cursor) { From 14647e18080a1823ee74da79b35a1095cfcdc885 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 02:28:05 +0800 Subject: [PATCH 12/29] Revert "Update mask batch size" This reverts commit df78cbf5e2d48c32462e585a2f67f626b51a83a9. --- parquet/src/arrow/arrow_reader/read_plan.rs | 64 ++------------------- 1 file changed, 4 insertions(+), 60 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 224deeee8c27..55ec89e2d02a 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -380,15 +380,10 @@ impl RowSelectionCursor { let mut chunk_rows = 0; let mut selected_rows = 0; - // Advance until this chunk would consume `batch_size` input rows or the mask - // is exhausted, tracking how many of those rows are selected. This mirrors - // the behaviour of the legacy `RowSelector` queue-based iteration while - // ensuring the downstream readers never request more than `batch_size` - // physical rows for a single chunk. - while cursor < mask.len() - && chunk_rows < batch_size - && selected_rows < batch_size - { + // Advance until enough rows have been selected to satisfy the batch size, + // or until the mask is exhausted. This mirrors the behaviour of the legacy + // `RowSelector` queue-based iteration. + while cursor < mask.len() && selected_rows < batch_size { chunk_rows += 1; if mask.value(cursor) { selected_rows += 1; @@ -433,54 +428,3 @@ fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer { } builder.finish() } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn mask_chunk_respects_batch_size() { - // Build a selection that alternates skipping three rows and selecting one. - let selectors = (0..32) - .flat_map(|_| [RowSelector::skip(3), RowSelector::select(1)]) - .collect::>(); - - let mut cursor = RowSelectionCursor::new(selectors); - assert!(cursor.is_mask_backed()); - - let batch_size = 8; - let mut total_selected = 0; - let mut total_rows = 0; - let mut saw_sparse_chunk = false; - - while let Some(chunk) = cursor.next_mask_chunk(batch_size) { - assert!( - chunk.chunk_rows <= batch_size, - "chunk_rows {} exceeds batch_size {}", - chunk.chunk_rows, - batch_size - ); - assert!( - chunk.selected_rows <= chunk.chunk_rows, - "selected_rows {} exceeds chunk_rows {}", - chunk.selected_rows, - chunk.chunk_rows - ); - - if total_rows == 0 { - assert_eq!(chunk.initial_skip, 3); - } - - if chunk.chunk_rows == batch_size && chunk.selected_rows < batch_size { - saw_sparse_chunk = true; - } - - total_selected += chunk.selected_rows; - total_rows += chunk.initial_skip + chunk.chunk_rows; - } - - assert!(saw_sparse_chunk, "expected at least one sparse chunk"); - assert_eq!(total_selected, 32); - assert_eq!(total_rows, 128); - } -} From bad49b27f89f9bb67b69a0cd807313e1473a5daa Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 17:08:41 +0800 Subject: [PATCH 13/29] Update mask batch size --- .../arrow/record_reader/definition_levels.rs | 53 ++++++++++--------- parquet/src/column/reader.rs | 30 +++-------- parquet/src/column/reader/decoder.rs | 24 +++++++++ 3 files changed, 60 insertions(+), 47 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 2121a6123e14..eef179f4c9e3 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -22,7 +22,6 @@ use bytes::Bytes; use crate::arrow::buffer::bit_util::count_set_bits; use crate::basic::Encoding; -use crate::column::reader::SyntheticLevelBuffer; use crate::column::reader::decoder::{ ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, }; @@ -108,30 +107,6 @@ impl DefinitionLevelBuffer { } } -impl SyntheticLevelBuffer for DefinitionLevelBuffer { - fn append_repeated_level(&mut self, level: i16, count: usize, max_level: i16) -> Result { - // Mirror the layout the buffer was initialized with so downstream consumers - // do not need to special-case synthetic null batches. - match &mut self.inner { - BufferInner::Full { - levels, - nulls, - max_level: buffer_max_level, - } => { - assert_eq!(*buffer_max_level, max_level); - levels.resize(levels.len() + count, level); - nulls.append_n(count, level == max_level); - Ok(if level == max_level { count } else { 0 }) - } - BufferInner::Mask { nulls } => { - assert_eq!(max_level, 1); - nulls.append_n(count, level == max_level); - Ok(if level == max_level { count } else { 0 }) - } - } - } -} - enum MaybePacked { Packed(PackedDecoder), Fallback(DefinitionLevelDecoderImpl), @@ -210,6 +185,34 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { MaybePacked::Packed(decoder) => decoder.skip(num_levels), } } + + fn append_null_def_levels( + &mut self, + writer: &mut Self::Buffer, + num_levels: usize, + null_def_level: i16, + max_level: i16, + ) -> Result { + // Mirror the layout the buffer was initialized with so downstream consumers + // do not need to special-case synthetic null batches. + match &mut writer.inner { + BufferInner::Full { + levels, + nulls, + max_level: buffer_max_level, + } => { + assert_eq!(*buffer_max_level, max_level); + levels.resize(levels.len() + num_levels, null_def_level); + nulls.append_n(num_levels, null_def_level == max_level); + Ok(if null_def_level == max_level { num_levels } else { 0 }) + } + BufferInner::Mask { nulls } => { + assert_eq!(max_level, 1); + nulls.append_n(num_levels, null_def_level == max_level); + Ok(if null_def_level == max_level { num_levels } else { 0 }) + } + } + } } /// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit width of 1 diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 982cb37ef64a..4bad50a6a48d 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -164,30 +164,10 @@ where } } -/// Helper trait for appending synthetic definition levels. -/// -/// Column readers synthesising sparse pages rely on this to extend an existing -/// definition-level buffer with repeated values without reallocating or changing -/// the buffer's representation (structured levels vs. null mask only). -pub trait SyntheticLevelBuffer { - /// Append `count` copies of `level`, returning how many of those levels - /// correspond to non-null values given `max_level`. Implementations should - /// mutate in place without altering their underlying representation. - fn append_repeated_level(&mut self, level: i16, count: usize, max_level: i16) -> Result; -} - -impl SyntheticLevelBuffer for Vec { - fn append_repeated_level(&mut self, level: i16, count: usize, max_level: i16) -> Result { - self.resize(self.len() + count, level); - Ok(if level == max_level { count } else { 0 }) - } -} - impl GenericColumnReader where R: RepetitionLevelDecoder, D: DefinitionLevelDecoder, - D::Buffer: SyntheticLevelBuffer, V: ColumnValueDecoder, { pub(crate) fn new_with_decoders( @@ -257,9 +237,15 @@ where .ok_or_else(|| general_err!("must specify definition levels"))?; let null_def_level = self.descr.max_def_level().saturating_sub(1); - values_from_levels = out.append_repeated_level( - null_def_level, + let decoder = self + .def_level_decoder + .as_mut() + .expect("nullable column requires definition level decoder"); + + values_from_levels = decoder.append_null_def_levels( + out, levels_to_emit, + null_def_level, self.descr.max_def_level(), )?; } diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index e49906207577..971c81611ead 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -83,6 +83,18 @@ pub trait DefinitionLevelDecoder: ColumnLevelDecoder { /// /// Returns the number of values skipped, and the number of levels skipped. fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>; + + /// Append `num_levels` copies of `null_level` to `out`, returning how + /// many of those levels correspond to non-null values given `max_level`. + /// + /// This is used when synthesising all-null pages for sparse column chunks. + fn append_null_def_levels( + &mut self, + out: &mut Self::Buffer, + num_levels: usize, + null_def_level: i16, + max_level: i16, + ) -> Result; } /// Decodes value data @@ -353,6 +365,18 @@ impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl { Ok((value_skip, level_skip)) } + + fn append_null_def_levels( + &mut self, + out: &mut Self::Buffer, + num_levels: usize, + null_def_level: i16, + max_level: i16, + ) -> Result { + let start = out.len(); + out.resize(start + num_levels, null_def_level); + Ok(if null_def_level == max_level { num_levels } else { 0 }) + } } pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024; From a2b7ef047f6dd5be2b47c833addbb4be9881eee0 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 17:19:46 +0800 Subject: [PATCH 14/29] format --- parquet/src/arrow/record_reader/definition_levels.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index eef179f4c9e3..ba273898ba2b 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -204,12 +204,20 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { assert_eq!(*buffer_max_level, max_level); levels.resize(levels.len() + num_levels, null_def_level); nulls.append_n(num_levels, null_def_level == max_level); - Ok(if null_def_level == max_level { num_levels } else { 0 }) + Ok(if null_def_level == max_level { + num_levels + } else { + 0 + }) } BufferInner::Mask { nulls } => { assert_eq!(max_level, 1); nulls.append_n(num_levels, null_def_level == max_level); - Ok(if null_def_level == max_level { num_levels } else { 0 }) + Ok(if null_def_level == max_level { + num_levels + } else { + 0 + }) } } } From 4728c2c8c277792fdf89a9e2197b58a5f82971e5 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 17:41:33 +0800 Subject: [PATCH 15/29] format --- parquet/src/column/reader/decoder.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 971c81611ead..2ef8a57a382b 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -375,7 +375,11 @@ impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl { ) -> Result { let start = out.len(); out.resize(start + num_levels, null_def_level); - Ok(if null_def_level == max_level { num_levels } else { 0 }) + Ok(if null_def_level == max_level { + num_levels + } else { + 0 + }) } } From 517325e26481841cb21a9ab013b3c36e8b010fba Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 18:53:57 +0800 Subject: [PATCH 16/29] Add row selection strategy --- parquet/benches/row_selection_state.rs | 259 ++++++-------------- parquet/src/arrow/arrow_reader/mod.rs | 19 +- parquet/src/arrow/arrow_reader/read_plan.rs | 43 +++- parquet/src/arrow/async_reader/mod.rs | 10 +- 4 files changed, 136 insertions(+), 195 deletions(-) diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs index 2e9c20d43263..23b31766c462 100644 --- a/parquet/benches/row_selection_state.rs +++ b/parquet/benches/row_selection_state.rs @@ -15,21 +15,31 @@ // specific language governing permissions and limitations // under the License. -use std::collections::VecDeque; use std::hint; +use std::sync::Arc; -use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; +use arrow_array::{ArrayRef, Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::Bytes; use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; -use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::{ + ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionStrategy, RowSelector, +}; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; const TOTAL_ROWS: usize = 1 << 20; const BATCH_SIZE: usize = 1 << 10; const BASE_SEED: u64 = 0xA55AA55A; +const READ_STRATEGIES: &[(&str, RowSelectionStrategy)] = &[ + ("read_mask", RowSelectionStrategy::Mask), + ("read_selectors", RowSelectionStrategy::Selectors), +]; fn criterion_benchmark(c: &mut Criterion) { - let avg_selector_lengths: &[usize] = &[2, 4, 8, 16, 32, 64, 128]; + let avg_selector_lengths: &[usize] = &[4, 6, 8, 10]; + let parquet_data = build_parquet_data(TOTAL_ROWS); let scenarios = [ Scenario { @@ -76,53 +86,23 @@ fn criterion_benchmark(c: &mut Criterion) { (stats.select_ratio * 100.0).round() as u32 ); - c.bench_with_input( - BenchmarkId::new("mask_build", &suffix), - &selectors, - |b, selectors| { - b.iter(|| { - let mask = MaskState::build_mask(selectors); - hint::black_box(mask); - }); - }, - ); - - c.bench_with_input( - BenchmarkId::new("selector_build", &suffix), - &selectors, - |b, selectors| { - b.iter(|| { - let queue = SelectorState::build_queue(selectors); - hint::black_box(queue); - }); - }, - ); - - let mask_state = MaskState::new(&selectors); - c.bench_with_input( - BenchmarkId::new("mask_scan", &suffix), - &mask_state, - |b, state| { - b.iter(|| { - let mut run = state.clone(); - let total = run.consume_all(BATCH_SIZE); - hint::black_box(total); - }); - }, - ); + let bench_input = BenchInput { + parquet_data: parquet_data.clone(), + selection: RowSelection::from(selectors.clone()), + }; - let selector_state = SelectorState::new(&selectors); - c.bench_with_input( - BenchmarkId::new("selector_scan", &suffix), - &selector_state, - |b, state| { - b.iter(|| { - let mut run = state.clone(); - let total = run.consume_all(BATCH_SIZE); - hint::black_box(total); - }); - }, - ); + for (label, strategy) in READ_STRATEGIES.iter().copied() { + c.bench_with_input( + BenchmarkId::new(label, &suffix), + &bench_input, + |b, input| { + b.iter(|| { + let total = run_read(&input.parquet_data, &input.selection, strategy); + hint::black_box(total); + }); + }, + ); + } } } } @@ -130,6 +110,48 @@ fn criterion_benchmark(c: &mut Criterion) { criterion_group!(benches, criterion_benchmark); criterion_main!(benches); +struct BenchInput { + parquet_data: Bytes, + selection: RowSelection, +} + +fn run_read( + parquet_data: &Bytes, + selection: &RowSelection, + strategy: RowSelectionStrategy, +) -> usize { + let mut reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone()) + .unwrap() + .with_batch_size(BATCH_SIZE) + .with_row_selection(selection.clone()) + .with_row_selection_strategy(strategy) + .build() + .unwrap(); + + let mut total_rows = 0usize; + for batch in reader { + let batch = batch.unwrap(); + total_rows += batch.num_rows(); + } + total_rows +} + +fn build_parquet_data(total_rows: usize) -> Bytes { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + false, + )])); + let values = Int32Array::from_iter_values((0..total_rows).map(|v| v as i32)); + let columns: Vec = vec![Arc::new(values) as ArrayRef]; + let batch = RecordBatch::try_new(schema.clone(), columns).unwrap(); + + let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap(); + writer.write(&batch).unwrap(); + let buffer = writer.into_inner().unwrap(); + Bytes::from(buffer) +} + #[derive(Clone)] struct Scenario { name: &'static str, @@ -235,143 +257,6 @@ fn sample_length(mean: f64, distribution: &RunDistribution, rng: &mut StdRng) -> } } -#[derive(Clone)] -struct MaskState { - mask: BooleanBuffer, - position: usize, -} - -impl MaskState { - fn new(selectors: &[RowSelector]) -> Self { - Self { - mask: Self::build_mask(selectors), - position: 0, - } - } - - fn build_mask(selectors: &[RowSelector]) -> BooleanBuffer { - let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); - let mut builder = BooleanBufferBuilder::new(total_rows); - for selector in selectors { - builder.append_n(selector.row_count, !selector.skip); - } - builder.finish() - } - - fn consume_all(&mut self, batch_size: usize) -> usize { - let mut selected = 0; - while let Some(batch) = self.next_mask_chunk(batch_size) { - hint::black_box(batch.initial_skip); - hint::black_box(batch.chunk_rows); - hint::black_box(batch.mask_start); - selected += batch.selected_rows; - } - selected - } - - fn next_mask_chunk(&mut self, batch_size: usize) -> Option { - let mask = &self.mask; - if self.position >= mask.len() { - return None; - } - - let start_position = self.position; - let mut cursor = start_position; - let mut initial_skip = 0usize; - - while cursor < mask.len() && !mask.value(cursor) { - initial_skip += 1; - cursor += 1; - } - - let mask_start = cursor; - let mut chunk_rows = 0usize; - let mut selected_rows = 0usize; - - while cursor < mask.len() && selected_rows < batch_size { - chunk_rows += 1; - if mask.value(cursor) { - selected_rows += 1; - } - cursor += 1; - } - - self.position = cursor; - - Some(MaskChunk { - initial_skip, - chunk_rows, - selected_rows, - mask_start, - }) - } -} - -#[derive(Clone)] -struct MaskChunk { - initial_skip: usize, - chunk_rows: usize, - selected_rows: usize, - mask_start: usize, -} - -#[derive(Clone)] -struct SelectorState { - selectors: VecDeque, -} - -impl SelectorState { - fn new(selectors: &[RowSelector]) -> Self { - Self { - selectors: Self::build_queue(selectors), - } - } - - fn build_queue(selectors: &[RowSelector]) -> VecDeque { - selectors.iter().copied().collect() - } - - fn consume_all(&mut self, batch_size: usize) -> usize { - // Drain the selector queue in `batch_size` chunks, splitting SELECT runs - // when they span across multiple batches. - let mut selected_total = 0usize; - - while !self.selectors.is_empty() { - let mut selected_rows = 0usize; - - while selected_rows < batch_size { - let front = match self.selectors.pop_front() { - Some(selector) => selector, - None => break, - }; - - hint::black_box(front.row_count); - - if front.skip { - continue; - } - - let need = (batch_size - selected_rows).min(front.row_count); - selected_rows += need; - if front.row_count > need { - self.selectors - .push_front(RowSelector::select(front.row_count - need)); - break; - } - } - - hint::black_box(selected_rows); - selected_total += selected_rows; - - if selected_rows == 0 && self.selectors.is_empty() { - break; - } - } - - selected_total - } -} - struct SelectorStats { average_selector_len: f64, select_ratio: f64, diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1052c6c95099..71580eaff565 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -43,7 +43,7 @@ use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; -pub use read_plan::{ReadPlan, ReadPlanBuilder}; +pub use read_plan::{ReadPlan, ReadPlanBuilder, RowSelectionStrategy}; mod filter; pub mod metrics; @@ -115,6 +115,8 @@ pub struct ArrowReaderBuilder { pub(crate) selection: Option, + pub(crate) selection_strategy: RowSelectionStrategy, + pub(crate) limit: Option, pub(crate) offset: Option, @@ -136,6 +138,7 @@ impl Debug for ArrowReaderBuilder { .field("projection", &self.projection) .field("filter", &self.filter) .field("selection", &self.selection) + .field("selection_strategy", &self.selection_strategy) .field("limit", &self.limit) .field("offset", &self.offset) .field("metrics", &self.metrics) @@ -155,6 +158,7 @@ impl ArrowReaderBuilder { projection: ProjectionMask::all(), filter: None, selection: None, + selection_strategy: RowSelectionStrategy::Auto, limit: None, offset: None, metrics: ArrowReaderMetrics::Disabled, @@ -269,6 +273,14 @@ impl ArrowReaderBuilder { } } + /// Override the strategy used to execute the configured [`RowSelection`] + pub fn with_row_selection_strategy(self, strategy: RowSelectionStrategy) -> Self { + Self { + selection_strategy: strategy, + ..self + } + } + /// Provide a [`RowFilter`] to skip decoding rows /// /// Row filters are applied after row group selection and row selection @@ -864,6 +876,7 @@ impl ParquetRecordBatchReaderBuilder { projection, mut filter, selection, + selection_strategy, limit, offset, metrics, @@ -884,7 +897,9 @@ impl ParquetRecordBatchReaderBuilder { row_groups, }; - let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); + let mut plan_builder = ReadPlanBuilder::new(batch_size) + .with_selection(selection) + .with_selection_strategy(selection_strategy); // Update selection based on any filters if let Some(filter) = filter.as_mut() { diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 55ec89e2d02a..41ca077109ed 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -28,12 +28,26 @@ use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; +/// Strategy for materialising [`RowSelection`] during execution. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub enum RowSelectionStrategy { + /// Automatically choose between mask- and selector-backed execution + #[default] + Auto, + /// Always use a boolean mask to materialise the selection + Mask, + /// Always use a queue of [`RowSelector`] values + Selectors, +} + /// A builder for [`ReadPlan`] #[derive(Clone)] pub struct ReadPlanBuilder { batch_size: usize, /// Current to apply, includes all filters selection: Option, + /// Strategy to use when materialising the row selection + selection_strategy: RowSelectionStrategy, } impl ReadPlanBuilder { @@ -42,6 +56,7 @@ impl ReadPlanBuilder { Self { batch_size, selection: None, + selection_strategy: RowSelectionStrategy::Auto, } } @@ -51,6 +66,12 @@ impl ReadPlanBuilder { self } + /// Force a specific strategy when materialising the [`RowSelection`] + pub fn with_selection_strategy(mut self, strategy: RowSelectionStrategy) -> Self { + self.selection_strategy = strategy; + self + } + /// Returns the current selection, if any #[cfg(feature = "async")] pub fn selection(&self) -> Option<&RowSelection> { @@ -132,12 +153,13 @@ impl ReadPlanBuilder { let Self { batch_size, selection, + selection_strategy, } = self; let selection = selection.map(|s| { let trimmed = s.trim(); let selectors: Vec = trimmed.into(); - RowSelectionCursor::new(selectors) + RowSelectionCursor::new(selectors, selection_strategy) }); ReadPlan { @@ -291,14 +313,27 @@ pub struct MaskChunk { impl RowSelectionCursor { /// Create a cursor, choosing an efficient backing representation - fn new(selectors: Vec) -> Self { + fn new(selectors: Vec, strategy: RowSelectionStrategy) -> Self { + if matches!(strategy, RowSelectionStrategy::Selectors) { + return Self { + storage: RowSelectionBacking::Selectors(selectors.into()), + position: 0, + }; + } + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); let selector_count = selectors.len(); const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 8; // Prefer a bitmap mask when the selectors are short on average, as the mask // (re)construction cost is amortized by a simpler execution path during reads. - let use_mask = selector_count == 0 - || total_rows < selector_count.saturating_mul(AVG_SELECTOR_LEN_MASK_THRESHOLD); + let use_mask = match strategy { + RowSelectionStrategy::Mask => true, + RowSelectionStrategy::Auto => { + selector_count == 0 + || total_rows < selector_count.saturating_mul(AVG_SELECTOR_LEN_MASK_THRESHOLD) + } + RowSelectionStrategy::Selectors => unreachable!(), + }; let storage = if use_mask { RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors)) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 27f90e3d7bc6..5afaa3cfc35f 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -44,7 +44,7 @@ use crate::arrow::array_reader::{ }; use crate::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, - RowFilter, RowSelection, + RowFilter, RowSelection, RowSelectionStrategy, }; use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; @@ -512,6 +512,7 @@ impl ParquetRecordBatchStreamBuilder { filter: self.filter, metadata: self.metadata.clone(), fields: self.fields, + selection_strategy: self.selection_strategy, limit: self.limit, offset: self.offset, metrics: self.metrics, @@ -561,6 +562,9 @@ struct ReaderFactory { /// Optional filter filter: Option, + /// Strategy used to materialise row selections for this reader + selection_strategy: RowSelectionStrategy, + /// Limit to apply to remaining row groups. limit: Option, @@ -622,7 +626,9 @@ where let cache_options_builder = CacheOptionsBuilder::new(&cache_projection, &row_group_cache); let filter = self.filter.as_mut(); - let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); + let mut plan_builder = ReadPlanBuilder::new(batch_size) + .with_selection(selection) + .with_selection_strategy(self.selection_strategy); // Update selection based on any filters if let Some(filter) = filter { From 1a43d29e842facc572fb1f17b76f8dfa03e7661e Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 19:14:05 +0800 Subject: [PATCH 17/29] fix build test --- parquet/benches/row_selection_state.rs | 2 +- parquet/src/arrow/async_reader/mod.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs index 23b31766c462..95a77ad90e88 100644 --- a/parquet/benches/row_selection_state.rs +++ b/parquet/benches/row_selection_state.rs @@ -38,7 +38,7 @@ const READ_STRATEGIES: &[(&str, RowSelectionStrategy)] = &[ ]; fn criterion_benchmark(c: &mut Criterion) { - let avg_selector_lengths: &[usize] = &[4, 6, 8, 10]; + let avg_selector_lengths: &[usize] = &[4, 6, 8, 10, 12, 14]; let parquet_data = build_parquet_data(TOTAL_ROWS); let scenarios = [ diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5afaa3cfc35f..c8de789471a1 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -1992,6 +1992,7 @@ mod tests { fields: fields.map(Arc::new), input: async_reader, filter: None, + selection_strategy: RowSelectionStrategy::Auto, limit: None, offset: None, metrics: ArrowReaderMetrics::disabled(), @@ -2457,6 +2458,7 @@ mod tests { fields: None, input: TestReader::new(data), filter: Some(filter), + selection_strategy: RowSelectionStrategy::Auto, limit: None, offset: None, metrics: ArrowReaderMetrics::disabled(), From 1ed84e88479121166d7682b08787f33b5133f9df Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 19:17:10 +0800 Subject: [PATCH 18/29] fix build --- parquet/benches/row_selection_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs index 95a77ad90e88..0a4a8404ebcd 100644 --- a/parquet/benches/row_selection_state.rs +++ b/parquet/benches/row_selection_state.rs @@ -120,7 +120,7 @@ fn run_read( selection: &RowSelection, strategy: RowSelectionStrategy, ) -> usize { - let mut reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone()) + let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone()) .unwrap() .with_batch_size(BATCH_SIZE) .with_row_selection(selection.clone()) From a80053976a3824ae950e59f4a7c164527dc1fee0 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 19:27:28 +0800 Subject: [PATCH 19/29] Add more selector length --- parquet/benches/row_selection_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs index 0a4a8404ebcd..f6771d481094 100644 --- a/parquet/benches/row_selection_state.rs +++ b/parquet/benches/row_selection_state.rs @@ -38,7 +38,7 @@ const READ_STRATEGIES: &[(&str, RowSelectionStrategy)] = &[ ]; fn criterion_benchmark(c: &mut Criterion) { - let avg_selector_lengths: &[usize] = &[4, 6, 8, 10, 12, 14]; + let avg_selector_lengths: &[usize] = &[4, 6, 8, 10, 12, 14, 16, 18, 20]; let parquet_data = build_parquet_data(TOTAL_ROWS); let scenarios = [ From 674cb4782cd3109a1ed090fcd209842d18c01ba0 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 19:56:36 +0800 Subject: [PATCH 20/29] update the length in row_selection_state --- parquet/benches/row_selection_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs index f6771d481094..5877fdb4810f 100644 --- a/parquet/benches/row_selection_state.rs +++ b/parquet/benches/row_selection_state.rs @@ -38,7 +38,7 @@ const READ_STRATEGIES: &[(&str, RowSelectionStrategy)] = &[ ]; fn criterion_benchmark(c: &mut Criterion) { - let avg_selector_lengths: &[usize] = &[4, 6, 8, 10, 12, 14, 16, 18, 20]; + let avg_selector_lengths: &[usize] = &[16, 18, 20, 22, 24, 26]; let parquet_data = build_parquet_data(TOTAL_ROWS); let scenarios = [ From f504da3a026a9d7fcb5c72afefac7fe40ee77217 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 20:01:17 +0800 Subject: [PATCH 21/29] update the length in row_selection_state --- parquet/benches/row_selection_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs index 5877fdb4810f..16a93e6272c0 100644 --- a/parquet/benches/row_selection_state.rs +++ b/parquet/benches/row_selection_state.rs @@ -38,7 +38,7 @@ const READ_STRATEGIES: &[(&str, RowSelectionStrategy)] = &[ ]; fn criterion_benchmark(c: &mut Criterion) { - let avg_selector_lengths: &[usize] = &[16, 18, 20, 22, 24, 26]; + let avg_selector_lengths: &[usize] = &[16, 20, 24, 28, 32, 36, 40]; let parquet_data = build_parquet_data(TOTAL_ROWS); let scenarios = [ From 9086da3ff46998fc52f21e6a9a308bba200bff44 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 20:19:05 +0800 Subject: [PATCH 22/29] Update threshold to 30 --- parquet/src/arrow/arrow_reader/read_plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 41ca077109ed..90a8a788c5a6 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -323,7 +323,7 @@ impl RowSelectionCursor { let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); let selector_count = selectors.len(); - const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 8; + const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 30; // Prefer a bitmap mask when the selectors are short on average, as the mask // (re)construction cost is amortized by a simpler execution path during reads. let use_mask = match strategy { From 2d3bcd629d973b7c7608245a8002d12bb1161ec7 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 21:32:27 +0800 Subject: [PATCH 23/29] Update threshold to 16 --- parquet/src/arrow/arrow_reader/read_plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 90a8a788c5a6..6f205e8e23bd 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -323,7 +323,7 @@ impl RowSelectionCursor { let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); let selector_count = selectors.len(); - const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 30; + const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 16; // Prefer a bitmap mask when the selectors are short on average, as the mask // (re)construction cost is amortized by a simpler execution path during reads. let use_mask = match strategy { From 7cc9f2447c25874ccf9d45a85c1931dcdb0b2601 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Thu, 30 Oct 2025 21:57:19 +0800 Subject: [PATCH 24/29] Fix the build failure --- parquet/src/arrow/push_decoder/mod.rs | 1 + parquet/src/arrow/push_decoder/reader_builder/mod.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 4b932fb08034..3da7c4bd2fbc 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -168,6 +168,7 @@ impl ParquetPushDecoderBuilder { projection, filter, selection, + selection_strategy: _, limit, offset, metrics, diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index be9070ae8b49..69b32d181d3c 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -654,6 +654,6 @@ mod tests { #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 184); + assert_eq!(std::mem::size_of::(), 192); } } From 079cb4d9c743eafe49590eeb1821002867b7a617 Mon Sep 17 00:00:00 2001 From: Huang Qiwei Date: Fri, 31 Oct 2025 16:15:07 +0800 Subject: [PATCH 25/29] Update parquet/src/arrow/arrow_reader/mod.rs Co-authored-by: Andrew Lamb --- parquet/src/arrow/arrow_reader/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index e54ed1082609..6c1966700579 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1054,9 +1054,8 @@ impl ParquetRecordBatchReader { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. loop { - let mask_chunk = match selection.next_mask_chunk(batch_size) { - Some(batch) => batch, - None => return Ok(None), + let Some(mask_chunk) = selection.next_mask_chunk(batch_size) else { + return Ok(None); }; if mask_chunk.initial_skip > 0 { From a57406b9f0004af7296b051997bdd845c8085ca3 Mon Sep 17 00:00:00 2001 From: Huang Qiwei Date: Fri, 31 Oct 2025 16:15:57 +0800 Subject: [PATCH 26/29] Update parquet/src/arrow/arrow_reader/read_plan.rs Co-authored-by: Andrew Lamb --- parquet/src/arrow/arrow_reader/read_plan.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 521cbbf246af..ee84e741d157 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -32,6 +32,7 @@ use std::collections::VecDeque; #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub enum RowSelectionStrategy { /// Automatically choose between mask- and selector-backed execution + /// based on heuristics #[default] Auto, /// Always use a boolean mask to materialise the selection From c442f3b40d445204adf98acf685f8b77747e7c1b Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Fri, 31 Oct 2025 16:27:28 +0800 Subject: [PATCH 27/29] improve next_inner, rowselector --- parquet/src/arrow/arrow_reader/mod.rs | 24 +++++++++------------ parquet/src/arrow/arrow_reader/read_plan.rs | 4 +++- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index e54ed1082609..191708d2e5ae 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1049,14 +1049,13 @@ impl ParquetRecordBatchReader { let mut read_records = 0; let batch_size = self.batch_size(); match self.read_plan.selection_mut() { - Some(selection) => { - if selection.is_mask_backed() { + Some(selection_cursor) => { + if selection_cursor.is_mask_backed() { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. - loop { - let mask_chunk = match selection.next_mask_chunk(batch_size) { - Some(batch) => batch, - None => return Ok(None), + while !selection_cursor.is_empty() { + let Some(mask_chunk) = selection_cursor.next_mask_chunk(batch_size) else { + return Ok(None); }; if mask_chunk.initial_skip > 0 { @@ -1072,13 +1071,13 @@ impl ParquetRecordBatchReader { } if mask_chunk.chunk_rows == 0 { - if selection.is_empty() && mask_chunk.selected_rows == 0 { + if selection_cursor.is_empty() && mask_chunk.selected_rows == 0 { return Ok(None); } continue; } - let mask = selection + let mask = selection_cursor .mask_values_for(&mask_chunk) .ok_or_else(|| general_err!("row selection mask out of bounds"))?; @@ -1118,17 +1117,14 @@ impl ParquetRecordBatchReader { } if filtered_batch.num_rows() == 0 { - if selection.is_empty() { - return Ok(None); - } continue; } return Ok(Some(filtered_batch)); } } - while read_records < batch_size && !selection.is_empty() { - let front = selection.next_selector().unwrap(); + while read_records < batch_size && !selection_cursor.is_empty() { + let front = selection_cursor.next_selector().unwrap(); if front.skip { let skipped = self.array_reader.skip_records(front.row_count)?; @@ -1154,7 +1150,7 @@ impl ParquetRecordBatchReader { Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - selection.return_selector(RowSelector::select(remaining)); + selection_cursor.return_selector(RowSelector::select(remaining)); need_read } _ => front.row_count, diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 521cbbf246af..2a56d2fc7ae5 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -367,7 +367,9 @@ impl RowSelectionCursor { self.position += selector.row_count; Some(selector) } - RowSelectionBacking::Mask(_) => None, + RowSelectionBacking::Mask(_) => { + unreachable!("next_selector called for mask-based RowSelectionCursor") + } } } From ad51d8750d4d6186d3a056890fa0fa267156a170 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Fri, 31 Oct 2025 16:37:54 +0800 Subject: [PATCH 28/29] use while loop --- parquet/src/arrow/arrow_reader/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 2b748c6a7daf..191708d2e5ae 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1053,7 +1053,7 @@ impl ParquetRecordBatchReader { if selection_cursor.is_mask_backed() { // Stream the record batch reader using contiguous segments of the selection // mask, avoiding the need to materialize intermediate `RowSelector` ranges. - loop { + while !selection_cursor.is_empty() { let Some(mask_chunk) = selection_cursor.next_mask_chunk(batch_size) else { return Ok(None); }; From a0880854a50152303f52bf492e05f49d03601079 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 1 Nov 2025 07:45:38 -0400 Subject: [PATCH 29/29] Remove synthetic page --- .../arrow/record_reader/definition_levels.rs | 36 --- parquet/src/column/reader.rs | 259 +----------------- parquet/src/column/reader/decoder.rs | 28 -- 3 files changed, 2 insertions(+), 321 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index ba273898ba2b..34b728d6fa1e 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -185,42 +185,6 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { MaybePacked::Packed(decoder) => decoder.skip(num_levels), } } - - fn append_null_def_levels( - &mut self, - writer: &mut Self::Buffer, - num_levels: usize, - null_def_level: i16, - max_level: i16, - ) -> Result { - // Mirror the layout the buffer was initialized with so downstream consumers - // do not need to special-case synthetic null batches. - match &mut writer.inner { - BufferInner::Full { - levels, - nulls, - max_level: buffer_max_level, - } => { - assert_eq!(*buffer_max_level, max_level); - levels.resize(levels.len() + num_levels, null_def_level); - nulls.append_n(num_levels, null_def_level == max_level); - Ok(if null_def_level == max_level { - num_levels - } else { - 0 - }) - } - BufferInner::Mask { nulls } => { - assert_eq!(max_level, 1); - nulls.append_n(num_levels, null_def_level == max_level); - Ok(if null_def_level == max_level { - num_levels - } else { - 0 - }) - } - } - } } /// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit width of 1 diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 4bad50a6a48d..ebde79e6a7f2 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -19,7 +19,7 @@ use bytes::Bytes; -use super::page::{Page, PageMetadata, PageReader}; +use super::page::{Page, PageReader}; use crate::basic::*; use crate::column::reader::decoder::{ ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, @@ -126,10 +126,6 @@ pub struct GenericColumnReader { /// True if the end of the current data page denotes the end of a record has_record_delimiter: bool, - /// True if the currently buffered page is a synthetic all-null page created - /// to handle sparse column chunks with missing data pages. - synthetic_page: bool, - /// The decoder for the definition levels if any def_level_decoder: Option, @@ -186,7 +182,6 @@ where num_decoded_values: 0, values_decoder, has_record_delimiter: false, - synthetic_page: false, } } @@ -219,49 +214,6 @@ where let remaining_records = max_records - total_records_read; let remaining_levels = self.num_buffered_values - self.num_decoded_values; - if self.synthetic_page { - // A previous sparse column chunk did not contain a physical data page; emit the - // implicit null records described by the page metadata before reading further. - debug_assert!(self.rep_level_decoder.is_none()); - let levels_to_emit = remaining_records.min(remaining_levels); - - if levels_to_emit == 0 { - self.synthetic_page = false; - continue; - } - - let mut values_from_levels = 0; - if self.descr.max_def_level() > 0 { - let out = def_levels - .as_mut() - .ok_or_else(|| general_err!("must specify definition levels"))?; - - let null_def_level = self.descr.max_def_level().saturating_sub(1); - let decoder = self - .def_level_decoder - .as_mut() - .expect("nullable column requires definition level decoder"); - - values_from_levels = decoder.append_null_def_levels( - out, - levels_to_emit, - null_def_level, - self.descr.max_def_level(), - )?; - } - - self.num_decoded_values += levels_to_emit; - total_records_read += levels_to_emit; - total_levels_read += levels_to_emit; - total_values_read += values_from_levels; - - if self.num_decoded_values == self.num_buffered_values { - self.synthetic_page = false; - } - - continue; - } - let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() { Some(reader) => { let out = rep_levels @@ -375,20 +327,6 @@ where // The number of levels in the current data page let remaining_levels = self.num_buffered_values - self.num_decoded_values; - if self.synthetic_page { - // When synthesising null rows there is no physical data to skip; advance the - // virtual counters until the synthetic page is fully consumed. - debug_assert!(self.rep_level_decoder.is_none()); - let to_skip = remaining_records.min(remaining_levels); - self.num_decoded_values += to_skip; - remaining_records -= to_skip; - - if self.num_buffered_values == self.num_decoded_values { - self.synthetic_page = false; - } - continue; - } - let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() { Some(decoder) => { let (mut records_read, levels_read) = @@ -465,27 +403,7 @@ where /// Returns false if there's no page left. fn read_new_page(&mut self) -> Result { loop { - let page_result = match self.page_reader.get_next_page() { - Ok(page) => page, - Err(err) => { - return match err { - ParquetError::General(message) - if message - .starts_with("Invalid offset in sparse column chunk data:") => - { - let metadata = self.page_reader.peek_next_page()?; - // Some writers omit data pages for sparse column chunks and encode the gap - // as a reader-visible error. Use the metadata peek to synthesise a page of - // null definition levels so downstream consumers see consistent row counts. - self.try_create_synthetic_page(metadata)?; - Ok(true) - } - _ => Err(err), - }; - } - }; - - match page_result { + match self.page_reader.get_next_page()? { // No more page to read None => return Ok(false), Some(current_page) => { @@ -497,7 +415,6 @@ where encoding, is_sorted, } => { - self.synthetic_page = false; self.values_decoder .set_dict(buf, num_values, encoding, is_sorted)?; continue; @@ -511,7 +428,6 @@ where rep_level_encoding, statistics: _, } => { - self.synthetic_page = false; self.num_buffered_values = num_values as _; self.num_decoded_values = 0; @@ -581,7 +497,6 @@ where )); } - self.synthetic_page = false; self.num_buffered_values = num_values as _; self.num_decoded_values = 0; @@ -626,44 +541,6 @@ where } } - fn try_create_synthetic_page(&mut self, metadata: Option) -> Result<()> { - if self.descr.max_rep_level() != 0 { - return Err(general_err!( - "cannot synthesise sparse page for column with repetition levels ({message})" - )); - } - - if self.descr.max_def_level() == 0 { - return Err(general_err!( - "cannot synthesise sparse page for required column ({message})" - )); - } - - let Some(meta) = metadata else { - return Err(general_err!( - "missing page metadata for sparse column chunk ({message})" - )); - }; - - if meta.is_dict { - return Err(general_err!( - "unexpected dictionary page error while synthesising sparse page ({message})" - )); - } - - let num_levels = meta.num_levels.or(meta.num_rows).ok_or_else(|| { - general_err!("page metadata missing level counts for sparse column chunk ({message})") - })?; - - self.page_reader.skip_next_page()?; - - self.num_buffered_values = num_levels; - self.num_decoded_values = 0; - self.synthetic_page = true; - self.has_record_delimiter = true; - Ok(()) - } - /// Check whether there is more data to read from this column, /// If the current page is fully decoded, this will load the next page /// (if it exists) into the buffer @@ -721,7 +598,6 @@ mod tests { use std::{collections::VecDeque, sync::Arc}; use crate::basic::Type as PhysicalType; - use crate::column::page::{Page, PageMetadata}; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType}; use crate::util::test_common::page_util::InMemoryPageReader; use crate::util::test_common::rand_gen::make_pages; @@ -1186,137 +1062,6 @@ mod tests { ); } - #[test] - fn test_synthetic_sparse_page_fills_nulls() { - let primitive_type = SchemaType::primitive_type_builder("a", PhysicalType::INT32) - .with_repetition(Repetition::OPTIONAL) - .build() - .expect("build() should be OK"); - - let desc = Arc::new(ColumnDescriptor::new( - Arc::new(primitive_type), - 1, - 0, - ColumnPath::new(Vec::new()), - )); - - let levels = 8; - let page_reader: Box = Box::new(MissingPageReader::new(levels)); - - let mut reader = ColumnReaderImpl::::new(desc, page_reader); - - let mut values = Vec::new(); - let mut def_levels = Vec::new(); - - let (records, non_null_values, levels_read) = reader - .read_records(levels, Some(&mut def_levels), None, &mut values) - .expect("reading synthetic page succeeds"); - - assert_eq!(records, levels); - assert_eq!(levels_read, levels); - assert_eq!(non_null_values, 0); - assert!(values.is_empty()); - assert_eq!(def_levels, vec![0; levels]); - - // Subsequent read should indicate no additional records - def_levels.clear(); - let (records, non_null_values, levels_read) = reader - .read_records(levels, Some(&mut def_levels), None, &mut values) - .expect("no further pages"); - assert_eq!(records, 0); - assert_eq!(levels_read, 0); - assert_eq!(non_null_values, 0); - } - - #[test] - fn test_synthetic_sparse_page_preserves_parent_levels() { - let primitive_type = SchemaType::primitive_type_builder("leaf", PhysicalType::INT32) - .with_repetition(Repetition::OPTIONAL) - .build() - .expect("build() should be OK"); - - let desc = Arc::new(ColumnDescriptor::new( - Arc::new(primitive_type), - 2, - 0, - ColumnPath::new(vec!["struct".to_string(), "leaf".to_string()]), - )); - - let levels = 6; - let page_reader: Box = Box::new(MissingPageReader::new(levels)); - - let mut reader = ColumnReaderImpl::::new(desc, page_reader); - - let mut values = Vec::new(); - let mut def_levels = Vec::new(); - - let (records, non_null_values, levels_read) = reader - .read_records(levels, Some(&mut def_levels), None, &mut values) - .expect("reading synthetic page succeeds"); - - assert_eq!(records, levels); - assert_eq!(levels_read, levels); - assert_eq!(non_null_values, 0); - assert!(values.is_empty()); - assert_eq!(def_levels, vec![1; levels]); - } - - struct MissingPageReader { - metadata: Option, - skipped: bool, - } - - impl MissingPageReader { - fn new(levels: usize) -> Self { - Self { - metadata: Some(PageMetadata { - num_rows: Some(levels), - num_levels: Some(levels), - is_dict: false, - }), - skipped: false, - } - } - } - - impl Iterator for MissingPageReader { - type Item = Result; - - fn next(&mut self) -> Option { - match self.get_next_page() { - Ok(Some(page)) => Some(Ok(page)), - Ok(None) => None, - Err(err) => Some(Err(err)), - } - } - } - - impl PageReader for MissingPageReader { - fn get_next_page(&mut self) -> Result> { - if self.skipped { - Ok(None) - } else { - Err(general_err!( - "Invalid offset in sparse column chunk data: 0" - )) - } - } - - fn peek_next_page(&mut self) -> Result> { - Ok(self.metadata.clone()) - } - - fn skip_next_page(&mut self) -> Result<()> { - self.metadata = None; - self.skipped = true; - Ok(()) - } - - fn at_record_boundary(&mut self) -> Result { - Ok(true) - } - } - // ---------------------------------------------------------------------- // Helper methods to make pages and test // diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 2ef8a57a382b..e49906207577 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -83,18 +83,6 @@ pub trait DefinitionLevelDecoder: ColumnLevelDecoder { /// /// Returns the number of values skipped, and the number of levels skipped. fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>; - - /// Append `num_levels` copies of `null_level` to `out`, returning how - /// many of those levels correspond to non-null values given `max_level`. - /// - /// This is used when synthesising all-null pages for sparse column chunks. - fn append_null_def_levels( - &mut self, - out: &mut Self::Buffer, - num_levels: usize, - null_def_level: i16, - max_level: i16, - ) -> Result; } /// Decodes value data @@ -365,22 +353,6 @@ impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl { Ok((value_skip, level_skip)) } - - fn append_null_def_levels( - &mut self, - out: &mut Self::Buffer, - num_levels: usize, - null_def_level: i16, - max_level: i16, - ) -> Result { - let start = out.len(); - out.resize(start + num_levels, null_def_level); - Ok(if null_def_level == max_level { - num_levels - } else { - 0 - }) - } } pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;