Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects, NewInstance, UnresolvedMapObjects}
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -1257,217 +1256,16 @@ class Analyzer(
}

/**
* Validates to make sure the outer references appearing inside the subquery
* are legal. This function also returns the list of expressions
* that contain outer references. These outer references would be kept as children
* of subquery expressions by the caller of this function.
*/
private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = {
val outerReferences = ArrayBuffer.empty[Expression]

// Validate that correlated aggregate expression do not contain a mixture
// of outer and local references.
def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
expr.foreach {
case a: AggregateExpression if containsOuter(a) =>
val outer = a.collect { case OuterReference(e) => e.toAttribute }
val local = a.references -- outer
if (local.nonEmpty) {
val msg =
s"""
|Found an aggregate expression in a correlated predicate that has both
|outer and local references, which is not supported yet.
|Aggregate expression: ${SubExprUtils.stripOuterReference(a).sql},
|Outer references: ${outer.map(_.sql).mkString(", ")},
|Local references: ${local.map(_.sql).mkString(", ")}.
""".stripMargin.replace("\n", " ").trim()
failAnalysis(msg)
}
case _ =>
}
}

// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (hasOuterReferences(p)) {
failAnalysis(s"Accessing outer query column is not allowed in:\n$p")
}
}

// Make sure a plan's expressions do not contain :
// 1. Aggregate expressions that have mixture of outer and local references.
// 2. Expressions containing outer references on plan nodes other than Filter.
def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) {
failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
s"clauses:\n$p")
}
}

// SPARK-17348: A potential incorrect result case.
// When a correlated predicate is a non-equality predicate,
// certain operators are not permitted from the operator
// hosting the correlated predicate up to the operator on the outer table.
// Otherwise, the pull up of the correlated predicate
// will generate a plan with a different semantics
// which could return incorrect result.
// Currently we check for Aggregate and Window operators
//
// Below shows an example of a Logical Plan during Analyzer phase that
// show this problem. Pulling the correlated predicate [outer(c2#77) >= ..]
// through the Aggregate (or Window) operator could alter the result of
// the Aggregate.
//
// Project [c1#76]
// +- Project [c1#87, c2#88]
// : (Aggregate or Window operator)
// : +- Filter [outer(c2#77) >= c2#88)]
// : +- SubqueryAlias t2, `t2`
// : +- Project [_1#84 AS c1#87, _2#85 AS c2#88]
// : +- LocalRelation [_1#84, _2#85]
// +- SubqueryAlias t1, `t1`
// +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
// +- LocalRelation [_1#73, _2#74]
def failOnNonEqualCorrelatedPredicate(found: Boolean, p: LogicalPlan): Unit = {
if (found) {
// Report a non-supported case as an exception
failAnalysis(s"Correlated column is not allowed in a non-equality predicate:\n$p")
}
}

var foundNonEqualCorrelatedPred : Boolean = false

// Simplify the predicates before validating any unsupported correlation patterns
// in the plan.
BooleanSimplification(sub).foreachUp {

// Whitelist operators allowed in a correlated subquery
// There are 4 categories:
// 1. Operators that are allowed anywhere in a correlated subquery, and,
// by definition of the operators, they either do not contain
// any columns or cannot host outer references.
// 2. Operators that are allowed anywhere in a correlated subquery
// so long as they do not host outer references.
// 3. Operators that need special handlings. These operators are
// Project, Filter, Join, Aggregate, and Generate.
//
// Any operators that are not in the above list are allowed
// in a correlated subquery only if they are not on a correlation path.
// In other word, these operators are allowed only under a correlation point.
//
// A correlation path is defined as the sub-tree of all the operators that
// are on the path from the operator hosting the correlated expressions
// up to the operator producing the correlated values.

// Category 1:
// BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias
case _: ResolvedHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias =>

// Category 2:
// These operators can be anywhere in a correlated subquery.
// so long as they do not host outer references in the operators.
case s: Sort =>
failOnInvalidOuterReference(s)
case r: RepartitionByExpression =>
failOnInvalidOuterReference(r)

// Category 3:
// Filter is one of the two operators allowed to host correlated expressions.
// The other operator is Join. Filter can be anywhere in a correlated subquery.
case f: Filter =>
// Find all predicates with an outer reference.
val (correlated, _) = splitConjunctivePredicates(f.condition).partition(containsOuter)

// Find any non-equality correlated predicates
foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || correlated.exists {
case _: EqualTo | _: EqualNullSafe => false
case _ => true
}

failOnInvalidOuterReference(f)
// The aggregate expressions are treated in a special way by getOuterReferences. If the
// aggregate expression contains only outer reference attributes then the entire aggregate
// expression is isolated as an OuterReference.
// i.e min(OuterReference(b)) => OuterReference(min(b))
outerReferences ++= getOuterReferences(correlated)

// Project cannot host any correlated expressions
// but can be anywhere in a correlated subquery.
case p: Project =>
failOnInvalidOuterReference(p)

// Aggregate cannot host any correlated expressions
// It can be on a correlation path if the correlation contains
// only equality correlated predicates.
// It cannot be on a correlation path if the correlation has
// non-equality correlated predicates.
case a: Aggregate =>
failOnInvalidOuterReference(a)
failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)

