From 96ee03c748cd049bfbd7ceff4666aa0d4615d315 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 3 Jun 2024 15:30:57 -0400 Subject: [PATCH 1/9] Add `advanced_parquet_index.rs` example of indexing into parquet files --- datafusion-examples/README.md | 1 + .../examples/advanced_parquet_index.rs | 602 ++++++++++++++++++ datafusion/common/src/column.rs | 7 + datafusion/common/src/config.rs | 7 + .../physical_plan/parquet/access_plan.rs | 5 + .../datasource/physical_plan/parquet/mod.rs | 4 +- .../physical_plan/parquet/row_groups.rs | 2 +- .../core/src/physical_optimizer/pruning.rs | 10 +- 8 files changed, 633 insertions(+), 5 deletions(-) create mode 100644 datafusion-examples/examples/advanced_parquet_index.rs diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index c34f706adb82..4e3ec058cc17 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -45,6 +45,7 @@ cargo run --example csv_sql - [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF) - [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF) - [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF) +- ['advanced_parquet_index.rs'](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files - [`avro_sql.rs`](examples/avro_sql.rs): Build and run a query plan from a SQL statement against a local AVRO file - [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog - [`csv_sql.rs`](examples/csv_sql.rs): Build and run a query plan from a SQL statement against a local CSV file diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs new file mode 100644 index 000000000000..4a64b88de7f5 --- /dev/null +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -0,0 +1,602 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use bytes::Bytes; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::parquet::{ + ParquetAccessPlan, ParquetExecBuilder, +}; +use datafusion::datasource::physical_plan::{ + parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig, +}; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::parquet::arrow::arrow_reader::{ + ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, +}; +use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::file::metadata::ParquetMetaData; +use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}; +use datafusion::parquet::schema::types::ColumnPath; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_optimizer::pruning::PruningPredicate; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::*; +use datafusion_common::config::TableParquetOptions; +use datafusion_common::{ + internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, +}; +use datafusion_expr::utils::conjunction; +use datafusion_expr::{TableProviderFilterPushDown, TableType}; +use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee}; +use futures::future::BoxFuture; +use futures::FutureExt; +use object_store::ObjectStore; +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::fs::File; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tempfile::TempDir; +use url::Url; + +/// This example demonstrates using low level DataFusion APIs to read ony +/// certain row groups and ranges from parquet files, based on external +/// information. +/// +/// Using these APIs, you can instruct DataFusion's parquet reader to skip +/// ("prune") portions of files that do not contain relevant data. These APIs +/// can be useful for doing low latency queries over a large number of Parquet +/// files on remote storage (e.g. S3) where the cost of reading the metadata for +/// each file is high (e.g. because it requires a network round trip to the +/// storage service). +/// +/// Depending on the information from the index, DataFusion can make a request +/// to the storage service (e.g. S3) to read only the necessary data. +/// +/// Note that this example uses a hard coded index implementation. For a more +/// realistic example of creating an index to prune files, see the +/// `parquet_index.rs` example. +/// +/// Specifically, this example illustrates how to: +/// 1. Use [`ParquetFileReaderFactory`] to avoid re-reading parquet metadata on each query +/// 2. Use [`PruningPredicate`] for predicate anaysis +/// 3. Pass a row group selection to [`ParuetExec`] +/// 4. Pass a row selection (within a row group) to [`ParuetExec`] +/// +/// Note this is a *VERY* low level example for people who want to build their +/// own custom indexes (e.g. for low latency queries). Most users should use +/// higher level APIs for reading parquet files: +/// [`SessionContext::read_parquet`] or [`ListingTable`], which also do file +/// pruning based on parquet statistics (using the same underlying APIs) +/// +/// [`ListingTable`]: datafusion::datasource::listing::ListingTable +#[tokio::main] +async fn main() -> Result<()> { + // the object store is used to read the parquet files (in this case, it is + // a local file system, but in a real system it could be S3, GCS, etc) + let object_store: Arc = + Arc::new(object_store::local::LocalFileSystem::new()); + + // Create a a custom table provider with our special index. + let provider = Arc::new(IndexTableProvider::try_new(Arc::clone(&object_store))?); + + // SessionContext for running queries that has the table provider + // registered as "index_table" + let ctx = SessionContext::new(); + ctx.register_table("index_table", Arc::clone(&provider) as _)?; + + // register object store provider for urls like `file://` work + let url = Url::try_from("file://").unwrap(); + ctx.register_object_store(&url, object_store); + + // Select data from the table without any predicates (and thus no pruning) + println!("** Select data, no predicates:"); + ctx.sql("SELECT avg(id), max(text) FROM index_table") + .await? + .show() + .await?; + // the underlying parquet reader makes 10 IO requests, one for each row group + + // Now, run a query that has a predicate that our index can handle + println!("** Select data, predicate `id IN (250, 750)`"); + ctx.sql("SELECT text FROM index_table WHERE id IN (250, 750)") + .await? + .show() + .await?; + // in this case, the access plan specifies skipping 8 row groups + // and scanning 2 of them. The skipped row groups are not read at all + // + // [Skip, Skip, Scan, Skip, Skip, Skip, Skip, Scan, Skip, Skip] + // + // Note that the parquet reader only does 2 IOs - one for the data from each + // row group. + + // Finally, demonstrate scanning sub ranges within the row groups. + // Parquet's minimum decode unit is a page, so specifying ranges + // within a row group can be used to skip pages within a row group. + provider.set_use_row_selection(true); + println!("** Select data, predicate `id = 950`"); + ctx.sql("SELECT text FROM index_table WHERE id = 950") + .await? + .show() + .await?; + // In this case, the access plan specifies skipping all but the last row group + // and within the last row group, reading only the row with id 950 + // + // [Skip, Skip, Skip, Skip, Skip, Skip, Skip, Skip, Skip, Selection(skip 49, select 1, skip 50)] + // + // In order to prune pages, the Page Index must be loaded. This PageIndex is + // loaded in a separate IO request, so the parquet reader makes 2 IO + // requests for this query. + + Ok(()) +} + +/// DataFusion `TableProvider` that uses knowledge of how data is distributed in +/// a file to prune row groups and rows from the file. +/// +/// `file1.parquet` contains values `0..1000` +#[derive(Debug)] +pub struct IndexTableProvider { + /// Where the file is stored (cleanup on drop) + #[allow(dead_code)] + tmpdir: TempDir, + /// The file that is being read. + indexed_file: IndexedFile, + /// The underlying object store + object_store: Arc, + /// if true, use row selections in addition to row group selections + use_row_selections: AtomicBool, +} +impl IndexTableProvider { + /// Create a new IndexTableProvider + /// * `dir` - the directory containing the parquet files + /// * `object_store` - the object store implementation to use for reading the files + pub fn try_new(object_store: Arc) -> Result { + let tmpdir = TempDir::new().expect("Can't make temporary directory"); + + let indexed_file = + IndexedFile::try_new(tmpdir.path().join("indexed_file.parquet"), 0..1000)?; + + Ok(Self { + indexed_file, + tmpdir, + object_store, + use_row_selections: AtomicBool::new(false), + }) + } + + /// set the value of use row selections + pub fn set_use_row_selection(&self, use_row_selections: bool) { + self.use_row_selections + .store(use_row_selections, Ordering::SeqCst); + } + + /// return the value of use row selections + pub fn use_row_selections(&self) -> bool { + self.use_row_selections.load(Ordering::SeqCst) + } + + /// convert filters like `a = 1`, `b = 2` + /// to a single predicate like `a = 1 AND b = 2` suitable for execution + fn filters_to_predicate( + &self, + state: &SessionState, + filters: &[Expr], + ) -> Result> { + let df_schema = DFSchema::try_from(self.schema())?; + + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()? + // if there are no filters, use a literal true to have a predicate + // that always evaluates to true we can pass to the index + .unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true)); + + Ok(predicate) + } + + /// Returns a [`ParquetAccessPlan`] that specifies how to scan the + /// parquet file. + /// + /// A `ParquetAccessPlan` specifies which row groups and which rows within + /// those row groups to scan. + fn create_plan( + &self, + predicate: &Arc, + ) -> Result { + // In this example, we use the PruningPredicate's literal guarantees to + // analyze the predicate. In a real system, using + // `PruningPredicate::prune` would likely be easier to do. + let pruning_predicate = + PruningPredicate::try_new(Arc::clone(predicate), self.schema().clone())?; + + // The PruningPredicate's guarantees must all be satisfied in order for + // the predicate to possibly evaluate to true. + let guarantees = pruning_predicate.literal_guarantees(); + let Some(constants) = self.value_constants(guarantees) else { + return Ok(self.indexed_file.scan_all_plan()); + }; + + // Begin with a plan that skips all row groups. + let mut plan = self.indexed_file.scan_none_plan(); + + // determine which row groups have the values in the guarantees + for value in constants { + let ScalarValue::Int32(Some(val)) = value else { + // if we have unexpected type of constant, no pruning is possible + return Ok(self.indexed_file.scan_all_plan()); + }; + + // Since we know the values in the files are between 0..1000 and + // evenly distributed between in row groups, calculate in what row + // group this value appears and tell the parquet reader to read it + let val = *val as usize; + let num_rows_in_row_group = 1000 / plan.len(); + let row_group_index = val / num_rows_in_row_group; + plan.scan(row_group_index); + + // If we want to use row selections, which the parquet reader can + // use to skip data pages when the parquet file has a "page index" + // and the reader is configured to read it, add a row seelction + if self.use_row_selections() { + let offset_in_row_group = val - row_group_index * num_rows_in_row_group; + let selection = RowSelection::from(vec![ + // skip rows before the desired row + RowSelector::skip(offset_in_row_group.saturating_sub(1)), + // select the actual row + RowSelector::select(1), + // skip any remaining rows in the group + RowSelector::skip(num_rows_in_row_group - offset_in_row_group), + ]); + + plan.scan_selection(row_group_index, selection); + } + } + + Ok(plan) + } + + /// Returns the set of constants that the `"id"` column must take in order + /// for the predicate to be true. + /// + /// If `None` is returned, we can't extract the necessary information from + /// the guarantees. + fn value_constants<'a>( + &self, + guarantees: &'a [LiteralGuarantee], + ) -> Option<&'a HashSet> { + // only handle a single guarantee for column in this example + if guarantees.len() != 1 { + return None; + } + let guarantee = guarantees.first()?; + + // Only handle IN guarantees for the "in" column + if guarantee.guarantee != Guarantee::In || guarantee.column.name() != "id" { + return None; + } + Some(&guarantee.literals) + } +} + +/// Stores information needed to scan a file +#[derive(Debug)] +struct IndexedFile { + /// File name + file_name: String, + /// The path of the file + path: PathBuf, + /// The size of the file + file_size: u64, + /// The pre-parsed parquet metadata for the file + metadata: Arc, + /// The arrow schema of the file + schema: SchemaRef, +} + +impl IndexedFile { + fn try_new(path: impl AsRef, value_range: Range) -> Result { + let path = path.as_ref(); + // write the actual file + make_demo_file(path, value_range)?; + + // Now, open the file and read its size and metadata + let file_name = path + .file_name() + .ok_or_else(|| internal_datafusion_err!("Invalid path"))? + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))? + .to_string(); + let file_size = path.metadata()?.len(); + + let file = File::open(path).map_err(|e| { + DataFusionError::from(e).context(format!("Error opening file {path:?}")) + })?; + + let reader = ParquetRecordBatchReaderBuilder::try_new(file)?; + let metadata = reader.metadata().clone(); + let schema = reader.schema().clone(); + + // canonicalize after writing the file + let path = std::fs::canonicalize(path)?; + + Ok(Self { + file_name, + path, + file_size, + metadata, + schema, + }) + } + + /// Return a `PartitionedFile` to scan the underlying file + /// + /// The returned value does not have any `ParquetAccessPlan` specified in + /// its extensions. + fn partitioned_file(&self) -> PartitionedFile { + PartitionedFile::new(self.path.display().to_string(), self.file_size) + } + + /// Return a `ParquetAccessPlan` that scans all row groups in the file + fn scan_all_plan(&self) -> ParquetAccessPlan { + ParquetAccessPlan::new_all(self.metadata.num_row_groups()) + } + + /// Return a `ParquetAccessPlan` that scans no row groups in the file + fn scan_none_plan(&self) -> ParquetAccessPlan { + ParquetAccessPlan::new_none(self.metadata.num_row_groups()) + } +} + +/// Implement the TableProvider trait for IndexTableProvider +/// so that we can query it as a table. +#[async_trait] +impl TableProvider for IndexTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.indexed_file.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let indexed_file = &self.indexed_file; + let predicate = self.filters_to_predicate(state, filters)?; + + // Figure out which row groups to scan based on the predicate + let access_plan = self.create_plan(&predicate)?; + println!("{access_plan:?}"); + + let partitioned_file = indexed_file + .partitioned_file() + // provide the starting access plan to the ParquetExec by + // storing it as "extensions" on PartitionedFile + .with_extensions(Arc::new(access_plan) as _); + + // In this example, we are trying to minimize IO, but by default the + // ParquetExec will load the page index with a separate IO. Disable + // doing so unless we are using row selections (which needs the page index) + let mut table_parquet_options = TableParquetOptions::new(); + if !self.use_row_selections() { + table_parquet_options.global.enable_page_index = false; + } + + // Prepare for scanning + let schema = self.schema(); + let object_store_url = ObjectStoreUrl::parse("file://")?; + let file_scan_config = FileScanConfig::new(object_store_url, schema) + .with_limit(limit) + .with_projection(projection.cloned()) + .with_file(partitioned_file); + + // Configure a factory interface to avoid re-reading the metadata for each file + let reader_factory = + CachedParquetFileReaderFactory::new(Arc::clone(&self.object_store)) + .with_file(indexed_file); + + // Finally, put it all together into a ParquetExec + Ok( + ParquetExecBuilder::new_with_options(file_scan_config, table_parquet_options) + // provide the predicate so the ParquetExec can try and prune + // row groups internally + .with_predicate(predicate) + // provide the factory to create parquet reader without re-reading metadata + .with_parquet_file_reader_factory(Arc::new(reader_factory)) + .build_arc(), + ) + } + + /// Tell DataFusion to push filters down to the scan method + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + // Inexact because the pruning can't handle all expressions and pruning + // is not done at the row level -- there may be rows in returned files + // that do not pass the filter + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } +} + +/// A custom [`ParquetFileReaderFactory`] that handles opening parquet files +/// from object storage, and uses pre-loaded metadata. + +#[derive(Debug)] +struct CachedParquetFileReaderFactory { + /// The underlying object store implementation for reading file data + object_store: Arc, + /// The parquet metadata for each file in the index, keyed by the file name + /// (e.g. `file1.parquet`) + metadata: HashMap>, +} + +impl CachedParquetFileReaderFactory { + fn new(object_store: Arc) -> Self { + Self { + object_store, + metadata: HashMap::new(), + } + } + /// Add the pre-parsed information about the file to the factor + fn with_file(mut self, indexed_file: &IndexedFile) -> Self { + self.metadata.insert( + indexed_file.file_name.clone(), + Arc::clone(&indexed_file.metadata), + ); + self + } +} + +impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { + fn create_reader( + &self, + _partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + _metrics: &ExecutionPlanMetricsSet, + ) -> Result> { + // for this example we ignore the partition index and metrics + // but in a real system you would likely use them to report details on + // the performance of the reader. + let filename = file_meta + .location() + .parts() + .last() + .expect("No path in location") + .as_ref() + .to_string(); + + let object_store = Arc::clone(&self.object_store); + let mut inner = ParquetObjectReader::new(object_store, file_meta.object_meta); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + let metadata = self + .metadata + .get(&filename) + .expect("metadata for file not found: {filename}"); + Ok(Box::new(ParquetReaderWithCache { + filename, + metadata: Arc::clone(metadata), + inner, + })) + } +} + +/// wrapper around a ParquetObjectReader that caches metadata +struct ParquetReaderWithCache { + filename: String, + metadata: Arc, + inner: ParquetObjectReader, +} + +impl AsyncFileReader for ParquetReaderWithCache { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result> { + println!("get_bytes: {} Reading range {:?}", self.filename, range); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> { + println!( + "get_byte_ranges: {} Reading ranges {:?}", + self.filename, ranges + ); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> { + println!("get_metadata: {} returning cached metadata", self.filename); + + // return the cached metadata so the parquet reader does not read it + let metadata = self.metadata.clone(); + async move { Ok(metadata) }.boxed() + } +} + +/// Creates a new parquet file at the specified path. +/// +/// * id: Int32 +/// * text: Utf8 +/// +/// The `id` column increases sequentially from `min_value` to `max_value` +/// The `text` column is a repeating sequence of `TheTextValue{i}` +/// +/// Each row group has 100 rows +fn make_demo_file(path: impl AsRef, value_range: Range) -> Result<()> { + let path = path.as_ref(); + let file = File::create(path)?; + + let id = Int32Array::from_iter_values(value_range.clone()); + let text = + StringArray::from_iter_values(value_range.map(|i| format!("TheTextValue{i}"))); + + let batch = RecordBatch::try_from_iter(vec![ + ("id", Arc::new(id) as ArrayRef), + ("text", Arc::new(text) as ArrayRef), + ])?; + + let schema = batch.schema(); + + // enable page statistics for the tag column, + // for everything else. + let props = WriterProperties::builder() + .set_max_row_group_size(100) + // compute column chunk (per row group) statistics by default + .set_statistics_enabled(EnabledStatistics::Chunk) + // compute column page statistics for the tag column + .set_column_statistics_enabled(ColumnPath::from("tag"), EnabledStatistics::Page) + .build(); + + // write the actual values to the file + let mut writer = ArrowWriter::try_new(file, schema, Some(props))?; + writer.write(&batch)?; + writer.close()?; + + Ok(()) +} diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index 911ff079def1..6c49d98c842d 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -127,6 +127,13 @@ impl Column { }) } + /// return the column's name. + /// + /// Note: This ignores the relation and returns the column name only. + pub fn name(&self) -> &str { + &self.name + } + /// Serialize column into a flat name string pub fn flat_name(&self) -> String { match &self.relation { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1c431d04cd35..c59cdba7c829 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1393,6 +1393,13 @@ pub struct TableParquetOptions { pub key_value_metadata: HashMap>, } +impl TableParquetOptions { + /// Return new default TableParquetOptions + pub fn new() -> Self { + Self::default() + } +} + impl ConfigField for TableParquetOptions { fn visit(&self, v: &mut V, key_prefix: &str, description: &'static str) { self.global.visit(v, key_prefix, description); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs index f51f2c49e896..3eb76f81b1dc 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs @@ -139,6 +139,11 @@ impl ParquetAccessPlan { self.set(idx, RowGroupAccess::Skip); } + /// scan the i-th row group + pub fn scan(&mut self, idx: usize) { + self.set(idx, RowGroupAccess::Scan); + } + /// Return true if the i-th row group should be scanned pub fn should_scan(&self, idx: usize) -> bool { self.row_groups[idx].should_scan() diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 5e5cc93bc54f..bf8b717aecdb 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -187,9 +187,9 @@ pub use writer::plan_to_parquet; /// let exec = ParquetExec::builder(file_scan_config).build(); /// ``` /// -/// For a complete example, see the [`parquet_index_advanced` example]). +/// For a complete example, see the [`advanced_parquet_index` example]). /// -/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs +/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_parquet_index.rs /// /// # Execution Overview /// diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 06eb8f79dada..bb99c5b98497 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -54,7 +54,7 @@ impl RowGroupAccessPlanFilter { Self { access_plan } } - /// Return true if there are no row groups to scan + /// Return true if there are no row groups pub fn is_empty(&self) -> bool { self.access_plan.is_empty() } diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 4dd62a894518..98aff0d65898 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -471,8 +471,10 @@ pub struct PruningPredicate { /// Original physical predicate from which this predicate expr is derived /// (required for serialization) orig_expr: Arc, - /// [`LiteralGuarantee`]s that are used to try and prove a predicate can not - /// possibly evaluate to `true`. + /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly + /// evaluate to `true`. + /// + /// See [`PruningPredicate::literal_guarantees`] for more details. literal_guarantees: Vec, } @@ -595,6 +597,10 @@ impl PruningPredicate { } /// Returns a reference to the literal guarantees + /// + /// Note that **All** `LiteralGuarantee`s must be satisfied for the + /// expression to possibly be `true`. If any is not satisfied, the + /// expression is guaranteed to be `null` or `false`. pub fn literal_guarantees(&self) -> &[LiteralGuarantee] { &self.literal_guarantees } From 031878fb070bec64c4e8cf3d26de2be4bbaabf6a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 13 Jun 2024 20:14:59 -0400 Subject: [PATCH 2/9] pre-load page index --- .../examples/advanced_parquet_index.rs | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 4a64b88de7f5..1a44f83fdb45 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -30,7 +30,7 @@ use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::arrow_reader::{ - ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, + ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, }; use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use datafusion::parquet::arrow::ArrowWriter; @@ -42,7 +42,6 @@ use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; -use datafusion_common::config::TableParquetOptions; use datafusion_common::{ internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, }; @@ -338,7 +337,12 @@ impl IndexedFile { DataFusionError::from(e).context(format!("Error opening file {path:?}")) })?; - let reader = ParquetRecordBatchReaderBuilder::try_new(file)?; + let options = ArrowReaderOptions::new() + // Load the page index when reading metadata to cache + // so it is available to interpret row selections + .with_page_index(true); + let reader = + ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?; let metadata = reader.metadata().clone(); let schema = reader.schema().clone(); @@ -409,14 +413,6 @@ impl TableProvider for IndexTableProvider { // storing it as "extensions" on PartitionedFile .with_extensions(Arc::new(access_plan) as _); - // In this example, we are trying to minimize IO, but by default the - // ParquetExec will load the page index with a separate IO. Disable - // doing so unless we are using row selections (which needs the page index) - let mut table_parquet_options = TableParquetOptions::new(); - if !self.use_row_selections() { - table_parquet_options.global.enable_page_index = false; - } - // Prepare for scanning let schema = self.schema(); let object_store_url = ObjectStoreUrl::parse("file://")?; @@ -431,15 +427,13 @@ impl TableProvider for IndexTableProvider { .with_file(indexed_file); // Finally, put it all together into a ParquetExec - Ok( - ParquetExecBuilder::new_with_options(file_scan_config, table_parquet_options) - // provide the predicate so the ParquetExec can try and prune - // row groups internally - .with_predicate(predicate) - // provide the factory to create parquet reader without re-reading metadata - .with_parquet_file_reader_factory(Arc::new(reader_factory)) - .build_arc(), - ) + Ok(ParquetExecBuilder::new(file_scan_config) + // provide the predicate so the ParquetExec can try and prune + // row groups internally + .with_predicate(predicate) + // provide the factory to create parquet reader without re-reading metadata + .with_parquet_file_reader_factory(Arc::new(reader_factory)) + .build_arc()) } /// Tell DataFusion to push filters down to the scan method From 79e1476a2b1afcb901aefe2fde5b5c415cc4e4c2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 13 Jun 2024 20:15:45 -0400 Subject: [PATCH 3/9] fix comment --- datafusion-examples/examples/advanced_parquet_index.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 1a44f83fdb45..7662fa65dc42 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -172,8 +172,7 @@ pub struct IndexTableProvider { } impl IndexTableProvider { /// Create a new IndexTableProvider - /// * `dir` - the directory containing the parquet files - /// * `object_store` - the object store implementation to use for reading the files + /// * `object_store` - the object store implementation to use for reading files pub fn try_new(object_store: Arc) -> Result { let tmpdir = TempDir::new().expect("Can't make temporary directory"); From 9f3a5f96e7aa4de5f79693bf8a3d726aeee08361 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 06:29:40 -0400 Subject: [PATCH 4/9] Apply suggestions from code review Thank you @Weijun-H Co-authored-by: Alex Huang --- datafusion-examples/examples/advanced_parquet_index.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 7662fa65dc42..8fcdeefa35e0 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -61,7 +61,7 @@ use std::sync::Arc; use tempfile::TempDir; use url::Url; -/// This example demonstrates using low level DataFusion APIs to read ony +/// This example demonstrates using low level DataFusion APIs to read only /// certain row groups and ranges from parquet files, based on external /// information. /// @@ -81,7 +81,7 @@ use url::Url; /// /// Specifically, this example illustrates how to: /// 1. Use [`ParquetFileReaderFactory`] to avoid re-reading parquet metadata on each query -/// 2. Use [`PruningPredicate`] for predicate anaysis +/// 2. Use [`PruningPredicate`] for predicate analysis /// 3. Pass a row group selection to [`ParuetExec`] /// 4. Pass a row selection (within a row group) to [`ParuetExec`] /// @@ -99,7 +99,7 @@ async fn main() -> Result<()> { let object_store: Arc = Arc::new(object_store::local::LocalFileSystem::new()); - // Create a a custom table provider with our special index. + // Create a custom table provider with our special index. let provider = Arc::new(IndexTableProvider::try_new(Arc::clone(&object_store))?); // SessionContext for running queries that has the table provider From 0a2bcfaa771f3adab79c7faf926f5e96ebcf39fb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 06:50:15 -0400 Subject: [PATCH 5/9] Add ASCII ART --- .../examples/advanced_parquet_index.rs | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 8fcdeefa35e0..1390914e2a77 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -91,6 +91,67 @@ use url::Url; /// [`SessionContext::read_parquet`] or [`ListingTable`], which also do file /// pruning based on parquet statistics (using the same underlying APIs) /// +/// # Diagram +/// +/// This diagram shows how the `ParquetExec` is configured to do only a single +/// (range) read from a parquet file, for the data that is needed. It does +/// not read the file footer or any of the row groups that are not needed. +/// +/// ```text +/// ┌───────────────────────┐ The TableProvider configures the +/// │ ┌───────────────────┐ │ ParquetExec: +/// │ │ │ │ +/// │ └───────────────────┘ │ +/// │ ┌───────────────────┐ │ +/// Row │ │ │ │ 1. To read only specific Row +/// Groups │ └───────────────────┘ │ Groups (the ParquetExec tries +/// │ ┌───────────────────┐ │ to reduce this further based +/// │ │ │ │ on metadata) +/// │ └───────────────────┘ │ ┌────────────────────┐ +/// │ ┌───────────────────┐ │ │ │ +/// │ │ │◀┼ ─ ─ ┐ │ ParquetExec │ +/// │ └───────────────────┘ │ │ (Parquet Reader) │ +/// │ ... │ └ ─ ─ ─ ─│ │ +/// │ ┌───────────────────┐ │ │ ╔═══════════════╗ │ +/// │ │ │ │ │ ║ParquetMetadata║ │ +/// │ └───────────────────┘ │ │ ╚═══════════════╝ │ +/// │ ╔═══════════════════╗ │ └────────────────────┘ +/// │ ║ Thrift metadata ║ │ +/// │ ╚═══════════════════╝ │ 1. With cached ParquetMetadata, so +/// └───────────────────────┘ the ParquetExec does not re-read / +/// Parquet File decode the thrift footer +/// +/// ``` +/// +/// Within a Row Group, Column Chunks store data in DataPages. This example also +/// shows how to configure the ParquetExec to read a `RowSelection` (row ranges) +/// which will skip unneeded data pages: +/// +/// ```text +/// ┌───────────────────────┐ If the RowSelection does not include any +/// │ ... │ rows from a particular Data Page, that +/// │ │ Data Page is not fetched or decoded +/// │ ┌───────────────────┐ │ +/// │ │ ┌──────────┐ │ │ +/// Row │ │ │DataPage 0│ │ │ ┌────────────────────┐ +/// Groups │ │ └──────────┘ │ │ │ │ +/// │ │ ┌──────────┐ │ │ │ ParquetExec │ +/// │ │ ... │DataPage 1│ ◀┼ ┼ ─ ─ ─ │ (Parquet Reader) │ +/// │ │ └──────────┘ │ │ └ ─ ─ ─ ─ ─│ │ +/// │ │ ┌──────────┐ │ │ │ ╔═══════════════╗ │ +/// │ │ │DataPage 2│ │ │ If only rows │ ║ParquetMetadata║ │ +/// │ │ └──────────┘ │ │ from DataPage 1 │ ╚═══════════════╝ │ +/// │ └───────────────────┘ │ are selected, └────────────────────┘ +/// │ │ only DataPage 1 +/// │ ... │ is fetched and +/// │ │ decoded +/// │ ╔═══════════════════╗ │ +/// │ ║ Thrift metadata ║ │ +/// │ ╚═══════════════════╝ │ +/// └───────────────────────┘ +/// Parquet File +/// ``` +/// /// [`ListingTable`]: datafusion::datasource::listing::ListingTable #[tokio::main] async fn main() -> Result<()> { From 05febf8b0350fa5eb8608ee486011cef60f9d66d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 07:44:43 -0400 Subject: [PATCH 6/9] Update datafusion-examples/README.md Co-authored-by: Alex Huang --- datafusion-examples/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 4e3ec058cc17..995c127120ab 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -45,7 +45,7 @@ cargo run --example csv_sql - [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF) - [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF) - [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF) -- ['advanced_parquet_index.rs'](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files +- [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files - [`avro_sql.rs`](examples/avro_sql.rs): Build and run a query plan from a SQL statement against a local AVRO file - [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog - [`csv_sql.rs`](examples/csv_sql.rs): Build and run a query plan from a SQL statement against a local CSV file From 9030e7b19ad3051b4b813023d48104668d345443 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 07:44:47 -0400 Subject: [PATCH 7/9] Update datafusion-examples/examples/advanced_parquet_index.rs Co-authored-by: Alex Huang --- datafusion-examples/examples/advanced_parquet_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 1390914e2a77..923670d2819f 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -321,7 +321,7 @@ impl IndexTableProvider { // If we want to use row selections, which the parquet reader can // use to skip data pages when the parquet file has a "page index" - // and the reader is configured to read it, add a row seelction + // and the reader is configured to read it, add a row selection if self.use_row_selections() { let offset_in_row_group = val - row_group_index * num_rows_in_row_group; let selection = RowSelection::from(vec![ From 58b2ba66b211a9c586fd2e7d7f81acbfaf47f97a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 19 Jun 2024 17:26:45 -0400 Subject: [PATCH 8/9] Improve / clarify comments based on review --- .../examples/advanced_parquet_index.rs | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 923670d2819f..7d3e3b1475f2 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -181,36 +181,42 @@ async fn main() -> Result<()> { // the underlying parquet reader makes 10 IO requests, one for each row group // Now, run a query that has a predicate that our index can handle + // + // For this query, the access plan specifies skipping 8 row groups + // and scanning 2 of them. The skipped row groups are not read at all: + // + // [Skip, Skip, Scan, Skip, Skip, Skip, Skip, Scan, Skip, Skip] + // + // Note that the parquet reader makes 2 IO requests - one for the data from + // each row group. println!("** Select data, predicate `id IN (250, 750)`"); ctx.sql("SELECT text FROM index_table WHERE id IN (250, 750)") .await? .show() .await?; - // in this case, the access plan specifies skipping 8 row groups - // and scanning 2 of them. The skipped row groups are not read at all - // - // [Skip, Skip, Scan, Skip, Skip, Skip, Skip, Scan, Skip, Skip] - // - // Note that the parquet reader only does 2 IOs - one for the data from each - // row group. // Finally, demonstrate scanning sub ranges within the row groups. // Parquet's minimum decode unit is a page, so specifying ranges // within a row group can be used to skip pages within a row group. + // + // For this query, the access plan specifies skipping all but the last row + // group and within the last row group, reading only the row with id 950 + // + // [Skip, Skip, Skip, Skip, Skip, Skip, Skip, Skip, Skip, Selection(skip 49, select 1, skip 50)] + // + // Note that the parquet reader makes a single IO request - for the data + // pages that must be decoded + // + // Note: in order to prune pages, the Page Index must be loaded and the + // ParquetExec will load it on demand if not present. To avoid a second IO + // during query, this example loaded the Page Index pre-emptively by setting + // `ArrowReader::with_page_index` in `IndexedFile::try_new` provider.set_use_row_selection(true); println!("** Select data, predicate `id = 950`"); ctx.sql("SELECT text FROM index_table WHERE id = 950") .await? .show() .await?; - // In this case, the access plan specifies skipping all but the last row group - // and within the last row group, reading only the row with id 950 - // - // [Skip, Skip, Skip, Skip, Skip, Skip, Skip, Skip, Skip, Selection(skip 49, select 1, skip 50)] - // - // In order to prune pages, the Page Index must be loaded. This PageIndex is - // loaded in a separate IO request, so the parquet reader makes 2 IO - // requests for this query. Ok(()) } From d2f64777a7fd48b2e81e7d1eb5358ae38ac6b494 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 21 Jun 2024 17:06:26 -0400 Subject: [PATCH 9/9] Add page index caveat --- datafusion-examples/examples/advanced_parquet_index.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 7d3e3b1475f2..9bf71e52c3de 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -83,7 +83,7 @@ use url::Url; /// 1. Use [`ParquetFileReaderFactory`] to avoid re-reading parquet metadata on each query /// 2. Use [`PruningPredicate`] for predicate analysis /// 3. Pass a row group selection to [`ParuetExec`] -/// 4. Pass a row selection (within a row group) to [`ParuetExec`] +/// 4. Pass a row selection (within a row group) to [`ParquetExec`] /// /// Note this is a *VERY* low level example for people who want to build their /// own custom indexes (e.g. for low latency queries). Most users should use @@ -125,13 +125,14 @@ use url::Url; /// /// Within a Row Group, Column Chunks store data in DataPages. This example also /// shows how to configure the ParquetExec to read a `RowSelection` (row ranges) -/// which will skip unneeded data pages: +/// which will skip unneeded data pages. This requires that the Parquet file has +/// a [Page Index]. /// /// ```text /// ┌───────────────────────┐ If the RowSelection does not include any /// │ ... │ rows from a particular Data Page, that -/// │ │ Data Page is not fetched or decoded -/// │ ┌───────────────────┐ │ +/// │ │ Data Page is not fetched or decoded. +/// │ ┌───────────────────┐ │ Note this requires a PageIndex /// │ │ ┌──────────┐ │ │ /// Row │ │ │DataPage 0│ │ │ ┌────────────────────┐ /// Groups │ │ └──────────┘ │ │ │ │ @@ -153,6 +154,7 @@ use url::Url; /// ``` /// /// [`ListingTable`]: datafusion::datasource::listing::ListingTable +/// [Page Index](https://github.com/apache/parquet-format/blob/master/PageIndex.md) #[tokio::main] async fn main() -> Result<()> { // the object store is used to read the parquet files (in this case, it is