Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Expr creation for ScalarUDF: Resolve function calls by name during planning #8157

Closed
2010YOUY01 opened this issue Nov 14, 2023 · 9 comments · Fixed by #8705
Closed
Labels
enhancement New feature or request

Comments

@2010YOUY01
Copy link
Contributor

Is your feature request related to a problem or challenge?

Motivation

There is ongoing work migrating BuitlinScalarFunctions -> ScalarUDF #8045, and we noticed one Expr related issue during prototyping:

We can use Expr API to create builtin scalar functions directly:

let expr1 = abs(lit(-1));
let expr2 = call_fn("abs", vec![lit(-1)]);

We want to still use this API after functions are migrated to ScalarUDF based implementation, it's not possible now because those Expr creations are stateless, and ScalarUDFs are registered inside SessionState. It's only possible if we do

let ctx = create_context()?;
ctx.register_udf(abs_impl);
let expr2 = ctx.call_fn("abs", vec![lit(-1)]);

Describe the solution you'd like

To continue supporting the original stateless Expr creation API, we can create Expr with only the string function name, and resolve the function during logical optimization.
Then call_fn("abs", vec![lit(-1)]) can be supported (now it only support BuitlinScalarFunction and don't support UDFs)
Another macro-based API abs(lit(-1)) can be supported if we hard code all possible function names within the core (should we do that?)

Potential implementation is:

  1. After let expr2 = call_fn("abs", vec![lit(-1)]);, create a ScalarUDF expression with dummy implementation.
  2. Add an AnalyzerRule(a mandatory logical optimizer rule) to resolve this UDF name using external functions registered in SessionState

Issue:

Now function implementation is inside SessionState but outside SessionConfig, and the analyzer can only access SessionConfig.
We have to move the function registry into SessionConfig first (or is there any better way?)

pub struct SessionState {
    // ...
    /// Scalar functions that are registered with the context
    scalar_functions: HashMap<String, Arc<ScalarUDF>>,
    /// Session configuration
    config: SessionConfig,
// ...
// `self.options()` is `ConfigOption`
            let analyzed_plan =
                self.analyzer
                    .execute_and_check(plan, self.options(), |_, _| {})?;

cc @alamb I wonder if you have any thoughts on this approach 🤔

Describe alternatives you've considered

No response

Additional context

No response

@2010YOUY01 2010YOUY01 added the enhancement New feature or request label Nov 14, 2023
@alamb
Copy link
Contributor

alamb commented Nov 14, 2023

Thanks @2010YOUY01 !

We have to move the function registry into SessionConfig first (or is there any better way?)

What if we changed how Expr::ScalarFunction looks (and remove Expr::ScalarUDF:

enum Expr {
...
    /// Represents the call of a built-in, or UDF scalar function with a set of arguments.
    ScalarFunction(ScalarFunction),
...
}

Instead of

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarFunction {
    /// The function
    pub fun: built_in_function::BuiltinScalarFunction,
    /// List of expressions to feed to the functions as arguments
    pub args: Vec<Expr>,
}

Make it look like

pub enum ScalarFunctionDefinition {
  /// Resolved to a built in scalar function
  /// (will be removed long term)
  BuiltIn(built_in_function::BuiltinScalarFunction),
  /// Resolved to a user defined function
  UDF(ScalarUDF),
  /// A scalar function that will be called by name
  Name(Arc<str>),
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarFunction {
    /// The function
    pub fun: ScalarFunctionDefinition,
    /// List of expressions to feed to the functions as arguments
    pub args: Vec<Expr>,
}

And that way an expr function like abs could look like

fn abs(arg: Expr) -> Expr {
  Expr::ScaarFunction {
    fun: ScalarFunctionDefintion::Name("abs".into()),
    args: vec![arg],
  }
}

I am not sure how large of a change this would be -- we would have to try it and see what it looked like.

@alamb
Copy link
Contributor

alamb commented Nov 14, 2023

, and the analyzer can only access SessionConfig.

I think passing a impl FunctionRegistry to the analyzer rule would be another way (so we could pass SessionState to that rule without it having to know about the actual SessionState

@alamb
Copy link
Contributor

alamb commented Nov 28, 2023

@2010YOUY01 I wonder if you still plan to work on this item? If not, no worries I can do it, but I wanted to check with you before doing so

@2010YOUY01
Copy link
Contributor Author

@2010YOUY01 I wonder if you still plan to work on this item? If not, no worries I can do it, but I wanted to check with you before doing so

Please feel free to proceed if you would like it to get done sooner, thank you @alamb 🙏🏼
I will catch up next week when I have more available time to help

@alamb alamb changed the title Support Expr creation for ScalarUDF Support Expr creation for ScalarUDF: Resolve function calls by name during planning Dec 5, 2023
@edmondop
Copy link
Contributor

edmondop commented Dec 6, 2023

@alamb I checked out and on main the structs and enums are already written as you recommended

pub enum ScalarFunctionDefinition {
  /// Resolved to a built in scalar function
  /// (will be removed long term)
  BuiltIn(built_in_function::BuiltinScalarFunction),
  /// Resolved to a user defined function
  UDF(ScalarUDF),
  /// A scalar function that will be called by name
  Name(Arc<str>),
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarFunction {
    /// The function
    pub fun: ScalarFunctionDefinition,
    /// List of expressions to feed to the functions as arguments
    pub args: Vec<Expr>,
}

So I am not sure I understood correctly. I could see that call_fn resolves at invocation site rather than at planning site, and changing the behavior would probably mean changing this code. What is the right type of Expr this code should return?

/// Calls a named built in function
/// ```
/// use datafusion_expr::{col, lit, call_fn};
///
/// // create the expression sin(x) < 0.2
/// let expr = call_fn("sin", vec![col("x")]).unwrap().lt(lit(0.2));
/// ```
pub fn call_fn(name: impl AsRef<str>, args: Vec<Expr>) -> Result<Expr> {
    match name.as_ref().parse::<BuiltinScalarFunction>() {
        Ok(fun) => Ok(Expr::ScalarFunction(ScalarFunction::new(fun, args))),
        Err(e) => Err(e),
    }
}

@alamb
Copy link
Contributor

alamb commented Dec 6, 2023

So I am not sure I understood correctly. I could see that call_fn resolves at invocation site rather than at planning site, and changing the behavior would probably mean changing this code. What is the right type of Expr this code should return?

I think the idea would be to make it be something like this

/// Calls a named function
/// ```
/// use datafusion_expr::{col, lit, call_fn};
///
/// // create the expression sin(x) < 0.2
/// let expr = call_fn("sin", vec![col("x")]).unwrap().lt(lit(0.2));
/// ```
pub fn call_fn(name: impl AsRef<str>, args: Vec<Expr>) -> Result<Expr> {
     Ok(Expr::ScalarFunction(ScalarFunctionDefinition::Name(Arc::new(name.as_ref()))
}```

And then add code to replace instances of `ScalarFunctionDefinition::Name` with either `ScalarFunctionDefinition::BuiltIn` or `ScalarFunction::Udf` to the optimizer. 

Perhaps you can use the same logic here: https://github.com/apache/arrow-datafusion/blob/d9d8ddd5f770817f325190c4c0cc02436e7777e6/datafusion/sql/src/expr/function.rs#L66-L76

BTW the point of doing this is so we can remove `BuiltInScalarFunction` eventually  incrementally and treat all functions the same

@alamb
Copy link
Contributor

alamb commented Dec 15, 2023

Update here is that @edmondop tried to implement what is suggested in this issue (❤️ ) and we got a bit hung up on being able to keep the expr_api backwards compatible. See #8447 (comment)

I took another shot at trying to implement expr_fns such as encode() and decode() in #8046 and I think I got it to work without having to change the public API (it does require moving the expr_fns into the same module as the functions, but I think that is unavoidable if we want to pull functions into their own modules)

So therefore I think we might be able to close this issue as "won't fix" -- what do you think @edmondop and @2010YOUY01 ? If we do do that, we should consider what to do with the existing Name function defintion

@edmondop
Copy link
Contributor

How does this affect the effort of having a single API for scalar functions, removing the separation between BuiltIn and UDF?

@alamb
Copy link
Contributor

alamb commented Dec 16, 2023

How does this affect the effort of having a single API for scalar functions, removing the separation between BuiltIn and UDF?

I think it offers a way to still have a single API for scalar functions without having to do String --> ScalarUDF resolution as an analysis pass.

It also has the nice property that the expr_fns like concat() or now() will only compile if they are actually available, rather than a runtime error with string resolution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants