From 70b65bd22928dc574644e1859545f9a2f3bd1fa3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 23 May 2024 07:59:52 -0400 Subject: [PATCH 1/4] Improve ParquetExec and related documentation --- .../datasource/physical_plan/parquet/mod.rs | 106 ++++++++++++++++-- .../physical_plan/parquet/schema_adapter.rs | 24 ++-- 2 files changed, 112 insertions(+), 18 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 410413ebd71b..cdd512a07634 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -75,7 +75,79 @@ pub use metrics::ParquetFileMetrics; pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; pub use statistics::{RequestedStatistics, StatisticsConverter}; -/// Execution plan for scanning one or more Parquet partitions +/// Execution plan for reading one or more Parquet files. +/// +/// ```text +/// ▲ +/// │ +/// │ Produce a stream of +/// │ RecordBatches +/// │ +/// ┌───────────────────────┐ +/// │ │ +/// │ ParquetExec │ +/// │ │ +/// └───────────────────────┘ +/// ▲ +/// │ Asynchronously read from one +/// │ or more parquet files via +/// │ ObjectStore interface +/// │ +/// │ +/// .───────────────────. +/// │ ) +/// │`───────────────────'│ +/// │ ObjectStore │ +/// │.───────────────────.│ +/// │ ) +/// `───────────────────' +/// +/// ``` +/// # Features +/// +/// Supports the following optimizations: +/// +/// * Multi-threaded (aka multi-partition): read from one or more files in +/// parallel. Can read concurrently from multiple row groups from a single file. +/// +/// * Predicate push down: skips row groups and pages based on +/// min/max/null_counts in the row group metadata, the page index and bloom +/// filters. +/// +/// * Projection pushdown: reads and decodes only the columns required. +/// +/// * Limit pushdown: stop execution early after some number of rows are read. +/// +/// * Custom readers: controls I/O for accessing pages. See +/// [`ParquetFileReaderFactory`] for more details. +/// +/// * Schema adapters: read parquet files with different schemas into a unified +/// table schema. This can be used to implement "schema evolution". See +/// [`SchemaAdapterFactory`] for more details. +/// +/// * metadata_size_hint: controls the number of bytes read from the end of the +/// file in the initial I/O. +/// +/// # Execution Overview +/// +/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`] +/// configured to open parquet files with a [`ParquetOpener`]. +/// +/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open +/// the file. +/// +/// * Step 3: The `ParquetOpener` gets the file metadata by reading the footer, +/// and applies any predicates and projections to determine what pages must be +/// read. +/// +/// * Step 4: The stream begins reading data, fetching the required pages +/// and incrementally decoding them. +/// +/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a +/// [`SchemaAdapter`] to match the table schema. By default missing columns are +/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`]. +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch #[derive(Debug, Clone)] pub struct ParquetExec { /// Base configuration for this scan @@ -85,9 +157,9 @@ pub struct ParquetExec { metrics: ExecutionPlanMetricsSet, /// Optional predicate for row filtering during parquet scan predicate: Option>, - /// Optional predicate for pruning row groups + /// Optional predicate for pruning row groups (derived from `predicate`) pruning_predicate: Option>, - /// Optional predicate for pruning pages + /// Optional predicate for pruning pages (derived from `predicate`) page_pruning_predicate: Option>, /// Optional hint for the size of the parquet metadata metadata_size_hint: Option, @@ -642,11 +714,22 @@ fn should_enable_page_index( .unwrap_or(false) } -/// Factory of parquet file readers. +/// Interface for creating [`AsyncFileReader`]s to read parquet files. +/// +/// This interface is used by [`ParquetOpener`] in order to create readers for +/// parquet files. Implementations of this trait can be used to provide custom +/// data access operations such as pre-cached data, I/O coalescing, etc. /// -/// Provides means to implement custom data access interface. +/// [`DefaultParquetFileReaderFactory`] by default returns a +/// [`ParquetObjectReader`]. pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { - /// Provides `AsyncFileReader` over parquet file specified in `FileMeta` + /// Provides an `AsyncFileReader` for reading data from a parquet file specified + /// + /// # Arguments + /// * partition_index - Index of the partition (for reporting metrics) + /// * file_meta - The file to be read + /// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer + /// * metrics - Execution metrics fn create_reader( &self, partition_index: usize, @@ -663,13 +746,20 @@ pub struct DefaultParquetFileReaderFactory { } impl DefaultParquetFileReaderFactory { - /// Create a factory. + /// Create a new `DefaultParquetFileReaderFactory`. pub fn new(store: Arc) -> Self { Self { store } } } -/// Implements [`AsyncFileReader`] for a parquet file in object storage +/// Implements [`AsyncFileReader`] for a parquet file in object storage. +/// +/// This implementation uses the [`ParquetObjectReader`] to read data from the +/// object store on demand, as required, tracking the number of bytes read. +/// +/// This implementation does not coalesce I/O operations or cache bytes. Such +/// optimizations can be done either at the object store level or by providing a +/// custom implementation of [`ParquetFileReaderFactory`]. pub(crate) struct ParquetFileReader { file_metrics: ParquetFileMetrics, inner: ParquetObjectReader, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs index 193e5161a398..ac053214fdfb 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs @@ -20,27 +20,29 @@ use arrow_schema::{Schema, SchemaRef}; use std::fmt::Debug; use std::sync::Arc; -/// Factory of schema adapters. +/// Factory for creating [`SchemaAdapter`] /// -/// Provides means to implement custom schema adaptation. +/// This interface provides a way to implement custom schema adaptation logic +/// for ParquetExec (for example, to fill missing columns with default value +/// other than null) pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { /// Provides `SchemaAdapter` for the ParquetExec. fn create(&self, schema: SchemaRef) -> Box; } -/// A utility which can adapt file-level record batches to a table schema which may have a schema +/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema /// obtained from merging multiple file-level schemas. /// /// This is useful for enabling schema evolution in partitioned datasets. /// /// This has to be done in two stages. /// -/// 1. Before reading the file, we have to map projected column indexes from the table schema to -/// the file schema. +/// 1. Before reading the file, we have to map projected column indexes from the +/// table schema to the file schema. /// -/// 2. After reading a record batch we need to map the read columns back to the expected columns -/// indexes and insert null-valued columns wherever the file schema was missing a colum present -/// in the table schema. +/// 2. After reading a record batch map the read columns back to the expected +/// columns indexes and insert null-valued columns wherever the file schema was +/// missing a colum present in the table schema. pub trait SchemaAdapter: Send + Sync { /// Map a column index in the table schema to a column index in a particular /// file schema @@ -48,7 +50,8 @@ pub trait SchemaAdapter: Send + Sync { /// Panics if index is not in range for the table schema fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option; - /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. + /// Creates a `SchemaMapping` that can be used to cast or map the columns + /// from the file schema to the table schema. /// /// If the provided `file_schema` contains columns of a different type to the expected /// `table_schema`, the method will attempt to cast the array data from the file schema @@ -62,7 +65,8 @@ pub trait SchemaAdapter: Send + Sync { ) -> datafusion_common::Result<(Arc, Vec)>; } -/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the table schema. +/// Transforms a [`RecordBatch`] read from a Parquet file to a [`RecordBatch`] +/// that has the the table schema. pub trait SchemaMapper: Send + Sync { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; From 91a27e892c1af89944e389c9c288598a3e86bfc9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 25 May 2024 09:26:13 -0400 Subject: [PATCH 2/4] Improve documentation --- .../datasource/physical_plan/parquet/mod.rs | 48 +++++++++++-------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index f4d19ac92eba..4d964f72bb4b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -107,8 +107,9 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// /// Supports the following optimizations: /// -/// * Multi-threaded (aka multi-partition): read from one or more files in -/// parallel. Can read concurrently from multiple row groups from a single file. +/// * Concurrent reads: Can read from one or more files in parallel as multiple +/// partitions, including concurrently reading multiple row groups from a single +/// file. /// /// * Predicate push down: skips row groups and pages based on /// min/max/null_counts in the row group metadata, the page index and bloom @@ -118,15 +119,18 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// /// * Limit pushdown: stop execution early after some number of rows are read. /// -/// * Custom readers: controls I/O for accessing pages. See -/// [`ParquetFileReaderFactory`] for more details. +/// * Custom readers: customize reading parquet files, e.g. to cache metadata, +/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more +/// details. /// /// * Schema adapters: read parquet files with different schemas into a unified /// table schema. This can be used to implement "schema evolution". See /// [`SchemaAdapterFactory`] for more details. /// /// * metadata_size_hint: controls the number of bytes read from the end of the -/// file in the initial I/O. +/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a +/// custom reader is used, it supplies the metadata directly and this parameter +/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more details. /// /// # Execution Overview /// @@ -136,9 +140,9 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open /// the file. /// -/// * Step 3: The `ParquetOpener` gets the file metadata by reading the footer, -/// and applies any predicates and projections to determine what pages must be -/// read. +/// * Step 3: The `ParquetOpener` gets the file metadata via +/// [`ParquetFileReaderFactory`] and applies any predicates +/// and projections to determine what pages must be read. /// /// * Step 4: The stream begins reading data, fetching the required pages /// and incrementally decoding them. @@ -261,11 +265,13 @@ impl ParquetExec { /// Optional user defined parquet file reader factory. /// - /// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom - /// implementation for data access operations. + /// You can use [`ParquetFileReaderFactory`] to more precisely control how + /// data is read from parquet files (e.g. skip re-reading metadata, coalesce + /// I/O operations, etc). /// - /// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed - /// to this factory instead of `ObjectStore`. + /// The default reader factory reads directly from an [`ObjectStore`] + /// instance using individual I/O operations for the footer and then for + /// each page. pub fn with_parquet_file_reader_factory( mut self, parquet_file_reader_factory: Arc, @@ -714,14 +720,13 @@ fn should_enable_page_index( .unwrap_or(false) } -/// Interface for creating [`AsyncFileReader`]s to read parquet files. +/// Interface for reading parquet files. /// -/// This interface is used by [`ParquetOpener`] in order to create readers for -/// parquet files. Implementations of this trait can be used to provide custom -/// data access operations such as pre-cached data, I/O coalescing, etc. +/// The combined implementations of [`ParquetFileReaderFactory`] and +/// [`AsyncFileReader`] can be used to provide custom data access operations +/// such as pre-cached data, I/O coalescing, etc. /// -/// [`DefaultParquetFileReaderFactory`] by default returns a -/// [`ParquetObjectReader`]. +/// See [`DefaultParquetFileReaderFactory`] for a simple implementation. pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { /// Provides an `AsyncFileReader` for reading data from a parquet file specified /// @@ -739,7 +744,12 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { ) -> Result>; } -/// Default parquet reader factory. +/// Default implementation of [`ParquetFileReaderFactory`] +/// +/// This implementation: +/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance. +/// 2. Reads the footer and page metadata on demand. +/// 3. Does not cache metadata or coalesce I/O operations. #[derive(Debug)] pub struct DefaultParquetFileReaderFactory { store: Arc, From 0290f3816f28003b4caf8502c5b669f82e103c82 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 26 May 2024 06:06:10 -0400 Subject: [PATCH 3/4] Update datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs Co-authored-by: Oleks V --- .../core/src/datasource/physical_plan/parquet/schema_adapter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs index ac053214fdfb..2b71ef005350 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs @@ -42,7 +42,7 @@ pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { /// /// 2. After reading a record batch map the read columns back to the expected /// columns indexes and insert null-valued columns wherever the file schema was -/// missing a colum present in the table schema. +/// missing a column present in the table schema. pub trait SchemaAdapter: Send + Sync { /// Map a column index in the table schema to a column index in a particular /// file schema From bd1e987791b36e23295c689b0246a5745e3c80e3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 May 2024 08:12:12 -0400 Subject: [PATCH 4/4] fix link --- datafusion/core/src/datasource/physical_plan/parquet/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 70cb5a4e2942..6655125ea876 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -153,6 +153,7 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// filled with nulls, but this can be customized via [`SchemaAdapterFactory`]. /// /// [`RecordBatch`]: arrow::record_batch::RecordBatch +/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter #[derive(Debug, Clone)] pub struct ParquetExec { /// Base configuration for this scan