diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index a5395ea7aab3..6fcbe0429bf5 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -61,6 +61,7 @@ cargo run --example csv_sql - [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function - [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es - ['parquet_index.rs'](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries +- ['parquet_index_advanced.rs'](examples/parquet_index.rs): Create a detailed secondary index that covers the contents of several parquet files - [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file - [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files - ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution diff --git a/datafusion-examples/examples/parquet_index_advanced.rs b/datafusion-examples/examples/parquet_index_advanced.rs new file mode 100644 index 000000000000..9b063027e329 --- /dev/null +++ b/datafusion-examples/examples/parquet_index_advanced.rs @@ -0,0 +1,1004 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Array, ArrayRef, AsArray, BooleanArray, Int32Array, Int32Builder, RecordBatch, + StringArray, StringBuilder, UInt64Array, UInt64Builder, +}; +use arrow::datatypes::Int32Type; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use bytes::Bytes; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::parquet::{ + ParquetAccessPlan, RequestedStatistics, RowGroupAccess, StatisticsConverter, +}; +use datafusion::datasource::physical_plan::{ + parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig, ParquetExec, +}; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::file::metadata::ParquetMetaData; +use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}; +use datafusion::parquet::schema::types::ColumnPath; +use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::*; +use datafusion_common::{ + internal_datafusion_err, internal_err, DFSchema, DataFusionError, Result, ScalarValue, +}; +use datafusion_expr::utils::conjunction; +use datafusion_expr::{TableProviderFilterPushDown, TableType}; +use datafusion_physical_expr::PhysicalExpr; +use futures::future::BoxFuture; +use futures::FutureExt; +use object_store::ObjectStore; +use std::any::Any; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::fmt::Display; +use std::fs; +use std::fs::{DirEntry, File}; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tempfile::TempDir; +use url::Url; + +/// This example demonstrates building a detailed secondary index over multiple +/// Parquet files and using that index during query to skip ("prune") portions +/// of files that do not contain relevant data. For some queries, this results +/// in a single request to the storage service to read only the necessary data. +/// +/// This type of index can be useful for low latency queries over a large number +/// of Parquet files on remote storage (e.g. S3) where the cost of reading the +/// metadata for each file is high (e.g. because it requires a network round +/// trip to the storage service). +/// +/// Specifically, this example illustrates how to: +/// +/// 1. Create an in memory index for row groups and pages within a file using +/// the existing Parquet metadata +/// +/// 2. Use the index and cached metadata to read exactly the pages that are +/// needed during query time. +/// +/// Note this is a *VERY* low level example for people who want to build their +/// own custom indexes for low latency queries. There are several higher level +/// APIs that may be more suitable: +/// +/// * Simpler, file level index: `parquet_index.rs` example +/// +/// * Higher level API for reading parquet files: +/// [`SessionContext::read_parquet`] or [`ListingTable`], which also do file +/// pruning based on parquet statistics (using the same underlying APIs) +/// +/// # Diagram +/// +/// ```text +/// TODO update diagram +/// ``` +/// +/// [`ListingTable`]: datafusion::datasource::listing::ListingTable +#[tokio::main] +async fn main() -> Result<()> { + // Demo data has three files, each with schema + // * file_name (string) + // * value (int32) + // * tag (string) + // See [make_demo_file] for more details + let data = DemoData::try_new()?; + + // the object store is used to read the parquet files (in this case, it is + // a local file system, but in a real system it could be S3, GCS, etc) + let object_store: Arc = + Arc::new(object_store::local::LocalFileSystem::new()); + + // Create a table provider with and our special index. + let provider = Arc::new(IndexTableProvider::try_new( + data.path(), + Arc::clone(&object_store), + )?); + println!("** Table Provider:"); + println!("{provider}\n"); + + // Create a SessionContext for running queries that has the table provider + // registered as "index_table" + let ctx = SessionContext::new(); + ctx.register_table("index_table", Arc::clone(&provider) as _)?; + + // register object store provider for urls like `file://` work + let url = Url::try_from("file://").unwrap(); + ctx.register_object_store(&url, object_store); + + // Select data from the table without any predicates (and thus no pruning) + println!("** Select data, no predicates:"); + ctx.sql("SELECT file_name, value, tag FROM index_table LIMIT 10") + .await? + .show() + .await?; + println!("Files pruned: {}\n", provider.index().last_num_pruned()); + // the underlying parquet reader reads ranges from all three files + + // Run a query that uses the index to prune files. + // + // Using the predicate "value = 150", the IndexTable can skip reading file 1 + // (max value 1000) and file 3 (min value of 2000) + println!("** Select data, predicate `value = 1500`"); + ctx.sql("SELECT file_name, value FROM index_table WHERE value = 1500") + .await? + .show() + .await?; + println!("Files pruned: {}\n", provider.index().last_num_pruned()); + // TODO why are there two calls to read bytes from the file? + + // Now, use a predicate on both the value and tag columns. The tag column + // is stored in multiple pages and thus we can prune entire pages based on + // the tag column statistics. + + Ok(()) +} + +/// DataFusion `TableProvider` that uses [`IndexTableProvider`], a secondary +/// index to decide which Parquet files to read. +#[derive(Debug)] +pub struct IndexTableProvider { + /// The index of the parquet files in the directory + index: DetailedParquetMetadataIndex, + /// the directory in which the files are stored + dir: PathBuf, +} + +impl Display for IndexTableProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "IndexTableProvider")?; + writeln!(f, "---- Index ----")?; + write!(f, "{}", self.index) + } +} + +impl IndexTableProvider { + /// Create a new IndexTableProvider + /// * `dir` - the directory containing the parquet files + /// * `object_store` - the object store implementation to use for reading the files + pub fn try_new( + dir: impl Into, + object_store: Arc, + ) -> Result { + let dir = dir.into(); + + // Create an index of the parquet files in the directory as we see them. + let mut index_builder = ParquetRowGroupMetadataIndexBuilder::new(object_store); + + let files = read_dir(&dir)?; + for file in &files { + // open the file and read the metadata, add to the correspoding index + let file_name = file.file_name(); + let file_name = file_name + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))?; + let file_size = file.metadata()?.len(); + + let file = File::open(file.path()).map_err(|e| { + DataFusionError::from(e).context(format!("Error opening file {file:?}")) + })?; + + let reader = ParquetRecordBatchReaderBuilder::try_new(file)?; + + index_builder.add_file( + file_name, + file_size, + reader.schema(), + reader.metadata(), + )?; + } + + let index = index_builder.build()?; + + Ok(Self { index, dir }) + } + + /// return a reference to the underlying index + fn index(&self) -> &DetailedParquetMetadataIndex { + &self.index + } +} + +#[async_trait] +impl TableProvider for IndexTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.index.schema().clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let df_schema = DFSchema::try_from(self.schema())?; + // convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2` + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()? + // if there are no filters, use a literal true to have a predicate + // that always evaluates to true we can pass to the index + .unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true)); + + // Use the index to find the row groups that might have data that matches the + // predicate. Any file that can not have data that matches the predicate + // will not be returned. + self.index + .get_row_groups(predicate.clone())? + .with_limit(limit) + .with_projection(projection.cloned()) + .with_predicate(predicate) + .with_parquet_file_reader_factory(Arc::clone(&self.index.parquet_factory)) + .build(self.schema(), &self.dir) + } + + /// Tell DataFusion to push filters down to the scan method + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + // Inexact because the pruning can't handle all expressions and pruning + // is not done at the row level -- there may be rows in returned files + // that do not pass the filter + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } +} + +/// Builds the scan for a set of parquet files and filters +#[derive(Debug, Clone, Default)] +struct ParquetScanBuilder { + /// Files to scan. Use btree map for deterministic order + files: BTreeMap, + /// Columns on which to project the data. Indexes that are higher than the + /// number of columns of `file_schema` refer to `table_partition_cols`. + projection: Option>, + /// The maximum number of records to read from this plan. If `None`, + /// all records after filtering are returned. + limit: Option, + /// Optional predicate for row filtering during parquet scan + predicate: Option>, + /// user defined parquet file reader factory + parquet_file_reader_factory: Option>, +} + +impl ParquetScanBuilder { + fn new() -> Self { + Self::default() + } + + /// Specify that a certain row group in a file should be scanned + fn add_row_group( + &mut self, + file_name: &str, + file_size: u64, + row_group_count: u64, + row_group_index: u64, + ) { + if let Some(scanned_file) = self.files.get_mut(file_name) { + scanned_file + .parquet_access_plan + .set(row_group_index as usize, RowGroupAccess::Scan); + } else { + self.files.insert( + file_name.to_string(), + ScannedFile::new(file_size, row_group_count as usize, row_group_index), + ); + } + } + + /// Set the projection of the scan being built + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + /// Set the limit of the scan being built + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + /// Set the predicate of the scan being built + pub fn with_predicate(mut self, predicate: Arc) -> Self { + self.predicate = Some(predicate); + self + } + + pub fn with_parquet_file_reader_factory( + mut self, + parquet_file_reader_factory: Arc, + ) -> Self { + self.parquet_file_reader_factory = Some(parquet_file_reader_factory); + self + } + + /// Creates a ParquetExec that only scans the files and row groups specified in this builder + /// + /// # Parameters + /// * `file_scan_config` - the base configuration for the scan (e.g. projection, limit) + /// * `dir` - the directory containing the files + /// + /// # Returns + /// * a ParquetExec that scans only the files and row groups specified in this builder + fn build(self, schema: SchemaRef, dir: &Path) -> Result> { + let Self { + files, + projection, + limit, + predicate, + parquet_file_reader_factory, + } = self; + + let object_store_url = ObjectStoreUrl::parse("file://")?; + let mut file_scan_config = FileScanConfig::new(object_store_url, schema) + .with_limit(limit) + .with_projection(projection); + + // Transform to the format needed to pass to ParquetExec + // Create one file group per file (default to scanning them all in parallel) + for (file_name, scanned_file) in files { + let ScannedFile { + file_size, + parquet_access_plan: row_group_set, + } = scanned_file; + + let path = dir.join(file_name); + let canonical_path = fs::canonicalize(path)?; + let partitioned_file = + PartitionedFile::new(canonical_path.display().to_string(), file_size) + // add the row group set as an extension + .with_extensions(Arc::new(row_group_set) as _); + + file_scan_config = file_scan_config.with_file(partitioned_file); + } + + let Some(parquet_file_reader_factory) = parquet_file_reader_factory else { + return internal_err!("Parquet file reader factory not set"); + }; + + // build the actual parquet exec + let mut builder = ParquetExec::builder(file_scan_config) + .with_parquet_file_reader_factory(parquet_file_reader_factory); + + if let Some(predicate) = predicate { + builder = builder.with_predicate(predicate); + } + Ok(builder.build_arc()) + } +} + +#[derive(Debug, Clone)] +struct ScannedFile { + /// the size of the file + file_size: u64, + /// which row groups to scan + parquet_access_plan: ParquetAccessPlan, +} + +impl ScannedFile { + fn new(file_size: u64, row_group_count: usize, row_group_index: u64) -> Self { + // default to scanning only the specified row group + let mut parquet_access_plan = ParquetAccessPlan::new_none(row_group_count); + parquet_access_plan.set(row_group_index as usize, RowGroupAccess::Scan); + + Self { + file_size, + parquet_access_plan, + } + } +} + +/// Simple in memory secondary index for a set of parquet files +/// +/// The index is represented as an arrow [`RecordBatch`] that can be passed +/// directly by the DataFusion [`PruningPredicate`] API +/// +/// The `RecordBatch` looks as follows: +/// +/// ```text +/// +---------------+-----------+-----------+-----------------+-----------------+-----------+-----------+-------------+-------------+ +/// | file_name | file_size | row_count | row_group_count | row_group_index | value_min | value_max | tag_min | tag_max | +// +---------------+-----------+-----------+-----------------+-----------------+-----------+-----------+-------------+-------------+ +/// | file1.parquet | 16598 | 100 | 10 | 0 | 0 | 99 | tag_value_A | tag_value_Z | +/// | file1.parquet | 16598 | 100 | 10 | 1 | 100 | 199 | tag_value_A | tag_value_Z | +/// | file1.parquet | 16598 | 100 | 10 | 2 | 200 | 299 | tag_value_A | tag_value_Z | +/// | file1.parquet | 16598 | 100 | 10 | 3 | 300 | 399 | tag_value_A | tag_value_Z | +/// | file1.parquet | 16598 | 100 | 10 | 4 | 400 | 499 | tag_value_A | tag_value_Z | +/// ... +/// | file3.parquet | 16598 | 100 | 10 | 4 | 2400 | 2499 | tag_value_A | tag_value_Z | +/// | file3.parquet | 16598 | 100 | 10 | 5 | 2500 | 2599 | tag_value_A | tag_value_Z | +/// | file3.parquet | 16598 | 100 | 10 | 6 | 2600 | 2699 | tag_value_A | tag_value_Z | +/// | file3.parquet | 16598 | 100 | 10 | 7 | 2700 | 2799 | tag_value_A | tag_value_Z | +/// | file3.parquet | 16598 | 100 | 10 | 8 | 2800 | 2899 | tag_value_A | tag_value_Z | +/// | file3.parquet | 16598 | 100 | 10 | 9 | 2900 | 2999 | tag_value_A | tag_value_Z | +/// +---------------+-----------+-----------+-----------------+-----------------+-----------+-----------+-------------+-------------+ +/// ``` +/// +/// Notes: It must store file_name and file_size, row_count and row_group_count to construct `PartitionedFile`. +#[derive(Debug)] +struct DetailedParquetMetadataIndex { + /// Specialized [`CachedParquetFileReaderFactory`] for accessing the files + parquet_factory: Arc, + /// The schema for the files themselves (not the index) + file_schema: SchemaRef, + /// The index of row group level metadata for each parquet file. See the + /// struct level documentation for the schema of this index. + row_group_index: RecordBatch, + /// The number of files that were pruned in the last query + last_num_pruned: AtomicUsize, +} + +impl Display for DetailedParquetMetadataIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "ParquetMetadataIndex(last_num_pruned: {})", + self.last_num_pruned() + )?; + let batches = pretty_format_batches(&[self.row_group_index.clone()]).unwrap(); + write!(f, "{batches}",) + } +} + +impl DetailedParquetMetadataIndex { + /// the schema of the *files* in the index (not the index's schema) + fn schema(&self) -> &SchemaRef { + &self.file_schema + } + + /// number of files in the index + fn len(&self) -> usize { + self.row_group_index.num_rows() + } + + /// Return the number of files that were pruned in the last query + pub fn last_num_pruned(&self) -> usize { + self.last_num_pruned.load(Ordering::SeqCst) + } + + /// Set the number of files that were pruned in the last query + fn set_last_num_pruned(&self, num_pruned: usize) { + self.last_num_pruned.store(num_pruned, Ordering::SeqCst); + } + + /// Return the files and row groups that match the predicate + pub fn get_row_groups( + &self, + predicate: Arc, + ) -> Result { + // Use the PruningPredicate API to determine which files can not + // possibly have any relevant data. + let pruning_predicate = + PruningPredicate::try_new(predicate, self.schema().clone())?; + + // Now evaluate the pruning predicate into a boolean mask, one element + // per row group in the index. If the mask is true, the file may have + // rows that match the predicate. If the mask is false, we know the row + // group can not have *any* rows that match the predicate and thus can + // be skipped. + let row_group_mask = pruning_predicate.prune(self)?; + + let num_left = row_group_mask.iter().filter(|x| **x).count(); + self.set_last_num_pruned(self.len() - num_left); + + // Return only files that match the predicate from the index (use + // BTreeMap for deterministic filename order) + let mut scan_builder = ParquetScanBuilder::new(); + row_group_mask + .into_iter() + .enumerate() + .for_each(|(file_offset, keep)| { + if !keep { + return; + } + + // Filenames and sizes are always non null, so we don't have to check is_valid + let file_name = self.file_names().value(file_offset); + let file_size = self.file_sizes().value(file_offset); + let row_group_index = self.row_group_indexes().value(file_offset); + let row_group_count = self.row_group_counts().value(file_offset); + + // TODO make the actual number of row groups + scan_builder.add_row_group( + file_name, + file_size, + row_group_count, + row_group_index, + ); + }); + + Ok(scan_builder) + } + + /// return the column with name `col_name` from the + /// index. Panic's if not found + fn column_by_name(&self, col_name: &str) -> &ArrayRef { + let index = self + .row_group_index + .schema() + .index_of(col_name) + .expect("Column not found"); + self.row_group_index.column(index) + } + + /// Return the `file_name` column of this index + fn file_names(&self) -> &StringArray { + self.column_by_name("file_name") + .as_any() + .downcast_ref::() + .unwrap() + } + + /// Return the file_size column of this index + fn file_sizes(&self) -> &UInt64Array { + self.column_by_name("file_size") + .as_any() + .downcast_ref::() + .unwrap() + } + + /// Return the row group index column of this index + fn row_group_indexes(&self) -> &UInt64Array { + self.column_by_name("row_group_index") + .as_any() + .downcast_ref::() + .unwrap() + } + + /// Return the row group count column of this index + fn row_group_counts(&self) -> &UInt64Array { + self.column_by_name("row_group_count") + .as_any() + .downcast_ref::() + .unwrap() + } + /// Reference to the row count column + fn row_counts_ref(&self) -> &ArrayRef { + self.column_by_name("row_count") + } +} + +/// In order to use the PruningPredicate API, we need to provide DataFusion +/// the required statistics via the [`PruningStatistics`] trait +impl PruningStatistics for DetailedParquetMetadataIndex { + /// return the minimum values for the value column + fn min_values(&self, column: &Column) -> Option { + // the index's column name uses the convention `(column_name)_min` + let index_column_name = format!("{}_min", &column.name); + self.row_group_index + .schema() + .index_of(&index_column_name) + .ok() + .map(|index| self.row_group_index.column(index).clone()) + } + + /// return the maximum values for the value column + fn max_values(&self, column: &Column) -> Option { + // the index's column name uses the convention `(column_name)_max` + let index_column_name = format!("{}_max", &column.name); + self.row_group_index + .schema() + .index_of(&index_column_name) + .ok() + .map(|index| self.row_group_index.column(index).clone()) + } + + /// return the number of "containers". In this example, each "container" is + /// a file (aka a row in the index) + fn num_containers(&self) -> usize { + self.len() + } + + /// Return `None` to signal we don't have any information about null + /// counts in the index, + fn null_counts(&self, _column: &Column) -> Option { + None + } + + /// return the row counts for each file + fn row_counts(&self, _column: &Column) -> Option { + Some(self.row_counts_ref().clone()) + } + + /// The `contained` API can be used with structures such as Bloom filters, + /// but is not used in this example, so return `None` + fn contained( + &self, + _column: &Column, + _values: &HashSet, + ) -> Option { + None + } +} + +/// Builds a [`ParquetRowGroupMetadataIndexBuilder`] from a set of parquet files +/// +/// Has one row for each row group +#[derive(Debug)] +struct ParquetRowGroupMetadataIndexBuilder { + object_store: Arc, + file_schema: Option, + /// Keep the parquet metadata for each file in memory + metadata: HashMap>, + filenames: StringBuilder, + file_sizes: UInt64Builder, + row_counts: UInt64Builder, + /// how many row groups are there in this file + row_group_counts: UInt64Builder, + /// which row group is the index for? + row_group_indexes: UInt64Builder, + value_mins: Int32Builder, + value_maxes: Int32Builder, + /// Holds the min/max value of the tag column + tag_mins: StringBuilder, + tag_maxes: StringBuilder, +} + +impl ParquetRowGroupMetadataIndexBuilder { + // Create with schema + fn new(object_store: Arc) -> Self { + Self { + object_store, + file_schema: None, + metadata: HashMap::new(), + filenames: StringBuilder::new(), + file_sizes: UInt64Builder::new(), + row_counts: UInt64Builder::new(), + row_group_counts: UInt64Builder::new(), + row_group_indexes: UInt64Builder::new(), + value_mins: Int32Builder::new(), + value_maxes: Int32Builder::new(), + tag_mins: StringBuilder::new(), + tag_maxes: StringBuilder::new(), + } + } + + /// Add a new file to the index (TODO pass in reader) + fn add_file( + &mut self, + file_name: &str, + file_size: u64, + schema: &SchemaRef, + metadata: &Arc, + ) -> Result<()> { + // Get the schema of the file. A real system might have to handle the + // case where the schema of the file is not the same as the schema of + // the other files e.g. using SchemaAdapter. + if let Some(file_schema) = self.file_schema.as_ref() { + assert_eq!(file_schema, schema, "Schema mismatch"); + } else { + self.file_schema = Some(schema.clone()); + } + + let previously_inserted = self + .metadata + .insert(file_name.to_string(), Arc::clone(metadata)); + assert!(previously_inserted.is_none()); + + let num_row_groups = metadata.num_row_groups(); + + // Extract the min/max values for each row group from the statistics + // TODO make an API that permits appending a row group at a time + let row_counts = StatisticsConverter::row_counts(metadata)?; + let value_column_mins = + StatisticsConverter::try_new("value", RequestedStatistics::Min, schema)? + .extract(metadata)?; + let value_column_maxes = + StatisticsConverter::try_new("value", RequestedStatistics::Max, schema)? + .extract(metadata)?; + let tag_column_mins = + StatisticsConverter::try_new("tag", RequestedStatistics::Min, schema)? + .extract(metadata)?; + let tag_column_maxes = + StatisticsConverter::try_new("tag", RequestedStatistics::Max, schema)? + .extract(metadata)?; + + // sanity check the statistics + assert_eq!(row_counts.len(), num_row_groups); + assert_eq!(row_counts.len(), value_column_mins.len()); + assert_eq!(row_counts.len(), value_column_maxes.len()); + + let value_column_mins = value_column_mins.as_primitive::(); + let value_column_maxes = value_column_maxes.as_primitive::(); + let tag_column_mins = tag_column_mins.as_string::(); + let tag_column_maxes = tag_column_maxes.as_string::(); + + // todo: update statistic converter to build / append arrays directly + for i in 0..num_row_groups { + self.filenames.append_value(file_name); + self.file_sizes.append_value(file_size); + self.row_group_counts.append_value(num_row_groups as u64); + self.row_group_indexes.append_value(i as u64); + self.value_mins.append_value(value_column_mins.value(i)); + self.value_maxes.append_value(value_column_maxes.value(i)); + self.tag_mins.append_value(tag_column_mins.value(i)); + self.tag_maxes.append_value(tag_column_maxes.value(i)); + } + self.row_counts.append_slice(row_counts.values()); + Ok(()) + } + + /// Build the index from the files added + fn build(self) -> Result { + let Self { + object_store, + metadata, + file_schema, + + mut filenames, + mut file_sizes, + mut row_counts, + mut row_group_counts, + mut row_group_indexes, + mut value_mins, + mut value_maxes, + mut tag_mins, + mut tag_maxes, + } = self; + + let Some(file_schema) = file_schema else { + return Err(internal_datafusion_err!("No files added to index")); + }; + + let row_group_index = RecordBatch::try_from_iter(vec![ + ("file_name", Arc::new(filenames.finish()) as ArrayRef), + ("file_size", Arc::new(file_sizes.finish()) as _), + ("row_count", Arc::new(row_counts.finish()) as _), + ("row_group_count", Arc::new(row_group_counts.finish()) as _), + ("row_group_index", Arc::new(row_group_indexes.finish()) as _), + ("value_min", Arc::new(value_mins.finish()) as _), + ("value_max", Arc::new(value_maxes.finish()) as _), + ("tag_min", Arc::new(tag_mins.finish()) as _), + ("tag_max", Arc::new(tag_maxes.finish()) as _), + ])?; + + let parquet_factory = CachedParquetFileReaderFactory::new(object_store, metadata); + Ok(DetailedParquetMetadataIndex { + parquet_factory: Arc::new(parquet_factory), + file_schema, + row_group_index, + last_num_pruned: AtomicUsize::new(0), + }) + } +} + +/// A custom [`ParquetFileReaderFactory`] that handles opening parquet files +/// from object storage, and uses pre-loaded metadata rather than re-loading it + +#[derive(Debug)] +struct CachedParquetFileReaderFactory { + /// The underlying object store implementation for reading file data + object_store: Arc, + /// The parquet metadata for each file in the index, keyed by the file name + /// (e.g. `file1.parquet`) to avoid having to read the metadata for each + /// file multiple times. + metadata: HashMap>, +} + +impl CachedParquetFileReaderFactory { + fn new( + object_store: Arc, + metadata: HashMap>, + ) -> Self { + Self { + object_store, + metadata, + } + } +} + +impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { + fn create_reader( + &self, + _partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + _metrics: &ExecutionPlanMetricsSet, + ) -> Result> { + // for this example we ignore the partition index and metrics + // but in a real system you would likely use them to report details on + // the performance of the reader. + let filename = file_meta + .location() + .parts() + .last() + .expect("No path in location") + .as_ref() + .to_string(); + + let object_store = Arc::clone(&self.object_store); + let mut inner = ParquetObjectReader::new(object_store, file_meta.object_meta); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + let metadata = self + .metadata + .get(&filename) + .expect("metadata for file not found: {filename}"); + Ok(Box::new(ParquetReaderWithCache { + filename, + metadata: Arc::clone(metadata), + inner, + })) + } +} + +/// wrapper around a ParquetObjectReader that caches the metadata +struct ParquetReaderWithCache { + filename: String, + metadata: Arc, + inner: ParquetObjectReader, +} + +impl AsyncFileReader for ParquetReaderWithCache { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result> { + println!("{} Reading range {:?}", self.filename, range); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> { + println!("{} Reading ranges {:?}", self.filename, ranges); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> { + println!("{} returning cached metadata", self.filename); + + // return the cached metadata (do not read it again) + let metadata = self.metadata.clone(); + async move { Ok(metadata) }.boxed() + } +} + +/// Return a list of the directory entries in the given directory, sorted by name +fn read_dir(dir: &Path) -> Result> { + let mut files = dir + .read_dir() + .map_err(|e| { + DataFusionError::from(e).context(format!("Error reading directory {dir:?}")) + })? + .map(|entry| { + entry.map_err(|e| { + DataFusionError::from(e) + .context(format!("Error reading directory entry in {dir:?}")) + }) + }) + .collect::>>()?; + files.sort_by_key(|entry| entry.file_name()); + Ok(files) +} + +/// Demonstration Data +/// +/// Makes a directory with three parquet files +/// +/// The schema of the files is +/// * file_name (string) +/// * value (int32) +/// +/// The files are as follows: +/// * file1.parquet (values 0..1000) +/// * file2.parquet (values 1000..2000) +/// * file3.parquet (values 2000..3000) +struct DemoData { + tmpdir: TempDir, +} + +impl DemoData { + fn try_new() -> Result { + let tmpdir = TempDir::new()?; + make_demo_file(tmpdir.path().join("file1.parquet"), 0..1000)?; + make_demo_file(tmpdir.path().join("file2.parquet"), 1000..2000)?; + make_demo_file(tmpdir.path().join("file3.parquet"), 2000..3000)?; + + Ok(Self { tmpdir }) + } + + fn path(&self) -> PathBuf { + self.tmpdir.path().into() + } +} + +/// Creates a new parquet file at the specified path. +/// +/// * file_name: Utf8 +/// * value: Int32 +/// * tag: Utf8 (values `tag_value_A` .. `tag_value_Z`, `tag_value_A`, ...) +/// +/// The `value` column increases sequentially from `min_value` to `max_value` +/// The `tag` column is a repeating sequence of `tag_value_A`, `tag_value_B`, .. `tag_value_Z` +/// +/// Each row group has 100 rows, and each page of `tag` has 10 values. +/// +/// Note that the `tag` column is designed so that it is not possible to prune +/// entire groups based on the tag column. However, we can use the page level +/// statistics in the page index to do so. +fn make_demo_file(path: impl AsRef, value_range: Range) -> Result<()> { + let path = path.as_ref(); + let file = File::create(path)?; + let filename = path + .file_name() + .ok_or_else(|| internal_datafusion_err!("No filename"))? + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))?; + + let num_values = value_range.len(); + let file_names = + StringArray::from_iter_values(std::iter::repeat(&filename).take(num_values)); + let values = Int32Array::from_iter_values(value_range); + let tag_values = StringArray::from_iter_values( + ('A'..='Z') + .map(|c| format!("tag_value_{}", c)) + .cycle() + .take(num_values), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("file_name", Arc::new(file_names) as ArrayRef), + ("value", Arc::new(values) as ArrayRef), + ("tag", Arc::new(tag_values) as ArrayRef), + ])?; + + let schema = batch.schema(); + + // enable page statistics for the tag column, + // for everything else. + let props = WriterProperties::builder() + .set_max_row_group_size(100) + // compute column chunk (per row group) statistics by default + .set_statistics_enabled(EnabledStatistics::Chunk) + // compute column page statistics for the tag column + .set_column_statistics_enabled(ColumnPath::from("tag"), EnabledStatistics::Page) + .build(); + + // write the actual values to the file + let mut writer = ArrowWriter::try_new(file, schema, Some(props))?; + writer.write(&batch)?; + writer.finish()?; + + Ok(()) +} diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 04aec9d77d58..c8b456fd784e 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -134,6 +134,18 @@ impl PartitionedFile { self.range = Some(FileRange { start, end }); self } + + /// Update the file with optional user metadata + /// + /// This field can be used to pass user defined metadata per object + /// down to the reader (e.g. a `ParquetAccessPlan`). + pub fn with_extensions( + mut self, + extensions: Arc, + ) -> Self { + self.extensions = Some(extensions); + self + } } impl From for PartitionedFile { diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 720e29e35582..db63b27565b7 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -31,7 +31,7 @@ mod statistics; pub(crate) use self::csv::plan_to_csv; pub(crate) use self::json::plan_to_json; #[cfg(feature = "parquet")] -pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; +pub use self::parquet::{ParquetExec, ParquetFileMetrics}; pub use arrow_file::ArrowExec; pub use avro::AvroExec; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs index a6ecf8188192..3092103020bc 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs @@ -121,6 +121,11 @@ impl ParquetAccessPlan { } } + /// Provide a reference to this struct as an `Any` reference + pub fn as_any(&self) -> &dyn std::any::Any { + self + } + /// Create a new `ParquetAccessPlan` from the specified [`RowGroupAccess`]es pub fn new(row_groups: Vec) -> Self { Self { row_groups } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 04b25069e923..e92f787f410e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -57,12 +57,12 @@ mod row_groups; mod statistics; mod writer; +use crate::datasource::physical_plan::parquet::opener::ParquetOpener; use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use metrics::ParquetFileMetrics; -use opener::ParquetOpener; pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; pub use statistics::{RequestedStatistics, StatisticsConverter}; pub use writer::plan_to_parquet; @@ -145,6 +145,12 @@ pub use writer::plan_to_parquet; /// custom reader is used, it supplies the metadata directly and this parameter /// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more details. /// +/// * External indexes: you can use custom external indexes to control exactly +/// what row groups and data ranges (pages) are read, by providing a custom +/// [`ParquetAccessPlan`] for any file. To do so, provide a `ParquetAccessPlan` +/// as the `extension` metadata on [`PartitionedFile`] when creating the +/// [`FileScanConfig`]. See [`PartitionedFile::with_extensions`] for more details. +/// /// # Execution Overview /// /// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`] diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 74442855da47..e4851eb899c9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -18,16 +18,18 @@ //! [`ParquetOpener`] for opening Parquet files use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate; +use crate::datasource::physical_plan::parquet::reader::ParquetFileReaderFactory; use crate::datasource::physical_plan::parquet::row_groups::RowGroupPlanBuilder; use crate::datasource::physical_plan::parquet::{ row_filter, should_enable_page_index, ParquetAccessPlan, }; use crate::datasource::physical_plan::{ - FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory, + FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, }; use crate::datasource::schema_adapter::SchemaAdapterFactory; use crate::physical_optimizer::pruning::PruningPredicate; use arrow_schema::{ArrowError, SchemaRef}; +use datafusion_common::{exec_err, Result}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; @@ -66,6 +68,8 @@ impl FileOpener for ParquetOpener { &self.metrics, ); + let extensions = file_meta.extensions.as_ref().map(Arc::clone); + let reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, @@ -138,7 +142,7 @@ impl FileOpener for ParquetOpener { let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); // track which row groups to actually read - let access_plan = ParquetAccessPlan::new_all(rg_metadata.len()); + let access_plan = create_starting_access_plan(extensions, rg_metadata.len())?; let mut row_groups = RowGroupPlanBuilder::new(access_plan); // if there is a range restricting what parts of the file to read if let Some(range) = file_range.as_ref() { @@ -211,3 +215,25 @@ impl FileOpener for ParquetOpener { })) } } + +/// Create a starting [`ParquetAccessPlan`]. If there is a plan provided as an +/// extension use that, otherwise create a plan that reads all row groups. +fn create_starting_access_plan( + extensions: Option>, + num_row_groups: usize, +) -> Result { + let provided_plan = + extensions.and_then(|e| e.downcast_ref::().cloned()); + let Some(plan) = provided_plan else { + return Ok(ParquetAccessPlan::new_all(num_row_groups)); + }; + + if plan.len() != num_row_groups { + return exec_err!( + "ParquetAccessPlan provided for parquet reader has {} row groups, but the file has {}", + plan.len(), + num_row_groups + ); + } + Ok(plan) +} diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 1d4876bf2476..cfd6bd0e6e40 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -56,7 +56,7 @@ impl RowGroupPlanBuilder { Self { access_plan } } - /// Return true if there are no row groups to scan + /// Return true if there are no row groups pub fn is_empty(&self) -> bool { self.access_plan.is_empty() }