diff --git a/Cargo.toml b/Cargo.toml index 9e7971bdc1e8..517ae427609f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ exclude = ["datafusion-cli", "dev/depcheck"] members = [ "datafusion/common", "datafusion/common-runtime", + "datafusion/catalog", "datafusion/core", "datafusion/expr", "datafusion/execution", @@ -88,6 +89,7 @@ chrono = { version = "0.4.34", default-features = false } ctor = "0.2.0" dashmap = "6.0.1" datafusion = { path = "datafusion/core", version = "40.0.0", default-features = false } +datafusion-catalog = { path = "datafusion/catalog", version = "40.0.0" } datafusion-common = { path = "datafusion/common", version = "40.0.0", default-features = false } datafusion-common-runtime = { path = "datafusion/common-runtime", version = "40.0.0" } datafusion-execution = { path = "datafusion/execution", version = "40.0.0" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index a4e87f99b5c3..8f8ecaadaa87 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1143,6 +1143,7 @@ dependencies = [ "bzip2", "chrono", "dashmap", + "datafusion-catalog", "datafusion-common", "datafusion-common-runtime", "datafusion-execution", @@ -1182,6 +1183,18 @@ dependencies = [ "zstd 0.13.2", ] +[[package]] +name = "datafusion-catalog" +version = "40.0.0" +dependencies = [ + "arrow-schema", + "async-trait", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-plan", +] + [[package]] name = "datafusion-cli" version = "40.0.0" diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index b83f65975610..273eb30d3a71 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -20,8 +20,7 @@ use std::sync::{Arc, Weak}; use crate::object_storage::{get_object_store, AwsOptions, GcpOptions}; -use datafusion::catalog::schema::SchemaProvider; -use datafusion::catalog::{CatalogProvider, CatalogProviderList}; +use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}; use datafusion::common::plan_datafusion_err; use datafusion::datasource::listing::{ ListingTable, ListingTableConfig, ListingTableUrl, @@ -237,7 +236,7 @@ fn substitute_tilde(cur: String) -> String { mod tests { use super::*; - use datafusion::catalog::schema::SchemaProvider; + use datafusion::catalog::SchemaProvider; use datafusion::prelude::SessionContext; fn setup_context() -> (SessionContext, Arc) { diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index 806e2bb39cd4..a85c43f3576f 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -22,11 +22,11 @@ use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; use async_trait::async_trait; +use datafusion::catalog::Session; use datafusion::common::{plan_err, Column}; use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; -use datafusion::execution::context::SessionState; use datafusion::logical_expr::Expr; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; @@ -234,7 +234,7 @@ impl TableProvider for ParquetMetadataTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 9bf71e52c3de..903defafe3ab 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -19,6 +19,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use arrow_schema::SchemaRef; use async_trait::async_trait; use bytes::Bytes; +use datafusion::catalog::Session; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::{ ParquetAccessPlan, ParquetExecBuilder, @@ -27,7 +28,6 @@ use datafusion::datasource::physical_plan::{ parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig, }; use datafusion::datasource::TableProvider; -use datafusion::execution::context::SessionState; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::arrow_reader::{ ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, @@ -271,7 +271,7 @@ impl IndexTableProvider { /// to a single predicate like `a = 1 AND b = 2` suitable for execution fn filters_to_predicate( &self, - state: &SessionState, + state: &dyn Session, filters: &[Expr], ) -> Result> { let df_schema = DFSchema::try_from(self.schema())?; @@ -463,7 +463,7 @@ impl TableProvider for IndexTableProvider { async fn scan( &self, - state: &SessionState, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, diff --git a/datafusion-examples/examples/catalog.rs b/datafusion-examples/examples/catalog.rs index b9188e1cd5e0..f9ead592c7ea 100644 --- a/datafusion-examples/examples/catalog.rs +++ b/datafusion-examples/examples/catalog.rs @@ -19,10 +19,7 @@ use async_trait::async_trait; use datafusion::{ arrow::util::pretty, - catalog::{ - schema::SchemaProvider, - {CatalogProvider, CatalogProviderList}, - }, + catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}, datasource::{ file_format::{csv::CsvFormat, FileFormat}, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index cfb49b023159..0f7748b13365 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -26,7 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::{provider_as_source, TableProvider, TableType}; use datafusion::error::Result; -use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::execution::context::TaskContext; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, @@ -37,6 +37,7 @@ use datafusion_expr::LogicalPlanBuilder; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; +use datafusion::catalog::Session; use tokio::time::timeout; /// This example demonstrates executing a simple query against a custom datasource @@ -175,7 +176,7 @@ impl TableProvider for CustomDataSource { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, // filters and limit can be used here to inject some push-down operations if needed _filters: &[Expr], diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index 668eda047444..91e178f1f1a5 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -23,13 +23,13 @@ use arrow::datatypes::Int32Type; use arrow::util::pretty::pretty_format_batches; use arrow_schema::SchemaRef; use async_trait::async_trait; +use datafusion::catalog::Session; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::{ parquet::StatisticsConverter, {FileScanConfig, ParquetExec}, }; use datafusion::datasource::TableProvider; -use datafusion::execution::context::SessionState; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::{ arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter, @@ -222,7 +222,7 @@ impl TableProvider for IndexTableProvider { async fn scan( &self, - state: &SessionState, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index c68c21fab169..fe7f37cc00e3 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -20,10 +20,11 @@ use arrow::csv::ReaderBuilder; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::catalog::Session; use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; -use datafusion::execution::context::{ExecutionProps, SessionState}; +use datafusion::execution::context::ExecutionProps; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; @@ -35,7 +36,6 @@ use std::fs::File; use std::io::Seek; use std::path::Path; use std::sync::Arc; - // To define your own table function, you only need to do the following 3 things: // 1. Implement your own [`TableProvider`] // 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`] @@ -95,7 +95,7 @@ impl TableProvider for LocalCsvTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml new file mode 100644 index 000000000000..2ebca511c5c8 --- /dev/null +++ b/datafusion/catalog/Cargo.toml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-catalog" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +arrow-schema = { workspace = true } +async-trait = "0.1.41" +datafusion-common = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-physical-plan = { workspace = true } + +[lints] +workspace = true diff --git a/datafusion/catalog/src/catalog.rs b/datafusion/catalog/src/catalog.rs new file mode 100644 index 000000000000..026c3c008f59 --- /dev/null +++ b/datafusion/catalog/src/catalog.rs @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +pub use crate::schema::SchemaProvider; +use datafusion_common::not_impl_err; +use datafusion_common::Result; + +/// Represents a catalog, comprising a number of named schemas. +/// +/// # Catalog Overview +/// +/// To plan and execute queries, DataFusion needs a "Catalog" that provides +/// metadata such as which schemas and tables exist, their columns and data +/// types, and how to access the data. +/// +/// The Catalog API consists: +/// * [`CatalogProviderList`]: a collection of `CatalogProvider`s +/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems) +/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems) +/// * [`TableProvider]`: individual tables +/// +/// # Implementing Catalogs +/// +/// To implement a catalog, you implement at least one of the [`CatalogProviderList`], +/// [`CatalogProvider`] and [`SchemaProvider`] traits and register them +/// appropriately in the `SessionContext`. +/// +/// DataFusion comes with a simple in-memory catalog implementation, +/// `MemoryCatalogProvider`, that is used by default and has no persistence. +/// DataFusion does not include more complex Catalog implementations because +/// catalog management is a key design choice for most data systems, and thus +/// it is unlikely that any general-purpose catalog implementation will work +/// well across many use cases. +/// +/// # Implementing "Remote" catalogs +/// +/// Sometimes catalog information is stored remotely and requires a network call +/// to retrieve. For example, the [Delta Lake] table format stores table +/// metadata in files on S3 that must be first downloaded to discover what +/// schemas and tables exist. +/// +/// [Delta Lake]: https://delta.io/ +/// +/// The [`CatalogProvider`] can support this use case, but it takes some care. +/// The planning APIs in DataFusion are not `async` and thus network IO can not +/// be performed "lazily" / "on demand" during query planning. The rationale for +/// this design is that using remote procedure calls for all catalog accesses +/// required for query planning would likely result in multiple network calls +/// per plan, resulting in very poor planning performance. +/// +/// To implement [`CatalogProvider`] and [`SchemaProvider`] for remote catalogs, +/// you need to provide an in memory snapshot of the required metadata. Most +/// systems typically either already have this information cached locally or can +/// batch access to the remote catalog to retrieve multiple schemas and tables +/// in a single network call. +/// +/// Note that [`SchemaProvider::table`] is an `async` function in order to +/// simplify implementing simple [`SchemaProvider`]s. For many table formats it +/// is easy to list all available tables but there is additional non trivial +/// access required to read table details (e.g. statistics). +/// +/// The pattern that DataFusion itself uses to plan SQL queries is to walk over +/// the query to find all table references, +/// performing required remote catalog in parallel, and then plans the query +/// using that snapshot. +/// +/// # Example Catalog Implementations +/// +/// Here are some examples of how to implement custom catalogs: +/// +/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider +/// that treats files and directories on a filesystem as tables. +/// +/// * The [`catalog.rs`]: a simple directory based catalog. +/// +/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can +/// read from Delta Lake tables +/// +/// [`datafusion-cli`]: https://datafusion.apache.org/user-guide/cli/index.html +/// [`DynamicFileCatalogProvider`]: https://github.com/apache/datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75 +/// [`catalog.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/catalog.rs +/// [delta-rs]: https://github.com/delta-io/delta-rs +/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123 +/// +/// [`TableProvider]: crate::datasource::TableProvider + +pub trait CatalogProvider: Sync + Send { + /// Returns the catalog provider as [`Any`] + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Retrieves the list of available schema names in this catalog. + fn schema_names(&self) -> Vec; + + /// Retrieves a specific schema from the catalog by name, provided it exists. + fn schema(&self, name: &str) -> Option>; + + /// Adds a new schema to this catalog. + /// + /// If a schema of the same name existed before, it is replaced in + /// the catalog and returned. + /// + /// By default returns a "Not Implemented" error + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Result>> { + // use variables to avoid unused variable warnings + let _ = name; + let _ = schema; + not_impl_err!("Registering new schemas is not supported") + } + + /// Removes a schema from this catalog. Implementations of this method should return + /// errors if the schema exists but cannot be dropped. For example, in DataFusion's + /// default in-memory catalog, `MemoryCatalogProvider`, a non-empty schema + /// will only be successfully dropped when `cascade` is true. + /// This is equivalent to how DROP SCHEMA works in PostgreSQL. + /// + /// Implementations of this method should return None if schema with `name` + /// does not exist. + /// + /// By default returns a "Not Implemented" error + fn deregister_schema( + &self, + _name: &str, + _cascade: bool, + ) -> Result>> { + not_impl_err!("Deregistering new schemas is not supported") + } +} + +/// Represent a list of named [`CatalogProvider`]s. +/// +/// Please see the documentation on `CatalogProvider` for details of +/// implementing a custom catalog. +pub trait CatalogProviderList: Sync + Send { + /// Returns the catalog list as [`Any`] + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Adds a new catalog to this catalog list + /// If a catalog of the same name existed before, it is replaced in the list and returned. + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option>; + + /// Retrieves the list of available catalog names + fn catalog_names(&self) -> Vec; + + /// Retrieves a specific catalog by name, provided it exists. + fn catalog(&self, name: &str) -> Option>; +} diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs new file mode 100644 index 000000000000..fe76b5dc9c64 --- /dev/null +++ b/datafusion/catalog/src/lib.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod catalog; +mod schema; +mod session; +mod table; + +pub use catalog::*; +pub use schema::*; +pub use session::*; +pub use table::*; diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/catalog/src/schema.rs similarity index 95% rename from datafusion/core/src/catalog/schema.rs rename to datafusion/catalog/src/schema.rs index 7d76b3fa4f19..21bca9fa828d 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/catalog/src/schema.rs @@ -23,11 +23,8 @@ use datafusion_common::{exec_err, DataFusionError}; use std::any::Any; use std::sync::Arc; -use crate::datasource::TableProvider; -use crate::error::Result; - -// backwards compatibility -pub use super::MemorySchemaProvider; +use crate::table::TableProvider; +use datafusion_common::Result; /// Represents a schema, comprising a number of named tables. /// diff --git a/datafusion/catalog/src/session.rs b/datafusion/catalog/src/session.rs new file mode 100644 index 000000000000..05d2684ed3e0 --- /dev/null +++ b/datafusion/catalog/src/session.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use datafusion_common::config::ConfigOptions; +use datafusion_common::{DFSchema, Result}; +use datafusion_execution::config::SessionConfig; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_execution::TaskContext; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; +use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr}; +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + +/// Interface for accessing [`SessionState`] from the catalog. +/// +/// This trait provides access to the information needed to plan and execute +/// queries, such as configuration, functions, and runtime environment. See the +/// documentation on [`SessionState`] for more information. +/// +/// Historically, the `SessionState` struct was passed directly to catalog +/// traits such as [`TableProvider`], which required a direct dependency on the +/// DataFusion core. The interface required is now defined by this trait. See +/// [#10782] for more details. +/// +/// [#10782]: https://github.com/apache/datafusion/issues/10782 +/// +/// # Migration from `SessionState` +/// +/// Using trait methods is preferred, as the implementation may change in future +/// versions. However, you can downcast a `Session` to a `SessionState` as shown +/// in the example below. If you find yourself needing to do this, please open +/// an issue on the DataFusion repository so we can extend the trait to provide +/// the required information. +/// +/// ``` +/// # use datafusion_catalog::Session; +/// # use datafusion_common::{Result, exec_datafusion_err}; +/// # struct SessionState {} +/// // Given a `Session` reference, get the concrete `SessionState` reference +/// // Note: this may stop working in future versions, +/// fn session_state_from_session(session: &dyn Session) -> Result<&SessionState> { +/// session.as_any() +/// .downcast_ref::() +/// .ok_or_else(|| exec_datafusion_err!("Failed to downcast Session to SessionState")) +/// } +/// ``` +/// +/// [`SessionState`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html +/// [`TableProvider`]: crate::TableProvider +#[async_trait] +pub trait Session: Send + Sync { + /// Return the session ID + fn session_id(&self) -> &str; + + /// Return the [`SessionConfig`] + fn config(&self) -> &SessionConfig; + + /// return the [`ConfigOptions`] + fn config_options(&self) -> &ConfigOptions { + self.config().options() + } + + /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`]. + /// + /// Note: this will optimize the provided plan first. + /// + /// This function will error for [`LogicalPlan`]s such as catalog DDL like + /// `CREATE TABLE`, which do not have corresponding physical plans and must + /// be handled by another layer, typically the `SessionContext`. + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + ) -> Result>; + + /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type + /// coercion, and function rewrites. + /// + /// Note: The expression is not simplified or otherwise optimized: `a = 1 + /// + 2` will not be simplified to `a = 3` as this is a more involved process. + /// See the [expr_api] example for how to simplify expressions. + /// + /// [expr_api]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs + fn create_physical_expr( + &self, + expr: Expr, + df_schema: &DFSchema, + ) -> Result>; + + /// Return reference to scalar_functions + fn scalar_functions(&self) -> &HashMap>; + + /// Return reference to aggregate_functions + fn aggregate_functions(&self) -> &HashMap>; + + /// Return reference to window functions + fn window_functions(&self) -> &HashMap>; + + /// Return the runtime env + fn runtime_env(&self) -> &Arc; + + /// Return the execution properties + fn execution_props(&self) -> &ExecutionProps; + + fn as_any(&self) -> &dyn Any; +} + +/// Create a new task context instance from Session +impl From<&dyn Session> for TaskContext { + fn from(state: &dyn Session) -> Self { + let task_id = None; + TaskContext::new( + task_id, + state.session_id().to_string(), + state.config().clone(), + state.scalar_functions().clone(), + state.aggregate_functions().clone(), + state.window_functions().clone(), + state.runtime_env().clone(), + ) + } +} diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs new file mode 100644 index 000000000000..792315642a00 --- /dev/null +++ b/datafusion/catalog/src/table.rs @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use crate::session::Session; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion_common::Result; +use datafusion_common::{not_impl_err, Constraints, Statistics}; +use datafusion_expr::{ + CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, TableType, +}; +use datafusion_physical_plan::ExecutionPlan; + +/// Source table +#[async_trait] +pub trait TableProvider: Sync + Send { + /// Returns the table provider as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Get a reference to the schema for this table + fn schema(&self) -> SchemaRef; + + /// Get a reference to the constraints of the table. + /// Returns: + /// - `None` for tables that do not support constraints. + /// - `Some(&Constraints)` for tables supporting constraints. + /// Therefore, a `Some(&Constraints::empty())` return value indicates that + /// this table supports constraints, but there are no constraints. + fn constraints(&self) -> Option<&Constraints> { + None + } + + /// Get the type of this table for metadata/catalog purposes. + fn table_type(&self) -> TableType; + + /// Get the create statement used to create this table, if available. + fn get_table_definition(&self) -> Option<&str> { + None + } + + /// Get the [`LogicalPlan`] of this table, if available + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + None + } + + /// Get the default value for a column, if available. + fn get_column_default(&self, _column: &str) -> Option<&Expr> { + None + } + + /// Create an [`ExecutionPlan`] for scanning the table with optionally + /// specified `projection`, `filter` and `limit`, described below. + /// + /// The `ExecutionPlan` is responsible scanning the datasource's + /// partitions in a streaming, parallelized fashion. + /// + /// # Projection + /// + /// If specified, only a subset of columns should be returned, in the order + /// specified. The projection is a set of indexes of the fields in + /// [`Self::schema`]. + /// + /// DataFusion provides the projection to scan only the columns actually + /// used in the query to improve performance, an optimization called + /// "Projection Pushdown". Some datasources, such as Parquet, can use this + /// information to go significantly faster when only a subset of columns is + /// required. + /// + /// # Filters + /// + /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the + /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for + /// which *all* of the `Expr`s evaluate to `true` must be returned (aka the + /// expressions are `AND`ed together). + /// + /// To enable filter pushdown you must override + /// [`Self::supports_filters_pushdown`] as the default implementation does + /// not and `filters` will be empty. + /// + /// DataFusion pushes filtering into the scans whenever possible + /// ("Filter Pushdown"), and depending on the format and the + /// implementation of the format, evaluating the predicate during the scan + /// can increase performance significantly. + /// + /// ## Note: Some columns may appear *only* in Filters + /// + /// In certain cases, a query may only use a certain column in a Filter that + /// has been completely pushed down to the scan. In this case, the + /// projection will not contain all the columns found in the filter + /// expressions. + /// + /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`, + /// + /// ```text + /// ┌────────────────────┐ + /// │ Projection(t.a) │ + /// └────────────────────┘ + /// ▲ + /// │ + /// │ + /// ┌────────────────────┐ Filter ┌────────────────────┐ Projection ┌────────────────────┐ + /// │ Filter(t.b > 5) │────Pushdown──▶ │ Projection(t.a) │ ───Pushdown───▶ │ Projection(t.a) │ + /// └────────────────────┘ └────────────────────┘ └────────────────────┘ + /// ▲ ▲ ▲ + /// │ │ │ + /// │ │ ┌────────────────────┐ + /// ┌────────────────────┐ ┌────────────────────┐ │ Scan │ + /// │ Scan │ │ Scan │ │ filter=(t.b > 5) │ + /// └────────────────────┘ │ filter=(t.b > 5) │ │ projection=(t.a) │ + /// └────────────────────┘ └────────────────────┘ + /// + /// Initial Plan If `TableProviderFilterPushDown` Projection pushdown notes that + /// returns true, filter pushdown the scan only needs t.a + /// pushes the filter into the scan + /// BUT internally evaluating the + /// predicate still requires t.b + /// ``` + /// + /// # Limit + /// + /// If `limit` is specified, must only produce *at least* this many rows, + /// (though it may return more). Like Projection Pushdown and Filter + /// Pushdown, DataFusion pushes `LIMIT`s as far down in the plan as + /// possible, called "Limit Pushdown" as some sources can use this + /// information to improve their performance. Note that if there are any + /// Inexact filters pushed down, the LIMIT cannot be pushed down. This is + /// because inexact filters do not guarantee that every filtered row is + /// removed, so applying the limit could lead to too few rows being available + /// to return as a final result. + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result>; + + /// Specify if DataFusion should provide filter expressions to the + /// TableProvider to apply *during* the scan. + /// + /// Some TableProviders can evaluate filters more efficiently than the + /// `Filter` operator in DataFusion, for example by using an index. + /// + /// # Parameters and Return Value + /// + /// The return `Vec` must have one element for each element of the `filters` + /// argument. The value of each element indicates if the TableProvider can + /// apply the corresponding filter during the scan. The position in the return + /// value corresponds to the expression in the `filters` parameter. + /// + /// If the length of the resulting `Vec` does not match the `filters` input + /// an error will be thrown. + /// + /// Each element in the resulting `Vec` is one of the following: + /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter + /// during scan + /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan + /// + /// By default, this function returns [`Unsupported`] for all filters, + /// meaning no filters will be provided to [`Self::scan`]. + /// + /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported + /// [`Exact`]: TableProviderFilterPushDown::Exact + /// [`Inexact`]: TableProviderFilterPushDown::Inexact + /// # Example + /// + /// ```rust + /// # use std::any::Any; + /// # use std::sync::Arc; + /// # use arrow_schema::SchemaRef; + /// # use async_trait::async_trait; + /// # use datafusion_catalog::{TableProvider, Session}; + /// # use datafusion_common::Result; + /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; + /// # use datafusion_physical_plan::ExecutionPlan; + /// // Define a struct that implements the TableProvider trait + /// struct TestDataSource {} + /// + /// #[async_trait] + /// impl TableProvider for TestDataSource { + /// # fn as_any(&self) -> &dyn Any { todo!() } + /// # fn schema(&self) -> SchemaRef { todo!() } + /// # fn table_type(&self) -> TableType { todo!() } + /// # async fn scan(&self, s: &dyn Session, p: Option<&Vec>, f: &[Expr], l: Option) -> Result> { + /// todo!() + /// # } + /// // Override the supports_filters_pushdown to evaluate which expressions + /// // to accept as pushdown predicates. + /// fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { + /// // Process each filter + /// let support: Vec<_> = filters.iter().map(|expr| { + /// match expr { + /// // This example only supports a between expr with a single column named "c1". + /// Expr::Between(between_expr) => { + /// between_expr.expr + /// .try_into_col() + /// .map(|column| { + /// if column.name == "c1" { + /// TableProviderFilterPushDown::Exact + /// } else { + /// TableProviderFilterPushDown::Unsupported + /// } + /// }) + /// // If there is no column in the expr set the filter to unsupported. + /// .unwrap_or(TableProviderFilterPushDown::Unsupported) + /// } + /// _ => { + /// // For all other cases return Unsupported. + /// TableProviderFilterPushDown::Unsupported + /// } + /// } + /// }).collect(); + /// Ok(support) + /// } + /// } + /// ``` + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![ + TableProviderFilterPushDown::Unsupported; + filters.len() + ]) + } + + /// Get statistics for this table, if available + fn statistics(&self) -> Option { + None + } + + /// Return an [`ExecutionPlan`] to insert data into this table, if + /// supported. + /// + /// The returned plan should return a single row in a UInt64 + /// column called "count" such as the following + /// + /// ```text + /// +-------+, + /// | count |, + /// +-------+, + /// | 6 |, + /// +-------+, + /// ``` + /// + /// # See Also + /// + /// See [`DataSinkExec`] for the common pattern of inserting a + /// streams of `RecordBatch`es as files to an ObjectStore. + /// + /// [`DataSinkExec`]: datafusion_physical_plan::insert::DataSinkExec + async fn insert_into( + &self, + _state: &dyn Session, + _input: Arc, + _overwrite: bool, + ) -> Result> { + not_impl_err!("Insert into not implemented for this table") + } +} + +/// A factory which creates [`TableProvider`]s at runtime given a URL. +/// +/// For example, this can be used to create a table "on the fly" +/// from a directory of files only when that name is referenced. +#[async_trait] +pub trait TableProviderFactory: Sync + Send { + /// Create a TableProvider with the given url + async fn create( + &self, + state: &dyn Session, + cmd: &CreateExternalTable, + ) -> Result>; +} diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 98d501794f77..09b90a56d2aa 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -98,6 +98,7 @@ bytes = { workspace = true } bzip2 = { version = "0.4.3", optional = true } chrono = { workspace = true } dashmap = { workspace = true } +datafusion-catalog = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog_common/information_schema.rs similarity index 99% rename from datafusion/core/src/catalog/information_schema.rs rename to datafusion/core/src/catalog_common/information_schema.rs index a79f62e742be..d086ce900cc3 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog_common/information_schema.rs @@ -29,8 +29,8 @@ use arrow::{ record_batch::RecordBatch, }; +use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider}; use crate::datasource::streaming::StreamingTable; -use crate::datasource::TableProvider; use crate::execution::context::TaskContext; use crate::logical_expr::TableType; use crate::physical_plan::stream::RecordBatchStreamAdapter; @@ -40,8 +40,6 @@ use crate::{ physical_plan::streaming::PartitionStream, }; -use super::{schema::SchemaProvider, CatalogProviderList}; - pub(crate) const INFORMATION_SCHEMA: &str = "information_schema"; pub(crate) const TABLES: &str = "tables"; pub(crate) const VIEWS: &str = "views"; diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog_common/listing_schema.rs similarity index 98% rename from datafusion/core/src/catalog/listing_schema.rs rename to datafusion/core/src/catalog_common/listing_schema.rs index 373fe788c721..5b91f963ca24 100644 --- a/datafusion/core/src/catalog/listing_schema.rs +++ b/datafusion/core/src/catalog_common/listing_schema.rs @@ -22,9 +22,7 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::{Arc, Mutex}; -use crate::catalog::schema::SchemaProvider; -use crate::datasource::provider::TableProviderFactory; -use crate::datasource::TableProvider; +use crate::catalog::{SchemaProvider, TableProvider, TableProviderFactory}; use crate::execution::context::SessionState; use datafusion_common::{Constraints, DFSchema, DataFusionError, TableReference}; diff --git a/datafusion/core/src/catalog/memory.rs b/datafusion/core/src/catalog_common/memory.rs similarity index 97% rename from datafusion/core/src/catalog/memory.rs rename to datafusion/core/src/catalog_common/memory.rs index 3af823913a29..6d8bddec4547 100644 --- a/datafusion/core/src/catalog/memory.rs +++ b/datafusion/core/src/catalog_common/memory.rs @@ -18,9 +18,9 @@ //! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory //! implementations of [`CatalogProviderList`] and [`CatalogProvider`]. -use crate::catalog::schema::SchemaProvider; -use crate::catalog::{CatalogProvider, CatalogProviderList}; -use crate::datasource::TableProvider; +use crate::catalog::{ + CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider, +}; use async_trait::async_trait; use dashmap::DashMap; use datafusion_common::{exec_err, DataFusionError}; @@ -201,11 +201,10 @@ impl SchemaProvider for MemorySchemaProvider { #[cfg(test)] mod test { use super::*; - use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider}; use crate::catalog::CatalogProvider; + use crate::catalog_common::memory::MemorySchemaProvider; use crate::datasource::empty::EmptyTable; use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; - use crate::datasource::TableProvider; use crate::prelude::SessionContext; use arrow_schema::Schema; use datafusion_common::assert_batches_eq; diff --git a/datafusion/core/src/catalog/mod.rs b/datafusion/core/src/catalog_common/mod.rs similarity index 59% rename from datafusion/core/src/catalog/mod.rs rename to datafusion/core/src/catalog_common/mod.rs index fc50b4214d6d..b8414378862e 100644 --- a/datafusion/core/src/catalog/mod.rs +++ b/datafusion/core/src/catalog_common/mod.rs @@ -17,11 +17,6 @@ //! Interfaces and default implementations of catalogs and schemas. //! -//! Traits: -//! * [`CatalogProviderList`]: a collection of `CatalogProvider`s -//! * [`CatalogProvider`]: a collection of [`SchemaProvider`]s (sometimes called a "database" in other systems) -//! * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems) -//! //! Implementations //! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`] //! * Information schema: [`information_schema`] @@ -29,180 +24,22 @@ pub mod information_schema; pub mod listing_schema; -mod memory; -pub mod schema; +pub mod memory; +pub use crate::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}; pub use memory::{ MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider, }; -pub use schema::SchemaProvider; pub use datafusion_sql::{ResolvedTableReference, TableReference}; -use datafusion_common::{not_impl_err, Result}; -use std::any::Any; use std::collections::BTreeSet; use std::ops::ControlFlow; -use std::sync::Arc; - -/// Represent a list of named [`CatalogProvider`]s. -/// -/// Please see the documentation on `CatalogProvider` for details of -/// implementing a custom catalog. -pub trait CatalogProviderList: Sync + Send { - /// Returns the catalog list as [`Any`] - /// so that it can be downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Adds a new catalog to this catalog list - /// If a catalog of the same name existed before, it is replaced in the list and returned. - fn register_catalog( - &self, - name: String, - catalog: Arc, - ) -> Option>; - - /// Retrieves the list of available catalog names - fn catalog_names(&self) -> Vec; - - /// Retrieves a specific catalog by name, provided it exists. - fn catalog(&self, name: &str) -> Option>; -} /// See [`CatalogProviderList`] #[deprecated(since = "35.0.0", note = "use [`CatalogProviderList`] instead")] pub trait CatalogList: CatalogProviderList {} -/// Represents a catalog, comprising a number of named schemas. -/// -/// # Catalog Overview -/// -/// To plan and execute queries, DataFusion needs a "Catalog" that provides -/// metadata such as which schemas and tables exist, their columns and data -/// types, and how to access the data. -/// -/// The Catalog API consists: -/// * [`CatalogProviderList`]: a collection of `CatalogProvider`s -/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems) -/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems) -/// * [`TableProvider]`: individual tables -/// -/// # Implementing Catalogs -/// -/// To implement a catalog, you implement at least one of the [`CatalogProviderList`], -/// [`CatalogProvider`] and [`SchemaProvider`] traits and register them -/// appropriately the [`SessionContext`]. -/// -/// [`SessionContext`]: crate::execution::context::SessionContext -/// -/// DataFusion comes with a simple in-memory catalog implementation, -/// [`MemoryCatalogProvider`], that is used by default and has no persistence. -/// DataFusion does not include more complex Catalog implementations because -/// catalog management is a key design choice for most data systems, and thus -/// it is unlikely that any general-purpose catalog implementation will work -/// well across many use cases. -/// -/// # Implementing "Remote" catalogs -/// -/// Sometimes catalog information is stored remotely and requires a network call -/// to retrieve. For example, the [Delta Lake] table format stores table -/// metadata in files on S3 that must be first downloaded to discover what -/// schemas and tables exist. -/// -/// [Delta Lake]: https://delta.io/ -/// -/// The [`CatalogProvider`] can support this use case, but it takes some care. -/// The planning APIs in DataFusion are not `async` and thus network IO can not -/// be performed "lazily" / "on demand" during query planning. The rationale for -/// this design is that using remote procedure calls for all catalog accesses -/// required for query planning would likely result in multiple network calls -/// per plan, resulting in very poor planning performance. -/// -/// To implement [`CatalogProvider`] and [`SchemaProvider`] for remote catalogs, -/// you need to provide an in memory snapshot of the required metadata. Most -/// systems typically either already have this information cached locally or can -/// batch access to the remote catalog to retrieve multiple schemas and tables -/// in a single network call. -/// -/// Note that [`SchemaProvider::table`] is an `async` function in order to -/// simplify implementing simple [`SchemaProvider`]s. For many table formats it -/// is easy to list all available tables but there is additional non trivial -/// access required to read table details (e.g. statistics). -/// -/// The pattern that DataFusion itself uses to plan SQL queries is to walk over -/// the query to [find all table references], -/// performing required remote catalog in parallel, and then plans the query -/// using that snapshot. -/// -/// [find all table references]: resolve_table_references -/// -/// # Example Catalog Implementations -/// -/// Here are some examples of how to implement custom catalogs: -/// -/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider -/// that treats files and directories on a filesystem as tables. -/// -/// * The [`catalog.rs`]: a simple directory based catalog. -/// -/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can -/// read from Delta Lake tables -/// -/// [`datafusion-cli`]: https://datafusion.apache.org/user-guide/cli/index.html -/// [`DynamicFileCatalogProvider`]: https://github.com/apache/datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75 -/// [`catalog.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/catalog.rs -/// [delta-rs]: https://github.com/delta-io/delta-rs -/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123 -/// -/// [`TableProvider]: crate::datasource::TableProvider - -pub trait CatalogProvider: Sync + Send { - /// Returns the catalog provider as [`Any`] - /// so that it can be downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Retrieves the list of available schema names in this catalog. - fn schema_names(&self) -> Vec; - - /// Retrieves a specific schema from the catalog by name, provided it exists. - fn schema(&self, name: &str) -> Option>; - - /// Adds a new schema to this catalog. - /// - /// If a schema of the same name existed before, it is replaced in - /// the catalog and returned. - /// - /// By default returns a "Not Implemented" error - fn register_schema( - &self, - name: &str, - schema: Arc, - ) -> Result>> { - // use variables to avoid unused variable warnings - let _ = name; - let _ = schema; - not_impl_err!("Registering new schemas is not supported") - } - - /// Removes a schema from this catalog. Implementations of this method should return - /// errors if the schema exists but cannot be dropped. For example, in DataFusion's - /// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema - /// will only be successfully dropped when `cascade` is true. - /// This is equivalent to how DROP SCHEMA works in PostgreSQL. - /// - /// Implementations of this method should return None if schema with `name` - /// does not exist. - /// - /// By default returns a "Not Implemented" error - fn deregister_schema( - &self, - _name: &str, - _cascade: bool, - ) -> Result>> { - not_impl_err!("Deregistering new schemas is not supported") - } -} - /// Collects all tables and views referenced in the SQL statement. CTEs are collected separately. /// This can be used to determine which tables need to be in the catalog for a query to be planned. /// @@ -215,7 +52,7 @@ pub trait CatalogProvider: Sync + Send { /// /// ``` /// # use datafusion_sql::parser::DFParser; -/// # use datafusion::catalog::resolve_table_references; +/// # use datafusion::catalog_common::resolve_table_references; /// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)"; /// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); /// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); @@ -229,7 +66,7 @@ pub trait CatalogProvider: Sync + Send { /// /// ``` /// # use datafusion_sql::parser::DFParser; -/// # use datafusion::catalog::resolve_table_references; +/// # use datafusion::catalog_common::resolve_table_references; /// let query = "with my_cte as (values (1), (2)) SELECT * from my_cte;"; /// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap(); /// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap(); diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index e1021d06261f..8feccfb43d6b 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -58,6 +58,7 @@ use datafusion_expr::{ use datafusion_functions_aggregate::expr_fn::{avg, count, median, stddev, sum}; use async_trait::async_trait; +use datafusion_catalog::Session; /// Contains options that control how data is /// written out from a DataFrame @@ -1657,7 +1658,7 @@ impl TableProvider for DataFrameTableProvider { async fn scan( &self, - state: &SessionState, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, diff --git a/datafusion/core/src/datasource/cte_worktable.rs b/datafusion/core/src/datasource/cte_worktable.rs index afc4536f068e..d7d224828dda 100644 --- a/datafusion/core/src/datasource/cte_worktable.rs +++ b/datafusion/core/src/datasource/cte_worktable.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion_catalog::Session; use datafusion_physical_plan::work_table::WorkTableExec; use crate::{ @@ -31,7 +32,6 @@ use crate::{ }; use crate::datasource::{TableProvider, TableType}; -use crate::execution::context::SessionState; /// The temporary working table where the previous iteration of a recursive query is stored /// Naming is based on PostgreSQL's implementation. @@ -77,7 +77,7 @@ impl TableProvider for CteWorkTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 5100987520ee..d831dd006031 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -22,11 +22,11 @@ use std::sync::Arc; use arrow::datatypes::*; use async_trait::async_trait; +use datafusion_catalog::Session; use datafusion_common::project_schema; use crate::datasource::{TableProvider, TableType}; use crate::error::Result; -use crate::execution::context::SessionState; use crate::logical_expr::Expr; use crate::physical_plan::{empty::EmptyExec, ExecutionPlan}; @@ -69,7 +69,7 @@ impl TableProvider for EmptyTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 500f20af474f..7154b50b9dd9 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -73,7 +73,7 @@ pub trait FileFormatFactory: Sync + Send + GetExt + Debug { /// from the [`TableProvider`]. This helps code re-utilization across /// providers that support the same file formats. /// -/// [`TableProvider`]: crate::datasource::provider::TableProvider +/// [`TableProvider`]: crate::catalog::TableProvider #[async_trait] pub trait FileFormat: Send + Sync + fmt::Debug { /// Returns the table provider as [`Any`](std::any::Any) so that it can be diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index b91a4bd09c55..3af4d41bcf03 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -24,9 +24,8 @@ use std::{any::Any, sync::Arc}; use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; use super::PartitionedFile; -use crate::datasource::{ - create_ordering, get_statistics_with_limit, TableProvider, TableType, -}; +use crate::catalog::TableProvider; +use crate::datasource::{create_ordering, get_statistics_with_limit, TableType}; use crate::datasource::{ file_format::{file_compression_type::FileCompressionType, FileFormat}, listing::ListingTableUrl, @@ -52,6 +51,7 @@ use datafusion_physical_expr::{ }; use async_trait::async_trait; +use datafusion_catalog::Session; use futures::{future, stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; @@ -736,13 +736,16 @@ impl TableProvider for ListingTable { async fn scan( &self, - state: &SessionState, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> Result> { - let (mut partitioned_file_lists, statistics) = - self.list_files_for_scan(state, filters, limit).await?; + // TODO remove downcast_ref from here? + let session_state = state.as_any().downcast_ref::().unwrap(); + let (mut partitioned_file_lists, statistics) = self + .list_files_for_scan(session_state, filters, limit) + .await?; // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { @@ -805,7 +808,7 @@ impl TableProvider for ListingTable { self.options .format .create_physical_plan( - state, + session_state, FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema)) .with_file_groups(partitioned_file_lists) .with_statistics(statistics) @@ -852,7 +855,7 @@ impl TableProvider for ListingTable { async fn insert_into( &self, - state: &SessionState, + state: &dyn Session, input: Arc, overwrite: bool, ) -> Result> { @@ -878,8 +881,10 @@ impl TableProvider for ListingTable { // Get the object store for the table path. let store = state.runtime_env().object_store(table_path)?; + // TODO remove downcast_ref from here? + let session_state = state.as_any().downcast_ref::().unwrap(); let file_list_stream = pruned_partition_list( - state, + session_state, store.as_ref(), table_path, &[], @@ -890,7 +895,7 @@ impl TableProvider for ListingTable { let file_groups = file_list_stream.try_collect::>().await?; let keep_partition_by_columns = - state.config().options().execution.keep_partition_by_columns; + state.config_options().execution.keep_partition_by_columns; // Sink related option, apart from format let config = FileSinkConfig { @@ -926,7 +931,7 @@ impl TableProvider for ListingTable { self.options() .format - .create_writer_physical_plan(input, state, config, order_requirements) + .create_writer_physical_plan(input, session_state, config, order_requirements) .await } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 1d4d08481895..ce52dd98166e 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -20,11 +20,10 @@ use std::path::Path; use std::sync::Arc; +use crate::catalog::{TableProvider, TableProviderFactory}; use crate::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; -use crate::datasource::provider::TableProviderFactory; -use crate::datasource::TableProvider; use crate::execution::context::SessionState; use arrow::datatypes::{DataType, SchemaRef}; @@ -33,6 +32,7 @@ use datafusion_common::{config_datafusion_err, Result}; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; +use datafusion_catalog::Session; /// A `TableProviderFactory` capable of creating new `ListingTable`s #[derive(Debug, Default)] @@ -49,16 +49,18 @@ impl ListingTableFactory { impl TableProviderFactory for ListingTableFactory { async fn create( &self, - state: &SessionState, + state: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { - let file_format = state + // TODO remove downcast_ref from here. Should file format factory be an extension to session state? + let session_state = state.as_any().downcast_ref::().unwrap(); + let file_format = session_state .get_file_format_factory(cmd.file_type.as_str()) .ok_or(config_datafusion_err!( "Unable to create table with format {}! Could not find FileFormat.", cmd.file_type ))? - .create(state, &cmd.options)?; + .create(session_state, &cmd.options)?; let file_extension = get_extension(cmd.location.as_str()); @@ -114,10 +116,12 @@ impl TableProviderFactory for ListingTableFactory { .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()); - options.validate_partitions(state, &table_path).await?; + options + .validate_partitions(session_state, &table_path) + .await?; let resolved_schema = match provided_schema { - None => options.infer_schema(state, &table_path).await?, + None => options.infer_schema(session_state, &table_path).await?, Some(s) => s, }; let config = ListingTableConfig::new(table_path) diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 5c4928209559..44e01e71648a 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -42,6 +42,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; +use datafusion_catalog::Session; use futures::StreamExt; use log::debug; use parking_lot::Mutex; @@ -206,7 +207,7 @@ impl TableProvider for MemTable { async fn scan( &self, - state: &SessionState, + state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, @@ -258,7 +259,7 @@ impl TableProvider for MemTable { /// * A plan that returns the number of rows written. async fn insert_into( &self, - _state: &SessionState, + _state: &dyn Session, input: Arc, overwrite: bool, ) -> Result> { diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index c28788eed458..1c9924735735 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -43,8 +43,8 @@ pub use self::default_table_source::{ provider_as_source, source_as_provider, DefaultTableSource, }; pub use self::memory::MemTable; -pub use self::provider::TableProvider; pub use self::view::ViewTable; +pub use crate::catalog::TableProvider; pub use crate::logical_expr::TableType; pub use statistics::get_statistics_with_limit; diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 7c58aded3108..9d4b67632a01 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -17,285 +17,17 @@ //! Data source traits -use std::any::Any; use std::sync::Arc; use async_trait::async_trait; -use datafusion_common::{not_impl_err, Constraints, Statistics}; -use datafusion_expr::{CreateExternalTable, LogicalPlan}; +use datafusion_catalog::Session; +use datafusion_expr::CreateExternalTable; pub use datafusion_expr::{TableProviderFilterPushDown, TableType}; -use crate::arrow::datatypes::SchemaRef; +use crate::catalog::{TableProvider, TableProviderFactory}; use crate::datasource::listing_table_factory::ListingTableFactory; use crate::datasource::stream::StreamTableFactory; use crate::error::Result; -use crate::execution::context::SessionState; -use crate::logical_expr::Expr; -use crate::physical_plan::ExecutionPlan; - -/// Source table -#[async_trait] -pub trait TableProvider: Sync + Send { - /// Returns the table provider as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Get a reference to the schema for this table - fn schema(&self) -> SchemaRef; - - /// Get a reference to the constraints of the table. - /// Returns: - /// - `None` for tables that do not support constraints. - /// - `Some(&Constraints)` for tables supporting constraints. - /// Therefore, a `Some(&Constraints::empty())` return value indicates that - /// this table supports constraints, but there are no constraints. - fn constraints(&self) -> Option<&Constraints> { - None - } - - /// Get the type of this table for metadata/catalog purposes. - fn table_type(&self) -> TableType; - - /// Get the create statement used to create this table, if available. - fn get_table_definition(&self) -> Option<&str> { - None - } - - /// Get the [`LogicalPlan`] of this table, if available - fn get_logical_plan(&self) -> Option<&LogicalPlan> { - None - } - - /// Get the default value for a column, if available. - fn get_column_default(&self, _column: &str) -> Option<&Expr> { - None - } - - /// Create an [`ExecutionPlan`] for scanning the table with optionally - /// specified `projection`, `filter` and `limit`, described below. - /// - /// The `ExecutionPlan` is responsible scanning the datasource's - /// partitions in a streaming, parallelized fashion. - /// - /// # Projection - /// - /// If specified, only a subset of columns should be returned, in the order - /// specified. The projection is a set of indexes of the fields in - /// [`Self::schema`]. - /// - /// DataFusion provides the projection to scan only the columns actually - /// used in the query to improve performance, an optimization called - /// "Projection Pushdown". Some datasources, such as Parquet, can use this - /// information to go significantly faster when only a subset of columns is - /// required. - /// - /// # Filters - /// - /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the - /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for - /// which *all* of the `Expr`s evaluate to `true` must be returned (aka the - /// expressions are `AND`ed together). - /// - /// To enable filter pushdown you must override - /// [`Self::supports_filters_pushdown`] as the default implementation does - /// not and `filters` will be empty. - /// - /// DataFusion pushes filtering into the scans whenever possible - /// ("Filter Pushdown"), and depending on the format and the - /// implementation of the format, evaluating the predicate during the scan - /// can increase performance significantly. - /// - /// ## Note: Some columns may appear *only* in Filters - /// - /// In certain cases, a query may only use a certain column in a Filter that - /// has been completely pushed down to the scan. In this case, the - /// projection will not contain all the columns found in the filter - /// expressions. - /// - /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`, - /// - /// ```text - /// ┌────────────────────┐ - /// │ Projection(t.a) │ - /// └────────────────────┘ - /// ▲ - /// │ - /// │ - /// ┌────────────────────┐ Filter ┌────────────────────┐ Projection ┌────────────────────┐ - /// │ Filter(t.b > 5) │────Pushdown──▶ │ Projection(t.a) │ ───Pushdown───▶ │ Projection(t.a) │ - /// └────────────────────┘ └────────────────────┘ └────────────────────┘ - /// ▲ ▲ ▲ - /// │ │ │ - /// │ │ ┌────────────────────┐ - /// ┌────────────────────┐ ┌────────────────────┐ │ Scan │ - /// │ Scan │ │ Scan │ │ filter=(t.b > 5) │ - /// └────────────────────┘ │ filter=(t.b > 5) │ │ projection=(t.a) │ - /// └────────────────────┘ └────────────────────┘ - /// - /// Initial Plan If `TableProviderFilterPushDown` Projection pushdown notes that - /// returns true, filter pushdown the scan only needs t.a - /// pushes the filter into the scan - /// BUT internally evaluating the - /// predicate still requires t.b - /// ``` - /// - /// # Limit - /// - /// If `limit` is specified, must only produce *at least* this many rows, - /// (though it may return more). Like Projection Pushdown and Filter - /// Pushdown, DataFusion pushes `LIMIT`s as far down in the plan as - /// possible, called "Limit Pushdown" as some sources can use this - /// information to improve their performance. Note that if there are any - /// Inexact filters pushed down, the LIMIT cannot be pushed down. This is - /// because inexact filters do not guarantee that every filtered row is - /// removed, so applying the limit could lead to too few rows being available - /// to return as a final result. - async fn scan( - &self, - state: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result>; - - /// Specify if DataFusion should provide filter expressions to the - /// TableProvider to apply *during* the scan. - /// - /// Some TableProviders can evaluate filters more efficiently than the - /// `Filter` operator in DataFusion, for example by using an index. - /// - /// # Parameters and Return Value - /// - /// The return `Vec` must have one element for each element of the `filters` - /// argument. The value of each element indicates if the TableProvider can - /// apply the corresponding filter during the scan. The position in the return - /// value corresponds to the expression in the `filters` parameter. - /// - /// If the length of the resulting `Vec` does not match the `filters` input - /// an error will be thrown. - /// - /// Each element in the resulting `Vec` is one of the following: - /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter - /// during scan - /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan - /// - /// By default, this function returns [`Unsupported`] for all filters, - /// meaning no filters will be provided to [`Self::scan`]. - /// - /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported - /// [`Exact`]: TableProviderFilterPushDown::Exact - /// [`Inexact`]: TableProviderFilterPushDown::Inexact - /// # Example - /// - /// ```rust - /// # use std::any::Any; - /// # use std::sync::Arc; - /// # use arrow_schema::SchemaRef; - /// # use async_trait::async_trait; - /// # use datafusion::datasource::TableProvider; - /// # use datafusion::error::{Result, DataFusionError}; - /// # use datafusion::execution::context::SessionState; - /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; - /// # use datafusion_physical_plan::ExecutionPlan; - /// // Define a struct that implements the TableProvider trait - /// struct TestDataSource {} - /// - /// #[async_trait] - /// impl TableProvider for TestDataSource { - /// # fn as_any(&self) -> &dyn Any { todo!() } - /// # fn schema(&self) -> SchemaRef { todo!() } - /// # fn table_type(&self) -> TableType { todo!() } - /// # async fn scan(&self, s: &SessionState, p: Option<&Vec>, f: &[Expr], l: Option) -> Result> { - /// todo!() - /// # } - /// // Override the supports_filters_pushdown to evaluate which expressions - /// // to accept as pushdown predicates. - /// fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { - /// // Process each filter - /// let support: Vec<_> = filters.iter().map(|expr| { - /// match expr { - /// // This example only supports a between expr with a single column named "c1". - /// Expr::Between(between_expr) => { - /// between_expr.expr - /// .try_into_col() - /// .map(|column| { - /// if column.name == "c1" { - /// TableProviderFilterPushDown::Exact - /// } else { - /// TableProviderFilterPushDown::Unsupported - /// } - /// }) - /// // If there is no column in the expr set the filter to unsupported. - /// .unwrap_or(TableProviderFilterPushDown::Unsupported) - /// } - /// _ => { - /// // For all other cases return Unsupported. - /// TableProviderFilterPushDown::Unsupported - /// } - /// } - /// }).collect(); - /// Ok(support) - /// } - /// } - /// ``` - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> Result> { - Ok(vec![ - TableProviderFilterPushDown::Unsupported; - filters.len() - ]) - } - - /// Get statistics for this table, if available - fn statistics(&self) -> Option { - None - } - - /// Return an [`ExecutionPlan`] to insert data into this table, if - /// supported. - /// - /// The returned plan should return a single row in a UInt64 - /// column called "count" such as the following - /// - /// ```text - /// +-------+, - /// | count |, - /// +-------+, - /// | 6 |, - /// +-------+, - /// ``` - /// - /// # See Also - /// - /// See [`DataSinkExec`] for the common pattern of inserting a - /// streams of `RecordBatch`es as files to an ObjectStore. - /// - /// [`DataSinkExec`]: crate::physical_plan::insert::DataSinkExec - async fn insert_into( - &self, - _state: &SessionState, - _input: Arc, - _overwrite: bool, - ) -> Result> { - not_impl_err!("Insert into not implemented for this table") - } -} - -/// A factory which creates [`TableProvider`]s at runtime given a URL. -/// -/// For example, this can be used to create a table "on the fly" -/// from a directory of files only when that name is referenced. -#[async_trait] -pub trait TableProviderFactory: Sync + Send { - /// Create a TableProvider with the given url - async fn create( - &self, - state: &SessionState, - cmd: &CreateExternalTable, - ) -> Result>; -} /// The default [`TableProviderFactory`] /// @@ -318,7 +50,7 @@ impl DefaultTableFactory { impl TableProviderFactory for DefaultTableFactory { async fn create( &self, - state: &SessionState, + state: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { let mut unbounded = cmd.unbounded; diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 9cfdb7bb1168..682565aea909 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -25,9 +25,8 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use crate::datasource::provider::TableProviderFactory; -use crate::datasource::{create_ordering, TableProvider}; -use crate::execution::context::SessionState; +use crate::catalog::{TableProvider, TableProviderFactory}; +use crate::datasource::create_ordering; use arrow_array::{RecordBatch, RecordBatchReader, RecordBatchWriter}; use arrow_schema::SchemaRef; @@ -42,6 +41,7 @@ use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use async_trait::async_trait; +use datafusion_catalog::Session; use futures::StreamExt; /// A [`TableProviderFactory`] for [`StreamTable`] @@ -52,7 +52,7 @@ pub struct StreamTableFactory {} impl TableProviderFactory for StreamTableFactory { async fn create( &self, - state: &SessionState, + state: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); @@ -322,7 +322,7 @@ impl TableProvider for StreamTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], limit: Option, @@ -347,7 +347,7 @@ impl TableProvider for StreamTable { async fn insert_into( &self, - _state: &SessionState, + _state: &dyn Session, input: Arc, _overwrite: bool, ) -> Result> { diff --git a/datafusion/core/src/datasource/streaming.rs b/datafusion/core/src/datasource/streaming.rs index 205faee43334..f9ded357b5a5 100644 --- a/datafusion/core/src/datasource/streaming.rs +++ b/datafusion/core/src/datasource/streaming.rs @@ -23,14 +23,13 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use datafusion_common::{plan_err, Result}; -use datafusion_expr::{Expr, TableType}; -use log::debug; - use crate::datasource::TableProvider; -use crate::execution::context::SessionState; use crate::physical_plan::streaming::{PartitionStream, StreamingTableExec}; use crate::physical_plan::ExecutionPlan; +use datafusion_catalog::Session; +use datafusion_common::{plan_err, Result}; +use datafusion_expr::{Expr, TableType}; +use log::debug; /// A [`TableProvider`] that streams a set of [`PartitionStream`] pub struct StreamingTable { @@ -85,7 +84,7 @@ impl TableProvider for StreamingTable { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], limit: Option, diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 3f024a6b4cb7..98d118c027b7 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -21,6 +21,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion_catalog::Session; use datafusion_common::Column; use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown}; @@ -31,7 +32,6 @@ use crate::{ }; use crate::datasource::{TableProvider, TableType}; -use crate::execution::context::SessionState; /// An implementation of `TableProvider` that uses another logical plan. pub struct ViewTable { @@ -103,7 +103,7 @@ impl TableProvider for ViewTable { async fn scan( &self, - state: &SessionState, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 18db4dc8eb0a..9b889c37ab52 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -23,16 +23,18 @@ use std::sync::{Arc, Weak}; use super::options::ReadOptions; use crate::{ - catalog::listing_schema::ListingSchemaProvider, - catalog::schema::MemorySchemaProvider, - catalog::{CatalogProvider, CatalogProviderList, MemoryCatalogProvider}, + catalog::{ + CatalogProvider, CatalogProviderList, TableProvider, TableProviderFactory, + }, + catalog_common::listing_schema::ListingSchemaProvider, + catalog_common::memory::MemorySchemaProvider, + catalog_common::MemoryCatalogProvider, dataframe::DataFrame, datasource::{ function::{TableFunction, TableFunctionImpl}, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, - provider::TableProviderFactory, }, - datasource::{provider_as_source, MemTable, TableProvider, ViewTable}, + datasource::{provider_as_source, MemTable, ViewTable}, error::{DataFusionError, Result}, execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry}, logical_expr::AggregateUDF, @@ -1579,7 +1581,7 @@ mod tests { use datafusion_common_runtime::SpawnedTask; - use crate::catalog::schema::SchemaProvider; + use crate::catalog::SchemaProvider; use crate::execution::session_state::SessionStateBuilder; use crate::physical_planner::PhysicalPlanner; use async_trait::async_trait; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index cc2b44cf1933..226e8085341e 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -17,13 +17,14 @@ //! [`SessionState`]: information required to run queries in a session -use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA}; -use crate::catalog::schema::SchemaProvider; -use crate::catalog::{CatalogProviderList, MemoryCatalogProviderList}; +use crate::catalog::{CatalogProviderList, SchemaProvider, TableProviderFactory}; +use crate::catalog_common::information_schema::{ + InformationSchemaProvider, INFORMATION_SCHEMA, +}; +use crate::catalog_common::MemoryCatalogProviderList; use crate::datasource::cte_worktable::CteWorkTable; use crate::datasource::file_format::{format_as_file_type, FileFormatFactory}; use crate::datasource::function::{TableFunction, TableFunctionImpl}; -use crate::datasource::provider::TableProviderFactory; use crate::datasource::provider_as_source; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::execution::SessionStateDefaults; @@ -32,6 +33,7 @@ use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use arrow_schema::{DataType, SchemaRef}; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use datafusion_catalog::Session; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions}; use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; @@ -68,6 +70,7 @@ use itertools::Itertools; use log::{debug, info}; use sqlparser::ast::Expr as SQLExpr; use sqlparser::dialect::dialect_from_str; +use std::any::Any; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; @@ -144,7 +147,7 @@ pub struct SessionState { /// `CREATE EXTERNAL TABLE ... STORED AS ` for custom file /// formats other than those built into DataFusion /// - /// [`TableProvider`]: crate::datasource::provider::TableProvider + /// [`TableProvider`]: crate::catalog::TableProvider table_factories: HashMap>, /// Runtime environment runtime_env: Arc, @@ -180,6 +183,56 @@ impl Debug for SessionState { } } +#[async_trait] +impl Session for SessionState { + fn session_id(&self) -> &str { + self.session_id() + } + + fn config(&self) -> &SessionConfig { + self.config() + } + + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + ) -> datafusion_common::Result> { + self.create_physical_plan(logical_plan).await + } + + fn create_physical_expr( + &self, + expr: Expr, + df_schema: &DFSchema, + ) -> datafusion_common::Result> { + self.create_physical_expr(expr, df_schema) + } + + fn scalar_functions(&self) -> &HashMap> { + self.scalar_functions() + } + + fn aggregate_functions(&self) -> &HashMap> { + self.aggregate_functions() + } + + fn window_functions(&self) -> &HashMap> { + self.window_functions() + } + + fn runtime_env(&self) -> &Arc { + self.runtime_env() + } + + fn execution_props(&self) -> &ExecutionProps { + self.execution_props() + } + + fn as_any(&self) -> &dyn Any { + self + } +} + impl SessionState { /// Returns new [`SessionState`] using the provided /// [`SessionConfig`] and [`RuntimeEnv`]. @@ -465,14 +518,14 @@ impl SessionState { /// /// See [`catalog::resolve_table_references`] for more information. /// - /// [`catalog::resolve_table_references`]: crate::catalog::resolve_table_references + /// [`catalog::resolve_table_references`]: crate::catalog_common::resolve_table_references pub fn resolve_table_references( &self, statement: &datafusion_sql::parser::Statement, ) -> datafusion_common::Result> { let enable_ident_normalization = self.config.options().sql_parser.enable_ident_normalization; - let (table_refs, _) = crate::catalog::resolve_table_references( + let (table_refs, _) = crate::catalog_common::resolve_table_references( statement, enable_ident_normalization, )?; diff --git a/datafusion/core/src/execution/session_state_defaults.rs b/datafusion/core/src/execution/session_state_defaults.rs index b7e7b5f0955f..07420afe842f 100644 --- a/datafusion/core/src/execution/session_state_defaults.rs +++ b/datafusion/core/src/execution/session_state_defaults.rs @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::catalog::listing_schema::ListingSchemaProvider; -use crate::catalog::{CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider}; +use crate::catalog::{CatalogProvider, TableProviderFactory}; +use crate::catalog_common::listing_schema::ListingSchemaProvider; +use crate::catalog_common::{MemoryCatalogProvider, MemorySchemaProvider}; use crate::datasource::file_format::arrow::ArrowFormatFactory; use crate::datasource::file_format::avro::AvroFormatFactory; use crate::datasource::file_format::csv::CsvFormatFactory; @@ -24,7 +25,7 @@ use crate::datasource::file_format::json::JsonFormatFactory; #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormatFactory; use crate::datasource::file_format::FileFormatFactory; -use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory}; +use crate::datasource::provider::DefaultTableFactory; use crate::execution::context::SessionState; #[cfg(feature = "nested_expressions")] use crate::functions_nested; diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 59a110646276..cf5a184e3416 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -500,7 +500,7 @@ pub const DATAFUSION_VERSION: &str = env!("CARGO_PKG_VERSION"); extern crate core; extern crate sqlparser; -pub mod catalog; +pub mod catalog_common; pub mod dataframe; pub mod datasource; pub mod error; @@ -535,6 +535,11 @@ pub use common::config; // NB datafusion execution is re-exported in the `execution` module +/// re-export of [`datafusion_catalog`] crate +pub mod catalog { + pub use datafusion_catalog::*; +} + /// re-export of [`datafusion_expr`] crate pub mod logical_expr { pub use datafusion_expr::*; diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index ba0509f3f51a..042febf32fd1 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -29,12 +29,12 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::catalog::{TableProvider, TableProviderFactory}; use crate::dataframe::DataFrame; -use crate::datasource::provider::TableProviderFactory; use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; -use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider}; +use crate::datasource::{empty::EmptyTable, provider_as_source}; use crate::error::Result; -use crate::execution::context::{SessionState, TaskContext}; +use crate::execution::context::TaskContext; use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; use crate::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, @@ -49,9 +49,9 @@ use datafusion_expr::{CreateExternalTable, Expr, TableType}; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; +use datafusion_catalog::Session; use futures::Stream; use tempfile::TempDir; - // backwards compatibility #[cfg(feature = "parquet")] pub use datafusion_common::test_util::parquet_test_data; @@ -177,7 +177,7 @@ pub struct TestTableFactory {} impl TableProviderFactory for TestTableFactory { async fn create( &self, - _: &SessionState, + _: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { Ok(Arc::new(TestTableProvider { @@ -213,7 +213,7 @@ impl TableProvider for TestTableProvider { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index eebc946ccb68..7c051ffaa7e1 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -26,7 +26,7 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; -use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::logical_expr::{ col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE, }; @@ -43,6 +43,7 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::{ExecutionMode, PlanProperties}; use async_trait::async_trait; +use datafusion_catalog::Session; use futures::stream::Stream; mod provider_filter_pushdown; @@ -212,7 +213,7 @@ impl TableProvider for CustomTableProvider { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index b5506b7c12f6..e91bb023ef38 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -21,9 +21,10 @@ use std::sync::Arc; use arrow::array::{Int32Builder, Int64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion::datasource::provider::{TableProvider, TableType}; +use datafusion::catalog::TableProvider; +use datafusion::datasource::provider::TableType; use datafusion::error::Result; -use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::execution::context::TaskContext; use datafusion::logical_expr::TableProviderFilterPushDown; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ @@ -39,6 +40,7 @@ use datafusion_functions_aggregate::expr_fn::count; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; +use datafusion_catalog::Session; fn create_batch(value: i32, num_rows: usize) -> Result { let mut builder = Int32Builder::with_capacity(num_rows); @@ -162,7 +164,7 @@ impl TableProvider for CustomProvider { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], _: Option, diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 2d42b03bfed8..41d182a3767b 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -20,7 +20,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::execution::context::TaskContext; use datafusion::{ datasource::{TableProvider, TableType}, error::Result, @@ -36,6 +36,7 @@ use datafusion_common::{project_schema, stats::Precision}; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; +use datafusion_catalog::Session; /// This is a testing structure for statistics /// It will act both as a table provider and execution plan @@ -89,7 +90,7 @@ impl TableProvider for StatisticsValidation { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], // limit is ignored because it is not mandatory for a `TableProvider` to honor it diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index bc2c3315da59..f62a019eb960 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -35,7 +35,6 @@ use tokio::fs::File; use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::execution::context::SessionState; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::execution::session_state::SessionStateBuilder; @@ -45,6 +44,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use datafusion_common::{assert_contains, Result}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_catalog::Session; use datafusion_execution::TaskContext; use test_utils::AccessLogGenerator; @@ -792,7 +792,7 @@ impl TableProvider for SortedTableProvider { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index 1e8d30cab638..5fd3b7a03384 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -24,11 +24,11 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; -use datafusion::execution::context::SessionState; use datafusion::execution::TaskContext; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::SessionContext; +use datafusion_catalog::Session; use datafusion_common::{assert_batches_eq, DFSchema, ScalarValue}; use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType}; use std::fs::File; @@ -127,7 +127,7 @@ impl TableProvider for SimpleCsvTable { async fn scan( &self, - state: &SessionState, + state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, @@ -161,7 +161,7 @@ impl TableProvider for SimpleCsvTable { } impl SimpleCsvTable { - async fn interpreter_expr(&self, state: &SessionState) -> Result { + async fn interpreter_expr(&self, state: &dyn Session) -> Result { use datafusion::logical_expr::expr_rewriter::normalize_col; use datafusion::logical_expr::utils::columnize_expr; let plan = LogicalPlan::EmptyRelation(EmptyRelation { diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 9c81c4852783..1bd6e9ad34b4 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -30,12 +30,11 @@ use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; use std::vec; +use datafusion::catalog::{TableProvider, TableProviderFactory}; use datafusion::datasource::file_format::arrow::ArrowFormatFactory; use datafusion::datasource::file_format::csv::CsvFormatFactory; use datafusion::datasource::file_format::parquet::ParquetFormatFactory; use datafusion::datasource::file_format::{format_as_file_type, DefaultFileType}; -use datafusion::datasource::provider::TableProviderFactory; -use datafusion::datasource::TableProvider; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::execution::FunctionRegistry; use datafusion::functions_aggregate::count::count_udaf; diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index dd27727e3ad5..224a0e18eac4 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -27,12 +27,12 @@ use arrow::array::{ }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; -use datafusion::execution::context::SessionState; use datafusion::logical_expr::{create_udf, ColumnarValue, Expr, ScalarUDF, Volatility}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionConfig; use datafusion::{ - catalog::{schema::MemorySchemaProvider, CatalogProvider, MemoryCatalogProvider}, + catalog::CatalogProvider, + catalog_common::{memory::MemoryCatalogProvider, memory::MemorySchemaProvider}, datasource::{MemTable, TableProvider, TableType}, prelude::{CsvReadOptions, SessionContext}, }; @@ -40,6 +40,7 @@ use datafusion_common::cast::as_float64_array; use datafusion_common::DataFusionError; use async_trait::async_trait; +use datafusion::catalog::Session; use log::info; use tempfile::TempDir; @@ -221,7 +222,7 @@ pub async fn register_temp_table(ctx: &SessionContext) { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, _: Option<&Vec>, _: &[Expr], _: Option, diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index f53ac6cfae97..a250e880913c 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -112,7 +112,7 @@ impl CustomDataSource { impl TableProvider for CustomDataSource { async fn scan( &self, - _state: &SessionState, + _state: &dyn Session, projection: Option<&Vec>, // filters and limit can be used here to inject some push-down operations if needed _filters: &[Expr],