// Join can host correlated expressions.
case j @ Join(left, right, joinType, _) =>
joinType match {
// Inner join, like Filter, can be anywhere.
case _: InnerLike =>
failOnInvalidOuterReference(j)

// Left outer join's right operand cannot be on a correlation path.
// LeftAnti and ExistenceJoin are special cases of LeftOuter.
// Note that ExistenceJoin cannot be expressed externally in both SQL and DataFrame
// so it should not show up here in Analysis phase. This is just a safety net.
//
// LeftSemi does not allow output from the right operand.
// Any correlated references in the subplan
// of the right operand cannot be pulled up.
case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
failOnInvalidOuterReference(j)
failOnOuterReferenceInSubTree(right)

// Likewise, Right outer join's left operand cannot be on a correlation path.
case RightOuter =>
failOnInvalidOuterReference(j)
failOnOuterReferenceInSubTree(left)

// Any other join types not explicitly listed above,
// including Full outer join, are treated as Category 4.
case _ =>
failOnOuterReferenceInSubTree(j)
}

// Generator with join=true, i.e., expressed with
// LATERAL VIEW [OUTER], similar to inner join,
// allows to have correlation under it
// but must not host any outer references.
// Note:
// Generator with join=false is treated as Category 4.
case g: Generate if g.join =>
failOnInvalidOuterReference(g)

// Category 4: Any other operators not in the above 3 categories
// cannot be on a correlation path, that is they are allowed only
// under a correlation point but they and their descendant operators
// are not allowed to have any correlated expressions.
case p =>
failOnOuterReferenceInSubTree(p)
}
outerReferences
}

/**
* Resolves the subquery. The subquery is resolved using its outer plans. This method
* will resolve the subquery by alternating between the regular analyzer and by applying the
* resolveOuterReferences rule.
* Resolves the subquery plan that is referenced in a subquery expression. The normal
* attribute references are resolved using regular analyzer and the outer references are
* resolved from the outer plans using the resolveOuterReferences method.
*
* Outer references from the correlated predicates are updated as children of
* Subquery expression.
*/
private def resolveSubQuery(
e: SubqueryExpression,
plans: Seq[LogicalPlan],
requiredColumns: Int = 0)(
plans: Seq[LogicalPlan])(
f: (LogicalPlan, Seq[Expression]) => SubqueryExpression): SubqueryExpression = {
// Step 1: Resolve the outer expressions.
var previous: LogicalPlan = null
Expand All @@ -1488,15 +1286,8 @@ class Analyzer(
// Step 2: If the subquery plan is fully resolved, pull the outer references and record
// them as children of SubqueryExpression.
if (current.resolved) {
// Make sure the resolved query has the required number of output columns. This is only
// needed for Scalar and IN subqueries.
if (requiredColumns > 0 && requiredColumns != current.output.size) {
failAnalysis(s"The number of columns in the subquery (${current.output.size}) " +
s"does not match the required number of columns ($requiredColumns)")
}
// Validate the outer reference and record the outer references as children of
// subquery expression.
f(current, checkAndGetOuterReferences(current))
// Record the outer references as children of subquery expression.
f(current, SubExprUtils.getOuterReferences(current))
} else {
e.withNewPlan(current)
}
Expand All @@ -1514,16 +1305,11 @@ class Analyzer(
private def resolveSubQueries(plan: LogicalPlan, plans: Seq[LogicalPlan]): LogicalPlan = {
plan transformExpressions {
case s @ ScalarSubquery(sub, _, exprId) if !sub.resolved =>
resolveSubQuery(s, plans, 1)(ScalarSubquery(_, _, exprId))
resolveSubQuery(s, plans)(ScalarSubquery(_, _, exprId))
case e @ Exists(sub, _, exprId) if !sub.resolved =>
resolveSubQuery(e, plans)(Exists(_, _, exprId))
case In(value, Seq(l @ ListQuery(sub, _, exprId))) if value.resolved && !sub.resolved =>
// Get the left hand side expressions.
val expressions = value match {
case cns : CreateNamedStruct => cns.valExprs
case expr => Seq(expr)
}
val expr = resolveSubQuery(l, plans, expressions.size)(ListQuery(_, _, exprId))
val expr = resolveSubQuery(l, plans)(ListQuery(_, _, exprId))
In(value, Seq(expr))
}
}
Expand Down
Loading