Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support user defined ParquetAccessPlan in ParquetExec, validation to ParquetAccessPlan::select (#10813) #8

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ impl PartitionedFile {
self.range = Some(FileRange { start, end });
self
}

/// Update the user defined extensions for this file.
///
/// This can be used to pass reader specific information.
pub fn with_extensions(
mut self,
extensions: Arc<dyn std::any::Any + Send + Sync>,
) -> Self {
self.extensions = Some(extensions);
self
}
}

impl From<ObjectMeta> for PartitionedFile {
Expand Down
121 changes: 111 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_common::{internal_err, Result};
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::RowGroupMetaData;

Expand Down Expand Up @@ -182,6 +183,11 @@ impl ParquetAccessPlan {
/// is returned for *all* the rows in the row groups that are not skipped.
/// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`].
///
/// # Errors
///
/// Returns an error if any specified row selection does not specify
/// the same number of rows as in it's corresponding `row_group_metadata`.
///
/// # Example: No Selections
///
/// Given an access plan like this
Expand Down Expand Up @@ -228,7 +234,7 @@ impl ParquetAccessPlan {
pub fn into_overall_row_selection(
self,
row_group_meta_data: &[RowGroupMetaData],
) -> Option<RowSelection> {
) -> Result<Option<RowSelection>> {
assert_eq!(row_group_meta_data.len(), self.row_groups.len());
// Intuition: entire row groups are filtered out using
// `row_group_indexes` which come from Skip and Scan. An overall
Expand All @@ -239,7 +245,32 @@ impl ParquetAccessPlan {
.iter()
.any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
{
return None;
return Ok(None);
}

// validate all Selections
for (idx, (rg, rg_meta)) in self
.row_groups
.iter()
.zip(row_group_meta_data.iter())
.enumerate()
{
let RowGroupAccess::Selection(selection) = rg else {
continue;
};
let rows_in_selection = selection
.iter()
.map(|selection| selection.row_count)
.sum::<usize>();

let row_group_row_count = rg_meta.num_rows();
if rows_in_selection as i64 != row_group_row_count {
return internal_err!(
"Invalid ParquetAccessPlan Selection. Row group {idx} has {row_group_row_count} rows \
but selection only specifies {rows_in_selection} rows. \
Selection: {selection:?}"
);
}
}

let total_selection: RowSelection = self
Expand All @@ -261,7 +292,7 @@ impl ParquetAccessPlan {
})
.collect();

Some(total_selection)
Ok(Some(total_selection))
}

/// Return an iterator over the row group indexes that should be scanned
Expand Down Expand Up @@ -305,6 +336,7 @@ impl ParquetAccessPlan {
#[cfg(test)]
mod test {
use super::*;
use datafusion_common::assert_contains;
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
Expand All @@ -320,7 +352,9 @@ mod test {
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

// scan all row groups, no selection
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
Expand All @@ -337,7 +371,9 @@ mod test {
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

// skip all row groups, no selection
assert_eq!(row_group_indexes, vec![] as Vec<usize>);
Expand All @@ -348,14 +384,22 @@ mod test {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
// select / skip all 20 rows in row group 1
vec![
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(8),
]
.into(),
),
RowGroupAccess::Skip,
RowGroupAccess::Skip,
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

assert_eq!(row_group_indexes, vec![0, 1]);
assert_eq!(
Expand All @@ -366,7 +410,8 @@ mod test {
RowSelector::select(10),
// selectors from the second row group
RowSelector::select(5),
RowSelector::skip(7)
RowSelector::skip(7),
RowSelector::select(8)
]
.into()
)
Expand All @@ -379,13 +424,21 @@ mod test {
RowGroupAccess::Skip,
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
// specify all 30 rows in row group 1
vec![
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(18),
]
.into(),
),
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

assert_eq!(row_group_indexes, vec![1, 2, 3]);
assert_eq!(
Expand All @@ -397,6 +450,7 @@ mod test {
// selectors from the third row group
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(18),
// select the entire fourth row group
RowSelector::select(40),
]
Expand All @@ -405,6 +459,53 @@ mod test {
);
}

#[test]
fn test_invalid_too_few() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 12 rows, but row group 1 has 20
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
),
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Internal error: Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 12 rows");
}

#[test]
fn test_invalid_too_many() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 22 rows, but row group 1 has only 20
RowGroupAccess::Selection(
vec![
RowSelector::select(10),
RowSelector::skip(2),
RowSelector::select(10),
]
.into(),
),
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
}

static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();

/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
Expand Down
46 changes: 46 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,52 @@ pub use writer::plan_to_parquet;
/// custom reader is used, it supplies the metadata directly and this parameter
/// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more details.
///
/// * User provided [`ParquetAccessPlan`]s to skip row groups and/or pages
/// based on external information. See "Implementing External Indexes" below
///
/// # Implementing External Indexes
///
/// It is possible to restrict the row groups and selections within those row
/// groups that the ParquetExec will consider by providing an initial
/// [`ParquetAccessPlan`] as `extensions` on [`PartitionedFile`]. This can be
/// used to implement external indexes on top of parquet files and select only
/// portions of the files.
///
/// The `ParquetExec` will try and further reduce any provided
/// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and
/// other settings.
///
/// ## Example of providing a ParquetAccessPlan
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_schema::{Schema, SchemaRef};
/// # use datafusion::datasource::listing::PartitionedFile;
/// # use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// # fn schema() -> SchemaRef {
/// # Arc::new(Schema::empty())
/// # }
/// // create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
/// let mut access_plan = ParquetAccessPlan::new_all(5);
/// access_plan.skip(2);
/// access_plan.skip(4);
/// // provide the plan as extension to the FileScanConfig
/// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
/// .with_extensions(Arc::new(access_plan));
/// // create a ParquetExec to scan this file
/// let file_scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema())
/// .with_file(partitioned_file);
/// // this parquet exec will not even try to read row groups 2 and 4. Additional
/// // pruning based on predicates may also happen
/// let exec = ParquetExec::builder(file_scan_config).build();
/// ```
///
/// For a complete example, see the [`parquet_index_advanced` example]).
///
/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs
///
/// # Execution Overview
///
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
Expand Down
46 changes: 39 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::datasource::physical_plan::{
use crate::datasource::schema_adapter::SchemaAdapterFactory;
use crate::physical_optimizer::pruning::PruningPredicate;
use arrow_schema::{ArrowError, SchemaRef};
use datafusion_common::{exec_err, Result};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -60,11 +61,10 @@ pub(super) struct ParquetOpener {
impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta) -> datafusion_common::Result<FileOpenFuture> {
let file_range = file_meta.range.clone();
let file_metrics = ParquetFileMetrics::new(
self.partition_index,
file_meta.location().as_ref(),
&self.metrics,
);
let extensions = file_meta.extensions.clone();
let file_name = file_meta.location().to_string();
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);

let reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
Expand Down Expand Up @@ -139,7 +139,8 @@ impl FileOpener for ParquetOpener {
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let access_plan = ParquetAccessPlan::new_all(rg_metadata.len());
let access_plan =
create_initial_plan(&file_name, extensions, rg_metadata.len())?;
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
// if there is a range restricting what parts of the file to read
if let Some(range) = file_range.as_ref() {
Expand Down Expand Up @@ -186,7 +187,7 @@ impl FileOpener for ParquetOpener {

let row_group_indexes = access_plan.row_group_indexes();
if let Some(row_selection) =
access_plan.into_overall_row_selection(rg_metadata)
access_plan.into_overall_row_selection(rg_metadata)?
{
builder = builder.with_row_selection(row_selection);
}
Expand All @@ -212,3 +213,34 @@ impl FileOpener for ParquetOpener {
}))
}
}

/// Return the initial [`ParquetAccessPlan`]
///
/// If the user has supplied one as an extension, use that
/// otherwise return a plan that scans all row groups
///
/// Returns an error if an invalid `ParquetAccessPlan` is provided
///
/// Note: file_name is only used for error messages
fn create_initial_plan(
file_name: &str,
extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
row_group_count: usize,
) -> Result<ParquetAccessPlan> {
if let Some(extensions) = extensions {
if let Some(access_plan) = extensions.downcast_ref::<ParquetAccessPlan>() {
let plan_len = access_plan.len();
if plan_len != row_group_count {
return exec_err!(
"Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}"
);
}

// check row group count matches the plan
return Ok(access_plan.clone());
}
}

// default to scanning all row groups
Ok(ParquetAccessPlan::new_all(row_group_count))
}
Loading