Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 259 additions & 0 deletions rust/datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,179 @@ impl Expr {

visitor.post_visit(self)
}

/// Performs a depth first walk of an expression and its children
/// to rewrite an expression, consuming `self` producing a new
/// [`Expr`].
///
/// Implements a modified version of the [visitor
/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) to
/// separate algorithms from the structure of the `Expr` tree and
/// make it easier to write new, efficient expression
/// transformation algorithms.
///
/// For an expression tree such as
/// ```text
/// BinaryExpr (GT)
/// left: Column("foo")
/// right: Column("bar")
/// ```
///
/// The nodes are visited using the following order
/// ```text
/// pre_visit(BinaryExpr(GT))
/// pre_visit(Column("foo"))
/// mutatate(Column("foo"))
/// pre_visit(Column("bar"))
/// mutate(Column("bar"))
/// mutate(BinaryExpr(GT))
/// ```
///
/// If an Err result is returned, recursion is stopped immediately
///
/// If [`false`] is returned on a call to pre_visit, no
/// children of that expression are visited, nor is mutate
/// called on that expression
///
pub fn rewrite<R>(self, rewriter: &mut R) -> Result<Self>
where
R: ExprRewriter,
{
if !rewriter.pre_visit(&self)? {
return Ok(self);
};

// recurse into all sub expressions(and cover all expression types)
let expr = match self {
Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name),
Expr::Column(name) => Expr::Column(name),
Expr::ScalarVariable(names) => Expr::ScalarVariable(names),
Expr::Literal(value) => Expr::Literal(value),
Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
left: rewrite_boxed(left, rewriter)?,
op,
right: rewrite_boxed(right, rewriter)?,
},
Expr::Not(expr) => Expr::Not(rewrite_boxed(expr, rewriter)?),
Expr::IsNotNull(expr) => Expr::IsNotNull(rewrite_boxed(expr, rewriter)?),
Expr::IsNull(expr) => Expr::IsNull(rewrite_boxed(expr, rewriter)?),
Expr::Negative(expr) => Expr::Negative(rewrite_boxed(expr, rewriter)?),
Expr::Between {
expr,
low,
high,
negated,
} => Expr::Between {
expr: rewrite_boxed(expr, rewriter)?,
low: rewrite_boxed(low, rewriter)?,
high: rewrite_boxed(high, rewriter)?,
negated,
},
Expr::Case {
expr,
when_then_expr,
else_expr,
} => {
let expr = rewrite_option_box(expr, rewriter)?;
let when_then_expr = when_then_expr
.into_iter()
.map(|(when, then)| {
Ok((
rewrite_boxed(when, rewriter)?,
rewrite_boxed(then, rewriter)?,
))
})
.collect::<Result<Vec<_>>>()?;

let else_expr = rewrite_option_box(else_expr, rewriter)?;

Expr::Case {
expr,
when_then_expr,
else_expr,
}
}
Expr::Cast { expr, data_type } => Expr::Cast {
expr: rewrite_boxed(expr, rewriter)?,
data_type,
},
Expr::Sort {
expr,
asc,
nulls_first,
} => Expr::Sort {
expr: rewrite_boxed(expr, rewriter)?,
asc,
nulls_first,
},
Expr::ScalarFunction { args, fun } => Expr::ScalarFunction {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::ScalarUDF { args, fun } => Expr::ScalarUDF {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::AggregateFunction {
args,
fun,
distinct,
} => Expr::AggregateFunction {
args: rewrite_vec(args, rewriter)?,
fun,
distinct,
},
Expr::AggregateUDF { args, fun } => Expr::AggregateUDF {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::InList {
expr,
list,
negated,
} => Expr::InList {
expr: rewrite_boxed(expr, rewriter)?,
list,
negated,
},
Expr::Wildcard => Expr::Wildcard,
};

// now rewrite this expression itself
rewriter.mutate(expr)
}
}

#[allow(clippy::boxed_local)]
fn rewrite_boxed<R>(boxed_expr: Box<Expr>, rewriter: &mut R) -> Result<Box<Expr>>
where
R: ExprRewriter,
{
// TODO: It might be possible to avoid an allocation (the
// Box::new) below by reusing the box.
let expr: Expr = *boxed_expr;
let rewritten_expr = expr.rewrite(rewriter)?;
Ok(Box::new(rewritten_expr))
}

