Skip to content

Commit

Permalink
Stop copying LogicalPlan and Exprs in DecorrelatePredicateSubquery (#…
Browse files Browse the repository at this point in the history
…10318)

* Fix build with missing `use`

* Avoid clones in `DecorrelatePredicateSubquery`
  • Loading branch information
alamb authored May 1, 2024
1 parent 7c1c794 commit e3487ee
Showing 1 changed file with 97 additions and 71 deletions.
168 changes: 97 additions & 71 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@ use crate::utils::replace_qualified_name;
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{plan_err, Result};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{internal_err, plan_err, Result};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
use datafusion_expr::utils::{conjunction, split_conjunction};
use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned};
use datafusion_expr::{
exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
LogicalPlan, LogicalPlanBuilder, Operator,
};

use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use log::debug;

/// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins
Expand All @@ -49,6 +50,16 @@ impl DecorrelatePredicateSubquery {
Self::default()
}

fn rewrite_subquery(
&self,
mut subquery: Subquery,
config: &dyn OptimizerConfig,
) -> Result<Subquery> {
subquery.subquery =
Arc::new(self.rewrite(unwrap_arc(subquery.subquery), config)?.data);
Ok(subquery)
}

/// Finds expressions that have the predicate subqueries (and recurses when found)
///
/// # Arguments
Expand All @@ -59,40 +70,32 @@ impl DecorrelatePredicateSubquery {
/// Returns a tuple (subqueries, non-subquery expressions)
fn extract_subquery_exprs(
&self,
predicate: &Expr,
predicate: Expr,
config: &dyn OptimizerConfig,
) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let filters = split_conjunction(predicate); // TODO: add ExistenceJoin to support disjunctions
let filters = split_conjunction_owned(predicate); // TODO: add ExistenceJoin to support disjunctions

let mut subqueries = vec![];
let mut others = vec![];
for it in filters.iter() {
for it in filters.into_iter() {
match it {
Expr::InSubquery(InSubquery {
expr,
subquery,
negated,
}) => {
let subquery_plan = self
.try_optimize(&subquery.subquery, config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let new_subquery = subquery.with_plan(subquery_plan);
let new_subquery = self.rewrite_subquery(subquery, config)?;
subqueries.push(SubqueryInfo::new_with_in_expr(
new_subquery,
(**expr).clone(),
*negated,
*expr,
negated,
));
}
Expr::Exists(Exists { subquery, negated }) => {
let subquery_plan = self
.try_optimize(&subquery.subquery, config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let new_subquery = subquery.with_plan(subquery_plan);
subqueries.push(SubqueryInfo::new(new_subquery, *negated));
let new_subquery = self.rewrite_subquery(subquery, config)?;
subqueries.push(SubqueryInfo::new(new_subquery, negated));
}
_ => others.push((*it).clone()),
expr => others.push(expr),
}
}

Expand All @@ -103,62 +106,85 @@ impl DecorrelatePredicateSubquery {
impl OptimizerRule for DecorrelatePredicateSubquery {
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let (subqueries, mut other_exprs) =
self.extract_subquery_exprs(&filter.predicate, config)?;
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
return Ok(None);
}
internal_err!("Should have called DecorrelatePredicateSubquery::rewrite")
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = filter.input.as_ref().clone();
for subquery in subqueries {
if let Some(plan) =
build_join(&subquery, &cur_input, config.alias_generator())?
{
cur_input = plan;
} else {
// If the subquery can not be converted to a Join, reconstruct the subquery expression and add it to the Filter
let sub_query_expr = match subquery {
SubqueryInfo {
query,
where_in_expr: Some(expr),
negated: false,
} => in_subquery(expr, query.subquery.clone()),
SubqueryInfo {
query,
where_in_expr: Some(expr),
negated: true,
} => not_in_subquery(expr, query.subquery.clone()),
SubqueryInfo {
query,
where_in_expr: None,
negated: false,
} => exists(query.subquery.clone()),
SubqueryInfo {
query,
where_in_expr: None,
negated: true,
} => not_exists(query.subquery.clone()),
};
other_exprs.push(sub_query_expr);
}
}
fn supports_rewrite(&self) -> bool {
true
}

let expr = conjunction(other_exprs);
if let Some(expr) = expr {
let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
cur_input = LogicalPlan::Filter(new_filter);
}
Ok(Some(cur_input))
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let LogicalPlan::Filter(filter) = plan else {
return Ok(Transformed::no(plan));
};

// if there are no subqueries in the predicate, return the original plan
let has_subqueries = split_conjunction(&filter.predicate)
.iter()
.any(|expr| matches!(expr, Expr::InSubquery(_) | Expr::Exists(_)));
if !has_subqueries {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}

let Filter {
predicate, input, ..
} = filter;
let (subqueries, mut other_exprs) =
self.extract_subquery_exprs(predicate, config)?;
if subqueries.is_empty() {
return internal_err!(
"can not find expected subqueries in DecorrelatePredicateSubquery"
);
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = unwrap_arc(input);
for subquery in subqueries {
if let Some(plan) =
build_join(&subquery, &cur_input, config.alias_generator())?
{
cur_input = plan;
} else {
// If the subquery can not be converted to a Join, reconstruct the subquery expression and add it to the Filter
let sub_query_expr = match subquery {
SubqueryInfo {
query,
where_in_expr: Some(expr),
negated: false,
} => in_subquery(expr, query.subquery),
SubqueryInfo {
query,
where_in_expr: Some(expr),
negated: true,
} => not_in_subquery(expr, query.subquery),
SubqueryInfo {
query,
where_in_expr: None,
negated: false,
} => exists(query.subquery),
SubqueryInfo {
query,
where_in_expr: None,
negated: true,
} => not_exists(query.subquery),
};
other_exprs.push(sub_query_expr);
}
_ => Ok(None),
}

let expr = conjunction(other_exprs);
if let Some(expr) = expr {
let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
cur_input = LogicalPlan::Filter(new_filter);
}
Ok(Transformed::yes(cur_input))
}

fn name(&self) -> &str {
Expand Down

0 comments on commit e3487ee

Please sign in to comment.