Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2698,7 +2698,7 @@ object EliminateUnions extends Rule[LogicalPlan] {
* rule can't work for those parameters.
*/
object CleanupAliases extends Rule[LogicalPlan] {
private def trimAliases(e: Expression): Expression = {
def trimAliases(e: Expression): Expression = {
e.transformDown {
case Alias(child, _) => child
case MultiAlias(child, _) => child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
*/
private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p1 @ Project(_, f @ Filter(_, p2 @ Project(_, child)))
if p2.outputSet.subsetOf(child.outputSet) =>
if p2.outputSet.subsetOf(child.outputSet) &&
// We only remove attribute-only project.
p2.projectList.forall(_.isInstanceOf[AttributeReference]) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this change. This may cause serious perf regression

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we remove project that's not attribute-only?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say it was wrong previously, but if a project's output has same expr IDs with its child, it's usually attribute-only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmmmh... I may be missing something, but I'd imagine a case like this:

select a, b from
(select a, b, very_expensive_operation as c from ... where a = 1)

Before this change, would be optimized as:

select a, b from
(select a, b from ... where a = 1)

while after it is not. Am I wrong?

Copy link
Member Author

@viirya viirya Jul 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In above case, it has a Alias in project list, so it's not an attribute-only project. And I think it also create new attr c, so p2.outputSet.subsetOf(child.outputSet) is not met too.

I think the rules in ColumnPruning will trim very_expensive_operation in the end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now, sorry. Why do we need this? Seems an unrelated change to the fix in this PR, isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, the issue was seen in previous comment 33441a3. It was overwritten now.

We added a column for count bug. The column checks a always-true leading column alwaysTrueExpr, returns special value if alwaysTrueExpr is null, to simulate empty input case.

This column reuses expr id of original output in the subquery. In non-foldable expression case, the added column in a potential Project-Filter-Project, will be trimmed by removeProjectBeforeFilter, because the second project meets p2.outputSet.subsetOf(child.outputSet).

My original fix is to create an expr id. Replace original expr id with new one in the subquery. Looks complicated. This seems a simple fix, and looks reasonable.

p1.copy(child = f.copy(child = child))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand Down Expand Up @@ -316,25 +317,41 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
newExpression.asInstanceOf[E]
}

/**
* Checks if given expression is foldable. Evaluates it and returns it as literal, if yes.
* If not, returns the original expression without evaluation.
*/
private def tryEvalExpr(expr: Expression): Expression = {
// Removes Alias over given expression, because Alias is not foldable.
if (!CleanupAliases.trimAliases(expr).foldable) {
// SPARK-28441: Some expressions, like PythonUDF, can't be statically evaluated.
// Needs to evaluate them on query runtime.
expr
} else {
Literal.create(expr.eval(), expr.dataType)
}
}

/**
* Statically evaluate an expression containing zero or more placeholders, given a set
* of bindings for placeholder values.
* of bindings for placeholder values, if the expression is evaluable. If it is not,
* bind statically evaluated expression results to an expression.
*/
private def evalExpr(expr: Expression, bindings: Map[ExprId, Option[Any]]) : Option[Any] = {
private def bindingExpr(
expr: Expression,
bindings: Map[ExprId, Expression]): Expression = {
val rewrittenExpr = expr transform {
case r: AttributeReference =>
bindings(r.exprId) match {
case Some(v) => Literal.create(v, r.dataType)
case None => Literal.default(NullType)
}
bindings.getOrElse(r.exprId, Literal.default(NullType))
}
Option(rewrittenExpr.eval())

tryEvalExpr(rewrittenExpr)
}

/**
* Statically evaluate an expression containing one or more aggregates on an empty input.
*/
private def evalAggOnZeroTups(expr: Expression) : Option[Any] = {
private def evalAggOnZeroTups(expr: Expression) : Expression = {
// AggregateExpressions are Unevaluable, so we need to replace all aggregates
// in the expression with the value they would return for zero input tuples.
// Also replace attribute refs (for example, for grouping columns) with NULL.
Expand All @@ -344,7 +361,8 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {

case _: AttributeReference => Literal.default(NullType)
}
Option(rewrittenExpr.eval())

tryEvalExpr(rewrittenExpr)
}

/**
Expand All @@ -354,36 +372,51 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
* [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]]. If the checks in
* CheckAnalysis become less restrictive, this method will need to change.
*/
private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Any] = {
private def evalSubqueryOnZeroTups(plan: LogicalPlan) : Option[Expression] = {
// Inputs to this method will start with a chain of zero or more SubqueryAlias
// and Project operators, followed by an optional Filter, followed by an
// Aggregate. Traverse the operators recursively.
def evalPlan(lp : LogicalPlan) : Map[ExprId, Option[Any]] = lp match {
def evalPlan(lp : LogicalPlan) : Map[ExprId, Expression] = lp match {
case SubqueryAlias(_, child) => evalPlan(child)
case Filter(condition, child) =>
val bindings = evalPlan(child)
if (bindings.isEmpty) bindings
else {
val exprResult = evalExpr(condition, bindings).getOrElse(false)
.asInstanceOf[Boolean]
if (exprResult) bindings else Map.empty
if (bindings.isEmpty) {
bindings
} else {
val bindCondition = bindingExpr(condition, bindings)

if (!bindCondition.foldable) {
// We can't evaluate the condition. Evaluate it in query runtime.
bindings.map { case (id, expr) =>
val newExpr = If(bindCondition, expr, Literal.create(null, expr.dataType))
(id, newExpr)
}
} else {
// The bound condition can be evaluated.
bindCondition.eval() match {
// For filter condition, null is the same as false.
case null | false => Map.empty
case true => bindings
}
}
}

case Project(projectList, child) =>
val bindings = evalPlan(child)
if (bindings.isEmpty) {
bindings
} else {
projectList.map(ne => (ne.exprId, evalExpr(ne, bindings))).toMap
projectList.map(ne => (ne.exprId, bindingExpr(ne, bindings))).toMap
}

case Aggregate(_, aggExprs, _) =>
// Some of the expressions under the Aggregate node are the join columns
// for joining with the outer query block. Fill those expressions in with
// nulls and statically evaluate the remainder.
aggExprs.map {
case ref: AttributeReference => (ref.exprId, None)
case alias @ Alias(_: AttributeReference, _) => (alias.exprId, None)
case ref: AttributeReference => (ref.exprId, Literal.create(null, ref.dataType))
case alias @ Alias(_: AttributeReference, _) =>
(alias.exprId, Literal.create(null, alias.dataType))
case ne => (ne.exprId, evalAggOnZeroTups(ne))
}.toMap

Expand All @@ -394,7 +427,10 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
val resultMap = evalPlan(plan)

// By convention, the scalar subquery result is the leftmost field.
resultMap.getOrElse(plan.output.head.exprId, None)
resultMap.get(plan.output.head.exprId) match {
case Some(Literal(null, _)) | None => None
case o => o
}
}

/**
Expand Down Expand Up @@ -473,7 +509,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
currentChild.output :+
Alias(
If(IsNull(alwaysTrueRef),
Literal.create(resultWithZeroTups.get, origOutput.dataType),
resultWithZeroTups.get,
aggValRef), origOutput.name)(exprId = origOutput.exprId),
Join(currentChild,
Project(query.output :+ alwaysTrueExpr, query),
Expand All @@ -494,11 +530,11 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
case op => sys.error(s"Unexpected operator $op in corelated subquery")
}

// CASE WHEN alwayTrue IS NULL THEN resultOnZeroTups
// CASE WHEN alwaysTrue IS NULL THEN resultOnZeroTups
// WHEN NOT (original HAVING clause expr) THEN CAST(null AS <type of aggVal>)
// ELSE (aggregate value) END AS (original column name)
val caseExpr = Alias(CaseWhen(Seq(
(IsNull(alwaysTrueRef), Literal.create(resultWithZeroTups.get, origOutput.dataType)),
(IsNull(alwaysTrueRef), resultWithZeroTups.get),
(Not(havingNode.get.condition), Literal.create(null, aggValRef.dataType))),
aggValRef),
origOutput.name)(exprId = origOutput.exprId)
Expand Down
Loading