Skip to content

Commit 4fcc214

Browse files
wangzhenhuacloud-fan
authored andcommitted
[SPARK-20124][SQL] Join reorder should keep the same order of final project attributes
## What changes were proposed in this pull request? Join reorder algorithm should keep exactly the same order of output attributes in the top project. For example, if user want to select a, b, c, after reordering, we should output a, b, c in the same order as specified by user, instead of b, a, c or other orders. ## How was this patch tested? A new test case is added in `JoinReorderSuite`. Author: wangzhenhua <[email protected]> Closes #17453 from wzhfy/keepOrderInProject.
1 parent 91559d2 commit 4fcc214

File tree

3 files changed

+31
-10
lines changed

3 files changed

+31
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
4040
val result = plan transformDown {
4141
// Start reordering with a joinable item, which is an InnerLike join with conditions.
4242
case j @ Join(_, _, _: InnerLike, Some(cond)) =>
43-
reorder(j, j.outputSet)
43+
reorder(j, j.output)
4444
case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond)))
4545
if projectList.forall(_.isInstanceOf[Attribute]) =>
46-
reorder(p, p.outputSet)
46+
reorder(p, p.output)
4747
}
4848
// After reordering is finished, convert OrderedJoin back to Join
4949
result transformDown {
@@ -52,7 +52,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
5252
}
5353
}
5454

55-
private def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
55+
private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = {
5656
val (items, conditions) = extractInnerJoins(plan)
5757
// TODO: Compute the set of star-joins and use them in the join enumeration
5858
// algorithm to prune un-optimal plan choices.
@@ -140,7 +140,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
140140
conf: SQLConf,
141141
items: Seq[LogicalPlan],
142142
conditions: Set[Expression],
143-
topOutput: AttributeSet): LogicalPlan = {
143+
output: Seq[Attribute]): LogicalPlan = {
144144

145145
val startTime = System.nanoTime()
146146
// Level i maintains all found plans for i + 1 items.
@@ -152,9 +152,10 @@ object JoinReorderDP extends PredicateHelper with Logging {
152152

153153
// Build plans for next levels until the last level has only one plan. This plan contains
154154
// all items that can be joined, so there's no need to continue.
155+
val topOutputSet = AttributeSet(output)
155156
while (foundPlans.size < items.length && foundPlans.last.size > 1) {
156157
// Build plans for the next level.
157-
foundPlans += searchLevel(foundPlans, conf, conditions, topOutput)
158+
foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet)
158159
}
159160

160161
val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
@@ -163,7 +164,14 @@ object JoinReorderDP extends PredicateHelper with Logging {
163164

164165
// The last level must have one and only one plan, because all items are joinable.
165166
assert(foundPlans.size == items.length && foundPlans.last.size == 1)
166-
foundPlans.last.head._2.plan
167+
foundPlans.last.head._2.plan match {
168+
case p @ Project(projectList, j: Join) if projectList != output =>
169+
assert(topOutputSet == p.outputSet)
170+
// Keep the same order of final output attributes.
171+
p.copy(projectList = output)
172+
case finalPlan =>
173+
finalPlan
174+
}
167175
}
168176

169177
/** Find all possible plans at the next level, based on existing levels. */
@@ -254,10 +262,10 @@ object JoinReorderDP extends PredicateHelper with Logging {
254262
val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds
255263
val remainingConds = conditions -- collectedJoinConds
256264
val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput
257-
val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains)
265+
val neededFromNewJoin = newJoin.output.filter(neededAttr.contains)
258266
val newPlan =
259267
if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {
260-
Project(neededFromNewJoin.toSeq, newJoin)
268+
Project(neededFromNewJoin, newJoin)
261269
} else {
262270
newJoin
263271
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,19 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
198198
assertEqualPlans(originalPlan, bestPlan)
199199
}
200200

201+
test("keep the order of attributes in the final output") {
202+
val outputLists = Seq("t1.k-1-2", "t1.v-1-10", "t3.v-1-100").permutations
203+
while (outputLists.hasNext) {
204+
val expectedOrder = outputLists.next().map(nameToAttr)
205+
val expectedPlan =
206+
t1.join(t3, Inner, Some(nameToAttr("t1.v-1-10") === nameToAttr("t3.v-1-100")))
207+
.join(t2, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5")))
208+
.select(expectedOrder: _*)
209+
// The plan should not change after optimization
210+
assertEqualPlans(expectedPlan, expectedPlan)
211+
}
212+
}
213+
201214
private def assertEqualPlans(
202215
originalPlan: LogicalPlan,
203216
groundTruthBestPlan: LogicalPlan): Unit = {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ abstract class PlanTest extends SparkFunSuite with PredicateHelper {
126126
case (j1: Join, j2: Join) =>
127127
(sameJoinPlan(j1.left, j2.left) && sameJoinPlan(j1.right, j2.right)) ||
128128
(sameJoinPlan(j1.left, j2.right) && sameJoinPlan(j1.right, j2.left))
129-
case _ if plan1.children.nonEmpty && plan2.children.nonEmpty =>
130-
(plan1.children, plan2.children).zipped.forall { case (c1, c2) => sameJoinPlan(c1, c2) }
129+
case (p1: Project, p2: Project) =>
130+
p1.projectList == p2.projectList && sameJoinPlan(p1.child, p2.child)
131131
case _ =>
132132
plan1 == plan2
133133
}

0 commit comments

Comments
 (0)