diff --git a/Cargo.lock b/Cargo.lock index 00bd64f21eb11..5f9853427d480 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1913,12 +1913,15 @@ dependencies = [ "datafusion-catalog", "datafusion-common", "datafusion-datasource", + "datafusion-datasource-parquet", "datafusion-execution", "datafusion-expr", "datafusion-physical-expr", + "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", "datafusion-physical-plan", "futures", + "itertools 0.14.0", "log", "object_store", "tokio", diff --git a/datafusion-examples/examples/custom_file_casts.rs b/datafusion-examples/examples/custom_file_casts.rs index 65ca096820640..4d97ecd91dc64 100644 --- a/datafusion-examples/examples/custom_file_casts.rs +++ b/datafusion-examples/examples/custom_file_casts.rs @@ -25,7 +25,7 @@ use datafusion::common::not_impl_err; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion::common::{Result, ScalarValue}; use datafusion::datasource::listing::{ - ListingTable, ListingTableConfig, ListingTableUrl, + ListingTable, ListingTableConfig, ListingTableConfigExt, ListingTableUrl, }; use datafusion::execution::context::SessionContext; use datafusion::execution::object_store::ObjectStoreUrl; diff --git a/datafusion-examples/examples/json_shredding.rs b/datafusion-examples/examples/json_shredding.rs index c7d0146a001f7..a2e83bc9510ab 100644 --- a/datafusion-examples/examples/json_shredding.rs +++ b/datafusion-examples/examples/json_shredding.rs @@ -27,7 +27,7 @@ use datafusion::common::tree_node::{ }; use datafusion::common::{assert_contains, exec_datafusion_err, Result}; use datafusion::datasource::listing::{ - ListingTable, ListingTableConfig, ListingTableUrl, + ListingTable, ListingTableConfig, ListingTableConfigExt, ListingTableUrl, }; use datafusion::execution::context::SessionContext; use datafusion::execution::object_store::ObjectStoreUrl; diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index 69f952ae98407..4eaeed675a206 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -39,14 +39,17 @@ datafusion-datasource = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } futures = { workspace = true } +itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } tokio = { workspace = true } [dev-dependencies] +datafusion-datasource-parquet = { workspace = true } [lints] workspace = true @@ -54,3 +57,6 @@ workspace = true [lib] name = "datafusion_catalog_listing" path = "src/mod.rs" + +[package.metadata.cargo-machete] +ignored = ["datafusion-datasource-parquet"] diff --git a/datafusion/catalog-listing/src/config.rs b/datafusion/catalog-listing/src/config.rs new file mode 100644 index 0000000000000..90f44de4fdbc8 --- /dev/null +++ b/datafusion/catalog-listing/src/config.rs @@ -0,0 +1,360 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::options::ListingOptions; +use arrow::datatypes::{DataType, Schema, SchemaRef}; +use datafusion_catalog::Session; +use datafusion_common::{config_err, internal_err}; +use datafusion_datasource::file_compression_type::FileCompressionType; +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use datafusion_datasource::ListingTableUrl; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; +use std::str::FromStr; +use std::sync::Arc; + +/// Indicates the source of the schema for a [`crate::ListingTable`] +// PartialEq required for assert_eq! in tests +#[derive(Debug, Clone, Copy, PartialEq, Default)] +pub enum SchemaSource { + /// Schema is not yet set (initial state) + #[default] + Unset, + /// Schema was inferred from first table_path + Inferred, + /// Schema was specified explicitly via with_schema + Specified, +} + +/// Configuration for creating a [`crate::ListingTable`] +/// +/// # Schema Evolution Support +/// +/// This configuration supports schema evolution through the optional +/// [`SchemaAdapterFactory`]. You might want to override the default factory when you need: +/// +/// - **Type coercion requirements**: When you need custom logic for converting between +/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8) +/// - **Column mapping**: You need to map columns with a legacy name to a new name +/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`. +/// +/// If not specified, a [`datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory`] +/// will be used, which handles basic schema compatibility cases. +/// +#[derive(Debug, Clone, Default)] +pub struct ListingTableConfig { + /// Paths on the `ObjectStore` for creating [`crate::ListingTable`]. + /// They should share the same schema and object store. + pub table_paths: Vec, + /// Optional `SchemaRef` for the to be created [`crate::ListingTable`]. + /// + /// See details on [`ListingTableConfig::with_schema`] + pub file_schema: Option, + /// Optional [`ListingOptions`] for the to be created [`crate::ListingTable`]. + /// + /// See details on [`ListingTableConfig::with_listing_options`] + pub options: Option, + /// Tracks the source of the schema information + pub(crate) schema_source: SchemaSource, + /// Optional [`SchemaAdapterFactory`] for creating schema adapters + pub(crate) schema_adapter_factory: Option>, + /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters + pub(crate) expr_adapter_factory: Option>, +} + +impl ListingTableConfig { + /// Creates new [`ListingTableConfig`] for reading the specified URL + pub fn new(table_path: ListingTableUrl) -> Self { + Self { + table_paths: vec![table_path], + ..Default::default() + } + } + + /// Creates new [`ListingTableConfig`] with multiple table paths. + /// + /// See `ListingTableConfigExt::infer_options` for details on what happens with multiple paths + pub fn new_with_multi_paths(table_paths: Vec) -> Self { + Self { + table_paths, + ..Default::default() + } + } + + /// Returns the source of the schema for this configuration + pub fn schema_source(&self) -> SchemaSource { + self.schema_source + } + /// Set the `schema` for the overall [`crate::ListingTable`] + /// + /// [`crate::ListingTable`] will automatically coerce, when possible, the schema + /// for individual files to match this schema. + /// + /// If a schema is not provided, it is inferred using + /// [`Self::infer_schema`]. + /// + /// If the schema is provided, it must contain only the fields in the file + /// without the table partitioning columns. + /// + /// # Example: Specifying Table Schema + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions}; + /// # use datafusion_datasource::ListingTableUrl; + /// # use datafusion_datasource_parquet::file_format::ParquetFormat; + /// # use arrow::datatypes::{Schema, Field, DataType}; + /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + /// let schema = Arc::new(Schema::new(vec![ + /// Field::new("id", DataType::Int64, false), + /// Field::new("name", DataType::Utf8, true), + /// ])); + /// + /// let config = ListingTableConfig::new(table_paths) + /// .with_listing_options(listing_options) // Set options first + /// .with_schema(schema); // Then set schema + /// ``` + pub fn with_schema(self, schema: SchemaRef) -> Self { + // Note: We preserve existing options state, but downstream code may expect + // options to be set. Consider calling with_listing_options() or infer_options() + // before operations that require options to be present. + debug_assert!( + self.options.is_some() || cfg!(test), + "ListingTableConfig::with_schema called without options set. \ + Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code." + ); + + Self { + file_schema: Some(schema), + schema_source: SchemaSource::Specified, + ..self + } + } + + /// Add `listing_options` to [`ListingTableConfig`] + /// + /// If not provided, format and other options are inferred via + /// `ListingTableConfigExt::infer_options`. + /// + /// # Example: Configuring Parquet Files with Custom Options + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions}; + /// # use datafusion_datasource::ListingTableUrl; + /// # use datafusion_datasource_parquet::file_format::ParquetFormat; + /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) + /// .with_file_extension(".parquet") + /// .with_collect_stat(true); + /// + /// let config = ListingTableConfig::new(table_paths) + /// .with_listing_options(options); // Configure file format and options + /// ``` + pub fn with_listing_options(self, listing_options: ListingOptions) -> Self { + // Note: This method properly sets options, but be aware that downstream + // methods like infer_schema() and try_new() require both schema and options + // to be set to function correctly. + debug_assert!( + !self.table_paths.is_empty() || cfg!(test), + "ListingTableConfig::with_listing_options called without table_paths set. \ + Consider calling new() or new_with_multi_paths() first to establish table paths." + ); + + Self { + options: Some(listing_options), + ..self + } + } + + /// Returns a tuple of `(file_extension, optional compression_extension)` + /// + /// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))` + /// For example a path ending with blah.test.csv returns `("csv", None)` + pub fn infer_file_extension_and_compression_type( + path: &str, + ) -> datafusion_common::Result<(String, Option)> { + let mut exts = path.rsplit('.'); + + let split = exts.next().unwrap_or(""); + + let file_compression_type = FileCompressionType::from_str(split) + .unwrap_or(FileCompressionType::UNCOMPRESSED); + + if file_compression_type.is_compressed() { + let split2 = exts.next().unwrap_or(""); + Ok((split2.to_string(), Some(split.to_string()))) + } else { + Ok((split.to_string(), None)) + } + } + + /// Infer the [`SchemaRef`] based on `table_path`s. + /// + /// This method infers the table schema using the first `table_path`. + /// See [`ListingOptions::infer_schema`] for more details + /// + /// # Errors + /// * if `self.options` is not set. See [`Self::with_listing_options`] + pub async fn infer_schema( + self, + state: &dyn Session, + ) -> datafusion_common::Result { + match self.options { + Some(options) => { + let ListingTableConfig { + table_paths, + file_schema, + options: _, + schema_source, + schema_adapter_factory, + expr_adapter_factory: physical_expr_adapter_factory, + } = self; + + let (schema, new_schema_source) = match file_schema { + Some(schema) => (schema, schema_source), // Keep existing source if schema exists + None => { + if let Some(url) = table_paths.first() { + ( + options.infer_schema(state, url).await?, + SchemaSource::Inferred, + ) + } else { + (Arc::new(Schema::empty()), SchemaSource::Inferred) + } + } + }; + + Ok(Self { + table_paths, + file_schema: Some(schema), + options: Some(options), + schema_source: new_schema_source, + schema_adapter_factory, + expr_adapter_factory: physical_expr_adapter_factory, + }) + } + None => internal_err!("No `ListingOptions` set for inferring schema"), + } + } + + /// Infer the partition columns from `table_paths`. + /// + /// # Errors + /// * if `self.options` is not set. See [`Self::with_listing_options`] + pub async fn infer_partitions_from_path( + self, + state: &dyn Session, + ) -> datafusion_common::Result { + match self.options { + Some(options) => { + let Some(url) = self.table_paths.first() else { + return config_err!("No table path found"); + }; + let partitions = options + .infer_partitions(state, url) + .await? + .into_iter() + .map(|col_name| { + ( + col_name, + DataType::Dictionary( + Box::new(DataType::UInt16), + Box::new(DataType::Utf8), + ), + ) + }) + .collect::>(); + let options = options.with_table_partition_cols(partitions); + Ok(Self { + table_paths: self.table_paths, + file_schema: self.file_schema, + options: Some(options), + schema_source: self.schema_source, + schema_adapter_factory: self.schema_adapter_factory, + expr_adapter_factory: self.expr_adapter_factory, + }) + } + None => config_err!("No `ListingOptions` set for inferring schema"), + } + } + + /// Set the [`SchemaAdapterFactory`] for the [`crate::ListingTable`] + /// + /// The schema adapter factory is used to create schema adapters that can + /// handle schema evolution and type conversions when reading files with + /// different schemas than the table schema. + /// + /// If not provided, a default schema adapter factory will be used. + /// + /// # Example: Custom Schema Adapter for Type Coercion + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions}; + /// # use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; + /// # use datafusion_datasource::ListingTableUrl; + /// # use datafusion_datasource_parquet::file_format::ParquetFormat; + /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; + /// # + /// # #[derive(Debug)] + /// # struct MySchemaAdapterFactory; + /// # impl SchemaAdapterFactory for MySchemaAdapterFactory { + /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box { + /// # unimplemented!() + /// # } + /// # } + /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + /// # let table_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + /// let config = ListingTableConfig::new(table_paths) + /// .with_listing_options(listing_options) + /// .with_schema(table_schema) + /// .with_schema_adapter_factory(Arc::new(MySchemaAdapterFactory)); + /// ``` + pub fn with_schema_adapter_factory( + self, + schema_adapter_factory: Arc, + ) -> Self { + Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self + } + } + + /// Get the [`SchemaAdapterFactory`] for this configuration + pub fn schema_adapter_factory(&self) -> Option<&Arc> { + self.schema_adapter_factory.as_ref() + } + + /// Set the [`PhysicalExprAdapterFactory`] for the [`crate::ListingTable`] + /// + /// The expression adapter factory is used to create physical expression adapters that can + /// handle schema evolution and type conversions when evaluating expressions + /// with different schemas than the table schema. + /// + /// If not provided, a default physical expression adapter factory will be used unless a custom + /// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used. + /// + /// See for details on this transition. + pub fn with_expr_adapter_factory( + self, + expr_adapter_factory: Arc, + ) -> Self { + Self { + expr_adapter_factory: Some(expr_adapter_factory), + ..self + } + } +} diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index 1322577b207ab..90d04b46b8067 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -24,4 +24,11 @@ // https://github.com/apache/datafusion/issues/11143 #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] +mod config; pub mod helpers; +mod options; +mod table; + +pub use config::{ListingTableConfig, SchemaSource}; +pub use options::ListingOptions; +pub use table::ListingTable; diff --git a/datafusion/catalog-listing/src/options.rs b/datafusion/catalog-listing/src/options.rs new file mode 100644 index 0000000000000..3cbf3573e9519 --- /dev/null +++ b/datafusion/catalog-listing/src/options.rs @@ -0,0 +1,411 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, SchemaRef}; +use datafusion_catalog::Session; +use datafusion_common::plan_err; +use datafusion_datasource::file_format::FileFormat; +use datafusion_datasource::ListingTableUrl; +use datafusion_execution::config::SessionConfig; +use datafusion_expr::SortExpr; +use futures::StreamExt; +use futures::{future, TryStreamExt}; +use itertools::Itertools; +use std::sync::Arc; + +/// Options for creating a [`crate::ListingTable`] +#[derive(Clone, Debug)] +pub struct ListingOptions { + /// A suffix on which files should be filtered (leave empty to + /// keep all files on the path) + pub file_extension: String, + /// The file format + pub format: Arc, + /// The expected partition column names in the folder structure. + /// See [Self::with_table_partition_cols] for details + pub table_partition_cols: Vec<(String, DataType)>, + /// Set true to try to guess statistics from the files. + /// This can add a lot of overhead as it will usually require files + /// to be opened and at least partially parsed. + pub collect_stat: bool, + /// Group files to avoid that the number of partitions exceeds + /// this limit + pub target_partitions: usize, + /// Optional pre-known sort order(s). Must be `SortExpr`s. + /// + /// DataFusion may take advantage of this ordering to omit sorts + /// or use more efficient algorithms. Currently sortedness must be + /// provided if it is known by some external mechanism, but may in + /// the future be automatically determined, for example using + /// parquet metadata. + /// + /// See + /// + /// NOTE: This attribute stores all equivalent orderings (the outer `Vec`) + /// where each ordering consists of an individual lexicographic + /// ordering (encapsulated by a `Vec`). If there aren't + /// multiple equivalent orderings, the outer `Vec` will have a + /// single element. + pub file_sort_order: Vec>, +} + +impl ListingOptions { + /// Creates an options instance with the given format + /// Default values: + /// - use default file extension filter + /// - no input partition to discover + /// - one target partition + /// - do not collect statistics + pub fn new(format: Arc) -> Self { + Self { + file_extension: format.get_ext(), + format, + table_partition_cols: vec![], + collect_stat: false, + target_partitions: 1, + file_sort_order: vec![], + } + } + + /// Set options from [`SessionConfig`] and returns self. + /// + /// Currently this sets `target_partitions` and `collect_stat` + /// but if more options are added in the future that need to be coordinated + /// they will be synchronized through this method. + pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self { + self = self.with_target_partitions(config.target_partitions()); + self = self.with_collect_stat(config.collect_statistics()); + self + } + + /// Set file extension on [`ListingOptions`] and returns self. + /// + /// # Example + /// ``` + /// # use std::sync::Arc; + /// # use datafusion_catalog_listing::ListingOptions; + /// # use datafusion_datasource_parquet::file_format::ParquetFormat; + /// + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::default() + /// )) + /// .with_file_extension(".parquet"); + /// + /// assert_eq!(listing_options.file_extension, ".parquet"); + /// ``` + pub fn with_file_extension(mut self, file_extension: impl Into) -> Self { + self.file_extension = file_extension.into(); + self + } + + /// Optionally set file extension on [`ListingOptions`] and returns self. + /// + /// If `file_extension` is `None`, the file extension will not be changed + /// + /// # Example + /// ``` + /// # use std::sync::Arc; + /// # use datafusion_catalog_listing::ListingOptions; + /// # use datafusion_datasource_parquet::file_format::ParquetFormat; + /// + /// let extension = Some(".parquet"); + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::default() + /// )) + /// .with_file_extension_opt(extension); + /// + /// assert_eq!(listing_options.file_extension, ".parquet"); + /// ``` + pub fn with_file_extension_opt(mut self, file_extension: Option) -> Self + where + S: Into, + { + if let Some(file_extension) = file_extension { + self.file_extension = file_extension.into(); + } + self + } + + /// Set `table partition columns` on [`ListingOptions`] and returns self. + /// + /// "partition columns," used to support [Hive Partitioning], are + /// columns added to the data that is read, based on the folder + /// structure where the data resides. + /// + /// For example, give the following files in your filesystem: + /// + /// ```text + /// /mnt/nyctaxi/year=2022/month=01/tripdata.parquet + /// /mnt/nyctaxi/year=2021/month=12/tripdata.parquet + /// /mnt/nyctaxi/year=2021/month=11/tripdata.parquet + /// ``` + /// + /// A [`crate::ListingTable`] created at `/mnt/nyctaxi/` with partition + /// columns "year" and "month" will include new `year` and `month` + /// columns while reading the files. The `year` column would have + /// value `2022` and the `month` column would have value `01` for + /// the rows read from + /// `/mnt/nyctaxi/year=2022/month=01/tripdata.parquet` + /// + ///# Notes + /// + /// - If only one level (e.g. `year` in the example above) is + /// specified, the other levels are ignored but the files are + /// still read. + /// + /// - Files that don't follow this partitioning scheme will be + /// ignored. + /// + /// - Since the columns have the same value for all rows read from + /// each individual file (such as dates), they are typically + /// dictionary encoded for efficiency. You may use + /// [`wrap_partition_type_in_dict`] to request a + /// dictionary-encoded type. + /// + /// - The partition columns are solely extracted from the file path. Especially they are NOT part of the parquet files itself. + /// + /// # Example + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow::datatypes::DataType; + /// # use datafusion_expr::col; + /// # use datafusion_catalog_listing::ListingOptions; + /// # use datafusion_datasource_parquet::file_format::ParquetFormat; + /// + /// // listing options for files with paths such as `/mnt/data/col_a=x/col_b=y/data.parquet` + /// // `col_a` and `col_b` will be included in the data read from those files + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::default() + /// )) + /// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8), + /// ("col_b".to_string(), DataType::Utf8)]); + /// + /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8), + /// ("col_b".to_string(), DataType::Utf8)]); + /// ``` + /// + /// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html + /// [`wrap_partition_type_in_dict`]: datafusion_datasource::file_scan_config::wrap_partition_type_in_dict + pub fn with_table_partition_cols( + mut self, + table_partition_cols: Vec<(String, DataType)>, + ) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + /// Set stat collection on [`ListingOptions`] and returns self. + /// + /// ``` + /// # use std::sync::Arc; + /// # use datafusion_catalog_listing::ListingOptions; + /// # use datafusion_datasource_parquet::file_format::ParquetFormat; + /// + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::default() + /// )) + /// .with_collect_stat(true); + /// + /// assert_eq!(listing_options.collect_stat, true); + /// ``` + pub fn with_collect_stat(mut self, collect_stat: bool) -> Self { + self.collect_stat = collect_stat; + self + } + + /// Set number of target partitions on [`ListingOptions`] and returns self. + /// + /// ``` + /// # use std::sync::Arc; + /// # use datafusion_catalog_listing::ListingOptions; + /// # use datafusion_datasource_parquet::file_format::ParquetFormat; + /// + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::default() + /// )) + /// .with_target_partitions(8); + /// + /// assert_eq!(listing_options.target_partitions, 8); + /// ``` + pub fn with_target_partitions(mut self, target_partitions: usize) -> Self { + self.target_partitions = target_partitions; + self + } + + /// Set file sort order on [`ListingOptions`] and returns self. + /// + /// ``` + /// # use std::sync::Arc; + /// # use datafusion_expr::col; + /// # use datafusion_catalog_listing::ListingOptions; + /// # use datafusion_datasource_parquet::file_format::ParquetFormat; + /// + /// // Tell datafusion that the files are sorted by column "a" + /// let file_sort_order = vec![vec![ + /// col("a").sort(true, true) + /// ]]; + /// + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::default() + /// )) + /// .with_file_sort_order(file_sort_order.clone()); + /// + /// assert_eq!(listing_options.file_sort_order, file_sort_order); + /// ``` + pub fn with_file_sort_order(mut self, file_sort_order: Vec>) -> Self { + self.file_sort_order = file_sort_order; + self + } + + /// Infer the schema of the files at the given path on the provided object store. + /// + /// If the table_path contains one or more files (i.e. it is a directory / + /// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`] + /// + /// Note: The inferred schema does not include any partitioning columns. + /// + /// This method is called as part of creating a [`crate::ListingTable`]. + pub async fn infer_schema<'a>( + &'a self, + state: &dyn Session, + table_path: &'a ListingTableUrl, + ) -> datafusion_common::Result { + let store = state.runtime_env().object_store(table_path)?; + + let files: Vec<_> = table_path + .list_all_files(state, store.as_ref(), &self.file_extension) + .await? + // Empty files cannot affect schema but may throw when trying to read for it + .try_filter(|object_meta| future::ready(object_meta.size > 0)) + .try_collect() + .await?; + + let schema = self.format.infer_schema(state, &store, &files).await?; + + Ok(schema) + } + + /// Infers the partition columns stored in `LOCATION` and compares + /// them with the columns provided in `PARTITIONED BY` to help prevent + /// accidental corrupts of partitioned tables. + /// + /// Allows specifying partial partitions. + pub async fn validate_partitions( + &self, + state: &dyn Session, + table_path: &ListingTableUrl, + ) -> datafusion_common::Result<()> { + if self.table_partition_cols.is_empty() { + return Ok(()); + } + + if !table_path.is_collection() { + return plan_err!( + "Can't create a partitioned table backed by a single file, \ + perhaps the URL is missing a trailing slash?" + ); + } + + let inferred = self.infer_partitions(state, table_path).await?; + + // no partitioned files found on disk + if inferred.is_empty() { + return Ok(()); + } + + let table_partition_names = self + .table_partition_cols + .iter() + .map(|(col_name, _)| col_name.clone()) + .collect_vec(); + + if inferred.len() < table_partition_names.len() { + return plan_err!( + "Inferred partitions to be {:?}, but got {:?}", + inferred, + table_partition_names + ); + } + + // match prefix to allow creating tables with partial partitions + for (idx, col) in table_partition_names.iter().enumerate() { + if &inferred[idx] != col { + return plan_err!( + "Inferred partitions to be {:?}, but got {:?}", + inferred, + table_partition_names + ); + } + } + + Ok(()) + } + + /// Infer the partitioning at the given path on the provided object store. + /// For performance reasons, it doesn't read all the files on disk + /// and therefore may fail to detect invalid partitioning. + pub async fn infer_partitions( + &self, + state: &dyn Session, + table_path: &ListingTableUrl, + ) -> datafusion_common::Result> { + let store = state.runtime_env().object_store(table_path)?; + + // only use 10 files for inference + // This can fail to detect inconsistent partition keys + // A DFS traversal approach of the store can help here + let files: Vec<_> = table_path + .list_all_files(state, store.as_ref(), &self.file_extension) + .await? + .take(10) + .try_collect() + .await?; + + let stripped_path_parts = files.iter().map(|file| { + table_path + .strip_prefix(&file.location) + .unwrap() + .collect_vec() + }); + + let partition_keys = stripped_path_parts + .map(|path_parts| { + path_parts + .into_iter() + .rev() + .skip(1) // get parents only; skip the file itself + .rev() + // Partitions are expected to follow the format "column_name=value", so we + // should ignore any path part that cannot be parsed into the expected format + .filter(|s| s.contains('=')) + .map(|s| s.split('=').take(1).collect()) + .collect_vec() + }) + .collect_vec(); + + match partition_keys.into_iter().all_equal_value() { + Ok(v) => Ok(v), + Err(None) => Ok(vec![]), + Err(Some(diff)) => { + let mut sorted_diff = [diff.0, diff.1]; + sorted_diff.sort(); + plan_err!("Found mixed partition values on disk {:?}", sorted_diff) + } + } + } +} diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs new file mode 100644 index 0000000000000..e9ac1bf097a22 --- /dev/null +++ b/datafusion/catalog-listing/src/table.rs @@ -0,0 +1,788 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::config::SchemaSource; +use crate::helpers::{expr_applicable_for_cols, pruned_partition_list}; +use crate::{ListingOptions, ListingTableConfig}; +use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef}; +use async_trait::async_trait; +use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; +use datafusion_common::stats::Precision; +use datafusion_common::{ + internal_datafusion_err, plan_err, project_schema, Constraints, DataFusionError, + SchemaExt, Statistics, +}; +use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; +use datafusion_datasource::file_sink_config::FileSinkConfig; +use datafusion_datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, +}; +use datafusion_datasource::{ + compute_all_files_statistics, ListingTableUrl, PartitionedFile, +}; +use datafusion_execution::cache::cache_manager::FileStatisticsCache; +use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; +use datafusion_expr::dml::InsertOp; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_physical_expr::create_lex_ordering; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::empty::EmptyExec; +use datafusion_physical_plan::ExecutionPlan; +use futures::{future, stream, Stream, StreamExt, TryStreamExt}; +use object_store::ObjectStore; +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + +/// Built in [`TableProvider`] that reads data from one or more files as a single table. +/// +/// The files are read using an [`ObjectStore`] instance, for example from +/// local files or objects from AWS S3. +/// +/// # Features: +/// * Reading multiple files as a single table +/// * Hive style partitioning (e.g., directories named `date=2024-06-01`) +/// * Merges schemas from files with compatible but not identical schemas (see [`ListingTableConfig::file_schema`]) +/// * `limit`, `filter` and `projection` pushdown for formats that support it (e.g., +/// Parquet) +/// * Statistics collection and pruning based on file metadata +/// * Pre-existing sort order (see [`ListingOptions::file_sort_order`]) +/// * Metadata caching to speed up repeated queries (see [`FileMetadataCache`]) +/// * Statistics caching (see [`FileStatisticsCache`]) +/// +/// [`FileMetadataCache`]: datafusion_execution::cache::cache_manager::FileMetadataCache +/// +/// # Reading Directories and Hive Style Partitioning +/// +/// For example, given the `table1` directory (or object store prefix) +/// +/// ```text +/// table1 +/// ├── file1.parquet +/// └── file2.parquet +/// ``` +/// +/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as +/// a single table, merging the schemas if the files have compatible but not +/// identical schemas. +/// +/// Given the `table2` directory (or object store prefix) +/// +/// ```text +/// table2 +/// ├── date=2024-06-01 +/// │ ├── file3.parquet +/// │ └── file4.parquet +/// └── date=2024-06-02 +/// └── file5.parquet +/// ``` +/// +/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and +/// `file5.parquet` as a single table, again merging schemas if necessary. +/// +/// Given the hive style partitioning structure (e.g,. directories named +/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date` +/// column when reading the table: +/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01` +/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`. +/// +/// If the query has a predicate like `WHERE date = '2024-06-01'` +/// only the corresponding directory will be read. +/// +/// # See Also +/// +/// 1. [`ListingTableConfig`]: Configuration options +/// 1. [`DataSourceExec`]: `ExecutionPlan` used by `ListingTable` +/// +/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec +/// +/// # Caching Metadata +/// +/// Some formats, such as Parquet, use the `FileMetadataCache` to cache file +/// metadata that is needed to execute but expensive to read, such as row +/// groups and statistics. The cache is scoped to the `SessionContext` and can +/// be configured via the [runtime config options]. +/// +/// [runtime config options]: https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings +/// +/// # Example: Read a directory of parquet files using a [`ListingTable`] +/// +/// ```no_run +/// # use datafusion_common::Result; +/// # use std::sync::Arc; +/// # use datafusion_catalog::TableProvider; +/// # use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; +/// # use datafusion_datasource::ListingTableUrl; +/// # use datafusion_datasource_parquet::file_format::ParquetFormat;/// # +/// # use datafusion_catalog::Session; +/// async fn get_listing_table(session: &dyn Session) -> Result> { +/// let table_path = "/path/to/parquet"; +/// +/// // Parse the path +/// let table_path = ListingTableUrl::parse(table_path)?; +/// +/// // Create default parquet options +/// let file_format = ParquetFormat::new(); +/// let listing_options = ListingOptions::new(Arc::new(file_format)) +/// .with_file_extension(".parquet"); +/// +/// // Resolve the schema +/// let resolved_schema = listing_options +/// .infer_schema(session, &table_path) +/// .await?; +/// +/// let config = ListingTableConfig::new(table_path) +/// .with_listing_options(listing_options) +/// .with_schema(resolved_schema); +/// +/// // Create a new TableProvider +/// let provider = Arc::new(ListingTable::try_new(config)?); +/// +/// # Ok(provider) +/// # } +/// ``` +#[derive(Debug, Clone)] +pub struct ListingTable { + table_paths: Vec, + /// `file_schema` contains only the columns physically stored in the data files themselves. + /// - Represents the actual fields found in files like Parquet, CSV, etc. + /// - Used when reading the raw data from files + file_schema: SchemaRef, + /// `table_schema` combines `file_schema` + partition columns + /// - Partition columns are derived from directory paths (not stored in files) + /// - These are columns like "year=2022/month=01" in paths like `/data/year=2022/month=01/file.parquet` + table_schema: SchemaRef, + /// Indicates how the schema was derived (inferred or explicitly specified) + schema_source: SchemaSource, + /// Options used to configure the listing table such as the file format + /// and partitioning information + options: ListingOptions, + /// The SQL definition for this table, if any + definition: Option, + /// Cache for collected file statistics + collected_statistics: FileStatisticsCache, + /// Constraints applied to this table + constraints: Constraints, + /// Column default expressions for columns that are not physically present in the data files + column_defaults: HashMap, + /// Optional [`SchemaAdapterFactory`] for creating schema adapters + schema_adapter_factory: Option>, + /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters + expr_adapter_factory: Option>, +} + +impl ListingTable { + /// Create new [`ListingTable`] + /// + /// See documentation and example on [`ListingTable`] and [`ListingTableConfig`] + pub fn try_new(config: ListingTableConfig) -> datafusion_common::Result { + // Extract schema_source before moving other parts of the config + let schema_source = config.schema_source(); + + let file_schema = config + .file_schema + .ok_or_else(|| internal_datafusion_err!("No schema provided."))?; + + let options = config + .options + .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?; + + // Add the partition columns to the file schema + let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned()); + for (part_col_name, part_col_type) in &options.table_partition_cols { + builder.push(Field::new(part_col_name, part_col_type.clone(), false)); + } + + let table_schema = Arc::new( + builder + .finish() + .with_metadata(file_schema.metadata().clone()), + ); + + let table = Self { + table_paths: config.table_paths, + file_schema, + table_schema, + schema_source, + options, + definition: None, + collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), + constraints: Constraints::default(), + column_defaults: HashMap::new(), + schema_adapter_factory: config.schema_adapter_factory, + expr_adapter_factory: config.expr_adapter_factory, + }; + + Ok(table) + } + + /// Assign constraints + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + + /// Assign column defaults + pub fn with_column_defaults( + mut self, + column_defaults: HashMap, + ) -> Self { + self.column_defaults = column_defaults; + self + } + + /// Set the [`FileStatisticsCache`] used to cache parquet file statistics. + /// + /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics + /// multiple times in the same session. + /// + /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query. + pub fn with_cache(mut self, cache: Option) -> Self { + self.collected_statistics = + cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default())); + self + } + + /// Specify the SQL definition for this table, if any + pub fn with_definition(mut self, definition: Option) -> Self { + self.definition = definition; + self + } + + /// Get paths ref + pub fn table_paths(&self) -> &Vec { + &self.table_paths + } + + /// Get options ref + pub fn options(&self) -> &ListingOptions { + &self.options + } + + /// Get the schema source + pub fn schema_source(&self) -> SchemaSource { + self.schema_source + } + + /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`] + /// + /// The schema adapter factory is used to create schema adapters that can + /// handle schema evolution and type conversions when reading files with + /// different schemas than the table schema. + /// + /// # Example: Adding Schema Evolution Support + /// ```rust + /// # use std::sync::Arc; + /// # use datafusion_catalog_listing::{ListingTable, ListingTableConfig, ListingOptions}; + /// # use datafusion_datasource::ListingTableUrl; + /// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter}; + /// # use datafusion_datasource_parquet::file_format::ParquetFormat; + /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; + /// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); + /// # let options = ListingOptions::new(Arc::new(ParquetFormat::default())); + /// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + /// # let config = ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema); + /// # let table = ListingTable::try_new(config).unwrap(); + /// let table_with_evolution = table + /// .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory)); + /// ``` + /// See [`ListingTableConfig::with_schema_adapter_factory`] for an example of custom SchemaAdapterFactory. + pub fn with_schema_adapter_factory( + self, + schema_adapter_factory: Arc, + ) -> Self { + Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self + } + } + + /// Get the [`SchemaAdapterFactory`] for this table + pub fn schema_adapter_factory(&self) -> Option<&Arc> { + self.schema_adapter_factory.as_ref() + } + + /// Creates a schema adapter for mapping between file and table schemas + /// + /// Uses the configured schema adapter factory if available, otherwise falls back + /// to the default implementation. + fn create_schema_adapter(&self) -> Box { + let table_schema = self.schema(); + match &self.schema_adapter_factory { + Some(factory) => { + factory.create_with_projected_schema(Arc::clone(&table_schema)) + } + None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)), + } + } + + /// Creates a file source and applies schema adapter factory if available + fn create_file_source_with_schema_adapter( + &self, + ) -> datafusion_common::Result> { + let mut source = self.options.format.file_source(); + // Apply schema adapter to source if available + // + // The source will use this SchemaAdapter to adapt data batches as they flow up the plan. + // Note: ListingTable also creates a SchemaAdapter in `scan()` but that is only used to adapt collected statistics. + if let Some(factory) = &self.schema_adapter_factory { + source = source.with_schema_adapter_factory(Arc::clone(factory))?; + } + Ok(source) + } + + /// If file_sort_order is specified, creates the appropriate physical expressions + pub fn try_create_output_ordering( + &self, + execution_props: &ExecutionProps, + ) -> datafusion_common::Result> { + create_lex_ordering( + &self.table_schema, + &self.options.file_sort_order, + execution_props, + ) + } +} + +// Expressions can be used for partition pruning if they can be evaluated using +// only the partition columns and there are partition columns. +fn can_be_evaluated_for_partition_pruning( + partition_column_names: &[&str], + expr: &Expr, +) -> bool { + !partition_column_names.is_empty() + && expr_applicable_for_cols(partition_column_names, expr) +} + +#[async_trait] +impl TableProvider for ListingTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.table_schema) + } + + fn constraints(&self) -> Option<&Constraints> { + Some(&self.constraints) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion_common::Result> { + let options = ScanArgs::default() + .with_projection(projection.map(|p| p.as_slice())) + .with_filters(Some(filters)) + .with_limit(limit); + Ok(self.scan_with_args(state, options).await?.into_inner()) + } + + async fn scan_with_args<'a>( + &self, + state: &dyn Session, + args: ScanArgs<'a>, + ) -> datafusion_common::Result { + let projection = args.projection().map(|p| p.to_vec()); + let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default(); + let limit = args.limit(); + + // extract types of partition columns + let table_partition_cols = self + .options + .table_partition_cols + .iter() + .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) + .collect::>>()?; + + let table_partition_col_names = table_partition_cols + .iter() + .map(|field| field.name().as_str()) + .collect::>(); + + // If the filters can be resolved using only partition cols, there is no need to + // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated + let (partition_filters, filters): (Vec<_>, Vec<_>) = + filters.iter().cloned().partition(|filter| { + can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter) + }); + + // We should not limit the number of partitioned files to scan if there are filters and limit + // at the same time. This is because the limit should be applied after the filters are applied. + let statistic_file_limit = if filters.is_empty() { limit } else { None }; + + let (mut partitioned_file_lists, statistics) = self + .list_files_for_scan(state, &partition_filters, statistic_file_limit) + .await?; + + // if no files need to be read, return an `EmptyExec` + if partitioned_file_lists.is_empty() { + let projected_schema = project_schema(&self.schema(), projection.as_ref())?; + return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); + } + + let output_ordering = self.try_create_output_ordering(state.execution_props())?; + match state + .config_options() + .execution + .split_file_groups_by_statistics + .then(|| { + output_ordering.first().map(|output_ordering| { + FileScanConfig::split_groups_by_statistics_with_target_partitions( + &self.table_schema, + &partitioned_file_lists, + output_ordering, + self.options.target_partitions, + ) + }) + }) + .flatten() + { + Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), + Some(Ok(new_groups)) => { + if new_groups.len() <= self.options.target_partitions { + partitioned_file_lists = new_groups; + } else { + log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered") + } + } + None => {} // no ordering required + }; + + let Some(object_store_url) = + self.table_paths.first().map(ListingTableUrl::object_store) + else { + return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new( + Schema::empty(), + ))))); + }; + + let file_source = self.create_file_source_with_schema_adapter()?; + + // create the execution plan + let plan = self + .options + .format + .create_physical_plan( + state, + FileScanConfigBuilder::new( + object_store_url, + Arc::clone(&self.file_schema), + file_source, + ) + .with_file_groups(partitioned_file_lists) + .with_constraints(self.constraints.clone()) + .with_statistics(statistics) + .with_projection(projection) + .with_limit(limit) + .with_output_ordering(output_ordering) + .with_table_partition_cols(table_partition_cols) + .with_expr_adapter(self.expr_adapter_factory.clone()) + .build(), + ) + .await?; + + Ok(ScanResult::new(plan)) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion_common::Result> { + let partition_column_names = self + .options + .table_partition_cols + .iter() + .map(|col| col.0.as_str()) + .collect::>(); + filters + .iter() + .map(|filter| { + if can_be_evaluated_for_partition_pruning(&partition_column_names, filter) + { + // if filter can be handled by partition pruning, it is exact + return Ok(TableProviderFilterPushDown::Exact); + } + + Ok(TableProviderFilterPushDown::Inexact) + }) + .collect() + } + + fn get_table_definition(&self) -> Option<&str> { + self.definition.as_deref() + } + + async fn insert_into( + &self, + state: &dyn Session, + input: Arc, + insert_op: InsertOp, + ) -> datafusion_common::Result> { + // Check that the schema of the plan matches the schema of this table. + self.schema() + .logically_equivalent_names_and_types(&input.schema())?; + + let table_path = &self.table_paths()[0]; + if !table_path.is_collection() { + return plan_err!( + "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \ + To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE" + ); + } + + // Get the object store for the table path. + let store = state.runtime_env().object_store(table_path)?; + + let file_list_stream = pruned_partition_list( + state, + store.as_ref(), + table_path, + &[], + &self.options.file_extension, + &self.options.table_partition_cols, + ) + .await?; + + let file_group = file_list_stream.try_collect::>().await?.into(); + let keep_partition_by_columns = + state.config_options().execution.keep_partition_by_columns; + + // Sink related option, apart from format + let config = FileSinkConfig { + original_url: String::default(), + object_store_url: self.table_paths()[0].object_store(), + table_paths: self.table_paths().clone(), + file_group, + output_schema: self.schema(), + table_partition_cols: self.options.table_partition_cols.clone(), + insert_op, + keep_partition_by_columns, + file_extension: self.options().format.get_ext(), + }; + + let orderings = self.try_create_output_ordering(state.execution_props())?; + // It is sufficient to pass only one of the equivalent orderings: + let order_requirements = orderings.into_iter().next().map(Into::into); + + self.options() + .format + .create_writer_physical_plan(input, state, config, order_requirements) + .await + } + + fn get_column_default(&self, column: &str) -> Option<&Expr> { + self.column_defaults.get(column) + } +} + +impl ListingTable { + /// Get the list of files for a scan as well as the file level statistics. + /// The list is grouped to let the execution plan know how the files should + /// be distributed to different threads / executors. + pub async fn list_files_for_scan<'a>( + &'a self, + ctx: &'a dyn Session, + filters: &'a [Expr], + limit: Option, + ) -> datafusion_common::Result<(Vec, Statistics)> { + let store = if let Some(url) = self.table_paths.first() { + ctx.runtime_env().object_store(url)? + } else { + return Ok((vec![], Statistics::new_unknown(&self.file_schema))); + }; + // list files (with partitions) + let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| { + pruned_partition_list( + ctx, + store.as_ref(), + table_path, + filters, + &self.options.file_extension, + &self.options.table_partition_cols, + ) + })) + .await?; + let meta_fetch_concurrency = + ctx.config_options().execution.meta_fetch_concurrency; + let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency); + // collect the statistics if required by the config + let files = file_list + .map(|part_file| async { + let part_file = part_file?; + let statistics = if self.options.collect_stat { + self.do_collect_statistics(ctx, &store, &part_file).await? + } else { + Arc::new(Statistics::new_unknown(&self.file_schema)) + }; + Ok(part_file.with_statistics(statistics)) + }) + .boxed() + .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); + + let (file_group, inexact_stats) = + get_files_with_limit(files, limit, self.options.collect_stat).await?; + + let file_groups = file_group.split_files(self.options.target_partitions); + let (mut file_groups, mut stats) = compute_all_files_statistics( + file_groups, + self.schema(), + self.options.collect_stat, + inexact_stats, + )?; + + let schema_adapter = self.create_schema_adapter(); + let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?; + + stats.column_statistics = + schema_mapper.map_column_statistics(&stats.column_statistics)?; + file_groups.iter_mut().try_for_each(|file_group| { + if let Some(stat) = file_group.statistics_mut() { + stat.column_statistics = + schema_mapper.map_column_statistics(&stat.column_statistics)?; + } + Ok::<_, DataFusionError>(()) + })?; + Ok((file_groups, stats)) + } + + /// Collects statistics for a given partitioned file. + /// + /// This method first checks if the statistics for the given file are already cached. + /// If they are, it returns the cached statistics. + /// If they are not, it infers the statistics from the file and stores them in the cache. + async fn do_collect_statistics( + &self, + ctx: &dyn Session, + store: &Arc, + part_file: &PartitionedFile, + ) -> datafusion_common::Result> { + match self + .collected_statistics + .get_with_extra(&part_file.object_meta.location, &part_file.object_meta) + { + Some(statistics) => Ok(statistics), + None => { + let statistics = self + .options + .format + .infer_stats( + ctx, + store, + Arc::clone(&self.file_schema), + &part_file.object_meta, + ) + .await?; + let statistics = Arc::new(statistics); + self.collected_statistics.put_with_extra( + &part_file.object_meta.location, + Arc::clone(&statistics), + &part_file.object_meta, + ); + Ok(statistics) + } + } + } +} + +/// Processes a stream of partitioned files and returns a `FileGroup` containing the files. +/// +/// This function collects files from the provided stream until either: +/// 1. The stream is exhausted +/// 2. The accumulated number of rows exceeds the provided `limit` (if specified) +/// +/// # Arguments +/// * `files` - A stream of `Result` items to process +/// * `limit` - An optional row count limit. If provided, the function will stop collecting files +/// once the accumulated number of rows exceeds this limit +/// * `collect_stats` - Whether to collect and accumulate statistics from the files +/// +/// # Returns +/// A `Result` containing a `FileGroup` with the collected files +/// and a boolean indicating whether the statistics are inexact. +/// +/// # Note +/// The function will continue processing files if statistics are not available or if the +/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated +/// but files will still be collected. +async fn get_files_with_limit( + files: impl Stream>, + limit: Option, + collect_stats: bool, +) -> datafusion_common::Result<(FileGroup, bool)> { + let mut file_group = FileGroup::default(); + // Fusing the stream allows us to call next safely even once it is finished. + let mut all_files = Box::pin(files.fuse()); + enum ProcessingState { + ReadingFiles, + ReachedLimit, + } + + let mut state = ProcessingState::ReadingFiles; + let mut num_rows = Precision::Absent; + + while let Some(file_result) = all_files.next().await { + // Early exit if we've already reached our limit + if matches!(state, ProcessingState::ReachedLimit) { + break; + } + + let file = file_result?; + + // Update file statistics regardless of state + if collect_stats { + if let Some(file_stats) = &file.statistics { + num_rows = if file_group.is_empty() { + // For the first file, just take its row count + file_stats.num_rows + } else { + // For subsequent files, accumulate the counts + num_rows.add(&file_stats.num_rows) + }; + } + } + + // Always add the file to our group + file_group.push(file); + + // Check if we've hit the limit (if one was specified) + if let Some(limit) = limit { + if let Precision::Exact(row_count) = num_rows { + if row_count > limit { + state = ProcessingState::ReachedLimit; + } + } + } + } + // If we still have files in the stream, it means that the limit kicked + // in, and the statistic could have been different had we processed the + // files in a different order. + let inexact_stats = all_files.next().await.is_some(); + Ok((file_group, inexact_stats)) +} diff --git a/datafusion/core/src/datasource/dynamic_file.rs b/datafusion/core/src/datasource/dynamic_file.rs index b30d53e586911..256a11ba693b5 100644 --- a/datafusion/core/src/datasource/dynamic_file.rs +++ b/datafusion/core/src/datasource/dynamic_file.rs @@ -20,6 +20,7 @@ use std::sync::Arc; +use crate::datasource::listing::ListingTableConfigExt; use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; use crate::datasource::TableProvider; use crate::error::Result; diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index a58db55bccb61..c206566a65941 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -20,7 +20,8 @@ mod table; pub use datafusion_catalog_listing::helpers; +pub use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; pub use datafusion_datasource::{ FileRange, ListingTableUrl, PartitionedFile, PartitionedFileStream, }; -pub use table::{ListingOptions, ListingTable, ListingTableConfig}; +pub use table::ListingTableConfigExt; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3ce58938d77e4..1af6055f1824a 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -15,226 +15,42 @@ // specific language governing permissions and limitations // under the License. -//! The table implementation. - -use super::{ - helpers::{expr_applicable_for_cols, pruned_partition_list}, - ListingTableUrl, PartitionedFile, -}; -use crate::{ - datasource::file_format::{file_compression_type::FileCompressionType, FileFormat}, - datasource::physical_plan::FileSinkConfig, - execution::context::SessionState, -}; -use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; -use arrow_schema::Schema; +use crate::execution::SessionState; use async_trait::async_trait; -use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; -use datafusion_common::{ - config_datafusion_err, config_err, internal_datafusion_err, internal_err, plan_err, - project_schema, stats::Precision, Constraints, DataFusionError, Result, SchemaExt, -}; -use datafusion_datasource::{ - compute_all_files_statistics, - file::FileSource, - file_groups::FileGroup, - file_scan_config::{FileScanConfig, FileScanConfigBuilder}, - schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory}, -}; -use datafusion_execution::{ - cache::{cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache}, - config::SessionConfig, -}; -use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::{ - dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType, -}; -use datafusion_physical_expr::create_lex_ordering; -use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; -use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; -use futures::{future, stream, Stream, StreamExt, TryStreamExt}; -use itertools::Itertools; -use object_store::ObjectStore; -use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc}; - -/// Indicates the source of the schema for a [`ListingTable`] -// PartialEq required for assert_eq! in tests -#[derive(Debug, Clone, Copy, PartialEq, Default)] -pub enum SchemaSource { - /// Schema is not yet set (initial state) - #[default] - Unset, - /// Schema was inferred from first table_path - Inferred, - /// Schema was specified explicitly via with_schema - Specified, -} +use datafusion_catalog_listing::{ListingOptions, ListingTableConfig}; +use datafusion_common::{config_datafusion_err, internal_datafusion_err}; +use datafusion_session::Session; +use futures::StreamExt; +use std::collections::HashMap; -/// Configuration for creating a [`ListingTable`] -/// -/// # Schema Evolution Support -/// -/// This configuration supports schema evolution through the optional -/// [`SchemaAdapterFactory`]. You might want to override the default factory when you need: +/// Extension trait for [`ListingTableConfig`] that supports inferring schemas /// -/// - **Type coercion requirements**: When you need custom logic for converting between -/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8) -/// - **Column mapping**: You need to map columns with a legacy name to a new name -/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`. -/// -/// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which handles -/// basic schema compatibility cases. -/// -#[derive(Debug, Clone, Default)] -pub struct ListingTableConfig { - /// Paths on the `ObjectStore` for creating `ListingTable`. - /// They should share the same schema and object store. - pub table_paths: Vec, - /// Optional `SchemaRef` for the to be created `ListingTable`. - /// - /// See details on [`ListingTableConfig::with_schema`] - pub file_schema: Option, - /// Optional [`ListingOptions`] for the to be created [`ListingTable`]. - /// - /// See details on [`ListingTableConfig::with_listing_options`] - pub options: Option, - /// Tracks the source of the schema information - schema_source: SchemaSource, - /// Optional [`SchemaAdapterFactory`] for creating schema adapters - schema_adapter_factory: Option>, - /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters - expr_adapter_factory: Option>, -} - -impl ListingTableConfig { - /// Creates new [`ListingTableConfig`] for reading the specified URL - pub fn new(table_path: ListingTableUrl) -> Self { - Self { - table_paths: vec![table_path], - ..Default::default() - } - } - - /// Creates new [`ListingTableConfig`] with multiple table paths. - /// - /// See [`Self::infer_options`] for details on what happens with multiple paths - pub fn new_with_multi_paths(table_paths: Vec) -> Self { - Self { - table_paths, - ..Default::default() - } - } - - /// Returns the source of the schema for this configuration - pub fn schema_source(&self) -> SchemaSource { - self.schema_source - } - /// Set the `schema` for the overall [`ListingTable`] - /// - /// [`ListingTable`] will automatically coerce, when possible, the schema - /// for individual files to match this schema. - /// - /// If a schema is not provided, it is inferred using - /// [`Self::infer_schema`]. - /// - /// If the schema is provided, it must contain only the fields in the file - /// without the table partitioning columns. - /// - /// # Example: Specifying Table Schema - /// ```rust - /// # use std::sync::Arc; - /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; - /// # use datafusion::datasource::file_format::parquet::ParquetFormat; - /// # use arrow::datatypes::{Schema, Field, DataType}; - /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); - /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); - /// let schema = Arc::new(Schema::new(vec![ - /// Field::new("id", DataType::Int64, false), - /// Field::new("name", DataType::Utf8, true), - /// ])); - /// - /// let config = ListingTableConfig::new(table_paths) - /// .with_listing_options(listing_options) // Set options first - /// .with_schema(schema); // Then set schema - /// ``` - pub fn with_schema(self, schema: SchemaRef) -> Self { - // Note: We preserve existing options state, but downstream code may expect - // options to be set. Consider calling with_listing_options() or infer_options() - // before operations that require options to be present. - debug_assert!( - self.options.is_some() || cfg!(test), - "ListingTableConfig::with_schema called without options set. \ - Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code." - ); - - Self { - file_schema: Some(schema), - schema_source: SchemaSource::Specified, - ..self - } - } - - /// Add `listing_options` to [`ListingTableConfig`] - /// - /// If not provided, format and other options are inferred via - /// [`Self::infer_options`]. - /// - /// # Example: Configuring Parquet Files with Custom Options - /// ```rust - /// # use std::sync::Arc; - /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; - /// # use datafusion::datasource::file_format::parquet::ParquetFormat; - /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); - /// let options = ListingOptions::new(Arc::new(ParquetFormat::default())) - /// .with_file_extension(".parquet") - /// .with_collect_stat(true); - /// - /// let config = ListingTableConfig::new(table_paths) - /// .with_listing_options(options); // Configure file format and options - /// ``` - pub fn with_listing_options(self, listing_options: ListingOptions) -> Self { - // Note: This method properly sets options, but be aware that downstream - // methods like infer_schema() and try_new() require both schema and options - // to be set to function correctly. - debug_assert!( - !self.table_paths.is_empty() || cfg!(test), - "ListingTableConfig::with_listing_options called without table_paths set. \ - Consider calling new() or new_with_multi_paths() first to establish table paths." - ); - - Self { - options: Some(listing_options), - ..self - } - } - - /// Returns a tuple of `(file_extension, optional compression_extension)` - /// - /// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))` - /// For example a path ending with blah.test.csv returns `("csv", None)` - fn infer_file_extension_and_compression_type( - path: &str, - ) -> Result<(String, Option)> { - let mut exts = path.rsplit('.'); - - let split = exts.next().unwrap_or(""); - - let file_compression_type = FileCompressionType::from_str(split) - .unwrap_or(FileCompressionType::UNCOMPRESSED); - - if file_compression_type.is_compressed() { - let split2 = exts.next().unwrap_or(""); - Ok((split2.to_string(), Some(split.to_string()))) - } else { - Ok((split.to_string(), None)) - } - } - +/// This trait exists because the following inference methods only +/// work for [`SessionState`] implementations of [`Session`]. +/// See [`ListingTableConfig`] for the remaining inference methods. +#[async_trait] +pub trait ListingTableConfigExt { /// Infer `ListingOptions` based on `table_path` and file suffix. /// /// The format is inferred based on the first `table_path`. - pub async fn infer_options(self, state: &dyn Session) -> Result { + async fn infer_options( + self, + state: &dyn Session, + ) -> datafusion_common::Result; + + /// Convenience method to call both [`Self::infer_options`] and [`ListingTableConfig::infer_schema`] + async fn infer( + self, + state: &dyn Session, + ) -> datafusion_common::Result; +} + +#[async_trait] +impl ListingTableConfigExt for ListingTableConfig { + async fn infer_options( + self, + state: &dyn Session, + ) -> datafusion_common::Result { let store = if let Some(url) = self.table_paths.first() { state.runtime_env().object_store(url)? } else { @@ -281,1299 +97,19 @@ impl ListingTableConfig { .with_target_partitions(state.config().target_partitions()) .with_collect_stat(state.config().collect_statistics()); - Ok(Self { - table_paths: self.table_paths, - file_schema: self.file_schema, - options: Some(listing_options), - schema_source: self.schema_source, - schema_adapter_factory: self.schema_adapter_factory, - expr_adapter_factory: self.expr_adapter_factory, - }) - } - - /// Infer the [`SchemaRef`] based on `table_path`s. - /// - /// This method infers the table schema using the first `table_path`. - /// See [`ListingOptions::infer_schema`] for more details - /// - /// # Errors - /// * if `self.options` is not set. See [`Self::with_listing_options`] - pub async fn infer_schema(self, state: &dyn Session) -> Result { - match self.options { - Some(options) => { - let ListingTableConfig { - table_paths, - file_schema, - options: _, - schema_source, - schema_adapter_factory, - expr_adapter_factory: physical_expr_adapter_factory, - } = self; - - let (schema, new_schema_source) = match file_schema { - Some(schema) => (schema, schema_source), // Keep existing source if schema exists - None => { - if let Some(url) = table_paths.first() { - ( - options.infer_schema(state, url).await?, - SchemaSource::Inferred, - ) - } else { - (Arc::new(Schema::empty()), SchemaSource::Inferred) - } - } - }; - - Ok(Self { - table_paths, - file_schema: Some(schema), - options: Some(options), - schema_source: new_schema_source, - schema_adapter_factory, - expr_adapter_factory: physical_expr_adapter_factory, - }) - } - None => internal_err!("No `ListingOptions` set for inferring schema"), - } + Ok(self.with_listing_options(listing_options)) } - /// Convenience method to call both [`Self::infer_options`] and [`Self::infer_schema`] - pub async fn infer(self, state: &dyn Session) -> Result { + async fn infer(self, state: &dyn Session) -> datafusion_common::Result { self.infer_options(state).await?.infer_schema(state).await } - - /// Infer the partition columns from `table_paths`. - /// - /// # Errors - /// * if `self.options` is not set. See [`Self::with_listing_options`] - pub async fn infer_partitions_from_path(self, state: &dyn Session) -> Result { - match self.options { - Some(options) => { - let Some(url) = self.table_paths.first() else { - return config_err!("No table path found"); - }; - let partitions = options - .infer_partitions(state, url) - .await? - .into_iter() - .map(|col_name| { - ( - col_name, - DataType::Dictionary( - Box::new(DataType::UInt16), - Box::new(DataType::Utf8), - ), - ) - }) - .collect::>(); - let options = options.with_table_partition_cols(partitions); - Ok(Self { - table_paths: self.table_paths, - file_schema: self.file_schema, - options: Some(options), - schema_source: self.schema_source, - schema_adapter_factory: self.schema_adapter_factory, - expr_adapter_factory: self.expr_adapter_factory, - }) - } - None => config_err!("No `ListingOptions` set for inferring schema"), - } - } - - /// Set the [`SchemaAdapterFactory`] for the [`ListingTable`] - /// - /// The schema adapter factory is used to create schema adapters that can - /// handle schema evolution and type conversions when reading files with - /// different schemas than the table schema. - /// - /// If not provided, a default schema adapter factory will be used. - /// - /// # Example: Custom Schema Adapter for Type Coercion - /// ```rust - /// # use std::sync::Arc; - /// # use datafusion::datasource::listing::{ListingTableConfig, ListingOptions, ListingTableUrl}; - /// # use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter}; - /// # use datafusion::datasource::file_format::parquet::ParquetFormat; - /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; - /// # - /// # #[derive(Debug)] - /// # struct MySchemaAdapterFactory; - /// # impl SchemaAdapterFactory for MySchemaAdapterFactory { - /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box { - /// # unimplemented!() - /// # } - /// # } - /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap(); - /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); - /// # let table_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); - /// let config = ListingTableConfig::new(table_paths) - /// .with_listing_options(listing_options) - /// .with_schema(table_schema) - /// .with_schema_adapter_factory(Arc::new(MySchemaAdapterFactory)); - /// ``` - pub fn with_schema_adapter_factory( - self, - schema_adapter_factory: Arc, - ) -> Self { - Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self - } - } - - /// Get the [`SchemaAdapterFactory`] for this configuration - pub fn schema_adapter_factory(&self) -> Option<&Arc> { - self.schema_adapter_factory.as_ref() - } - - /// Set the [`PhysicalExprAdapterFactory`] for the [`ListingTable`] - /// - /// The expression adapter factory is used to create physical expression adapters that can - /// handle schema evolution and type conversions when evaluating expressions - /// with different schemas than the table schema. - /// - /// If not provided, a default physical expression adapter factory will be used unless a custom - /// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used. - /// - /// See for details on this transition. - pub fn with_expr_adapter_factory( - self, - expr_adapter_factory: Arc, - ) -> Self { - Self { - expr_adapter_factory: Some(expr_adapter_factory), - ..self - } - } -} - -/// Options for creating a [`ListingTable`] -#[derive(Clone, Debug)] -pub struct ListingOptions { - /// A suffix on which files should be filtered (leave empty to - /// keep all files on the path) - pub file_extension: String, - /// The file format - pub format: Arc, - /// The expected partition column names in the folder structure. - /// See [Self::with_table_partition_cols] for details - pub table_partition_cols: Vec<(String, DataType)>, - /// Set true to try to guess statistics from the files. - /// This can add a lot of overhead as it will usually require files - /// to be opened and at least partially parsed. - pub collect_stat: bool, - /// Group files to avoid that the number of partitions exceeds - /// this limit - pub target_partitions: usize, - /// Optional pre-known sort order(s). Must be `SortExpr`s. - /// - /// DataFusion may take advantage of this ordering to omit sorts - /// or use more efficient algorithms. Currently sortedness must be - /// provided if it is known by some external mechanism, but may in - /// the future be automatically determined, for example using - /// parquet metadata. - /// - /// See - /// - /// NOTE: This attribute stores all equivalent orderings (the outer `Vec`) - /// where each ordering consists of an individual lexicographic - /// ordering (encapsulated by a `Vec`). If there aren't - /// multiple equivalent orderings, the outer `Vec` will have a - /// single element. - pub file_sort_order: Vec>, -} - -impl ListingOptions { - /// Creates an options instance with the given format - /// Default values: - /// - use default file extension filter - /// - no input partition to discover - /// - one target partition - /// - do not collect statistics - pub fn new(format: Arc) -> Self { - Self { - file_extension: format.get_ext(), - format, - table_partition_cols: vec![], - collect_stat: false, - target_partitions: 1, - file_sort_order: vec![], - } - } - - /// Set options from [`SessionConfig`] and returns self. - /// - /// Currently this sets `target_partitions` and `collect_stat` - /// but if more options are added in the future that need to be coordinated - /// they will be synchronized through this method. - pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self { - self = self.with_target_partitions(config.target_partitions()); - self = self.with_collect_stat(config.collect_statistics()); - self - } - - /// Set file extension on [`ListingOptions`] and returns self. - /// - /// # Example - /// ``` - /// # use std::sync::Arc; - /// # use datafusion::prelude::SessionContext; - /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; - /// - /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::default() - /// )) - /// .with_file_extension(".parquet"); - /// - /// assert_eq!(listing_options.file_extension, ".parquet"); - /// ``` - pub fn with_file_extension(mut self, file_extension: impl Into) -> Self { - self.file_extension = file_extension.into(); - self - } - - /// Optionally set file extension on [`ListingOptions`] and returns self. - /// - /// If `file_extension` is `None`, the file extension will not be changed - /// - /// # Example - /// ``` - /// # use std::sync::Arc; - /// # use datafusion::prelude::SessionContext; - /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; - /// let extension = Some(".parquet"); - /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::default() - /// )) - /// .with_file_extension_opt(extension); - /// - /// assert_eq!(listing_options.file_extension, ".parquet"); - /// ``` - pub fn with_file_extension_opt(mut self, file_extension: Option) -> Self - where - S: Into, - { - if let Some(file_extension) = file_extension { - self.file_extension = file_extension.into(); - } - self - } - - /// Set `table partition columns` on [`ListingOptions`] and returns self. - /// - /// "partition columns," used to support [Hive Partitioning], are - /// columns added to the data that is read, based on the folder - /// structure where the data resides. - /// - /// For example, give the following files in your filesystem: - /// - /// ```text - /// /mnt/nyctaxi/year=2022/month=01/tripdata.parquet - /// /mnt/nyctaxi/year=2021/month=12/tripdata.parquet - /// /mnt/nyctaxi/year=2021/month=11/tripdata.parquet - /// ``` - /// - /// A [`ListingTable`] created at `/mnt/nyctaxi/` with partition - /// columns "year" and "month" will include new `year` and `month` - /// columns while reading the files. The `year` column would have - /// value `2022` and the `month` column would have value `01` for - /// the rows read from - /// `/mnt/nyctaxi/year=2022/month=01/tripdata.parquet` - /// - ///# Notes - /// - /// - If only one level (e.g. `year` in the example above) is - /// specified, the other levels are ignored but the files are - /// still read. - /// - /// - Files that don't follow this partitioning scheme will be - /// ignored. - /// - /// - Since the columns have the same value for all rows read from - /// each individual file (such as dates), they are typically - /// dictionary encoded for efficiency. You may use - /// [`wrap_partition_type_in_dict`] to request a - /// dictionary-encoded type. - /// - /// - The partition columns are solely extracted from the file path. Especially they are NOT part of the parquet files itself. - /// - /// # Example - /// - /// ``` - /// # use std::sync::Arc; - /// # use arrow::datatypes::DataType; - /// # use datafusion::prelude::col; - /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; - /// - /// // listing options for files with paths such as `/mnt/data/col_a=x/col_b=y/data.parquet` - /// // `col_a` and `col_b` will be included in the data read from those files - /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::default() - /// )) - /// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8), - /// ("col_b".to_string(), DataType::Utf8)]); - /// - /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8), - /// ("col_b".to_string(), DataType::Utf8)]); - /// ``` - /// - /// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html - /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict - pub fn with_table_partition_cols( - mut self, - table_partition_cols: Vec<(String, DataType)>, - ) -> Self { - self.table_partition_cols = table_partition_cols; - self - } - - /// Set stat collection on [`ListingOptions`] and returns self. - /// - /// ``` - /// # use std::sync::Arc; - /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; - /// - /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::default() - /// )) - /// .with_collect_stat(true); - /// - /// assert_eq!(listing_options.collect_stat, true); - /// ``` - pub fn with_collect_stat(mut self, collect_stat: bool) -> Self { - self.collect_stat = collect_stat; - self - } - - /// Set number of target partitions on [`ListingOptions`] and returns self. - /// - /// ``` - /// # use std::sync::Arc; - /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; - /// - /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::default() - /// )) - /// .with_target_partitions(8); - /// - /// assert_eq!(listing_options.target_partitions, 8); - /// ``` - pub fn with_target_partitions(mut self, target_partitions: usize) -> Self { - self.target_partitions = target_partitions; - self - } - - /// Set file sort order on [`ListingOptions`] and returns self. - /// - /// ``` - /// # use std::sync::Arc; - /// # use datafusion::prelude::col; - /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; - /// - /// // Tell datafusion that the files are sorted by column "a" - /// let file_sort_order = vec![vec![ - /// col("a").sort(true, true) - /// ]]; - /// - /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::default() - /// )) - /// .with_file_sort_order(file_sort_order.clone()); - /// - /// assert_eq!(listing_options.file_sort_order, file_sort_order); - /// ``` - pub fn with_file_sort_order(mut self, file_sort_order: Vec>) -> Self { - self.file_sort_order = file_sort_order; - self - } - - /// Infer the schema of the files at the given path on the provided object store. - /// - /// If the table_path contains one or more files (i.e. it is a directory / - /// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`] - /// - /// Note: The inferred schema does not include any partitioning columns. - /// - /// This method is called as part of creating a [`ListingTable`]. - pub async fn infer_schema<'a>( - &'a self, - state: &dyn Session, - table_path: &'a ListingTableUrl, - ) -> Result { - let store = state.runtime_env().object_store(table_path)?; - - let files: Vec<_> = table_path - .list_all_files(state, store.as_ref(), &self.file_extension) - .await? - // Empty files cannot affect schema but may throw when trying to read for it - .try_filter(|object_meta| future::ready(object_meta.size > 0)) - .try_collect() - .await?; - - let schema = self.format.infer_schema(state, &store, &files).await?; - - Ok(schema) - } - - /// Infers the partition columns stored in `LOCATION` and compares - /// them with the columns provided in `PARTITIONED BY` to help prevent - /// accidental corrupts of partitioned tables. - /// - /// Allows specifying partial partitions. - pub async fn validate_partitions( - &self, - state: &dyn Session, - table_path: &ListingTableUrl, - ) -> Result<()> { - if self.table_partition_cols.is_empty() { - return Ok(()); - } - - if !table_path.is_collection() { - return plan_err!( - "Can't create a partitioned table backed by a single file, \ - perhaps the URL is missing a trailing slash?" - ); - } - - let inferred = self.infer_partitions(state, table_path).await?; - - // no partitioned files found on disk - if inferred.is_empty() { - return Ok(()); - } - - let table_partition_names = self - .table_partition_cols - .iter() - .map(|(col_name, _)| col_name.clone()) - .collect_vec(); - - if inferred.len() < table_partition_names.len() { - return plan_err!( - "Inferred partitions to be {:?}, but got {:?}", - inferred, - table_partition_names - ); - } - - // match prefix to allow creating tables with partial partitions - for (idx, col) in table_partition_names.iter().enumerate() { - if &inferred[idx] != col { - return plan_err!( - "Inferred partitions to be {:?}, but got {:?}", - inferred, - table_partition_names - ); - } - } - - Ok(()) - } - - /// Infer the partitioning at the given path on the provided object store. - /// For performance reasons, it doesn't read all the files on disk - /// and therefore may fail to detect invalid partitioning. - pub(crate) async fn infer_partitions( - &self, - state: &dyn Session, - table_path: &ListingTableUrl, - ) -> Result> { - let store = state.runtime_env().object_store(table_path)?; - - // only use 10 files for inference - // This can fail to detect inconsistent partition keys - // A DFS traversal approach of the store can help here - let files: Vec<_> = table_path - .list_all_files(state, store.as_ref(), &self.file_extension) - .await? - .take(10) - .try_collect() - .await?; - - let stripped_path_parts = files.iter().map(|file| { - table_path - .strip_prefix(&file.location) - .unwrap() - .collect_vec() - }); - - let partition_keys = stripped_path_parts - .map(|path_parts| { - path_parts - .into_iter() - .rev() - .skip(1) // get parents only; skip the file itself - .rev() - // Partitions are expected to follow the format "column_name=value", so we - // should ignore any path part that cannot be parsed into the expected format - .filter(|s| s.contains('=')) - .map(|s| s.split('=').take(1).collect()) - .collect_vec() - }) - .collect_vec(); - - match partition_keys.into_iter().all_equal_value() { - Ok(v) => Ok(v), - Err(None) => Ok(vec![]), - Err(Some(diff)) => { - let mut sorted_diff = [diff.0, diff.1]; - sorted_diff.sort(); - plan_err!("Found mixed partition values on disk {:?}", sorted_diff) - } - } - } -} - -/// Built in [`TableProvider`] that reads data from one or more files as a single table. -/// -/// The files are read using an [`ObjectStore`] instance, for example from -/// local files or objects from AWS S3. -/// -/// # Features: -/// * Reading multiple files as a single table -/// * Hive style partitioning (e.g., directories named `date=2024-06-01`) -/// * Merges schemas from files with compatible but not identical schemas (see [`ListingTableConfig::file_schema`]) -/// * `limit`, `filter` and `projection` pushdown for formats that support it (e.g., -/// Parquet) -/// * Statistics collection and pruning based on file metadata -/// * Pre-existing sort order (see [`ListingOptions::file_sort_order`]) -/// * Metadata caching to speed up repeated queries (see [`FileMetadataCache`]) -/// * Statistics caching (see [`FileStatisticsCache`]) -/// -/// [`FileMetadataCache`]: datafusion_execution::cache::cache_manager::FileMetadataCache -/// -/// # Reading Directories and Hive Style Partitioning -/// -/// For example, given the `table1` directory (or object store prefix) -/// -/// ```text -/// table1 -/// ├── file1.parquet -/// └── file2.parquet -/// ``` -/// -/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as -/// a single table, merging the schemas if the files have compatible but not -/// identical schemas. -/// -/// Given the `table2` directory (or object store prefix) -/// -/// ```text -/// table2 -/// ├── date=2024-06-01 -/// │ ├── file3.parquet -/// │ └── file4.parquet -/// └── date=2024-06-02 -/// └── file5.parquet -/// ``` -/// -/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and -/// `file5.parquet` as a single table, again merging schemas if necessary. -/// -/// Given the hive style partitioning structure (e.g,. directories named -/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date` -/// column when reading the table: -/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01` -/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`. -/// -/// If the query has a predicate like `WHERE date = '2024-06-01'` -/// only the corresponding directory will be read. -/// -/// # See Also -/// -/// 1. [`ListingTableConfig`]: Configuration options -/// 1. [`DataSourceExec`]: `ExecutionPlan` used by `ListingTable` -/// -/// [`DataSourceExec`]: crate::datasource::source::DataSourceExec -/// -/// # Caching Metadata -/// -/// Some formats, such as Parquet, use the `FileMetadataCache` to cache file -/// metadata that is needed to execute but expensive to read, such as row -/// groups and statistics. The cache is scoped to the [`SessionContext`] and can -/// be configured via the [runtime config options]. -/// -/// [`SessionContext`]: crate::prelude::SessionContext -/// [runtime config options]: https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings -/// -/// # Example: Read a directory of parquet files using a [`ListingTable`] -/// -/// ```no_run -/// # use datafusion::prelude::SessionContext; -/// # use datafusion::error::Result; -/// # use std::sync::Arc; -/// # use datafusion::datasource::{ -/// # listing::{ -/// # ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, -/// # }, -/// # file_format::parquet::ParquetFormat, -/// # }; -/// # #[tokio::main] -/// # async fn main() -> Result<()> { -/// let ctx = SessionContext::new(); -/// let session_state = ctx.state(); -/// let table_path = "/path/to/parquet"; -/// -/// // Parse the path -/// let table_path = ListingTableUrl::parse(table_path)?; -/// -/// // Create default parquet options -/// let file_format = ParquetFormat::new(); -/// let listing_options = ListingOptions::new(Arc::new(file_format)) -/// .with_file_extension(".parquet"); -/// -/// // Resolve the schema -/// let resolved_schema = listing_options -/// .infer_schema(&session_state, &table_path) -/// .await?; -/// -/// let config = ListingTableConfig::new(table_path) -/// .with_listing_options(listing_options) -/// .with_schema(resolved_schema); -/// -/// // Create a new TableProvider -/// let provider = Arc::new(ListingTable::try_new(config)?); -/// -/// // This provider can now be read as a dataframe: -/// let df = ctx.read_table(provider.clone()); -/// -/// // or registered as a named table: -/// ctx.register_table("my_table", provider); -/// -/// # Ok(()) -/// # } -/// ``` -#[derive(Debug, Clone)] -pub struct ListingTable { - table_paths: Vec, - /// `file_schema` contains only the columns physically stored in the data files themselves. - /// - Represents the actual fields found in files like Parquet, CSV, etc. - /// - Used when reading the raw data from files - file_schema: SchemaRef, - /// `table_schema` combines `file_schema` + partition columns - /// - Partition columns are derived from directory paths (not stored in files) - /// - These are columns like "year=2022/month=01" in paths like `/data/year=2022/month=01/file.parquet` - table_schema: SchemaRef, - /// Indicates how the schema was derived (inferred or explicitly specified) - schema_source: SchemaSource, - /// Options used to configure the listing table such as the file format - /// and partitioning information - options: ListingOptions, - /// The SQL definition for this table, if any - definition: Option, - /// Cache for collected file statistics - collected_statistics: FileStatisticsCache, - /// Constraints applied to this table - constraints: Constraints, - /// Column default expressions for columns that are not physically present in the data files - column_defaults: HashMap, - /// Optional [`SchemaAdapterFactory`] for creating schema adapters - schema_adapter_factory: Option>, - /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters - expr_adapter_factory: Option>, -} - -impl ListingTable { - /// Create new [`ListingTable`] - /// - /// See documentation and example on [`ListingTable`] and [`ListingTableConfig`] - pub fn try_new(config: ListingTableConfig) -> Result { - // Extract schema_source before moving other parts of the config - let schema_source = config.schema_source(); - - let file_schema = config - .file_schema - .ok_or_else(|| internal_datafusion_err!("No schema provided."))?; - - let options = config - .options - .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?; - - // Add the partition columns to the file schema - let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned()); - for (part_col_name, part_col_type) in &options.table_partition_cols { - builder.push(Field::new(part_col_name, part_col_type.clone(), false)); - } - - let table_schema = Arc::new( - builder - .finish() - .with_metadata(file_schema.metadata().clone()), - ); - - let table = Self { - table_paths: config.table_paths, - file_schema, - table_schema, - schema_source, - options, - definition: None, - collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), - constraints: Constraints::default(), - column_defaults: HashMap::new(), - schema_adapter_factory: config.schema_adapter_factory, - expr_adapter_factory: config.expr_adapter_factory, - }; - - Ok(table) - } - - /// Assign constraints - pub fn with_constraints(mut self, constraints: Constraints) -> Self { - self.constraints = constraints; - self - } - - /// Assign column defaults - pub fn with_column_defaults( - mut self, - column_defaults: HashMap, - ) -> Self { - self.column_defaults = column_defaults; - self - } - - /// Set the [`FileStatisticsCache`] used to cache parquet file statistics. - /// - /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics - /// multiple times in the same session. - /// - /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query. - pub fn with_cache(mut self, cache: Option) -> Self { - self.collected_statistics = - cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default())); - self - } - - /// Specify the SQL definition for this table, if any - pub fn with_definition(mut self, definition: Option) -> Self { - self.definition = definition; - self - } - - /// Get paths ref - pub fn table_paths(&self) -> &Vec { - &self.table_paths - } - - /// Get options ref - pub fn options(&self) -> &ListingOptions { - &self.options - } - - /// Get the schema source - pub fn schema_source(&self) -> SchemaSource { - self.schema_source - } - - /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`] - /// - /// The schema adapter factory is used to create schema adapters that can - /// handle schema evolution and type conversions when reading files with - /// different schemas than the table schema. - /// - /// # Example: Adding Schema Evolution Support - /// ```rust - /// # use std::sync::Arc; - /// # use datafusion::datasource::listing::{ListingTable, ListingTableConfig, ListingOptions, ListingTableUrl}; - /// # use datafusion::datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter}; - /// # use datafusion::datasource::file_format::parquet::ParquetFormat; - /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType}; - /// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap(); - /// # let options = ListingOptions::new(Arc::new(ParquetFormat::default())); - /// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); - /// # let config = ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema); - /// # let table = ListingTable::try_new(config).unwrap(); - /// let table_with_evolution = table - /// .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory)); - /// ``` - /// See [`ListingTableConfig::with_schema_adapter_factory`] for an example of custom SchemaAdapterFactory. - pub fn with_schema_adapter_factory( - self, - schema_adapter_factory: Arc, - ) -> Self { - Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self - } - } - - /// Get the [`SchemaAdapterFactory`] for this table - pub fn schema_adapter_factory(&self) -> Option<&Arc> { - self.schema_adapter_factory.as_ref() - } - - /// Creates a schema adapter for mapping between file and table schemas - /// - /// Uses the configured schema adapter factory if available, otherwise falls back - /// to the default implementation. - fn create_schema_adapter(&self) -> Box { - let table_schema = self.schema(); - match &self.schema_adapter_factory { - Some(factory) => { - factory.create_with_projected_schema(Arc::clone(&table_schema)) - } - None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)), - } - } - - /// Creates a file source and applies schema adapter factory if available - fn create_file_source_with_schema_adapter(&self) -> Result> { - let mut source = self.options.format.file_source(); - // Apply schema adapter to source if available - // - // The source will use this SchemaAdapter to adapt data batches as they flow up the plan. - // Note: ListingTable also creates a SchemaAdapter in `scan()` but that is only used to adapt collected statistics. - if let Some(factory) = &self.schema_adapter_factory { - source = source.with_schema_adapter_factory(Arc::clone(factory))?; - } - Ok(source) - } - - /// If file_sort_order is specified, creates the appropriate physical expressions - fn try_create_output_ordering( - &self, - execution_props: &ExecutionProps, - ) -> Result> { - create_lex_ordering( - &self.table_schema, - &self.options.file_sort_order, - execution_props, - ) - } -} - -// Expressions can be used for partition pruning if they can be evaluated using -// only the partition columns and there are partition columns. -fn can_be_evaluated_for_partition_pruning( - partition_column_names: &[&str], - expr: &Expr, -) -> bool { - !partition_column_names.is_empty() - && expr_applicable_for_cols(partition_column_names, expr) -} - -#[async_trait] -impl TableProvider for ListingTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.table_schema) - } - - fn constraints(&self) -> Option<&Constraints> { - Some(&self.constraints) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - state: &dyn Session, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result> { - let options = ScanArgs::default() - .with_projection(projection.map(|p| p.as_slice())) - .with_filters(Some(filters)) - .with_limit(limit); - Ok(self.scan_with_args(state, options).await?.into_inner()) - } - - async fn scan_with_args<'a>( - &self, - state: &dyn Session, - args: ScanArgs<'a>, - ) -> Result { - let projection = args.projection().map(|p| p.to_vec()); - let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default(); - let limit = args.limit(); - - // extract types of partition columns - let table_partition_cols = self - .options - .table_partition_cols - .iter() - .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) - .collect::>>()?; - - let table_partition_col_names = table_partition_cols - .iter() - .map(|field| field.name().as_str()) - .collect::>(); - - // If the filters can be resolved using only partition cols, there is no need to - // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated - let (partition_filters, filters): (Vec<_>, Vec<_>) = - filters.iter().cloned().partition(|filter| { - can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter) - }); - - // We should not limit the number of partitioned files to scan if there are filters and limit - // at the same time. This is because the limit should be applied after the filters are applied. - let statistic_file_limit = if filters.is_empty() { limit } else { None }; - - let (mut partitioned_file_lists, statistics) = self - .list_files_for_scan(state, &partition_filters, statistic_file_limit) - .await?; - - // if no files need to be read, return an `EmptyExec` - if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection.as_ref())?; - return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); - } - - let output_ordering = self.try_create_output_ordering(state.execution_props())?; - match state - .config_options() - .execution - .split_file_groups_by_statistics - .then(|| { - output_ordering.first().map(|output_ordering| { - FileScanConfig::split_groups_by_statistics_with_target_partitions( - &self.table_schema, - &partitioned_file_lists, - output_ordering, - self.options.target_partitions, - ) - }) - }) - .flatten() - { - Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), - Some(Ok(new_groups)) => { - if new_groups.len() <= self.options.target_partitions { - partitioned_file_lists = new_groups; - } else { - log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered") - } - } - None => {} // no ordering required - }; - - let Some(object_store_url) = - self.table_paths.first().map(ListingTableUrl::object_store) - else { - return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new( - Schema::empty(), - ))))); - }; - - let file_source = self.create_file_source_with_schema_adapter()?; - - // create the execution plan - let plan = self - .options - .format - .create_physical_plan( - state, - FileScanConfigBuilder::new( - object_store_url, - Arc::clone(&self.file_schema), - file_source, - ) - .with_file_groups(partitioned_file_lists) - .with_constraints(self.constraints.clone()) - .with_statistics(statistics) - .with_projection(projection) - .with_limit(limit) - .with_output_ordering(output_ordering) - .with_table_partition_cols(table_partition_cols) - .with_expr_adapter(self.expr_adapter_factory.clone()) - .build(), - ) - .await?; - - Ok(ScanResult::new(plan)) - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> Result> { - let partition_column_names = self - .options - .table_partition_cols - .iter() - .map(|col| col.0.as_str()) - .collect::>(); - filters - .iter() - .map(|filter| { - if can_be_evaluated_for_partition_pruning(&partition_column_names, filter) - { - // if filter can be handled by partition pruning, it is exact - return Ok(TableProviderFilterPushDown::Exact); - } - - Ok(TableProviderFilterPushDown::Inexact) - }) - .collect() - } - - fn get_table_definition(&self) -> Option<&str> { - self.definition.as_deref() - } - - async fn insert_into( - &self, - state: &dyn Session, - input: Arc, - insert_op: InsertOp, - ) -> Result> { - // Check that the schema of the plan matches the schema of this table. - self.schema() - .logically_equivalent_names_and_types(&input.schema())?; - - let table_path = &self.table_paths()[0]; - if !table_path.is_collection() { - return plan_err!( - "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \ - To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE" - ); - } - - // Get the object store for the table path. - let store = state.runtime_env().object_store(table_path)?; - - let file_list_stream = pruned_partition_list( - state, - store.as_ref(), - table_path, - &[], - &self.options.file_extension, - &self.options.table_partition_cols, - ) - .await?; - - let file_group = file_list_stream.try_collect::>().await?.into(); - let keep_partition_by_columns = - state.config_options().execution.keep_partition_by_columns; - - // Sink related option, apart from format - let config = FileSinkConfig { - original_url: String::default(), - object_store_url: self.table_paths()[0].object_store(), - table_paths: self.table_paths().clone(), - file_group, - output_schema: self.schema(), - table_partition_cols: self.options.table_partition_cols.clone(), - insert_op, - keep_partition_by_columns, - file_extension: self.options().format.get_ext(), - }; - - let orderings = self.try_create_output_ordering(state.execution_props())?; - // It is sufficient to pass only one of the equivalent orderings: - let order_requirements = orderings.into_iter().next().map(Into::into); - - self.options() - .format - .create_writer_physical_plan(input, state, config, order_requirements) - .await - } - - fn get_column_default(&self, column: &str) -> Option<&Expr> { - self.column_defaults.get(column) - } -} - -impl ListingTable { - /// Get the list of files for a scan as well as the file level statistics. - /// The list is grouped to let the execution plan know how the files should - /// be distributed to different threads / executors. - async fn list_files_for_scan<'a>( - &'a self, - ctx: &'a dyn Session, - filters: &'a [Expr], - limit: Option, - ) -> Result<(Vec, Statistics)> { - let store = if let Some(url) = self.table_paths.first() { - ctx.runtime_env().object_store(url)? - } else { - return Ok((vec![], Statistics::new_unknown(&self.file_schema))); - }; - // list files (with partitions) - let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| { - pruned_partition_list( - ctx, - store.as_ref(), - table_path, - filters, - &self.options.file_extension, - &self.options.table_partition_cols, - ) - })) - .await?; - let meta_fetch_concurrency = - ctx.config_options().execution.meta_fetch_concurrency; - let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency); - // collect the statistics if required by the config - let files = file_list - .map(|part_file| async { - let part_file = part_file?; - let statistics = if self.options.collect_stat { - self.do_collect_statistics(ctx, &store, &part_file).await? - } else { - Arc::new(Statistics::new_unknown(&self.file_schema)) - }; - Ok(part_file.with_statistics(statistics)) - }) - .boxed() - .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); - - let (file_group, inexact_stats) = - get_files_with_limit(files, limit, self.options.collect_stat).await?; - - let file_groups = file_group.split_files(self.options.target_partitions); - let (mut file_groups, mut stats) = compute_all_files_statistics( - file_groups, - self.schema(), - self.options.collect_stat, - inexact_stats, - )?; - - let schema_adapter = self.create_schema_adapter(); - let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?; - - stats.column_statistics = - schema_mapper.map_column_statistics(&stats.column_statistics)?; - file_groups.iter_mut().try_for_each(|file_group| { - if let Some(stat) = file_group.statistics_mut() { - stat.column_statistics = - schema_mapper.map_column_statistics(&stat.column_statistics)?; - } - Ok::<_, DataFusionError>(()) - })?; - Ok((file_groups, stats)) - } - - /// Collects statistics for a given partitioned file. - /// - /// This method first checks if the statistics for the given file are already cached. - /// If they are, it returns the cached statistics. - /// If they are not, it infers the statistics from the file and stores them in the cache. - async fn do_collect_statistics( - &self, - ctx: &dyn Session, - store: &Arc, - part_file: &PartitionedFile, - ) -> Result> { - match self - .collected_statistics - .get_with_extra(&part_file.object_meta.location, &part_file.object_meta) - { - Some(statistics) => Ok(statistics), - None => { - let statistics = self - .options - .format - .infer_stats( - ctx, - store, - Arc::clone(&self.file_schema), - &part_file.object_meta, - ) - .await?; - let statistics = Arc::new(statistics); - self.collected_statistics.put_with_extra( - &part_file.object_meta.location, - Arc::clone(&statistics), - &part_file.object_meta, - ); - Ok(statistics) - } - } - } -} - -/// Processes a stream of partitioned files and returns a `FileGroup` containing the files. -/// -/// This function collects files from the provided stream until either: -/// 1. The stream is exhausted -/// 2. The accumulated number of rows exceeds the provided `limit` (if specified) -/// -/// # Arguments -/// * `files` - A stream of `Result` items to process -/// * `limit` - An optional row count limit. If provided, the function will stop collecting files -/// once the accumulated number of rows exceeds this limit -/// * `collect_stats` - Whether to collect and accumulate statistics from the files -/// -/// # Returns -/// A `Result` containing a `FileGroup` with the collected files -/// and a boolean indicating whether the statistics are inexact. -/// -/// # Note -/// The function will continue processing files if statistics are not available or if the -/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated -/// but files will still be collected. -async fn get_files_with_limit( - files: impl Stream>, - limit: Option, - collect_stats: bool, -) -> Result<(FileGroup, bool)> { - let mut file_group = FileGroup::default(); - // Fusing the stream allows us to call next safely even once it is finished. - let mut all_files = Box::pin(files.fuse()); - enum ProcessingState { - ReadingFiles, - ReachedLimit, - } - - let mut state = ProcessingState::ReadingFiles; - let mut num_rows = Precision::Absent; - - while let Some(file_result) = all_files.next().await { - // Early exit if we've already reached our limit - if matches!(state, ProcessingState::ReachedLimit) { - break; - } - - let file = file_result?; - - // Update file statistics regardless of state - if collect_stats { - if let Some(file_stats) = &file.statistics { - num_rows = if file_group.is_empty() { - // For the first file, just take its row count - file_stats.num_rows - } else { - // For subsequent files, accumulate the counts - num_rows.add(&file_stats.num_rows) - }; - } - } - - // Always add the file to our group - file_group.push(file); - - // Check if we've hit the limit (if one was specified) - if let Some(limit) = limit { - if let Precision::Exact(row_count) = num_rows { - if row_count > limit { - state = ProcessingState::ReachedLimit; - } - } - } - } - // If we still have files in the stream, it means that the limit kicked - // in, and the statistic could have been different had we processed the - // files in a different order. - let inexact_stats = all_files.next().await.is_some(); - Ok((file_group, inexact_stats)) } #[cfg(test)] mod tests { - use super::*; #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; + use crate::datasource::listing::table::ListingTableConfigExt; use crate::prelude::*; use crate::{ datasource::{ @@ -1587,21 +123,34 @@ mod tests { }, }; use arrow::{compute::SortOptions, record_batch::RecordBatch}; + use arrow_schema::{DataType, Field, Schema, SchemaRef}; + use datafusion_catalog::TableProvider; + use datafusion_catalog_listing::{ + ListingOptions, ListingTable, ListingTableConfig, SchemaSource, + }; use datafusion_common::{ - assert_contains, + assert_contains, plan_err, stats::Precision, test_util::{batches_to_string, datafusion_test_data}, - ColumnStatistics, ScalarValue, + ColumnStatistics, DataFusionError, Result, ScalarValue, }; + use datafusion_datasource::file_compression_type::FileCompressionType; + use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::schema_adapter::{ SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; + use datafusion_datasource::ListingTableUrl; + use datafusion_expr::dml::InsertOp; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; use datafusion_physical_expr::expressions::binary; use datafusion_physical_expr::PhysicalSortExpr; + use datafusion_physical_expr_common::sort_expr::LexOrdering; + use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::{collect, ExecutionPlanProperties}; use rstest::rstest; + use std::collections::HashMap; use std::io::Write; + use std::sync::Arc; use tempfile::TempDir; use url::Url; @@ -1638,10 +187,13 @@ mod tests { let ctx = SessionContext::new(); let testdata = datafusion_test_data(); let filename = format!("{testdata}/aggregate_simple.csv"); - let table_path = ListingTableUrl::parse(filename).unwrap(); + let table_path = ListingTableUrl::parse(filename)?; // Test default schema source - let config = ListingTableConfig::new(table_path.clone()); + let format = CsvFormat::default(); + let options = ListingOptions::new(Arc::new(format)); + let config = + ListingTableConfig::new(table_path.clone()).with_listing_options(options); assert_eq!(config.schema_source(), SchemaSource::Unset); // Test schema source after setting a schema explicitly @@ -1650,18 +202,13 @@ mod tests { assert_eq!(config_with_schema.schema_source(), SchemaSource::Specified); // Test schema source after inferring schema - let format = CsvFormat::default(); - let options = ListingOptions::new(Arc::new(format)); - let config_with_options = config.with_listing_options(options.clone()); - assert_eq!(config_with_options.schema_source(), SchemaSource::Unset); + assert_eq!(config.schema_source(), SchemaSource::Unset); - let config_with_inferred = config_with_options.infer_schema(&ctx.state()).await?; + let config_with_inferred = config.infer_schema(&ctx.state()).await?; assert_eq!(config_with_inferred.schema_source(), SchemaSource::Inferred); // Test schema preservation through operations - let config_with_schema_and_options = config_with_schema - .clone() - .with_listing_options(options.clone()); + let config_with_schema_and_options = config_with_schema.clone(); assert_eq!( config_with_schema_and_options.schema_source(), SchemaSource::Specified @@ -1836,7 +383,7 @@ mod tests { .with_table_partition_cols(vec![(String::from("p1"), DataType::Utf8)]) .with_target_partitions(4); - let table_path = ListingTableUrl::parse("test:///table/").unwrap(); + let table_path = ListingTableUrl::parse("test:///table/")?; let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); let config = ListingTableConfig::new(table_path) @@ -1872,7 +419,7 @@ mod tests { ) -> Result> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{testdata}/{name}"); - let table_path = ListingTableUrl::parse(filename).unwrap(); + let table_path = ListingTableUrl::parse(filename)?; let config = ListingTableConfig::new(table_path) .infer(&ctx.state()) @@ -1899,7 +446,7 @@ mod tests { let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); - let table_path = ListingTableUrl::parse(table_prefix).unwrap(); + let table_path = ListingTableUrl::parse(table_prefix)?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(Arc::new(schema)); @@ -2458,7 +1005,7 @@ mod tests { async fn test_infer_options_compressed_csv() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{testdata}/csv/aggregate_test_100.csv.gz"); - let table_path = ListingTableUrl::parse(filename).unwrap(); + let table_path = ListingTableUrl::parse(filename)?; let ctx = SessionContext::new(); @@ -2479,12 +1026,15 @@ mod tests { let testdata = datafusion_test_data(); let filename = format!("{testdata}/aggregate_simple.csv"); - let table_path = ListingTableUrl::parse(filename).unwrap(); + let table_path = ListingTableUrl::parse(filename)?; let provided_schema = create_test_schema(); - let config = - ListingTableConfig::new(table_path).with_schema(Arc::clone(&provided_schema)); + let format = CsvFormat::default(); + let options = ListingOptions::new(Arc::new(format)); + let config = ListingTableConfig::new(table_path) + .with_listing_options(options) + .with_schema(Arc::clone(&provided_schema)); let config = config.infer(&ctx.state()).await?; @@ -2549,8 +1099,8 @@ mod tests { table_path1.clone(), table_path2.clone(), ]) - .with_schema(schema_3cols) - .with_listing_options(options.clone()); + .with_listing_options(options.clone()) + .with_schema(schema_3cols); let config2 = config2.infer_schema(&ctx.state()).await?; assert_eq!(config2.schema_source(), SchemaSource::Specified); @@ -2573,8 +1123,8 @@ mod tests { table_path1.clone(), table_path2.clone(), ]) - .with_schema(schema_4cols) - .with_listing_options(options.clone()); + .with_listing_options(options.clone()) + .with_schema(schema_4cols); let config3 = config3.infer_schema(&ctx.state()).await?; assert_eq!(config3.schema_source(), SchemaSource::Specified); @@ -2739,7 +1289,7 @@ mod tests { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); - let table_path = ListingTableUrl::parse(filename).unwrap(); + let table_path = ListingTableUrl::parse(filename)?; let ctx = SessionContext::new(); let state = ctx.state(); @@ -2885,7 +1435,7 @@ mod tests { let format = JsonFormat::default(); let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(false); let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); - let table_path = ListingTableUrl::parse("test:///table/").unwrap(); + let table_path = ListingTableUrl::parse("test:///table/")?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) @@ -3099,7 +1649,7 @@ mod tests { let format = JsonFormat::default(); let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat); let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); - let table_path = ListingTableUrl::parse("test:///table/").unwrap(); + let table_path = ListingTableUrl::parse("test:///table/")?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) diff --git a/datafusion/core/tests/catalog/memory.rs b/datafusion/core/tests/catalog/memory.rs index ea9e71fc37467..06ed141b2e8bd 100644 --- a/datafusion/core/tests/catalog/memory.rs +++ b/datafusion/core/tests/catalog/memory.rs @@ -19,7 +19,7 @@ use arrow::datatypes::Schema; use datafusion::catalog::CatalogProvider; use datafusion::datasource::empty::EmptyTable; use datafusion::datasource::listing::{ - ListingTable, ListingTableConfig, ListingTableUrl, + ListingTable, ListingTableConfig, ListingTableConfigExt, ListingTableUrl, }; use datafusion::prelude::SessionContext; use datafusion_catalog::memory::*; diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 4ae2fa9b4c399..40fc6176e212b 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -23,7 +23,9 @@ use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion::assert_batches_eq; use datafusion::common::Result; -use datafusion::datasource::listing::{ListingTable, ListingTableConfig}; +use datafusion::datasource::listing::{ + ListingTable, ListingTableConfig, ListingTableConfigExt, +}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::DataFusionError;