diff --git a/Cargo.toml b/Cargo.toml index af975e42b44b7..5f93fe630254c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ ctor = "0.2.0" datafusion = { path = "datafusion/core" } datafusion-common = { path = "datafusion/common" } datafusion-expr = { path = "datafusion/expr" } +datafusion-functions = { path = "datafusion/functions" } datafusion-sql = { path = "datafusion/sql" } datafusion-optimizer = { path = "datafusion/optimizer" } datafusion-physical-expr = { path = "datafusion/physical-expr" } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 4015ba439e67d..ddf6ba8960795 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -40,7 +40,7 @@ backtrace = ["datafusion-common/backtrace"] compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"] crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"] default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"] -encoding_expressions = ["datafusion-physical-expr/encoding_expressions"] +encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] parquet = ["datafusion-common/parquet", "dep:parquet"] @@ -65,6 +65,7 @@ dashmap = { workspace = true } datafusion-common = { path = "../common", version = "32.0.0", features = ["object_store"], default-features = false } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions = { workspace = true } datafusion-optimizer = { path = "../optimizer", version = "32.0.0", default-features = false } datafusion-physical-expr = { path = "../physical-expr", version = "32.0.0", default-features = false } datafusion-physical-plan = { workspace = true } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 1df1924813b7a..8169c8fcb40e2 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -40,7 +40,6 @@ use datafusion_common::{ exec_err, not_impl_err, plan_datafusion_err, plan_err, tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}, }; -pub use datafusion_execution::registry::MutableFunctionRegistry; use datafusion_execution::registry::SerializerRegistry; use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, @@ -796,6 +795,48 @@ impl SessionContext { .add_var_provider(variable_type, provider); } + /// Registers a scalar UDF within this context. + /// + /// Note in SQL queries, function names are looked up using + /// lowercase unless the query uses quotes. For example, + /// + /// - `SELECT MY_FUNC(x)...` will look for a function named `"my_func"` + /// - `SELECT "my_FUNC"(x)` will look for a function named `"my_FUNC"` + pub fn register_udf(&self, f: ScalarUDF) { + self.state + .write() + .scalar_functions + .insert(f.name().to_string(), Arc::new(f)); + } + + /// Registers an aggregate UDF within this context. + /// + /// Note in SQL queries, aggregate names are looked up using + /// lowercase unless the query uses quotes. For example, + /// + /// - `SELECT MY_UDAF(x)...` will look for an aggregate named `"my_udaf"` + /// - `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"` + pub fn register_udaf(&self, f: AggregateUDF) { + self.state + .write() + .aggregate_functions + .insert(f.name.clone(), Arc::new(f)); + } + + /// Registers a window UDF within this context. + /// + /// Note in SQL queries, window function names are looked up using + /// lowercase unless the query uses quotes. For example, + /// + /// - `SELECT MY_UDWF(x)...` will look for a window function named `"my_udwf"` + /// - `SELECT "my_UDWF"(x)` will look for a window function named `"my_UDWF"` + pub fn register_udwf(&self, f: WindowUDF) { + self.state + .write() + .window_functions + .insert(f.name.clone(), Arc::new(f)); + } + /// Creates a [`DataFrame`] for reading a data source. /// /// For more control such as reading multiple files, you can use @@ -1117,50 +1158,6 @@ impl FunctionRegistry for SessionContext { } } -impl MutableFunctionRegistry for SessionContext { - /// Registers a scalar UDF within this context. - /// - /// Note in SQL queries, function names are looked up using - /// lowercase unless the query uses quotes. For example, - /// - /// - `SELECT MY_FUNC(x)...` will look for a function named `"my_func"` - /// - `SELECT "my_FUNC"(x)` will look for a function named `"my_FUNC"` - fn register_udf(&self, f: ScalarUDF) { - self.state - .write() - .scalar_functions - .insert(f.name().to_string(), Arc::new(f)); - } - - /// Registers an aggregate UDF within this context. - /// - /// Note in SQL queries, aggregate names are looked up using - /// lowercase unless the query uses quotes. For example, - /// - /// - `SELECT MY_UDAF(x)...` will look for an aggregate named `"my_udaf"` - /// - `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"` - fn register_udaf(&self, f: AggregateUDF) { - self.state - .write() - .aggregate_functions - .insert(f.name.clone(), Arc::new(f)); - } - - /// Registers a window UDF within this context. - /// - /// Note in SQL queries, window function names are looked up using - /// lowercase unless the query uses quotes. For example, - /// - /// - `SELECT MY_UDWF(x)...` will look for a window function named `"my_udwf"` - /// - `SELECT "my_UDWF"(x)` will look for a window function named `"my_UDWF"` - fn register_udwf(&self, f: WindowUDF) { - self.state - .write() - .window_functions - .insert(f.name.clone(), Arc::new(f)); - } -} - /// A planner used to add extensions to DataFusion logical and physical plans. #[async_trait] pub trait QueryPlanner { @@ -1301,6 +1298,10 @@ impl SessionState { ); } + // register built in functions + let mut scalar_functions = HashMap::new(); + datafusion_functions::register_all(&mut scalar_functions); + SessionState { session_id, analyzer: Analyzer::new(), @@ -1308,7 +1309,7 @@ impl SessionState { physical_optimizers: PhysicalOptimizer::new(), query_planner: Arc::new(DefaultQueryPlanner {}), catalog_list, - scalar_functions: HashMap::new(), + scalar_functions, aggregate_functions: HashMap::new(), window_functions: HashMap::new(), serializer_registry: Arc::new(EmptySerializerRegistry), @@ -1318,19 +1319,7 @@ impl SessionState { table_factories, } } - /// Returns new [`SessionState`] using the provided - /// [`SessionConfig`] and [`RuntimeEnv`]. - #[deprecated( - since = "32.0.0", - note = "Use SessionState::new_with_config_rt_and_catalog_list" - )] - pub fn with_config_rt_and_catalog_list( - config: SessionConfig, - runtime: Arc, - catalog_list: Arc, - ) -> Self { - Self::new_with_config_rt_and_catalog_list(config, runtime, catalog_list) - } + fn register_default_schema( config: &SessionConfig, table_factories: &HashMap>, diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index 3ddfd1d20c065..5cd8b3870f818 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -26,9 +26,7 @@ //! ``` pub use crate::dataframe::DataFrame; -pub use crate::execution::context::{ - MutableFunctionRegistry, SQLOptions, SessionConfig, SessionContext, -}; +pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext}; pub use crate::execution::options::{ AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, }; diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index 05cba2547944c..fb0ecd02c6b09 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -35,7 +35,6 @@ use datafusion::{ }, assert_batches_eq, error::Result, - execution::MutableFunctionRegistry, logical_expr::{ AccumulatorFactoryFunction, AggregateUDF, ReturnTypeFunction, Signature, StateTypeFunction, TypeSignature, Volatility, diff --git a/datafusion/core/tests/user_defined/user_defined_window_functions.rs b/datafusion/core/tests/user_defined/user_defined_window_functions.rs index b10ed8abc9a19..5f99391572174 100644 --- a/datafusion/core/tests/user_defined/user_defined_window_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_window_functions.rs @@ -31,7 +31,6 @@ use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; use arrow_schema::DataType; use datafusion::{assert_batches_eq, prelude::SessionContext}; use datafusion_common::{Result, ScalarValue}; -use datafusion_execution::MutableFunctionRegistry; use datafusion_expr::{ function::PartitionEvaluatorFactory, PartitionEvaluator, ReturnTypeFunction, Signature, Volatility, WindowUDF, diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 19749b3402d44..a1a1551c2ca61 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -28,6 +28,6 @@ mod stream; mod task; pub use disk_manager::DiskManager; -pub use registry::{FunctionRegistry, MutableFunctionRegistry}; +pub use registry::FunctionRegistry; pub use stream::{RecordBatchStream, SendableRecordBatchStream}; pub use task::TaskContext; diff --git a/datafusion/execution/src/registry.rs b/datafusion/execution/src/registry.rs index 603cef49cdd79..9ba487e715b3b 100644 --- a/datafusion/execution/src/registry.rs +++ b/datafusion/execution/src/registry.rs @@ -36,36 +36,6 @@ pub trait FunctionRegistry { fn udwf(&self, name: &str) -> Result>; } -/// A Function registry that can have functions registered -pub trait MutableFunctionRegistry { - /// Registers a scalar UDF within this context. - /// - /// Note in SQL queries, function names are looked up using - /// lowercase unless the query uses quotes. For example, - /// - /// - `SELECT MY_FUNC(x)...` will look for a function named `"my_func"` - /// - `SELECT "my_FUNC"(x)` will look for a function named `"my_FUNC"` - fn register_udf(&self, f: ScalarUDF); - - /// Registers an aggregate UDF within this context. - /// - /// Note in SQL queries, aggregate names are looked up using - /// lowercase unless the query uses quotes. For example, - /// - /// - `SELECT MY_UDAF(x)...` will look for an aggregate named `"my_udaf"` - /// - `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"` - fn register_udaf(&self, f: AggregateUDF); - - /// Registers a window UDF within this context. - /// - /// Note in SQL queries, window function names are looked up using - /// lowercase unless the query uses quotes. For example, - /// - /// - `SELECT MY_UDWF(x)...` will look for a window function named `"my_udwf"` - /// - `SELECT "my_UDWF"(x)` will look for a window function named `"my_UDWF"` - fn register_udwf(&self, f: WindowUDF); -} - /// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode]. pub trait SerializerRegistry: Send + Sync { /// Serialize this node to a byte array. This serialization should not include diff --git a/datafusion/functions/src/encoding/mod.rs b/datafusion/functions/src/encoding/mod.rs index 6c4309dad17e8..eaa745a1e1050 100644 --- a/datafusion/functions/src/encoding/mod.rs +++ b/datafusion/functions/src/encoding/mod.rs @@ -19,11 +19,11 @@ mod inner; use datafusion_common::arrow::datatypes::DataType; use datafusion_common::{plan_err, DataFusionError, Result}; -use datafusion_execution::registry::MutableFunctionRegistry; use datafusion_expr::TypeSignature::*; use datafusion_expr::{ ColumnarValue, FunctionImplementation, ScalarUDF, Signature, Volatility, }; +use std::collections::HashMap; use std::sync::{Arc, OnceLock}; use DataType::*; @@ -39,12 +39,14 @@ pub fn decode_udf() -> ScalarUDF { ScalarUDF::new_from_impl(Arc::new(DecodeFunc {})) } -/// Registers the `encode` and `decode` functions with the function registry -pub fn register(registry: &dyn MutableFunctionRegistry) -> Result<()> { - registry.register_udf(encode_udf()); - registry.register_udf(decode_udf()); +fn insert(registry: &mut HashMap>, udf: ScalarUDF) { + registry.insert(udf.name().to_string(), Arc::new(udf)); +} - Ok(()) +/// Registers the `encode` and `decode` functions with the function registry +pub fn register(registry: &mut HashMap>) { + insert(registry, encode_udf()); + insert(registry, decode_udf()); } struct EncodeFunc {} @@ -95,7 +97,7 @@ static DECODE_SIGNATURE: OnceLock = OnceLock::new(); impl FunctionImplementation for DecodeFunc { fn name(&self) -> &str { - "encode" + "decode" } fn signature(&self) -> &Signature { diff --git a/datafusion/functions/src/lib.rs b/datafusion/functions/src/lib.rs index ae7ce5b6aea4f..feecac18c0ee2 100644 --- a/datafusion/functions/src/lib.rs +++ b/datafusion/functions/src/lib.rs @@ -17,4 +17,13 @@ //! Several packages of built in functions for DataFusion +use datafusion_expr::ScalarUDF; +use std::collections::HashMap; +use std::sync::Arc; + pub mod encoding; + +/// Registers all "built in" functions from this crate with the provided registry +pub fn register_all(registry: &mut HashMap>) { + encoding::register(registry); +} diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index c5f777823fc3d..ca801df337f14 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -31,7 +31,6 @@ use datafusion::datasource::provider::TableProviderFactory; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; -use datafusion::execution::MutableFunctionRegistry; use datafusion::physical_plan::functions::make_scalar_function; use datafusion::prelude::{create_udf, CsvReadOptions, SessionConfig, SessionContext}; use datafusion::test_util::{TestTableFactory, TestTableProvider}; diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index ada660d8c20a2..e3cde25631698 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -25,7 +25,6 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::execution::context::ExecutionProps; -use datafusion::execution::MutableFunctionRegistry; use datafusion::logical_expr::{ create_udf, BuiltinScalarFunction, JoinType, Operator, Volatility, }; diff --git a/datafusion/proto/tests/cases/serialize.rs b/datafusion/proto/tests/cases/serialize.rs index bdc2a0cee061c..f32c81527925d 100644 --- a/datafusion/proto/tests/cases/serialize.rs +++ b/datafusion/proto/tests/cases/serialize.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::array::ArrayRef; use arrow::datatypes::DataType; -use datafusion::execution::{FunctionRegistry, MutableFunctionRegistry}; +use datafusion::execution::FunctionRegistry; use datafusion::physical_plan::functions::make_scalar_function; use datafusion::prelude::SessionContext; use datafusion_expr::{col, create_udf, lit};