Skip to content

Commit

Permalink
Add ParquetAccessPlan that describes which part of the parquet file…
Browse files Browse the repository at this point in the history
…s to read
  • Loading branch information
alamb committed May 31, 2024
1 parent a0773cd commit 47ed64a
Show file tree
Hide file tree
Showing 5 changed files with 496 additions and 223 deletions.
297 changes: 297 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::RowGroupMetaData;

/// Specifies a selection of rows and row groups within a ParquetFile to decode.
///
/// A `ParquetAccessPlan` is used to limits the row groups and data pages a `ParquetExec`
/// will read and decode and this improve performance.
///
/// Note that page level pruning based on ArrowPredicate is applied after all of
/// these selections
///
/// This looks like:
/// (TODO diagram)
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetAccessPlan {
/// How to access the i-th row group
row_groups: Vec<RowGroupAccess>,
}

#[derive(Debug, Clone, PartialEq)]
pub enum RowGroupAccess {
/// The row group should not be read at all
Skip,
/// The row group should be scanned fully
Scan,
/// Only the specified rows within the row group should be scanned
Selection(RowSelection),
}

impl RowGroupAccess {
/// return true if this row group should be scanned
pub fn should_scan(&self) -> bool {
match self {
RowGroupAccess::Skip => false,
RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true,
}
}
}

impl ParquetAccessPlan {
/// Create a new `ParquetAccessPlan` that scans all row groups
pub fn new_all(row_group_count: usize) -> Self {
Self {
row_groups: vec![RowGroupAccess::Scan; row_group_count],
}
}

/// Create a new `ParquetAccessPlan` from the specified [`RowGroupAccess`]es
pub fn new(row_groups: Vec<RowGroupAccess>) -> Self {
Self { row_groups }
}

/// Set the i-th row group to false (should not be scanned)
pub fn do_not_scan(&mut self, idx: usize) {
self.row_groups[idx] = RowGroupAccess::Skip;
}

/// Return true if the i-th row group should be scanned
pub fn should_scan(&self, idx: usize) -> bool {
self.row_groups[idx].should_scan()
}

/// Set to scan only the [`RowSelection`] in the specified row group.
///
/// Based on the existing row groups plan:
/// * Skip: does nothing
/// * Scan: Updates to scan only the rows in the `RowSelection`
/// * Selection: Updates to scan only the specified in the exising selection and the new selection
pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) {
self.row_groups[idx] = match &self.row_groups[idx] {
// already skipping the entire row group
RowGroupAccess::Skip => RowGroupAccess::Skip,
RowGroupAccess::Scan => RowGroupAccess::Selection(selection),
RowGroupAccess::Selection(existing_selection) => {
RowGroupAccess::Selection(existing_selection.intersection(&selection))
}
}
}

/// Return the overall RowSelection for all scanned row groups, if
/// there are any RowGroupAccess::Selection;
///
///
/// TODO better doc / explanation
pub fn overall_row_selection(
&self,
row_group_meta_data: &[RowGroupMetaData],
) -> Option<RowSelection> {
if !self
.row_groups
.iter()
.any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
{
return None;
}
assert_eq!(row_group_meta_data.len(), self.row_groups.len());

let total_selection: RowSelection = self
.row_groups
.iter()
.zip(row_group_meta_data.iter())
.flat_map(|(rg, rg_meta)| {
match rg {
RowGroupAccess::Skip => vec![],
RowGroupAccess::Scan => {
// need a row group access to scan the entire row group (need row group counts)
vec![RowSelector::select(rg_meta.num_rows() as usize)]
}
RowGroupAccess::Selection(selection) => {
// todo avoid these clones
let selection: Vec<RowSelector> = selection.clone().into();
selection
}
}
})
.collect();

Some(total_selection)
}

/// Return an iterator over the row group indexes that should be scanned
pub fn row_group_index_iter(&self) -> impl Iterator<Item = usize> + '_ {
self.row_groups.iter().enumerate().filter_map(|(idx, b)| {
if b.should_scan() {
Some(idx)
} else {
None
}
})
}

/// Return a vec of all row group indexes to scan
pub fn row_group_indexes(&self) -> Vec<usize> {
self.row_group_index_iter().collect()
}

/// Return the total number of row groups (not the total number to be scanned)
pub fn len(&self) -> usize {
self.row_groups.len()
}

/// Return true if there are no row groups
pub fn is_empty(&self) -> bool {
self.row_groups.is_empty()
}
}

