Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ object ScalarSubquery {
case _ => false
}.isDefined
}

def hasScalarSubquery(e: Expression): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove this now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Sure.

e.find {
case _: ScalarSubquery => true
case _ => false
}.isDefined
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
EliminateOuterJoin,
PushPredicateThroughJoin,
PushDownPredicate,
PushDownLeftSemiAntiJoin,
LimitPushDown,
ColumnPruning,
InferFiltersFromConstraints,
Expand Down Expand Up @@ -1016,24 +1017,13 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// This also applies to Aggregate.
case Filter(condition, project @ Project(fields, grandChild))
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>

// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
val aliasMap = AttributeMap(fields.collect {
case a: Alias => (a.toAttribute, a.child)
})

val aliasMap = getAliasMap(project)
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))

case filter @ Filter(condition, aggregate: Aggregate)
if aggregate.aggregateExpressions.forall(_.deterministic)
&& aggregate.groupingExpressions.nonEmpty =>
// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression, and create a map from the alias to the expression
val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {
case a: Alias if a.child.find(_.isInstanceOf[AggregateExpression]).isEmpty =>
(a.toAttribute, a.child)
})
val aliasMap = getAliasMap(aggregate)

// For each filter, expand the alias and check if the filter can be evaluated using
// attributes produced by the aggregate operator's child operator.
Expand Down Expand Up @@ -1131,7 +1121,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
}
}

