Skip to content

Commit 095862a

Browse files
tejasapatilhvanhovell
authored andcommitted
[SPARK-17271][SQL] Planner adds un-necessary Sort even if child ordering is semantically same as required ordering
## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-17271 Planner is adding un-needed SORT operation due to bug in the way comparison for `SortOrder` is done at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L253 `SortOrder` needs to be compared semantically because `Expression` within two `SortOrder` can be "semantically equal" but not literally equal objects. eg. In case of `sql("SELECT * FROM table1 a JOIN table2 b ON a.col1=b.col1")` Expression in required SortOrder: ``` AttributeReference( name = "col1", dataType = LongType, nullable = false ) (exprId = exprId, qualifier = Some("a") ) ``` Expression in child SortOrder: ``` AttributeReference( name = "col1", dataType = LongType, nullable = false ) (exprId = exprId) ``` Notice that the output column has a qualifier but the child attribute does not but the inherent expression is the same and hence in this case we can say that the child satisfies the required sort order. This PR includes following changes: - Added a `semanticEquals` method to `SortOrder` so that it can compare underlying child expressions semantically (and not using default Object.equals) - Fixed `EnsureRequirements` to use semantic comparison of SortOrder ## How was this patch tested? - Added a test case to `PlannerSuite`. Ran rest tests in `PlannerSuite` Author: Tejas Patil <[email protected]> Closes #14841 from tejasapatil/SPARK-17271_sort_order_equals_bug.
1 parent e07baf1 commit 095862a

File tree

3 files changed

+52
-2
lines changed

3 files changed

+52
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ case class SortOrder(child: Expression, direction: SortDirection)
6161
override def sql: String = child.sql + " " + direction.sql
6262

6363
def isAscending: Boolean = direction == Ascending
64+
65+
def semanticEquals(other: SortOrder): Boolean =
66+
(direction == other.direction) && child.semanticEquals(other.child)
6467
}
6568

6669
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,16 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
250250
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
251251
if (requiredOrdering.nonEmpty) {
252252
// If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
253-
if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
253+
val orderingMatched = if (requiredOrdering.length > child.outputOrdering.length) {
254+
false
255+
} else {
256+
requiredOrdering.zip(child.outputOrdering).forall {
257+
case (requiredOrder, childOutputOrder) =>
258+
requiredOrder.semanticEquals(childOutputOrder)
259+
}
260+
}
261+
262+
if (!orderingMatched) {
254263
SortExec(requiredOrdering, global = false, child = child)
255264
} else {
256265
child

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.{execution, DataFrame, Row}
2222
import org.apache.spark.sql.catalyst.InternalRow
23-
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder}
23+
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.plans.Inner
2525
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
2626
import org.apache.spark.sql.catalyst.plans.physical._
@@ -444,6 +444,44 @@ class PlannerSuite extends SharedSQLContext {
444444
}
445445
}
446446

447+
test("EnsureRequirements skips sort when required ordering is semantically equal to " +
448+
"existing ordering") {
449+
val exprId: ExprId = NamedExpression.newExprId
450+
val attribute1 =
451+
AttributeReference(
452+
name = "col1",
453+
dataType = LongType,
454+
nullable = false
455+
) (exprId = exprId,
456+
qualifier = Some("col1_qualifier")
457+
)
458+
459+
val attribute2 =
460+
AttributeReference(
461+
name = "col1",
462+
dataType = LongType,
463+
nullable = false
464+
) (exprId = exprId)
465+
466+
val orderingA1 = SortOrder(attribute1, Ascending)
467+
val orderingA2 = SortOrder(attribute2, Ascending)
468+
469+
assert(orderingA1 != orderingA2, s"$orderingA1 should NOT equal to $orderingA2")
470+
assert(orderingA1.semanticEquals(orderingA2),
471+
s"$orderingA1 should be semantically equal to $orderingA2")
472+
473+
val inputPlan = DummySparkPlan(
474+
children = DummySparkPlan(outputOrdering = Seq(orderingA1)) :: Nil,
475+
requiredChildOrdering = Seq(Seq(orderingA2)),
476+
requiredChildDistribution = Seq(UnspecifiedDistribution)
477+
)
478+
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
479+
assertDistributionRequirementsAreSatisfied(outputPlan)
480+
if (outputPlan.collect { case s: SortExec => true }.nonEmpty) {
481+
fail(s"No sorts should have been added:\n$outputPlan")
482+
}
483+
}
484+
447485
// This is a regression test for SPARK-11135
448486
test("EnsureRequirements adds sort when required ordering isn't a prefix of existing ordering") {
449487
val orderingA = SortOrder(Literal(1), Ascending)

0 commit comments

Comments
 (0)