#[cfg(test)]
mod test {
use super::*;
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use std::sync::{Arc, OnceLock};

#[test]
fn test_overall_row_selection_only_scans() {
assert_eq!(
overall_row_selection(vec![
RowGroupAccess::Scan,
RowGroupAccess::Scan,
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]),
None
);
}

#[test]
fn test_overall_row_selection_only_skips() {
assert_eq!(
overall_row_selection(vec![
RowGroupAccess::Skip,
RowGroupAccess::Skip,
RowGroupAccess::Skip,
RowGroupAccess::Skip,
]),
None
);
}
#[test]
fn test_overall_row_selection_mixed_1() {
assert_eq!(
overall_row_selection(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into()
),
RowGroupAccess::Skip,
RowGroupAccess::Skip,
]),
Some(
vec![
// select the entire first row group
RowSelector::select(10),
// selectors from the second row group
RowSelector::select(5),
RowSelector::skip(7)
]
.into()
)
);
}

#[test]
fn test_overall_row_selection_mixed_2() {
assert_eq!(
overall_row_selection(vec![
RowGroupAccess::Skip,
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into()
),
RowGroupAccess::Scan,
]),
Some(
vec![
// select the entire second row group
RowSelector::select(20),
// selectors from the third row group
RowSelector::select(5),
RowSelector::skip(7),
// select the entire fourth row group
RowSelector::select(40),
]
.into()
)
);
}

/// Computes the overall row selection for the given row group access list
fn overall_row_selection(
row_group_access: Vec<RowGroupAccess>,
) -> Option<RowSelection> {
let access_plan = ParquetAccessPlan::new(row_group_access);
access_plan.overall_row_selection(row_group_metadata())
}

static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();

/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
/// respectively
fn row_group_metadata() -> &'static [RowGroupMetaData] {
ROW_GROUP_METADATA.get_or_init(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
})
}

/// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
fn get_test_schema_descr() -> SchemaDescPtr {
use parquet::basic::Type as PhysicalType;
use parquet::schema::types::Type as SchemaType;
let field = SchemaType::primitive_type_builder("a", PhysicalType::BYTE_ARRAY)
.with_logical_type(Some(LogicalType::String))
.build()
.unwrap();
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![Arc::new(field)])
.build()
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
}
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use log::debug;
use parquet::basic::{ConvertedType, LogicalType};
use parquet::schema::types::ColumnDescriptor;

mod access_plan;
mod metrics;
mod opener;
mod page_filter;
Expand All @@ -59,6 +60,7 @@ mod writer;
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
pub use access_plan::ParquetAccessPlan;
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
Expand Down Expand Up @@ -152,8 +154,9 @@ pub use writer::plan_to_parquet;
/// the file.
///
/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
/// via [`ParquetFileReaderFactory`] and applies any predicates and projections
/// to determine what pages must be read.
/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by
/// applying predicates. The plan and projections are used to determine what
/// pages must be read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
Expand Down
28 changes: 17 additions & 11 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
//! [`ParquetOpener`] for opening Parquet files

use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
use crate::datasource::physical_plan::parquet::{row_filter, should_enable_page_index};
use crate::datasource::physical_plan::parquet::row_groups::RowGroupPlanBuilder;
use crate::datasource::physical_plan::parquet::{
row_filter, should_enable_page_index, ParquetAccessPlan,
};
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory,
};
Expand Down Expand Up @@ -137,7 +139,8 @@ impl FileOpener for ParquetOpener {
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let mut row_groups = RowGroupSet::new(rg_metadata.len());
let access_plan = ParquetAccessPlan::new_all(rg_metadata.len());
let mut row_groups = RowGroupPlanBuilder::new(access_plan);
// if there is a range restricting what parts of the file to read
if let Some(range) = file_range.as_ref() {
row_groups.prune_by_range(rg_metadata, range);
Expand All @@ -164,32 +167,35 @@ impl FileOpener for ParquetOpener {
}
}

let mut access_plan = row_groups.build();

// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
if enable_page_index && !row_groups.is_empty() {
if enable_page_index && !access_plan.is_empty() {
if let Some(p) = page_pruning_predicate {
let pruned = p.prune(
access_plan = p.prune_plan_with_page_index(
access_plan,
&file_schema,
builder.parquet_schema(),
&row_groups,
file_metadata.as_ref(),
&file_metrics,
)?;
if let Some(row_selection) = pruned {
builder = builder.with_row_selection(row_selection);
}
);
}
}

if let Some(row_selection) = access_plan.overall_row_selection(rg_metadata) {
builder = builder.with_row_selection(row_selection);
}

if let Some(limit) = limit {
builder = builder.with_limit(limit)
}

let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
.with_row_groups(row_groups.indexes())
.with_row_groups(access_plan.row_group_indexes())
.build()?;

let adapted = stream
Expand Down
Loading

0 comments on commit 47ed64a

Please sign in to comment.