private def canPushThrough(p: UnaryNode): Boolean = p match {
def getAliasMap(plan: Project): AttributeMap[Expression] = {
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
AttributeMap(plan.projectList.collect { case a: Alias => (a.toAttribute, a.child) })
}

def getAliasMap(plan: Aggregate): AttributeMap[Expression] = {
// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression, and create a map from the alias to the expression
val aliasMap = plan.aggregateExpressions.collect {
case a: Alias if a.child.find(_.isInstanceOf[AggregateExpression]).isEmpty =>
(a.toAttribute, a.child)
}
AttributeMap(aliasMap)
}

def canPushThrough(p: UnaryNode): Boolean = p match {
// Note that some operators (e.g. project, aggregate, union) are being handled separately
// (earlier in this rule).
case _: AppendColumns => true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule

/**
* This rule is a variant of [[PushDownPredicate]] which can handle
* pushing down Left semi and Left Anti joins below the following operators.
* 1) Project
* 2) Window
* 3) Union
* 4) Aggregate
* 5) Other permissible unary operators. please see [[PushDownPredicate.canPushThrough]].
*/
object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// LeftSemi/LeftAnti over Project
case Join(p @ Project(pList, gChild), rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if pList.forall(_.deterministic) &&
!pList.exists(ScalarSubquery.hasScalarSubquery)&&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will remove this line after we finish refactoring the subquery rewrite, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan We would keep this after the refactoring. Currently , except is planned using anti join. Here is the test that exhibits the problem.

SELECT (SELECT min(k) FROM t2 WHERE t2.k = t1.k) min_t2 FROM t1
MINUS
SELECT (SELECT min(k) FROM t2) abs_min_t2 FROM t1 WHERE  t1.k = 'one'

After the except operator is replaced .. the plan is :

GlobalLimit 21
+- LocalLimit 21
   +- Project [cast(min_t2#245 as string) AS min_t2#254]
      +- Distinct
         +- Join LeftAnti, (min_t2#245 <=> abs_min_t2#247)
            :- Project [scalar-subquery#244 [(k#242 = k#240)] AS min_t2#245]
            :  :  +- Aggregate [k#242], [min(k#242) AS min(k)#249, k#242]
            :  :     +- Project [k#242]
            :  :        +- LocalRelation [k#242, v#243]
            :  +- Project [k#240, v#241]
            :     +- LocalRelation [k#240, v#241]
            +- Project [scalar-subquery#246 [] AS abs_min_t2#247]
               :  +- Aggregate [min(k#242) AS min(k)#251]
               :     +- Project [k#242]
               :        +- LocalRelation [k#242, v#243]
               +- Filter (k#240 = one)
                  +- Project [k#240, v#241]
                     +- LocalRelation [k#240, v#241]

Here we are not pushing down the leftanti operator below project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we convert all correlated subqueries to joins before we go to the main optimizer batch?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan We will be moving the RewritePredicateSubquery which does the work for converting IN and EXISTS subqueries to semi/anti joins. However, Scalar subquery are treated differently in the sense that its handled by a different rule (RewriteCorrelatedScalarSubquery) and are planned using Left outer joins. As part of this work, i wasn't planning on changing Scalar subquery code. Also one thing to note is that, the rule to rewrite scalar subquery is already in the default batch and is run as a fixedPoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sorry I misread the code. Yea scalar subquery will still be there.

Then why can't we push left anti join through project with scalar subqurey? scalr subquery is similar to a literal which doesn't depend on anything from the child plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan So letting join pass through scalar subquery ends up with plan like following :

   +- Project [scalar-subquery#244 [(k#242 = k#240)] AS min_t2#245]
            :  +- Aggregate [k#242], [min(k#242) AS min(k)#249, k#242]
            :     +- Project [k#242]
            :        +- LocalRelation [k#242, v#243]
            +- Project [k#240, v#241]
               +- Join LeftAnti, (scalar-subquery#244 [(k#242 = k#240)] <=> abs_min_t2#247)
                  :  +- Aggregate [k#242], [min(k#242) AS min(k)#249, k#242]
                  :     +- Project [k#242]
                  :        +- LocalRelation [k#242, v#243]
                  :- LocalRelation [k#240, v#241]
                  +- Project [scalar-subquery#246 [] AS abs_min_t2#247]
                     :  +- Aggregate [min(k#242) AS min(k)#251]
                     :     +- Project [k#242]
                     :        +- LocalRelation [k#242, v#243]
                     +- Project [k#240, v#241]
                        +- Filter (k#240 = one)
                           +- LocalRelation [k#240, v#241]

And things go totally wrong :-). firstly, join ends up hosting a scalar-sub expression which we don't allow (Project, Aggregate, Filter are the only ones that allow). We get a TreeNodeException..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan One thing i forgot Wenchen is that, we will be introducing a new rule that pushes down LeftSemi and LeftAnti joins below Join. Once we have that, even though we will skip scalar subqueries in this rule, once they are changed to left outer join after RewriteCorrelatedScalarSubquery, we will be able to push LeftSemi and Left anti joins when applicable. So i think we will get a good plan eventually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RewriteCorrelatedScalarSubquery only touches correlated scalar subquery, what about non-correlated scalar subquery? I think it's fine to have non-correlated scalar subquery in the project list, we can still pushdown left anti/semi joins.

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Yeah.. We should be able to pushdown in the non-correlated case. If it is okay with you, i would like to study this and take it as a follow-up. The reason is, at the moment, i don't know if we are generating an optimal plan in this case. Here is the plan before and after for your reference.
Before :

Join LeftSemi, (sum#7L = cast(d#3 as bigint))
:- Project [scalar-subquery#6 [] AS sum#7L]
:  :  +- Aggregate [b#1], [sum(cast(c#2 as bigint)) AS sum#5L]
:  :     +- LocalRelation <empty>, [a#0, b#1, c#2]
:  +- LocalRelation <empty>, [a#0, b#1, c#2]
+- LocalRelation <empty>, [d#3]

After the pushdown

Project [scalar-subquery#6 [] AS sum#7L]
:  +- Aggregate [b#1], [sum(cast(c#2 as bigint)) AS sum#5L]
:     +- LocalRelation <empty>, [a#0, b#1, c#2]
+- Join LeftSemi, (scalar-subquery#6 [] = cast(d#3 as bigint))
   :  +- Aggregate [b#1], [sum(cast(c#2 as bigint)) AS sum#5L]
   :     +- LocalRelation <empty>, [a#0, b#1, c#2]
   :- LocalRelation <empty>, [a#0, b#1, c#2]
   +- LocalRelation <empty>, [d#3]

I wanted to go a little defensive in the first pass. If the plan looks okay to you then
i can make the change. Please let me know.

canPushThroughCondition(Seq(gChild), joinCond, rightOp) =>
if (joinCond.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for curiosity, does left anti/semi join always a condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Ha.. i had the same question a couple of days back. So i quickly tried :

select * from t1 left semi join t2

We end up getting all the rows from t1 (if i remember correctly).

Copy link
Contributor

@cloud-fan cloud-fan Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for left-anti join, it returns no result.

Then it makes me think that, we should always pushdown the join if the condition is empty. For left semi join it's just a noop, and for left-anti join it helps a lot. You already did it in the rule, except https://github.com/apache/spark/pull/23750/files#diff-44d3a3f876bcf811fdbf71fce1f7072aR192

A new optimizer rule can be: we turn left-semi join to the left child if join condition is empty, and turn left-anti to empty relation if join condition is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan When i click on the link it shows me show many diffs. Were you referring me to a few lines when you said "except" ?

2ndly, what can i say ? When i said i did try the left semi join on empty join conditions i wrote this in my notes :
"Explore if there any optimization opportunity when there are empty join condition. Is the join necessary.. need to study more" :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 2ndly, it's just an orthogonal optimizer rule, you are welcome to do it in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Sure wenchen. I will do it.

// No join condition, just push down the Join below Project
p.copy(child = Join(gChild, rightOp, joinType, joinCond, hint))
} else {
val aliasMap = PushDownPredicate.getAliasMap(p)
val newJoinCond = if (aliasMap.nonEmpty) {
Option(replaceAlias(joinCond.get, aliasMap))
} else {
joinCond
}
p.copy(child = Join(gChild, rightOp, joinType, newJoinCond, hint))
}

// LeftSemi/LeftAnti over Aggregate
case join @ Join(agg: Aggregate, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if agg.aggregateExpressions.forall(_.deterministic) && agg.groupingExpressions.nonEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will aggregateExpressions contain subquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Yeah.. aggregate expressions can host scalar subqueries. In and Exists can be hosted in only filter. I am thinking, if i need to stop the pushdown if scalar sub queries just like we do for project. Let me test this some more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have a conclusion now?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Yeah.. we do need the same check for aggregate expressions as well. I have already added the check.

if (joinCond.isEmpty) {
// No join condition, just push down Join below Aggregate
agg.copy(child = Join(agg.child, rightOp, joinType, joinCond, hint))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @maryannxue shall we keep the join hint when pushdown join through an operator?

} else {
val aliasMap = PushDownPredicate.getAliasMap(agg)

// For each join condition, expand the alias and check if the condition can be evaluated
// using attributes produced by the aggregate operator's child operator.
val (pushDown, stayUp) = splitConjunctivePredicates(joinCond.get).partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
cond.references.nonEmpty &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the same comment in the previous review though, I still have a question here....: Is it ok to push down non-deterministic exprs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu In my knowledge, join conditions cannot have non-deterministic expressions ? Its ensured in checkAnalysis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we pushdown constant join conditions?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Hmmn.. thats how the original logic was.. coming to think of it wenchen, wouldn't we have close to 0% chance of ever having a join conditions with constants only :-) ?

replaced.references.subsetOf(agg.child.outputSet ++ rightOp.outputSet)
}

// Check if the remaining predicates do not contain columns from subquery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't mention subquery here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also explain what can go wrong if remaining predicates contain columns from right side.

val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet)

if (pushDown.nonEmpty && rightOpColumns.isEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val replaced = replaceAlias(pushDownPredicate, aliasMap)
val newAgg = agg.copy(child = Join(agg.child, rightOp, joinType, Option(replaced), hint))
// If there is no more filter to stay up, just return the Aggregate over Join.
// Otherwise, create "Filter(stayUp) <- Aggregate <- Join(pushDownPredicate)".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left-anti join outputs records that do NOT satisfy the join condition. Let's say the join condition is a && b, and this rule turns Join(Aggregate, ..., a && b) to Filter(b, Aggregate(Join(..., ..., a))).

This seems problematic. Previously we get result satisfying Not(a && b), now we get Not(a) && b. @hvanhovell is this your concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that, we can't push down partial left-anti join

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thank you. I understand it better now.Let me test this out a bit and plan a follow-up.
@hvanhovell Thanks a lot for pointing this out.

if (stayUp.isEmpty) newAgg else Filter(stayUp.reduce(And), newAgg)
} else {
// The join condition is not a subset of the Aggregate's GROUP BY columns,
// no push down.
join
}
}

// LeftSemi/LeftAnti over Window
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

case join @ Join(w: Window, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will Window.windowExpressions contain correlated subqueries?

e.g.

SELECT (SELECT min(k) FROM t2 WHERE t2.k = t1.k) min_t2 + max(k) over (...) FROM t1

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan No.. windows expression can't contain correlated subqueries.
Edit: Actually i am not sure Wenchen. to the best of my knowledge only a few operators can host correlated subquery expressions. Project, Filter and Aggregate is the ones i know of.
Edit2:
project, filter and aggregate can have correlated scalar subqueries
only filter can have correlated in/exists subqueries.

if (joinCond.isEmpty) {
// No join condition, just push down Join below Window
w.copy(child = Join(w.child, rightOp, joinType, joinCond, hint))
} else {
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) ++
rightOp.outputSet

val (pushDown, stayUp) = splitConjunctivePredicates(joinCond.get).partition { cond =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: Is it ok to push down non-deterministic exprs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Please see my answer above.

cond.references.subsetOf(partitionAttrs)
}

// Check if the remaining predicates do not contain columns from subquery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet)

if (pushDown.nonEmpty && rightOpColumns.isEmpty) {
val predicate = pushDown.reduce(And)
val newPlan = w.copy(child = Join(w.child, rightOp, joinType, Option(predicate), hint))
if (stayUp.isEmpty) newPlan else Filter(stayUp.reduce(And), newPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal this does hold with left anti joins? If a predicate is part of the condition then it means it should be filtered out right, and not retained?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Thanks for reviewing. Can you please help illustrate the problem with an example ? So if the join was in filter form (in an subquery expression), we do push it down, right ? We don't distinguish between semi or anti joins ?

} else {
// The join condition is not a subset of the Window's PARTITION BY clause,
// no push down.
join
}
}

// LeftSemi/LeftAnti over Union
case join @ Join(union: Union, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if canPushThroughCondition(union.children, joinCond, rightOp) =>
if (joinCond.isEmpty) {
// Push down the Join below Union
val newGrandChildren = union.children.map { Join(_, rightOp, joinType, joinCond, hint) }
union.withNewChildren(newGrandChildren)
} else {
val pushDown = splitConjunctivePredicates(joinCond.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: Is it ok to push down non-deterministic exprs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Please see my answer above.


if (pushDown.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how could pushDown be empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just val pushDown = splitConjunctivePredicates(joinCond.get)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan You r right. I will change.

val pushDownCond = pushDown.reduceLeft(And)
val output = union.output
val newGrandChildren = union.children.map { grandchild =>
val newCond = pushDownCond transform {
case e if output.exists(_.semanticEquals(e)) =>
grandchild.output(output.indexWhere(_.semanticEquals(e)))
}
assert(newCond.references.subsetOf(grandchild.outputSet ++ rightOp.outputSet))
Join(grandchild, rightOp, joinType, Option(newCond), hint)
}
union.withNewChildren(newGrandChildren)
} else {
// Nothing to push down
join
}
}

// LeftSemi/LeftAnti over UnaryNode
case join @ Join(u: UnaryNode, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if PushDownPredicate.canPushThrough(u) && u.expressions.forall(_.deterministic) =>
pushDownJoin(join, u.child) { joinCond =>
u.withNewChildren(Seq(Join(u.child, rightOp, joinType, Option(joinCond), hint)))
}
}

/**
* Check if we can safely push a join through a project or union by making sure that attributes
* referred in join condition do not contain the same attributes as the plan they are moved
* into. This can happen when the plan and predicate subquery have the same source.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it still mentions subquery.

*/
private def canPushThroughCondition(plans: Seq[LogicalPlan], condition: Option[Expression],
rightOp: LogicalPlan): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

def func(
    para1: T,
    para2: U,...

val attributes = AttributeSet(plans.flatMap(_.output))
if (condition.isDefined) {
val matched = condition.get.references.intersect(rightOp.outputSet).intersect(attributes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be rightOp.outputSet.intersect(attributes).isEmpty? It's a self-join even if there is no join condition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, the self-join is problematic only if we can't rewrite join condition correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should mention it in the method doc, so that other reviewers won't get confused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I had this in the function prologue. Did you want it improved ?

This function makes sure that the join condition refers to attributes that are not 
ambiguous(i.e present in both the legs of the join) or else the resultant plan will be invalid.

matched.isEmpty
} else {
true
}
}


private def pushDownJoin(
join: Join,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 space indentation.

grandchild: LogicalPlan)(insertFilter: Expression => LogicalPlan): LogicalPlan = {
val (pushDown, stayUp) = if (join.condition.isDefined) {
splitConjunctivePredicates(join.condition.get)
.partition {_.references.subsetOf(grandchild.outputSet ++ join.right.outputSet)}
} else {
(Nil, Nil)
}

if (pushDown.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't match the other cases, that we always push down the join if join condition is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Can you please explain a bit ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I will fix it wenchen.

val newChild = insertFilter(pushDown.reduceLeft(And))
if (stayUp.nonEmpty) {
Filter(stayUp.reduceLeft(And), newChild)
} else {
newChild
}
} else {
join
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,10 @@ object LeftExistence {
case _ => None
}
}

object LeftSemiOrAnti {
def unapply(joinType: JoinType): Option[JoinType] = joinType match {
case LeftSemi | LeftAnti => Some(joinType)
case _ => None
}
}
Loading