Skip to content

Commit

Permalink
Add ParquetExec::builder() API
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 24, 2024
1 parent 70b65bd commit 6cefd61
Showing 1 changed file with 158 additions and 30 deletions.
188 changes: 158 additions & 30 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub use statistics::{RequestedStatistics, StatisticsConverter};
/// `───────────────────'
///
/// ```
///
/// # Features
///
/// Supports the following optimizations:
Expand All @@ -118,36 +119,50 @@ pub use statistics::{RequestedStatistics, StatisticsConverter};
///
/// * Limit pushdown: stop execution early after some number of rows are read.
///
/// * Custom readers: controls I/O for accessing pages. See
/// [`ParquetFileReaderFactory`] for more details.
/// * Custom readers: controls I/O for accessing pages, and reading
/// [`ParquetMetadata`]. This can be used to implement custom IO scheduling, and
/// re-using parsed metadata. See [`ParquetFileReaderFactory`] for more details.
///
/// * Schema adapters: read parquet files with different schemas into a unified
/// * Schema adapters:read parquet files with different schemas into a unified
/// table schema. This can be used to implement "schema evolution". See
/// [`SchemaAdapterFactory`] for more details.
///
/// * metadata_size_hint: controls the number of bytes read from the end of the
/// file in the initial I/O.
/// file in the initial I/O. See [`ParquetExecBuilder::with_metadata_size_hint`]
/// for details.
///
/// # Execution Overview
///
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
/// * Step 1: `ParquetExec::execute` is called, returning a [`FileStream`]
/// configured to open parquet files with a [`ParquetOpener`].
///
/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open
/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open
/// the file.
///
/// * Step 3: The `ParquetOpener` gets the file metadata by reading the footer,
/// and applies any predicates and projections to determine what pages must be
/// read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// * Step 4: The stream begins reading data, by fetching the required pages
/// and incrementally decoding them.
///
/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
/// [`SchemaAdapter`] to match the table schema. By default missing columns are
/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
/// # Example: Create a `ParquetExec`
/// ```
/// # use std::sync::Arc;
/// # use arrow::datatypes::Schema;
/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
/// # use datafusion::datasource::listing::PartitionedFile;
/// # let file_schema = Arc::new(Schema::empty());
/// # let object_store_url = ObjectStoreUrl::local_filesystem();
/// # let file_scan_config = FileScanConfig::new(object_store_url, file_schema);
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// # use datafusion_physical_expr::expressions::lit;
/// # let predicate = lit(true);
/// let exec = ParquetExec::builder(file_scan_config)
/// // Provide a predicate for filtering row groups
/// .with_predicate(predicate)
/// .build();
/// ```
#[derive(Debug, Clone)]
pub struct ParquetExec {
/// Base configuration for this scan
Expand All @@ -173,14 +188,107 @@ pub struct ParquetExec {
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl ParquetExec {
/// Create a new Parquet reader execution plan provided file list and schema.
pub fn new(
base_config: FileScanConfig,
predicate: Option<Arc<dyn PhysicalExpr>>,
metadata_size_hint: Option<usize>,
/// [`ParquetExecBuilder`]`, builder for [`ParquetExec`].
///
/// See example on [`ParquetExec`].
pub struct ParquetExecBuilder {
file_scan_config: FileScanConfig,
predicate: Option<Arc<dyn PhysicalExpr>>,
metadata_size_hint: Option<usize>,
table_parquet_options: TableParquetOptions,
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl ParquetExecBuilder {
/// Create a new builder to read the provided file scan configuration
pub fn new(file_scan_config: FileScanConfig) -> Self {
Self::new_with_options(file_scan_config, TableParquetOptions::default())
}

/// Create a new builder to read the provided file scan configuration
/// with the provided table parquet options
pub fn new_with_options(
file_scan_config: FileScanConfig,
table_parquet_options: TableParquetOptions,
) -> Self {
Self {
file_scan_config,
predicate: None,
metadata_size_hint: None,
table_parquet_options,
parquet_file_reader_factory: None,
schema_adapter_factory: None,
}
}

/// Set the predicate for the scan. The ParquetExec will use this predicate
/// to filter row groups and data pages using the Parquet statistics and
/// bloom filters.
pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
self.predicate = Some(predicate);
self
}

/// Set the metadata size hint
///
/// This value determines how many bytes at the end of the file the
/// ParquetExec will request in the initial IO. If this is too small, the
/// ParquetExec will need to make additional IO requests to read the footer.
pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
self.metadata_size_hint = Some(metadata_size_hint);
self
}

/// Set the table parquet options that control how the ParquetExec reads
pub fn with_table_parquet_options(
mut self,
table_parquet_options: TableParquetOptions,
) -> Self {
self.table_parquet_options = table_parquet_options;
self
}

/// Optional user defined parquet file reader factory.
///
/// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom
/// implementation for data access operations.
///
/// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed
/// to this factory instead of `ObjectStore`.
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
) -> Self {
self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
self
}

/// Optional schema adapter factory.
///
/// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to
/// that of the table schema. The default schema adapter uses arrow's cast library to map
/// the parquet fields to the table schema.
pub fn with_schema_adapter_factory(
mut self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
self.schema_adapter_factory = Some(schema_adapter_factory);
self
}

/// Build a [`ParquetExec`]
pub fn build(self) -> ParquetExec {
let Self {
file_scan_config,
predicate,
metadata_size_hint,
table_parquet_options,
parquet_file_reader_factory,
schema_adapter_factory,
} = self;

let base_config = file_scan_config;
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
base_config.file_groups, base_config.projection, predicate, base_config.limit);

Expand Down Expand Up @@ -219,25 +327,51 @@ impl ParquetExec {

let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let cache = Self::compute_properties(
let cache = ParquetExec::compute_properties(
projected_schema,
&projected_output_ordering,
&base_config,
);
Self {
ParquetExec {
base_config,
projected_statistics,
metrics,
predicate,
pruning_predicate,
page_pruning_predicate,
metadata_size_hint,
parquet_file_reader_factory: None,
parquet_file_reader_factory,
cache,
table_parquet_options,
schema_adapter_factory: None,
schema_adapter_factory,
}
}
}

impl ParquetExec {
/// Create a new Parquet reader execution plan provided file list and schema.
// TODO deprecate
pub fn new(
base_config: FileScanConfig,
predicate: Option<Arc<dyn PhysicalExpr>>,
metadata_size_hint: Option<usize>,
table_parquet_options: TableParquetOptions,
) -> Self {
let mut builder =
ParquetExecBuilder::new_with_options(base_config, table_parquet_options);
if let Some(predicate) = predicate {
builder = builder.with_predicate(predicate);
}
if let Some(metadata_size_hint) = metadata_size_hint {
builder = builder.with_metadata_size_hint(metadata_size_hint);
}
builder.build()
}

/// Return a builder for `ParquetExec`. See example on [`ParquetExec`].
pub fn builder(file_scan_config: FileScanConfig) -> ParquetExecBuilder {
ParquetExecBuilder::new(file_scan_config)
}

/// [`FileScanConfig`] that controls this scan (such as which files to read)
pub fn base_config(&self) -> &FileScanConfig {
Expand All @@ -261,11 +395,7 @@ impl ParquetExec {

/// Optional user defined parquet file reader factory.
///
/// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom
/// implementation for data access operations.
///
/// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed
/// to this factory instead of `ObjectStore`.
/// See documentation on [`ParquetExecBuilder::with_parquet_file_reader_factory`]
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
Expand All @@ -276,9 +406,7 @@ impl ParquetExec {

/// Optional schema adapter factory.
///
/// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to
/// that of the table schema. The default schema adapter uses arrow's cast library to map
/// the parquet fields to the table schema.
/// See documentation on [`ParquetExecBuilder::with_schema_adapter_factory`]
pub fn with_schema_adapter_factory(
mut self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
Expand Down

0 comments on commit 6cefd61

Please sign in to comment.