fn rewrite_option_box<R>(
option_box: Option<Box<Expr>>,
rewriter: &mut R,
) -> Result<Option<Box<Expr>>>
where
R: ExprRewriter,
{
option_box
.map(|expr| rewrite_boxed(expr, rewriter))
.transpose()
}

/// rewrite a `Vec` of `Expr`s with the rewriter
fn rewrite_vec<R>(v: Vec<Expr>, rewriter: &mut R) -> Result<Vec<Expr>>
where
R: ExprRewriter,
{
v.into_iter().map(|expr| expr.rewrite(rewriter)).collect()
}

/// Controls how the visitor recursion should proceed.
Expand All @@ -589,6 +762,22 @@ pub trait ExpressionVisitor: Sized {
}
}

/// Trait for potentially recursively rewriting an [`Expr`] expression
/// tree. When passed to `Expr::rewrite`, `ExpressionVisitor::mutate` is
/// invoked recursively on all nodes of an expression tree. See the
/// comments on `Expr::rewrite` for details on its use
pub trait ExprRewriter: Sized {
/// Invoked before any children of `expr` are rewritten /
/// visited. Default implementation returns `Ok(true)`
fn pre_visit(&mut self, _expr: &Expr) -> Result<bool> {
Ok(true)
}

/// Invoked after all children of `expr` have been mutated and
/// returns a potentially modified expr.
fn mutate(&mut self, expr: Expr) -> Result<Expr>;
}

pub struct CaseBuilder {
expr: Option<Box<Expr>>,
when_expr: Vec<Expr>,
Expand Down Expand Up @@ -1180,4 +1369,74 @@ mod tests {
.end();
assert!(maybe_expr.is_err());
}

#[test]
fn rewriter_visit() {
let mut rewriter = RecordingRewriter::default();
col("state").eq(lit("CO")).rewrite(&mut rewriter).unwrap();

assert_eq!(
rewriter.v,
vec![
"Previsited #state Eq Utf8(\"CO\")",
"Previsited #state",
"Mutated #state",
"Previsited Utf8(\"CO\")",
"Mutated Utf8(\"CO\")",
"Mutated #state Eq Utf8(\"CO\")"
]
)
}

#[derive(Default)]
struct RecordingRewriter {
v: Vec<String>,
}
impl ExprRewriter for RecordingRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
self.v.push(format!("Mutated {:?}", expr));
Ok(expr)
}

fn pre_visit(&mut self, expr: &Expr) -> Result<bool> {
self.v.push(format!("Previsited {:?}", expr));
Ok(true)
}
}

#[test]
fn rewriter_rewrite() {
let mut rewriter = FooBarRewriter {};

// rewrites "foo" --> "bar"
let rewritten = col("state").eq(lit("foo")).rewrite(&mut rewriter).unwrap();
assert_eq!(rewritten, col("state").eq(lit("bar")));

// doesn't wrewrite
let rewritten = col("state").eq(lit("baz")).rewrite(&mut rewriter).unwrap();
assert_eq!(rewritten, col("state").eq(lit("baz")));
}

/// rewrites all "foo" string literals to "bar"
struct FooBarRewriter {}
impl ExprRewriter for FooBarRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
Expr::Literal(scalar) => {
if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
let utf8_val = if utf8_val == "foo" {
"bar".to_string()
} else {
utf8_val
};
Ok(lit(utf8_val))
} else {
Ok(Expr::Literal(scalar))
}
}
// otherwise, return the expression unchanged
expr => Ok(expr),
}
}
}
}
2 changes: 1 addition & 1 deletion rust/datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub use expr::{
count_distinct, create_udaf, create_udf, exp, exprlist_to_fields, floor, in_list,
length, lit, ln, log10, log2, lower, ltrim, max, md5, min, octet_length, or, round,
rtrim, sha224, sha256, sha384, sha512, signum, sin, sqrt, substr, sum, tan, trim,
trunc, upper, when, Expr, ExpressionVisitor, Literal, Recursion,
trunc, upper, when, Expr, ExprRewriter, ExpressionVisitor, Literal, Recursion,
};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
Expand Down
Loading