Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support user defined ParquetAccessPlan in ParquetExec, validation to ParquetAccessPlan::select #10813

Merged
merged 6 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ impl PartitionedFile {
self.range = Some(FileRange { start, end });
self
}

/// Update the user defined extensions for this file.
///
/// This can be used to pass reader specific information.
pub fn with_extensions(
mut self,
extensions: Arc<dyn std::any::Any + Send + Sync>,
) -> Self {
self.extensions = Some(extensions);
self
}
}

impl From<ObjectMeta> for PartitionedFile {
Expand Down
121 changes: 111 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_common::{internal_err, Result};
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::RowGroupMetaData;

Expand Down Expand Up @@ -182,6 +183,11 @@ impl ParquetAccessPlan {
/// is returned for *all* the rows in the row groups that are not skipped.
/// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`].
///
/// # Errors
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since users can now provide a ParquetAccessPlan it is important to do validation on the contents.

While technically we could avoid doing this validation when the selections came from the page pruning, I think it would be a good check to have to catch future bugs rather than subtle wrong results so I chose to always validate

///
/// Returns an error if the specified row selection does not specify
/// the same number of rows as in `row_group_metadata`.
alamb marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Example: No Selections
///
/// Given an access plan like this
Expand Down Expand Up @@ -228,7 +234,7 @@ impl ParquetAccessPlan {
pub fn into_overall_row_selection(
self,
row_group_meta_data: &[RowGroupMetaData],
) -> Option<RowSelection> {
) -> Result<Option<RowSelection>> {
assert_eq!(row_group_meta_data.len(), self.row_groups.len());
// Intuition: entire row groups are filtered out using
// `row_group_indexes` which come from Skip and Scan. An overall
Expand All @@ -239,7 +245,32 @@ impl ParquetAccessPlan {
.iter()
.any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
{
return None;
return Ok(None);
}

// validate all Selections
for (idx, (rg, rg_meta)) in self
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new checking added as users can pass in ParquetAccessPlan and the semantics are quite subtle

.row_groups
.iter()
.zip(row_group_meta_data.iter())
.enumerate()
{
let RowGroupAccess::Selection(selection) = rg else {
continue;
};
let rows_in_selection = selection
.iter()
.map(|selection| selection.row_count)
.sum::<usize>();

let row_group_row_count = rg_meta.num_rows();
if rows_in_selection as i64 != row_group_row_count {
return internal_err!(
"Invalid ParquetAccessPlan Selection. Row group {idx} has {row_group_row_count} rows \
but selection only specifies {rows_in_selection} rows. \
Selection: {selection:?}"
);
}
}

let total_selection: RowSelection = self
Expand All @@ -261,7 +292,7 @@ impl ParquetAccessPlan {
})
.collect();

Some(total_selection)
Ok(Some(total_selection))
}

/// Return an iterator over the row group indexes that should be scanned
Expand Down Expand Up @@ -305,6 +336,7 @@ impl ParquetAccessPlan {
#[cfg(test)]
mod test {
use super::*;
use datafusion_common::assert_contains;
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
Expand All @@ -320,7 +352,9 @@ mod test {
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

// scan all row groups, no selection
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
Expand All @@ -337,7 +371,9 @@ mod test {
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

// skip all row groups, no selection
assert_eq!(row_group_indexes, vec![] as Vec<usize>);
Expand All @@ -348,14 +384,22 @@ mod test {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
// select / skip all 20 rows in row group 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: to be consistent with L427 in this file, it would be better to call it as
specifies all 20 rows in row group .

vec![
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(8),
]
.into(),
),
RowGroupAccess::Skip,
RowGroupAccess::Skip,
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

assert_eq!(row_group_indexes, vec![0, 1]);
assert_eq!(
Expand All @@ -366,7 +410,8 @@ mod test {
RowSelector::select(10),
// selectors from the second row group
RowSelector::select(5),
RowSelector::skip(7)
RowSelector::skip(7),
Copy link
Contributor Author

@alamb alamb Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turns out that some of the existing unit tests were actually invalid. However, I think the issues are actually test problem, not actual code problems. All the actual parquet reader tests passed

RowSelector::select(8)
]
.into()
)
Expand All @@ -379,13 +424,21 @@ mod test {
RowGroupAccess::Skip,
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
// specify all 30 rows in row group 1
vec![
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(18),
]
.into(),
),
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

assert_eq!(row_group_indexes, vec![1, 2, 3]);
assert_eq!(
Expand All @@ -397,6 +450,7 @@ mod test {
// selectors from the third row group
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(18),
// select the entire fourth row group
RowSelector::select(40),
]
Expand All @@ -405,6 +459,53 @@ mod test {
);
}

#[test]
fn test_invalid_too_few() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 12 rows, but row group 1 has 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: -> specifies 12 rows?

I think the select is referred to as selection, which the following code also includes a skip.

RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
),
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Internal error: Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 12 rows");
}

#[test]
fn test_invalid_too_many() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 22 rows, but row group 1 has only 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

RowGroupAccess::Selection(
vec![
RowSelector::select(10),
RowSelector::skip(2),
RowSelector::select(10),
]
.into(),
),
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
}

static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();

/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
Expand Down
46 changes: 46 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,52 @@ pub use writer::plan_to_parquet;
/// custom reader is used, it supplies the metadata directly and this parameter
/// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more details.
///
/// * User provided [`ParquetAccessPlan`]s to skip row groups and/or pages
/// based on external information. See "Implementing External Indexes" below
///
/// # Implementing External Indexes
///
/// It is possible to restrict the row groups and selections within those row
/// groups that the ParquetExec will consider by providing an initial
/// [`ParquetAccessPlan`] as `extensions` on [`PartitionedFile`]. This can be
/// used to implement external indexes on top of parquet files and select only
/// portions of the files.
///
/// The `ParquetExec` will try and further reduce any provided
/// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and
Comment on lines +159 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: there are two further in this sentence. How about:

/// The `ParquetExec` will try and reduce any provided
/// `ParquetAccessPlan` further based on the contents ...

/// other settings.
///
/// ## Example of providing a ParquetAccessPlan
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_schema::{Schema, SchemaRef};
/// # use datafusion::datasource::listing::PartitionedFile;
/// # use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// # fn schema() -> SchemaRef {
/// # Arc::new(Schema::empty())
/// # }
/// // create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
/// let mut access_plan = ParquetAccessPlan::new_all(5);
/// access_plan.skip(2);
/// access_plan.skip(4);
/// // provide the plan as extension to the FileScanConfig
/// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
/// .with_extensions(Arc::new(access_plan));
/// // create a ParquetExec to scan this file
/// let file_scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema())
/// .with_file(partitioned_file);
/// // this parquet exec will not even try to read row groups 2 and 4. Additional
/// // pruning based on predicates may also happen
/// let exec = ParquetExec::builder(file_scan_config).build();
/// ```
///
/// For a complete example, see the [`parquet_index_advanced` example]).
///
/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be added in #10701

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW #10701 (example of how to use this API) is ready for review

///
/// # Execution Overview
///
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
Expand Down
46 changes: 39 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::datasource::physical_plan::{
use crate::datasource::schema_adapter::SchemaAdapterFactory;
use crate::physical_optimizer::pruning::PruningPredicate;
use arrow_schema::{ArrowError, SchemaRef};
use datafusion_common::{exec_err, Result};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -60,11 +61,10 @@ pub(super) struct ParquetOpener {
impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta) -> datafusion_common::Result<FileOpenFuture> {
let file_range = file_meta.range.clone();
let file_metrics = ParquetFileMetrics::new(
self.partition_index,
file_meta.location().as_ref(),
&self.metrics,
);
let extensions = file_meta.extensions.clone();
let file_name = file_meta.location().to_string();
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);

let reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
Expand Down Expand Up @@ -139,7 +139,8 @@ impl FileOpener for ParquetOpener {
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let access_plan = ParquetAccessPlan::new_all(rg_metadata.len());
let access_plan =
Copy link
Contributor Author

@alamb alamb Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the required plumbing, which I am quite pleased with -- it is quite straightforward now

create_initial_plan(&file_name, extensions, rg_metadata.len())?;
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
// if there is a range restricting what parts of the file to read
if let Some(range) = file_range.as_ref() {
Expand Down Expand Up @@ -186,7 +187,7 @@ impl FileOpener for ParquetOpener {

let row_group_indexes = access_plan.row_group_indexes();
if let Some(row_selection) =
access_plan.into_overall_row_selection(rg_metadata)
access_plan.into_overall_row_selection(rg_metadata)?
{
builder = builder.with_row_selection(row_selection);
}
Expand All @@ -212,3 +213,34 @@ impl FileOpener for ParquetOpener {
}))
}
}

/// Return the initial [`ParquetAccessPlan`]
///
/// If the user has supplied one as an extension, use that
/// otherwise return a plan that scans all row groups
///
/// Returns an error is an invalid `ParquetAccessPlan` is provided
///
/// Note: path is only used for error messages
alamb marked this conversation as resolved.
Show resolved Hide resolved
fn create_initial_plan(
file_name: &str,
extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
row_group_count: usize,
) -> Result<ParquetAccessPlan> {
if let Some(extensions) = extensions {
if let Some(access_plan) = extensions.downcast_ref::<ParquetAccessPlan>() {
let plan_len = access_plan.len();
if plan_len != row_group_count {
return exec_err!(
"Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}"
);
}

// check row group count matches the plan
return Ok(access_plan.clone());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is it better to add a logging in the else branch?

}

// default to scanning all row groups
Ok(ParquetAccessPlan::new_all(row_group_count))
}
Loading