Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] {
val newJoin = joinType match {
case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
case Cross => join.copy(left = maybePushLocalLimit(exp, left), right = maybePushLocalLimit(exp, right))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about inner join without condition?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can match InnerLike when condition is empty.

Copy link
Author

@guoxiaolongzte guoxiaolongzte Nov 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A = {(a, 0), (b, 1), (c, 2), (d, 0), (e, 1), (f, 2)}
B = {(e, 1), (f, 2)}

A inner join B limit 2
If there is limit 2, (a, 0), (b, 1) inner join {(e, 1), (f, 2)}, the result is empty. But the real result is not empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inner join without condition is literally cross join.

Copy link
Author

@guoxiaolongzte guoxiaolongzte Nov 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When set spark.sql.crossJoin.enabled=true,
inner join without condition, LeftOuter without condition, RightOuter without condition, FullOuter without condition, all these are iterally cross join?
@cloud-fan

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

@guoxiaolongzte guoxiaolongzte Nov 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, if when set spark.sql.crossJoin.enabled=true, if Inner join without condition, LeftOuter join without condition, RightOuter join without condition, FullOuter join without condition , limit should be pushed down on both sides, just like cross join limit in this PR.
Is this correct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
Please give me some advice. Thank you.

Copy link
Contributor

@cloud-fan cloud-fan Nov 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is no join condition, I think join type doesn't matter and we can always push down limits. We may need to look into left anti join though.

Copy link
Author

@guoxiaolongzte guoxiaolongzte Nov 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two tables as follows:
CREATE TABLE test1(id int, name int);
CREATE TABLE test2(id int, name int);

test1 table data:
2,2
1,1

test2 table data:
2,2
3,3
4,4

Execute sql select * from test1 t1 left anti join test2 t2 on t1.id=t2.id limit 1; The result:
1,1

But
we push the limit 1 on left side, the result is not correct. Result is empty.
we push the limit 1 on right side, the result is not correct. Result is empty.

So
left anti join no need to push down limit. Similarly, left semi join is the same logic.

case _ => join
}
LocalLimit(exp, newJoin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Add
import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, LeftOuter, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

Expand Down Expand Up @@ -171,4 +171,25 @@ class LimitPushdownSuite extends PlanTest {
// No pushdown for FULL OUTER JOINS.
comparePlans(optimized, originalQuery)
}

test("cross join") {
val originalQuery = x.join(y, Cross).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, LocalLimit(1, x).join(LocalLimit(1, y), Cross)).analyze
comparePlans(optimized, correctAnswer)
}

test("cross join and left sides are limited") {
val originalQuery = x.limit(2).join(y, Cross).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, LocalLimit(1, x).join(LocalLimit(1, y), Cross)).analyze
comparePlans(optimized, correctAnswer)
}

test("cross join and right sides are limited") {
val originalQuery = x.join(y.limit(2), Cross).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, LocalLimit(1, x).join(LocalLimit(1, y), Cross)).analyze
comparePlans(optimized, correctAnswer)
}
}