Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom operator support #11137

Closed
wants to merge 11 commits into from
Closed

Conversation

samuelcolvin
Copy link
Contributor

@samuelcolvin samuelcolvin commented Jun 26, 2024

I think this is ready to review.

WIP to decide if we progress cc @alamb.

This is based on #11132 since I need Operator to be non-copyable. If we decide to progress I can rebase this, or cherry pick and create a new branch.

You can see this without the churn of #11132 at samuelcolvin#1.

high level question

Is all the effort here worth it, should we just add the 20 or so extra operators from the sql library to Operator?

Advantages of this route:

  • more flexibility in how operators behave, what their signatures is, precedence, negation etc.
  • easier to use custom operators or new operators adding to the SQL library without waiting for datafusion to support them

Disadvantages:

  • a lot of faff here it's not that bad

To to

  • How should ParseCustomOperator be passed into the SQL parser, it definitely shouldn't be as it is now, perhaps we should have an equivalent of register_function_rewrite like, e.g. register_custom_operators? DONE
  • is the hack with WrapCustomOperator necessary and acceptable, maybe someone else's Rust foo could avoid this? I think what we have is good
  • should CustomOperator provide a get_udf method, then we rewrite to that function automatically, rather than relying on register_function_rewrite? I don't think so, the current write logic is more powerful
  • what should we do about from_proto_binary_op, we can't keep the same signature and support custom operators, this might be easy to fix - done by adding the register methods to FunctionRegistry
  • is it okay to rely on name() of the operator to implement equality, ordering and hashing? I think it's good
  • Needs tests - basic tests are done, LMK what more is needed

Example Usage:

Here's a trivial example of usage that just replaces the "->" operator with string concat:
use std::sync::Arc;

use datafusion::arrow::datatypes::DataType;
use datafusion::common::config::ConfigOptions;
use datafusion::common::tree_node::Transformed;
use datafusion::common::DFSchema;
use datafusion::error::Result;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::expr_rewriter::FunctionRewrite;
use datafusion::logical_expr::{
    CustomOperator, Operator, ParseCustomOperator, WrapCustomOperator,
};
use datafusion::prelude::*;
use datafusion::sql::sqlparser::ast::BinaryOperator;

#[derive(Debug)]
enum MyCustomOperator {
    /// Question, like `?`
    Question,
    /// Arrow, like `->`
    Arrow,
    /// Long arrow, like `->>`
    LongArrow,
}

impl std::fmt::Display for MyCustomOperator {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            MyCustomOperator::Question => write!(f, "?"),
            MyCustomOperator::Arrow => write!(f, "->"),
            MyCustomOperator::LongArrow => write!(f, "->>"),
        }
    }
}

impl CustomOperator for MyCustomOperator {
    fn binary_signature(
        &self,
        lhs: &DataType,
        rhs: &DataType,
    ) -> Result<(DataType, DataType, DataType)> {
        Ok((lhs.clone(), rhs.clone(), lhs.clone()))
    }

    fn op_to_sql(&self) -> Result<BinaryOperator> {
        match self {
            MyCustomOperator::Question => Ok(BinaryOperator::Question),
            MyCustomOperator::Arrow => Ok(BinaryOperator::Arrow),
            MyCustomOperator::LongArrow => Ok(BinaryOperator::LongArrow),
        }
    }

    fn name(&self) -> &'static str {
        match self {
            MyCustomOperator::Question => "Question",
            MyCustomOperator::Arrow => "Arrow",
            MyCustomOperator::LongArrow => "LongArrow",
        }
    }
}

impl TryFrom<&str> for MyCustomOperator {
    type Error = ();

    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
        match value {
            "Question" => Ok(MyCustomOperator::Question),
            "Arrow" => Ok(MyCustomOperator::Arrow),
            "LongArrow" => Ok(MyCustomOperator::LongArrow),
            _ => Err(()),
        }
    }
}

#[derive(Debug)]
struct CustomOperatorParser;

impl ParseCustomOperator for CustomOperatorParser {
    fn name(&self) -> &str {
        "PostgresParseCustomOperator"
    }

    fn op_from_ast(&self, op: &BinaryOperator) -> Result<Option<Operator>> {
        match op {
            BinaryOperator::Question => Ok(Some(MyCustomOperator::Question.into())),
            BinaryOperator::Arrow => Ok(Some(MyCustomOperator::Arrow.into())),
            BinaryOperator::LongArrow => Ok(Some(MyCustomOperator::LongArrow.into())),
            _ => Ok(None),
        }
    }

    fn op_from_name(&self, raw_op: &str) -> Result<Option<Operator>> {
        if let Ok(op) = MyCustomOperator::try_from(raw_op) {
            Ok(Some(op.into()))
        } else {
            Ok(None)
        }
    }
}

