Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ParquetAccessPlan, unify RowGroup selection and PagePruning selection #10738

Merged
merged 8 commits into from
Jun 6, 2024

Conversation

alamb
Copy link
Contributor

@alamb alamb commented May 31, 2024

Which issue does this PR close?

This is part of #9929, a way to provide row selections from an outside index to the parquet reader

Rationale for this change

My highlevel plan, which you can see in action in #10701 is that the ParquetAccessPlan is the structure that users pass to the parquet reader to select row groups and pages.

The idea is that a user can provide a starting ParquetAccessPlan which can be then further refined by the
parquet reader by reading file metadata, if needed.

What changes are included in this PR?

  1. Split out the representation of which RowGroups to scan from the code to prune them (RowGroupBuilder)
  2. Update the page pruning to update ParquetAccessPlan
  3. Copious documentation

Are these changes tested?

Covered by existing tests

Are there any user-facing changes?

Not yet -- this is all internal. I will make a follow on PR with a proposed API for passing this structure in

@alamb alamb marked this pull request as draft May 31, 2024 12:03
@github-actions github-actions bot added the core Core DataFusion crate label May 31, 2024
@alamb alamb force-pushed the alamb/unified_rg_and_page branch from f301a73 to 301f27d Compare May 31, 2024 13:19
@alamb alamb changed the title Add ParquetAccessPlan to describe which row groups and which rows should be read Add ParquetAccessPlan, unify RowGroup selection and PagePruning selection May 31, 2024
@alamb alamb force-pushed the alamb/unified_rg_and_page branch 2 times, most recently from 47ed64a to d638b8f Compare May 31, 2024 17:06
@alamb alamb force-pushed the alamb/unified_rg_and_page branch from 5b1b5eb to 3bd9b04 Compare June 1, 2024 10:58
use std::sync::{Arc, OnceLock};

#[test]
fn test_overall_row_selection_only_scans() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added these tests because I found the existing tests don't cover the case where there is a mix of scan some entire row groups and only scan the rows of another (happens when the statistics can't be extracted or there is some error evaluating the page pruning predicate only on some row groups)

@@ -152,8 +154,9 @@ pub use writer::plan_to_parquet;
/// the file.
///
/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
/// via [`ParquetFileReaderFactory`] and applies any predicates and projections
/// to determine what pages must be read.
/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change / reason for this PR is to encapsulate the decision of what row groups / what row selection to scan into a single struct (in this case ParquetAccessPlan) so that that the information can be passed in from the outside as well

Aka ParquetAccessPlan will be the struct passed in the API described in #9929

}
}

let row_group_indexes = access_plan.row_group_indexes();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while the rationale is to pass in scan restrictions from outside, I actually think encapsulating the logic for scan / skip into ParquetAccessPlan has made the existing logic easier to follow as well

}

let page_index_predicates = &self.predicates;
let groups = file_metadata.row_groups();

if groups.is_empty() {
return Ok(None);
return access_plan;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reworked this logic in two ways:

  1. The loop is now for each row group (rather than for each predicate)
  2. The row selection is incrementally built up for each row group rather than being created all at the end.

I think this logic is clearer and also is no less performant (the same work is still done)

///
/// The final selection is the intersection of these `RowSelector`s:
/// * `final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]`
fn combine_multi_col_selection(row_selections: Vec<Vec<RowSelector>>) -> RowSelection {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this intersection is now done in ParquetAccessPlan::scan_selection

pub struct RowGroupSet {
/// `row_groups[i]` is true if the i-th row group should be scanned
row_groups: Vec<bool>,
#[derive(Debug, Clone, PartialEq)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this extracts the plan of what to scan from the logic that prunes it based on row group statistic metadat

@alamb
Copy link
Contributor Author

alamb commented Jun 3, 2024

@Ted-Jiang I wonder if you have time to review this PR

cc @hengfeiyang, @thinkharderdev @crepererum @waynexia and @xinlifoobar for any comments you may have

Copy link
Contributor

@hengfeiyang hengfeiyang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, the ParquetAccessPlan API is very clear.

@alamb
Copy link
Contributor Author

alamb commented Jun 5, 2024

Also FYI @advancedxy

Copy link
Contributor

@crepererum crepererum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems sensible to have a well-designed IR (intermediate representation) for that stuff so that both the internal code and the future API users benefit from that. 👍

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for ping me. I left some comments in the code. The code looks much clearer over all.

if !self
.row_groups
.iter()
.any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm. I don't think this is correct. Unless all the RowGroupAccess is Skip we can simply return none here. Otherwise, we should still build RowSelection for the Scan Access.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question.

I think the reasoning here is that actually correct because any RowGroupAccess that is Skip is filtered by skipping the row groups itself (and thus there are no rows to select).

I will improve the documentation and add some tests that show how this works

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about all the RowGroupAccess is Scan then(which means all the row group should be selected)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same reasoning applies there (that the entire rowgroup is scanned)

The intuition is that entire row groups are filtered out using Skip and Scan -- an overall RowSelection is only useful if there is any parts within a row group which can be filtered out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to clarify with comments and some test updates in 8d44ed2 -- let me know what you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I think the example and the following code in L214 - L218 is quite misleading.

BTW, I did follow the code in /path/to/arrow-rs/src/index.crates.io-6f17d22bba15001f/parquet-51.0.0/src/arrow/async_reader/mod.rs:601's poll_next method, namely the following code:

                    let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;

                    let selection = self.selection.as_mut().map(|s| s.split_off(row_count));

                    let fut = reader
                        .read_row_group(
                            row_group_idx,
                            selection,
                            self.projection.clone(),
                            self.batch_size,
                        )
                        .boxed();

I'm not sure I understand the code correctly, but it looks to me that the row_selection only consider one row group per parquet file? Otherwise, I believe the row_count(or the split_off point) should be the row count has been accumulated with all the previous row groups?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am somewhat confused too -- I will look into this more carefully

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the code correctly, but it looks to me that the row_selection only consider one row group per parquet file? Otherwise, I believe the row_count(or the split_off point) should be the row count has been accumulated with all the previous row groups?

Yes, I think that is correct

I have updated the example in a76f95a to more accurately reflect what is going on, which I think makes the behavior clearer

I will also make a PR to the arrow repo trying to clarify with an example as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apache/arrow-rs#5850 to further improve the parquet crate docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this so subtle it turns out my unit tests don't get the edge cases correct. I added error checking and more tests as part of #10813

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm now, except one minor comment.

///
/// If there are no [`RowGroupAccess::Selection`]s, the overall row
/// selection is `None` because each row group is either entirely skipped or
/// scanned, as specified by [`Self::row_group_indexes`].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: How about add some comment about the mixed RowGroupAccess.

    /// If there are no [`RowGroupAccess::Selection`]s, the overall row
    /// selection is `None` because each row group is either entirely skipped or
    /// scanned, as specified by [`Self::row_group_indexes`]. If the RowGroupAccess::Scan
    /// is also included in the list, the entire row group selection should be populated as well.

I think that matches the comment and the following code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent idea - I added more comments and examples in a76f95a. Thank you

@Ted-Jiang
Copy link
Member

Thanks for ping me, i will review this carefully later.

Copy link
Member

@Ted-Jiang Ted-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LTGM 👍 this looks really well-designed and tidy, Now the logic seems more clearer ❤️

) -> Option<RowSelection> {
match current_selection {
None => Some(row_selection),
Some(current_selection) => Some(current_selection.intersection(&row_selection)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb maybe we can check after the intersection the result row_selection is select 0 row ? 🤔 then we can cast it to skip. IMO select-0 seems need do more effort in next round check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Thank you @Ted-Jiang I made this change in e2dec61

@alamb
Copy link
Contributor Author

alamb commented Jun 6, 2024

Thank you everyone for the feedback and reviews. I will now merge this PR and will address any additional feedback as follow on PRs

@adriangb
Copy link
Contributor

adriangb commented Jun 12, 2024

@alamb is there any documentation on what it means for DataFusion to "scan" specific rows within a row group? Does it actually read only those rows? I'd imagine that because of some mix of compression and limitations of byte range fetches to contiguous bytes for object stores you end up streaming entire row groups anyway.

Edit: looks like there's info in https://github.com/apache/parquet-format/blob/master/PageIndex.md, and guess who's committed there 😆. I still don't fully understand the implications but this is a good start and reading homework for me.

@alamb
Copy link
Contributor Author

alamb commented Jun 12, 2024

@alamb is there any documentation on what it means for DataFusion to "scan" specific rows within a row group? Does it actually read only those rows? I'd imagine that because of some mix of compression and limitations of byte range fetches to contiguous bytes for object stores you end up streaming entire row groups anyway.

Specifically, DataFusion uses this API: https://github.com/apache/arrow-rs/blob/0cc14168000e1e41fc5f63929d34d13dda6e5873/parquet/src/arrow/arrow_reader/mod.rs#L137-L194

Which if you have the PageIndex (which is written by default in the parquet rs writer) the reader may be able to skip certain pages

@alamb alamb deleted the alamb/unified_rg_and_page branch June 12, 2024 22:24
@thinkharderdev
Copy link
Contributor

@alamb is there any documentation on what it means for DataFusion to "scan" specific rows within a row group? Does it actually read only those rows? I'd imagine that because of some mix of compression and limitations of byte range fetches to contiguous bytes for object stores you end up streaming entire row groups anyway.

Specifically, DataFusion uses this API: https://github.com/apache/arrow-rs/blob/0cc14168000e1e41fc5f63929d34d13dda6e5873/parquet/src/arrow/arrow_reader/mod.rs#L137-L194

Which if you have the PageIndex (which is written by default in the parquet rs writer) the reader may be able to skip certain pages

Yeah so conceptually how it works is that once we have a RowSelection we can

  1. If there is a PageIndex, we can compare the RowSelection to the PageIndex and fetch only the data pages which contain selected rows (and hence prune IO)
  2. While decoding the data pages that were fetched we can skip decoding of rows that were not selected. Depending on the exact datatype this can be more or less useful. For something that is delta encoded, you can't really skip decoding within mini-blocks so it probably doesn't make a huge difference, but with a fixed-size datatype you can skip over an arbitrary number of rows by just jumping directly to the next selected row and potentially save a bunch of CPU cycles.

findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
…ection (apache#10738)

* Add `ParquetAccessPlan` that describes which part of the parquet files to read

* Rename to RowGroupAccessPlanFilter

* Clarify when overall selection is needed

* Update documentation to exlain the relationship between scan/skip/selection

* Break early of the row selection is empty
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants