Skip to content

Commit

Permalink
Allow ParquetAccessPlan to be passed in to ParquetExec, add valid…
Browse files Browse the repository at this point in the history
…ation to ParquetAccessPlan::select
  • Loading branch information
alamb committed Jun 6, 2024
1 parent 089b232 commit 3e839ef
Show file tree
Hide file tree
Showing 7 changed files with 567 additions and 38 deletions.
10 changes: 10 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ impl PartitionedFile {
self.range = Some(FileRange { start, end });
self
}

/// Update the user defined extensions for this file. You can use this field
/// to pass reader specific information.
pub fn with_extensions(
mut self,
extensions: Arc<dyn std::any::Any + Send + Sync>,
) -> Self {
self.extensions = Some(extensions);
self
}
}

impl From<ObjectMeta> for PartitionedFile {
Expand Down
121 changes: 111 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_common::{internal_err, Result};
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::RowGroupMetaData;

Expand Down Expand Up @@ -182,6 +183,11 @@ impl ParquetAccessPlan {
/// is returned for *all* the rows in the row groups that are not skipped.
/// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`].
///
/// # Errors
///
/// Returns an error if the specified row selection does not specify
/// the same number of rows as in `row_group_metadata`.
///
/// # Example: No Selections
///
/// Given an access plan like this
Expand Down Expand Up @@ -228,7 +234,7 @@ impl ParquetAccessPlan {
pub fn into_overall_row_selection(
self,
row_group_meta_data: &[RowGroupMetaData],
) -> Option<RowSelection> {
) -> Result<Option<RowSelection>> {
assert_eq!(row_group_meta_data.len(), self.row_groups.len());
// Intuition: entire row groups are filtered out using
// `row_group_indexes` which come from Skip and Scan. An overall
Expand All @@ -239,7 +245,32 @@ impl ParquetAccessPlan {
.iter()
.any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
{
return None;
return Ok(None);
}

// validate all Selections
for (idx, (rg, rg_meta)) in self
.row_groups
.iter()
.zip(row_group_meta_data.iter())
.enumerate()
{
let RowGroupAccess::Selection(selection) = rg else {
continue;
};
let rows_in_selection = selection
.iter()
.map(|selection| selection.row_count)
.sum::<usize>();

let row_group_row_count = rg_meta.num_rows();
if rows_in_selection as i64 != row_group_row_count {
return internal_err!(
"Invalid ParquetAccessPlan Selection. Row group {idx} has {row_group_row_count} rows \
but selection only specifies {rows_in_selection} rows. \
Selection: {selection:?}"
);
}
}

let total_selection: RowSelection = self
Expand All @@ -261,7 +292,7 @@ impl ParquetAccessPlan {
})
.collect();

Some(total_selection)
Ok(Some(total_selection))
}

/// Return an iterator over the row group indexes that should be scanned
Expand Down Expand Up @@ -305,6 +336,7 @@ impl ParquetAccessPlan {
#[cfg(test)]
mod test {
use super::*;
use datafusion_common::assert_contains;
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
Expand All @@ -320,7 +352,9 @@ mod test {
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

// scan all row groups, no selection
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
Expand All @@ -337,7 +371,9 @@ mod test {
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

// skip all row groups, no selection
assert_eq!(row_group_indexes, vec![] as Vec<usize>);
Expand All @@ -348,14 +384,22 @@ mod test {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
// select / skip all 20 rows in row group 1
vec![
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(8),
]
.into(),
),
RowGroupAccess::Skip,
RowGroupAccess::Skip,
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

assert_eq!(row_group_indexes, vec![0, 1]);
assert_eq!(
Expand All @@ -366,7 +410,8 @@ mod test {
RowSelector::select(10),
// selectors from the second row group
RowSelector::select(5),
RowSelector::skip(7)
RowSelector::skip(7),
RowSelector::select(8)
]
.into()
)
Expand All @@ -379,13 +424,21 @@ mod test {
RowGroupAccess::Skip,
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
// specify all 30 rows in row group 1
vec![
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(18),
]
.into(),
),
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(row_group_metadata());
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap();

assert_eq!(row_group_indexes, vec![1, 2, 3]);
assert_eq!(
Expand All @@ -397,6 +450,7 @@ mod test {
// selectors from the third row group
RowSelector::select(5),
RowSelector::skip(7),
RowSelector::select(18),
// select the entire fourth row group
RowSelector::select(40),
]
Expand All @@ -405,6 +459,53 @@ mod test {
);
}

#[test]
fn test_invalid_too_few() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 12 rows, but row group 1 has 20
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
),
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Internal error: Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 12 rows");
}

#[test]
fn test_invalid_too_many() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 22 rows, but row group 1 has only 20
RowGroupAccess::Selection(
vec![
RowSelector::select(10),
RowSelector::skip(2),
RowSelector::select(10),
]
.into(),
),
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
}

static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();

/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
Expand Down
46 changes: 39 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::datasource::physical_plan::{
use crate::datasource::schema_adapter::SchemaAdapterFactory;
use crate::physical_optimizer::pruning::PruningPredicate;
use arrow_schema::{ArrowError, SchemaRef};
use datafusion_common::{exec_err, Result};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -60,11 +61,10 @@ pub(super) struct ParquetOpener {
impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta) -> datafusion_common::Result<FileOpenFuture> {
let file_range = file_meta.range.clone();
let file_metrics = ParquetFileMetrics::new(
self.partition_index,
file_meta.location().as_ref(),
&self.metrics,
);
let extensions = file_meta.extensions.clone();
let file_name = file_meta.location().to_string();
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);

let reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
Expand Down Expand Up @@ -139,7 +139,8 @@ impl FileOpener for ParquetOpener {
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let access_plan = ParquetAccessPlan::new_all(rg_metadata.len());
let access_plan =
create_initial_plan(&file_name, extensions, rg_metadata.len())?;
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
// if there is a range restricting what parts of the file to read
if let Some(range) = file_range.as_ref() {
Expand Down Expand Up @@ -186,7 +187,7 @@ impl FileOpener for ParquetOpener {

let row_group_indexes = access_plan.row_group_indexes();
if let Some(row_selection) =
access_plan.into_overall_row_selection(rg_metadata)
access_plan.into_overall_row_selection(rg_metadata)?
{
builder = builder.with_row_selection(row_selection);
}
Expand All @@ -212,3 +213,34 @@ impl FileOpener for ParquetOpener {
}))
}
}

/// Return the initial [`ParquetAccessPlan`]
///
/// If the user has supplied one as an extension, use that
/// otherwise return a plan that scans all row groups
///
/// Returns an error is an invalid `ParquetAccessPlan` is provided
///
/// Note: path is only used for error messages
fn create_initial_plan(
file_name: &str,
extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
row_group_count: usize,
) -> Result<ParquetAccessPlan> {
if let Some(extensions) = extensions {
if let Some(access_plan) = extensions.downcast_ref::<ParquetAccessPlan>() {
let plan_len = access_plan.len();
if plan_len != row_group_count {
return exec_err!(
"Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}"
);
}

// check row group count matches the plan
return Ok(access_plan.clone());
}
}

// default to scanning all row groups
Ok(ParquetAccessPlan::new_all(row_group_count))
}
Loading

0 comments on commit 3e839ef

Please sign in to comment.