Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ import org.apache.spark.sql.types._
object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it also mean This batch must be executed after the RewriteSubquery batch, which creates joins. is not definitely true now?

Copy link
Contributor Author

@cloud-fan cloud-fan Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's still true, the correlated subquery becomes join, and may have new join keys.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Makes sense.


def apply(plan: LogicalPlan): LogicalPlan = plan match {
// A subquery will be rewritten into join later, and will go through this rule
// eventually. Here we skip subquery, as we only need to run this rule once.
case _: Subquery => plan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding tests for the subquery case in NormalizeFloatingPointNumbersSuite, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we can't.

This fix relies on the rule OptimizeSubqueries, which is an inner object of the class Optimizer as it needs to rerun the entire optimizer for subquery. So we can't use OptimizeSubqueries in NormalizeFloatingPointNumbersSuite.


case _ => plan transform {
case w: Window if w.partitionSpec.exists(p => needNormalize(p)) =>
// Although the `windowExpressions` may refer to `partitionSpec` expressions, we don't need
Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3503,6 +3503,24 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkAnswer(sql("select CAST(-32768 as short) DIV CAST (-1 as short)"),
Seq(Row(Short.MinValue.toLong * -1)))
}

test("normalize special floating numbers in subquery") {
withTempView("v1", "v2", "v3") {
Seq(-0.0).toDF("d").createTempView("v1")
Seq(0.0).toDF("d").createTempView("v2")
spark.range(2).createTempView("v3")

// non-correlated subquery
checkAnswer(sql("SELECT (SELECT v1.d FROM v1 JOIN v2 ON v1.d = v2.d)"), Row(-0.0))
// correlated subquery
checkAnswer(
sql(
"""
|SELECT id FROM v3 WHERE EXISTS
| (SELECT v1.d FROM v1 JOIN v2 ON v1.d = v2.d WHERE id > 0)
|""".stripMargin), Row(1))
}
}
}

case class Foo(bar: Option[String])