Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::file::metadata::{
PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
ParquetStatisticsPolicy, RowGroupMetaData,
};
use crate::file::page_index::offset_index::PageLocation;
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;

Expand Down Expand Up @@ -1184,7 +1185,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
.build_limited()
.build();

Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
Ok(ParquetRecordBatchReader::new(array_reader, read_plan, None))
}
}

Expand Down Expand Up @@ -1286,6 +1287,7 @@ pub struct ParquetRecordBatchReader {
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
read_plan: ReadPlan,
page_offsets: Option<Vec<PageLocation>>,
}

impl Debug for ParquetRecordBatchReader {
Expand Down Expand Up @@ -1324,8 +1326,11 @@ impl ParquetRecordBatchReader {
RowSelectionCursor::Mask(mask_cursor) => {
// Stream the record batch reader using contiguous segments of the selection
// mask, avoiding the need to materialize intermediate `RowSelector` ranges.
let page_locations = self.page_offsets.as_deref();

while !mask_cursor.is_empty() {
let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size, page_locations)
else {
return Ok(None);
};

Expand Down Expand Up @@ -1492,13 +1497,18 @@ impl ParquetRecordBatchReader {
array_reader,
schema: Arc::new(Schema::new(levels.fields.clone())),
read_plan,
page_offsets: None,
})
}

/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
/// all rows will be returned
pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
pub(crate) fn new(
array_reader: Box<dyn ArrayReader>,
read_plan: ReadPlan,
page_offsets: Option<Vec<PageLocation>>,
) -> Self {
let schema = match array_reader.get_data_type() {
ArrowType::Struct(fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
Expand All @@ -1508,6 +1518,7 @@ impl ParquetRecordBatchReader {
array_reader,
schema: Arc::new(schema),
read_plan,
page_offsets,
}
}

Expand Down Expand Up @@ -5611,6 +5622,14 @@ pub(crate) mod tests {
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
let result = concat_batches(&schema, &batches).unwrap();
assert_eq!(result.num_rows(), 2);
assert_eq!(
result.column(0).as_ref(),
&Int64Array::from(vec![first_value, last_value])
);
assert_eq!(
result.column(1).as_ref(),
&Int64Array::from(vec![first_value, last_value])
);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/read_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl ReadPlanBuilder {
array_reader: Box<dyn ArrayReader>,
predicate: &mut dyn ArrowPredicate,
) -> Result<Self> {
let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build(), None);
let mut filters = vec![];
for maybe_batch in reader {
let maybe_batch = maybe_batch?;
Expand Down
76 changes: 68 additions & 8 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ impl RowSelection {
})
}

/// Returns true if selectors should be forced, preventing mask materialisation
pub(crate) fn should_force_selectors(
/// Returns true if bitmasks should be page aware
pub(crate) fn requires_page_aware_mask(
&self,
projection: &ProjectionMask,
offset_index: Option<&[OffsetIndexMetaData]>,
Expand Down Expand Up @@ -778,8 +778,13 @@ impl MaskCursor {
self.position >= self.mask.len()
}

/// Advance through the mask representation, producing the next chunk summary
pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option<MaskChunk> {
/// Advance through the mask representation, producing the next chunk summary.
/// Optionally clips chunk boundaries to page boundaries.
pub fn next_mask_chunk(
&mut self,
batch_size: usize,
page_locations: Option<&[PageLocation]>,
) -> Option<MaskChunk> {
let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = {
let mask = &self.mask;

Expand All @@ -791,6 +796,7 @@ impl MaskCursor {
let mut cursor = start_position;
let mut initial_skip = 0;

// Skip unselected rows
while cursor < mask.len() && !mask.value(cursor) {
initial_skip += 1;
cursor += 1;
Expand All @@ -800,10 +806,21 @@ impl MaskCursor {
let mut chunk_rows = 0;
let mut selected_rows = 0;

// Advance until enough rows have been selected to satisfy the batch size,
// or until the mask is exhausted. This mirrors the behaviour of the legacy
// `RowSelector` queue-based iteration.
while cursor < mask.len() && selected_rows < batch_size {
let max_chunk_rows = page_locations
.and_then(|pages| {
let next_idx =
pages.partition_point(|loc| loc.first_row_index as usize <= mask_start);
pages.get(next_idx).and_then(|loc| {
let page_start = loc.first_row_index as usize;
(page_start > mask_start).then_some(page_start - mask_start)
})
})
.unwrap_or(usize::MAX);

// Advance until enough rows have been selected to satisfy batch_size,
// or until the mask is exhausted or until a page boundary.
while cursor < mask.len() && selected_rows < batch_size && chunk_rows < max_chunk_rows {
// Increment counters
chunk_rows += 1;
if mask.value(cursor) {
selected_rows += 1;
Expand Down Expand Up @@ -1090,6 +1107,49 @@ mod tests {
);
}

#[test]
fn test_mask_cursor_page_aware_chunking() {
let selectors = vec![RowSelector::skip(2), RowSelector::select(10)];
let mask = boolean_mask_from_selectors(&selectors);
let mut cursor = MaskCursor { mask, position: 0 };

let pages = vec![
PageLocation {
offset: 0,
compressed_page_size: 1,
first_row_index: 0,
},
PageLocation {
offset: 1,
compressed_page_size: 1,
first_row_index: 4,
},
PageLocation {
offset: 2,
compressed_page_size: 1,
first_row_index: 8,
},
PageLocation {
offset: 3,
compressed_page_size: 1,
first_row_index: 12,
},
];
// First chunk is page 1
let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap();
assert_eq!(chunk.initial_skip, 2);
assert_eq!(chunk.mask_start, 2);
assert_eq!(chunk.chunk_rows, 2);
assert_eq!(chunk.selected_rows, 2);

// Second chunk is page 2
let chunk = cursor.next_mask_chunk(100, Some(&pages)).unwrap();
assert_eq!(chunk.initial_skip, 0);
assert_eq!(chunk.mask_start, 4);
assert_eq!(chunk.chunk_rows, 4);
assert_eq!(chunk.selected_rows, 4);
}

#[test]
fn test_and() {
let mut a = RowSelection::from(vec![
Expand Down
8 changes: 8 additions & 0 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,14 @@ mod tests {
let batches: Vec<_> = stream.try_collect().await.unwrap();
let result = concat_batches(&schema, &batches).unwrap();
assert_eq!(result.num_rows(), 2);
assert_eq!(
result.column(0).as_ref(),
&Int64Array::from(vec![first_value, last_value])
);
assert_eq!(
result.column(1).as_ref(),
&Int64Array::from(vec![first_value, last_value])
);
}

#[tokio::test]
Expand Down
79 changes: 20 additions & 59 deletions parquet/src/arrow/push_decoder/reader_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use crate::DecodeResult;
use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
use crate::arrow::arrow_reader::{
ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionPolicy,
selection::RowSelectionStrategy,
};
use crate::arrow::in_memory_row_group::ColumnChunkData;
use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
Expand Down Expand Up @@ -536,12 +536,6 @@ impl RowGroupReaderBuilder {

plan_builder = plan_builder.with_row_selection_policy(self.row_selection_policy);

plan_builder = override_selector_strategy_if_needed(
plan_builder,
&self.projection,
self.row_group_offset_index(row_group_idx),
);

let row_group_info = RowGroupInfo {
row_group_idx,
row_count,
Expand Down Expand Up @@ -588,6 +582,23 @@ impl RowGroupReaderBuilder {
&mut self.buffers,
)?;

// before plan is build below
// check if plan is bitmask and if it is, put it in a variable
let page_offsets = if plan_builder.resolve_selection_strategy()
== RowSelectionStrategy::Mask
&& plan_builder.selection().is_some_and(|selection| {
selection.requires_page_aware_mask(
&self.projection,
self.row_group_offset_index(row_group_idx),
)
}) {
self.row_group_offset_index(row_group_idx)
.and_then(|columns| columns.first())
.map(|column| column.page_locations())
} else {
None
};

let plan = plan_builder.build();

// if we have any cached results, connect them up
Expand All @@ -603,7 +614,8 @@ impl RowGroupReaderBuilder {
.build_array_reader(self.fields.as_deref(), &self.projection)
}?;

let reader = ParquetRecordBatchReader::new(array_reader, plan);
let reader =
ParquetRecordBatchReader::new(array_reader, plan, page_offsets.cloned());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloned may cause extra expense here, can we use Arc<[PageLocation]> to avoid that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a big api change to make PageLocation or OffsetIndexMetadataData an Arc inside ParquetMetaData.

If we'd want to make that change, I can open an issue and work up a PR.

NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader))
}
RowGroupDecoderState::Finished => {
Expand Down Expand Up @@ -654,57 +666,6 @@ impl RowGroupReaderBuilder {
}
}

/// Override the selection strategy if needed.
///
/// Some pages can be skipped during row-group construction if they are not read
/// by the selections. This means that the data pages for those rows are never
/// loaded and definition/repetition levels are never read. When using
/// `RowSelections` selection works because `skip_records()` handles this
/// case and skips the page accordingly.
///
/// However, with the current mask design, all values must be read and decoded
/// and then a mask filter is applied. Thus if any pages are skipped during
/// row-group construction, the data pages are missing and cannot be decoded.
///
/// A simple example:
/// * the page size is 2, the mask is 100001, row selection should be read(1) skip(4) read(1)
/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01)
///
/// Using the row selection to skip(4), page2 won't be read at all, so in this
/// case we can't decode all the rows and apply a mask. To correctly apply the
/// bit mask, we need all 6 values be read, but page2 is not in memory.
fn override_selector_strategy_if_needed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice -- the idea is to avoid this function 👍

plan_builder: ReadPlanBuilder,
projection_mask: &ProjectionMask,
offset_index: Option<&[OffsetIndexMetaData]>,
) -> ReadPlanBuilder {
// override only applies to Auto policy, If the policy is already Mask or Selectors, respect that
let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy() else {
return plan_builder;
};

let preferred_strategy = plan_builder.resolve_selection_strategy();

let force_selectors = matches!(preferred_strategy, RowSelectionStrategy::Mask)
&& plan_builder.selection().is_some_and(|selection| {
selection.should_force_selectors(projection_mask, offset_index)
});

let resolved_strategy = if force_selectors {
RowSelectionStrategy::Selectors
} else {
preferred_strategy
};

// override the plan builder strategy with the resolved one
let new_policy = match resolved_strategy {
RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
};

plan_builder.with_row_selection_policy(new_policy)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading