From 7ff0bbde5aa29d833e221332cae6712949385fb0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 3 Jun 2024 15:30:57 -0400 Subject: [PATCH] Add `advanced_parquet_index.rs` example of indexing into parquet files --- datafusion-examples/README.md | 1 + .../examples/advanced_parquet_index.rs | 606 ++++++++++++++++++ datafusion/common/src/column.rs | 7 + .../physical_plan/parquet/access_plan.rs | 5 + .../datasource/physical_plan/parquet/mod.rs | 4 +- .../physical_plan/parquet/row_groups.rs | 2 +- .../core/src/physical_optimizer/pruning.rs | 10 +- 7 files changed, 630 insertions(+), 5 deletions(-) create mode 100644 datafusion-examples/examples/advanced_parquet_index.rs diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index c34f706adb82..4e3ec058cc17 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -45,6 +45,7 @@ cargo run --example csv_sql - [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF) - [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF) - [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF) +- ['advanced_parquet_index.rs'](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files - [`avro_sql.rs`](examples/avro_sql.rs): Build and run a query plan from a SQL statement against a local AVRO file - [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog - [`csv_sql.rs`](examples/csv_sql.rs): Build and run a query plan from a SQL statement against a local CSV file diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs new file mode 100644 index 000000000000..1fad5fc13e00 --- /dev/null +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -0,0 +1,606 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use bytes::Bytes; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan; +use datafusion::datasource::physical_plan::{ + parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig, ParquetExec, +}; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::parquet::arrow::arrow_reader::{ParquetRecordBatchReaderBuilder, RowSelection, RowSelector}; +use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::file::metadata::ParquetMetaData; +use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}; +use datafusion::parquet::schema::types::ColumnPath; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_optimizer::pruning::PruningPredicate; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::*; +use datafusion_common::{ + internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, +}; +use datafusion_expr::utils::conjunction; +use datafusion_expr::{TableProviderFilterPushDown, TableType}; +use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee}; +use futures::future::BoxFuture; +use futures::FutureExt; +use object_store::ObjectStore; +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::fmt::Display; +use std::fs::File; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use tempfile::TempDir; +use url::Url; + + +/// This example demonstrates using low level DataFusion APIs to minimize the +/// amount of parquet data read when external information is available. +/// +/// Using these APIs, you can instruct DataFusion's parquet reader to to skip +/// ("prune") portions of files that do not contain relevant data. These APIs +/// can be useful for use cases such as low latency queries over a large number +/// of Parquet files on remote storage (e.g. S3) where the cost of reading the +/// metadata for each file is high (e.g. because it requires a network round +/// trip to the storage service). +/// +/// Depending on the information from the index, DataFusion can make a request +/// to the storage service (e.g. S3) to read only the necessary data. +/// +/// Note that this example uses a simple index implementation. For a more +/// realistic example of creating an index to prune files, see the +/// `parquet_index.rs` example. +/// +/// Specifically, this example illustrates how to: +/// 1. Use ParquetFileReaderFactory to avoid re-reading parquet metadata on each query +/// 2. Using `PruningPredicate` to analyze the predicates +/// 2. Pass a row group selection to parquet reader +/// 3. Pass a row selection (within a row group) to the parquet reader +/// +/// Note this is a *VERY* low level example for people who want to build their +/// own custom indexes (e.g. for low latency queries). Most users should use +/// higher level APIs for reading parquet files: +/// [`SessionContext::read_parquet`] or [`ListingTable`], which also do file +/// pruning based on parquet statistics (using the same underlying APIs) +/// +/// # Diagram +/// +/// ```text +/// TODO update diagram +/// ``` +/// +/// [`ListingTable`]: datafusion::datasource::listing::ListingTable +#[tokio::main] +async fn main() -> Result<()> { + // the object store is used to read the parquet files (in this case, it is + // a local file system, but in a real system it could be S3, GCS, etc) + let object_store: Arc = + Arc::new(object_store::local::LocalFileSystem::new()); + + // Create a table provider with and our special index. + let provider = Arc::new(IndexTableProvider::try_new(Arc::clone(&object_store))?); + println!("** Table Provider:"); + println!("{provider}\n"); + + // Create a SessionContext for running queries that has the table provider + // registered as "index_table" + let ctx = SessionContext::new(); + ctx.register_table("index_table", Arc::clone(&provider) as _)?; + + // register object store provider for urls like `file://` work + let url = Url::try_from("file://").unwrap(); + ctx.register_object_store(&url, object_store); + + // Select data from the table without any predicates (and thus no pruning) + println!("** Select data, no predicates:"); + ctx.sql("SELECT file_name, value FROM index_table LIMIT 10") + .await? + .show() + .await?; + // the underlying parquet reader reads ranges from all three files + + // Run a query that uses the index to prune row groups + println!("** Select data, predicate `value IN (250, 750)`"); + ctx.sql("SELECT file_name, value FROM index_table WHERE value IN (250, 750)") + .await? + .show() + .await?; + // TODO why are there two calls to read bytes from the file? + + // Run a query that uses the index to prune down to row selections + provider.set_use_row_selection(true); + println!("** Select data, predicate `value = 950`"); + ctx.sql("SELECT file_name, value FROM index_table WHERE value = 950") + .await? + .show() + .await?; + // TODO why are there two calls to read bytes from the file? + + Ok(()) +} + +/// DataFusion `TableProvider` that uses [`IndexTableProvider`], a secondary +/// index to decide which parts of a parquet file to read +/// +/// The schema of the file is +/// * file_name (string) +/// * value (int32) +/// +/// `file1.parquet` contains values `0..1000` +#[derive(Debug)] +pub struct IndexTableProvider { + /// Where the file is stored (cleanup on drop) + #[allow(dead_code)] + tmpdir: TempDir, + /// The file that we indexed + indexed_file: IndexedFile, + /// handle to the underlying object store + object_store: Arc, + /// if true, use row selections rather than row group selections + use_row_selections: AtomicBool, +} + +impl Display for IndexTableProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "IndexTableProvider")?; + writeln!(f, "---- Index ----")?; + write!(f, "use_row_selections: {}", self.use_row_selections()) + } +} + +impl IndexTableProvider { + /// Create a new IndexTableProvider + /// * `dir` - the directory containing the parquet files + /// * `object_store` - the object store implementation to use for reading the files + pub fn try_new(object_store: Arc) -> Result { + let tmpdir = TempDir::new().expect("Can't make temporary directory"); + + let indexed_file = + IndexedFile::try_new(tmpdir.path().join("indexed_file.parquet"), 0..1000)?; + + Ok(Self { + indexed_file, + tmpdir, + object_store, + use_row_selections: AtomicBool::new(false), + }) + } + + /// Specify the value of use row selections + pub fn set_use_row_selection(&self, use_row_selections: bool) { + self.use_row_selections.store(use_row_selections, Ordering::SeqCst); + } + + /// return the value of use row selections + pub fn use_row_selections(&self) -> bool { + self.use_row_selections.load(Ordering::SeqCst) + } + + /// convert filters like `a = 1`, `b = 2` + /// to a single predicate ike `a = 1 AND b = 2` + fn filters_to_predicate( + &self, + state: &SessionState, + filters: &[Expr], + ) -> Result> { + let df_schema = DFSchema::try_from(self.schema())?; + + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()? + // if there are no filters, use a literal true to have a predicate + // that always evaluates to true we can pass to the index + .unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true)); + + Ok(predicate) + } + + /// returns a `ParquetAccessPlan` that specifies how to scan the + /// parquet file. + fn create_plan( + &self, + predicate: &Arc, + ) -> Result { + // In this example, usi the PruningPredicate API's literal + // guarantees to analyze the predicate. In a real system, using + // `PruningPredicate::prune` would likely be easier to do. + let pruning_predicate = + PruningPredicate::try_new(Arc::clone(predicate), self.schema().clone())?; + + // The PruningPredicate's guarantees must all be satisfied or else the predicate + // cannot be true (and thus the data is not in the file) + let guarantees = pruning_predicate.literal_guarantees(); + let Some(constants) = self.value_constants(guarantees) else { + return Ok(self.indexed_file.scan_all_plan()); + }; + + let mut plan = self.indexed_file.scan_none_plan(); + + + for value in constants { + let ScalarValue::Int32(Some(val)) = value else { + // if we have in expected type of constant, no pruning is possible + return Ok(self.indexed_file.scan_all_plan()); + }; + + // Since we know values are between 0..1000 and evenly stored in row + // groups, we know what row group this value appears. + let val = *val as usize; + let num_rows_in_row_group = 1000 / plan.len(); + let row_group_index = val / num_rows_in_row_group; + + plan.scan(row_group_index); + + // If the index knows which rows in the row groups to scan, create a + // row selection which the parquet reader can use to skip data + // pages, if the parquet file has "page index" information + if self.use_row_selections() { + let offset_in_row_group = val - row_group_index * num_rows_in_row_group; + let selection = RowSelection::from(vec![ + // skip rows before the desired row + RowSelector::skip(offset_in_row_group.saturating_sub(1)), + // select the actual row + RowSelector::select(1), + // skip any remaining rows in the group + RowSelector::skip(num_rows_in_row_group - offset_in_row_group), + ]); + + plan.scan_selection(row_group_index, selection); + } + } + + Ok(plan) + } + + /// returns the set of constants that "value" column must take in order for + /// the predicate to be true. + /// + /// If None is returned, there is no way to prune the file based on the + /// the value column. + fn value_constants<'a>( + &self, + guarantees: &'a [LiteralGuarantee], + ) -> Option<&'a HashSet> { + println!("Guarantees: {guarantees:#?}"); + + // only handle a single guarantee for column + if guarantees.len() != 1 { + return None; + } + let guarantee = guarantees.first()?; + + // Only handle IN guarantees for the "value" column + if guarantee.guarantee != Guarantee::In || guarantee.column.name() != "value" { + return None; + } + Some(&guarantee.literals) + } +} + +/// Stores information about how to scan a file +#[derive(Debug)] +struct IndexedFile { + /// File name + file_name: String, + /// The path of the file + path: PathBuf, + /// The size of the file + file_size: u64, + /// The metadata for the file + metadata: Arc, + /// The arrow schema of the file + schema: SchemaRef, +} + +impl IndexedFile { + fn try_new(path: impl AsRef, value_range: Range) -> Result { + let path = path.as_ref(); + // write the actual file + make_demo_file(path, value_range)?; + + // Now, open the file and read its size and metadata + let file_name = path + .file_name() + .ok_or_else(|| internal_datafusion_err!("Invalid path"))? + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))? + .to_string(); + let file_size = path.metadata()?.len(); + + let file = File::open(path).map_err(|e| { + DataFusionError::from(e).context(format!("Error opening file {path:?}")) + })?; + + let reader = ParquetRecordBatchReaderBuilder::try_new(file)?; + + let metadata = reader.metadata().clone(); + let schema = reader.schema().clone(); + + // canonicalize after writing the file + let path = std::fs::canonicalize(path)?; + + Ok(Self { + file_name, + path, + file_size, + metadata, + schema, + }) + } + + /// Return a `PartitionedFile` suitable for scanning this file + /// + /// The returned value does not have any `ParquetAccessPlan` specified in + /// its extensions. + fn partitioned_file(&self) -> PartitionedFile { + PartitionedFile::new(self.path.display().to_string(), self.file_size) + } + + /// Return a parquet access plan that scans all row groups in the file + fn scan_all_plan(&self) -> ParquetAccessPlan { + ParquetAccessPlan::new_all(self.metadata.num_row_groups()) + } + + /// Return a parquet access plan that scans no row groups in the file + fn scan_none_plan(&self) -> ParquetAccessPlan { + ParquetAccessPlan::new_none(self.metadata.num_row_groups()) + } +} + +#[async_trait] +impl TableProvider for IndexTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.indexed_file.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let indexed_file = &self.indexed_file; + let predicate = self.filters_to_predicate(state, filters)?; + + // figure out which row groups to scan based on the predicate... + let access_plan = self.create_plan(&predicate)?; + println!("Using access plan:\n{access_plan:?}"); + + let partitioned_file = indexed_file + .partitioned_file() + // provide the starting access plan to the ParquetExec + .with_extensions(Arc::new(access_plan) as _); + + let schema = self.schema(); + let object_store_url = ObjectStoreUrl::parse("file://")?; + let file_scan_config = FileScanConfig::new(object_store_url, schema) + .with_limit(limit) + .with_projection(projection.cloned()) + .with_file(partitioned_file); + + // setup a factory that can create parquet readers + // without re-reading the metadata for each file + let reader_factory = + CachedParquetFileReaderFactory::new(Arc::clone(&self.object_store)) + .with_file(indexed_file); + + // Finally, put it all together + Ok(ParquetExec::builder(file_scan_config) + // tell the parquet exec about the predicate so it can prune + // row groups internally. + .with_predicate(predicate) + // provide a factory that can create parquet readers + // to avoid re-reading the metadata for each file + .with_parquet_file_reader_factory(Arc::new(reader_factory)) + .build_arc()) + } + + /// Tell DataFusion to push filters down to the scan method + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + // Inexact because the pruning can't handle all expressions and pruning + // is not done at the row level -- there may be rows in returned files + // that do not pass the filter + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } +} + +/// A custom [`ParquetFileReaderFactory`] that handles opening parquet files +/// from object storage, and uses pre-loaded metadata rather than re-loading it + +#[derive(Debug)] +struct CachedParquetFileReaderFactory { + /// The underlying object store implementation for reading file data + object_store: Arc, + /// The parquet metadata for each file in the index, keyed by the file name + /// (e.g. `file1.parquet`) to avoid having to read the metadata for each + /// file multiple times. + metadata: HashMap>, +} + +impl CachedParquetFileReaderFactory { + fn new(object_store: Arc) -> Self { + Self { + object_store, + metadata: HashMap::new(), + } + } + /// Add the pre-parsed information about the file to the factor + fn with_file(mut self, indexed_file: &IndexedFile) -> Self { + self.metadata.insert( + indexed_file.file_name.clone(), + Arc::clone(&indexed_file.metadata), + ); + self + } +} + +impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { + fn create_reader( + &self, + _partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + _metrics: &ExecutionPlanMetricsSet, + ) -> Result> { + // for this example we ignore the partition index and metrics + // but in a real system you would likely use them to report details on + // the performance of the reader. + let filename = file_meta + .location() + .parts() + .last() + .expect("No path in location") + .as_ref() + .to_string(); + + let object_store = Arc::clone(&self.object_store); + let mut inner = ParquetObjectReader::new(object_store, file_meta.object_meta); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + let metadata = self + .metadata + .get(&filename) + .expect("metadata for file not found: {filename}"); + Ok(Box::new(ParquetReaderWithCache { + filename, + metadata: Arc::clone(metadata), + inner, + })) + } +} + +/// wrapper around a ParquetObjectReader that caches the metadata +struct ParquetReaderWithCache { + filename: String, + metadata: Arc, + inner: ParquetObjectReader, +} + +impl AsyncFileReader for ParquetReaderWithCache { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result> { + println!("{} Reading range {:?}", self.filename, range); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> { + println!("{} Reading ranges {:?}", self.filename, ranges); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> { + println!("{} returning cached metadata", self.filename); + + // return the cached metadata (do not read it again) + let metadata = self.metadata.clone(); + async move { Ok(metadata) }.boxed() + } +} + +/// Creates a new parquet file at the specified path. +/// +/// * file_name: Utf8 +/// * value: Int32 +/// * tag: Utf8 (values `tag_value_A` .. `tag_value_Z`, `tag_value_A`, ...) +/// +/// The `value` column increases sequentially from `min_value` to `max_value` +/// The `tag` column is a repeating sequence of `tag_value_A`, `tag_value_B`, .. `tag_value_Z` +/// +/// Each row group has 100 rows, and each page of `tag` has 10 values. +/// +/// Note that the `tag` column is designed so that it is not possible to prune +/// entire groups based on the tag column. However, we can use the page level +/// statistics in the page index to do so. +fn make_demo_file(path: impl AsRef, value_range: Range) -> Result<()> { + let path = path.as_ref(); + let file = File::create(path)?; + let filename = path + .file_name() + .ok_or_else(|| internal_datafusion_err!("No filename"))? + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))?; + + let num_values = value_range.len(); + let file_names = + StringArray::from_iter_values(std::iter::repeat(&filename).take(num_values)); + let values = Int32Array::from_iter_values(value_range); + let tag_values = StringArray::from_iter_values( + ('A'..='Z') + .map(|c| format!("tag_value_{}", c)) + .cycle() + .take(num_values), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("file_name", Arc::new(file_names) as ArrayRef), + ("value", Arc::new(values) as ArrayRef), + ("tag", Arc::new(tag_values) as ArrayRef), + ])?; + + let schema = batch.schema(); + + // enable page statistics for the tag column, + // for everything else. + let props = WriterProperties::builder() + .set_max_row_group_size(100) + // compute column chunk (per row group) statistics by default + .set_statistics_enabled(EnabledStatistics::Chunk) + // compute column page statistics for the tag column + .set_column_statistics_enabled(ColumnPath::from("tag"), EnabledStatistics::Page) + .build(); + + // write the actual values to the file + let mut writer = ArrowWriter::try_new(file, schema, Some(props))?; + writer.write(&batch)?; + writer.close()?; + + Ok(()) +} diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index 911ff079def1..6c49d98c842d 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -127,6 +127,13 @@ impl Column { }) } + /// return the column's name. + /// + /// Note: This ignores the relation and returns the column name only. + pub fn name(&self) -> &str { + &self.name + } + /// Serialize column into a flat name string pub fn flat_name(&self) -> String { match &self.relation { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs index f51f2c49e896..3eb76f81b1dc 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs @@ -139,6 +139,11 @@ impl ParquetAccessPlan { self.set(idx, RowGroupAccess::Skip); } + /// scan the i-th row group + pub fn scan(&mut self, idx: usize) { + self.set(idx, RowGroupAccess::Scan); + } + /// Return true if the i-th row group should be scanned pub fn should_scan(&self, idx: usize) -> bool { self.row_groups[idx].should_scan() diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 5e5cc93bc54f..bf8b717aecdb 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -187,9 +187,9 @@ pub use writer::plan_to_parquet; /// let exec = ParquetExec::builder(file_scan_config).build(); /// ``` /// -/// For a complete example, see the [`parquet_index_advanced` example]). +/// For a complete example, see the [`advanced_parquet_index` example]). /// -/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs +/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_parquet_index.rs /// /// # Execution Overview /// diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 06eb8f79dada..bb99c5b98497 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -54,7 +54,7 @@ impl RowGroupAccessPlanFilter { Self { access_plan } } - /// Return true if there are no row groups to scan + /// Return true if there are no row groups pub fn is_empty(&self) -> bool { self.access_plan.is_empty() } diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 4dd62a894518..98aff0d65898 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -471,8 +471,10 @@ pub struct PruningPredicate { /// Original physical predicate from which this predicate expr is derived /// (required for serialization) orig_expr: Arc, - /// [`LiteralGuarantee`]s that are used to try and prove a predicate can not - /// possibly evaluate to `true`. + /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly + /// evaluate to `true`. + /// + /// See [`PruningPredicate::literal_guarantees`] for more details. literal_guarantees: Vec, } @@ -595,6 +597,10 @@ impl PruningPredicate { } /// Returns a reference to the literal guarantees + /// + /// Note that **All** `LiteralGuarantee`s must be satisfied for the + /// expression to possibly be `true`. If any is not satisfied, the + /// expression is guaranteed to be `null` or `false`. pub fn literal_guarantees(&self) -> &[LiteralGuarantee] { &self.literal_guarantees }