Skip to content

Commit

Permalink
Rewrite page filter in terms of ParquetAccessPlan
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 31, 2024
1 parent 66a5936 commit f301a73
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use parquet::arrow::arrow_reader::RowSelection;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};

/// Specifies a selection of rows and row groups within a ParquetFile to decode.
///
Expand Down Expand Up @@ -71,6 +71,60 @@ impl ParquetAccessPlan {
self.row_groups[idx].should_scan()
}

/// Set to scan only the [`RowSelection`] in the specified row group.
///
/// Based on the existing row groups plan:
/// * Skip: does nothing
/// * Scan: Updates to scan only the rows in the `RowSelection`
/// * Selection: Updates to scan only the specified in the exising selection and the new selection
pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) {
self.row_groups[idx] = match &self.row_groups[idx] {
// already skipping the entire row group
RowGroupAccess::Skip => RowGroupAccess::Skip,
RowGroupAccess::Scan => RowGroupAccess::Selection(selection),
RowGroupAccess::Selection(existing_selection) => {
RowGroupAccess::Selection(existing_selection.intersection(&selection))
}
}
}

/// Return the overall RowSelection for all scanned row groups, if
/// there are any RowGroupAccess::Selection;
///
///
/// TODO better doc / explanation
pub fn overall_row_selection(&self) -> Option<RowSelection> {
if !self
.row_groups
.iter()
.any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
{
return None;
}

let total_selection: RowSelection = self
.row_groups
.iter()
.flat_map(|rg| {
match rg {
RowGroupAccess::Skip => vec![],
RowGroupAccess::Scan => {
// need a row group access to scan the entire row group (need row group counts)
// This is clearly not tested TODO
todo!();
}
RowGroupAccess::Selection(selection) => {
// todo avoid these clones
let selection: Vec<RowSelector> = selection.clone().into();
selection
}
}
})
.collect();

Some(total_selection)
}

/// Return an iterator over the row group indexes that should be scanned
pub fn row_group_index_iter(&self) -> impl Iterator<Item = usize> + '_ {
self.row_groups.iter().enumerate().filter_map(|(idx, b)| {
Expand Down
15 changes: 8 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,26 +164,27 @@ impl FileOpener for ParquetOpener {
}
}

let access_plan = row_groups.build();
let mut access_plan = row_groups.build();

// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
if enable_page_index && !access_plan.is_empty() {
if let Some(p) = page_pruning_predicate {
let pruned = p.prune(
access_plan = p.prune(
&file_schema,
builder.parquet_schema(),
&access_plan,
access_plan,
file_metadata.as_ref(),
&file_metrics,
)?;
if let Some(row_selection) = pruned {
builder = builder.with_row_selection(row_selection);
}
);
}
}

if let Some(row_selection) = access_plan.overall_row_selection() {
builder = builder.with_row_selection(row_selection);
}

if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
Expand Down
Loading

0 comments on commit f301a73

Please sign in to comment.