impl FunctionRewrite for CustomOperatorParser {
    fn name(&self) -> &str {
        "PostgresParseCustomOperator"
    }

    fn rewrite(
        &self,
        expr: Expr,
        _schema: &DFSchema,
        _config: &ConfigOptions,
    ) -> Result<Transformed<Expr>> {
        if let Expr::BinaryExpr(bin_expr) = &expr {
            if let Operator::Custom(WrapCustomOperator(op)) = &bin_expr.op {
                if let Ok(pg_op) = MyCustomOperator::try_from(op.name()) {
                    // return BinaryExpr with a different operator
                    let mut bin_expr = bin_expr.clone();
                    bin_expr.op = match pg_op {
                        MyCustomOperator::Arrow => Operator::StringConcat,
                        MyCustomOperator::LongArrow => Operator::Plus,
                        MyCustomOperator::Question => Operator::Minus,
                    };
                    return Ok(Transformed::yes(Expr::BinaryExpr(bin_expr)));
                }
            }
        }
        Ok(Transformed::no(expr))
    }
}

async fn run(sql: &str) -> Result<()> {
    let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", "postgres");
    let mut ctx = SessionContext::new_with_config(config);
    ctx.register_function_rewrite(Arc::new(CustomOperatorParser))?;
    ctx.register_parse_custom_operator(Arc::new(CustomOperatorParser))?;
    let df = ctx.sql(sql).await?;
    df.show().await
}

#[tokio::main]
async fn main() {
    run("select 'foo'->'bar';").await.unwrap();
    run("select 1->>2;").await.unwrap();
    run("select 1 ? 2;").await.unwrap();
}

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate substrait labels Jun 26, 2024
@samuelcolvin
Copy link
Contributor Author

This seems to be working well with datafusion-functions-json, see datafusion-contrib/datafusion-functions-json#22

@alamb
Copy link
Contributor

alamb commented Jun 28, 2024

I plan to review this tomorrow morning

@samuelcolvin
Copy link
Contributor Author

samuelcolvin commented Jun 28, 2024

I've found an issue whilst working on datafusion-contrib/datafusion-functions-json#22

Lexical Precedence doesn't work correctly

See datafusion-contrib/datafusion-functions-json@f573a39

The following works as expected in postgres:

select '{"a": "b"}'::json->>'a'='b'
-- True

But with datafusion

select '{"a": "b"}'->>'a'='b'
-- Error during planning: Unexpected argument type to 'LongArrow' at position 2, expected string or int, got Boolean.

Same happens for other common code like '{"a": "b"}'->>'a' is null.

Happens for both -> and ->>.

I tried setting precedence on the operator to the max value, but it didn't help.


Same issue applies to the existing at arrows:

> select [1,2,3]@>[1] is null;
-- Error during planning: Cannot infer common array type for arrow operation List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) @> Boolean

@samuelcolvin
Copy link
Contributor Author

Precedence issue looks like it's an issue in sqlparser - sqlparser-rs/sqlparser-rs#814 (comment) 😞

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @samuelcolvin

I think this PR looks really quite good to be honest. It is well commented / documented / structured ❤️

However, to respond to your high level question

Is all the effort here worth it, should we just add the 20 or so extra operators from the sql library to Operator?

I personally think this is a very reasonable alternative and given what I am seeing in this PR from what it would required to support custom operators, I would tend to agree that simply adding new enum variants to Operators might be the best way to go

I think some PRs from @jayzhan211 recently may be related (e.g. #11155) where the planner is translating operators into function calls for array operators. Maybe we could extend that idea so instead of hard coding the function names in the planner (which isn't ideal from my perspective) the translation of "operator --> function" comes from a table or trait or something 🤔

What do you think @jayzhan211 ?

@alamb
Copy link
Contributor

alamb commented Jun 28, 2024

cc @phillipleblanc as I think you may be interested in this too

@alamb
Copy link
Contributor

alamb commented Jun 28, 2024

An also related conversation maybe: #10534

@samuelcolvin
Copy link
Contributor Author

Is all the effort here worth it, should we just add the 20 or so extra operators from the sql library to Operator?

@alamb sorry I wasn't clear, having implemented this, I'm in favour of sticking to this API, it'll make customising how operators behave much easier than hard coding them all within datafusion. It'll also be much easier to avoid any breaking changes for those who don't want to support more operators.

I'll let you make the final decision.

@alamb
Copy link
Contributor

alamb commented Jun 28, 2024

I am working on an alternate proposal too -- stay tuned

@alamb
Copy link
Contributor

alamb commented Jun 28, 2024

Here is an alternate idea #11168

@alamb
Copy link
Contributor

alamb commented Jul 1, 2024

@jayzhan211's alternate PR here #11180 is looking good in my opinion

@samuelcolvin
Copy link
Contributor Author

closing this, I'll follow up on #11180.

This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sql SQL Planner substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants