Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ license: |

- Since Spark 3.0, if files or subdirectories disappear during recursive directory listing (i.e. they appear in an intermediate listing but then cannot be read or listed during later phases of the recursive directory listing, due to either concurrent file deletions or object store consistency issues) then the listing will fail with an exception unless `spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous versions, these missing files or subdirectories would be ignored. Note that this change of behavior only applies during initial table file listing (or during `REFRESH TABLE`), not during query execution: the net change is that `spark.sql.files.ignoreMissingFiles` is now obeyed during table file listing / query planning, not only at query execution time.

- Since Spark 3.0, substitution order of nested WITH clauses is changed and an inner CTE definition takes precedence over an outer. In version 2.4 and earlier, `WITH t AS (SELECT 1), t2 AS (WITH t AS (SELECT 2) SELECT * FROM t) SELECT * FROM t2` returns `1` while in version 3.0 it returns `2`. The previous behaviour can be restored by setting `spark.sql.legacy.ctePrecedence.enabled` to `true`.
- Since Spark 3.0, Spark throws an AnalysisException if name conflict is detected in the nested WITH clause. It forces the users to choose the specific substitution order they wanted, which is controlled by `spark.sql.legacy.ctePrecedence.enabled`. If set to false, inner CTE definitions take precedence over outer definitions. For example, set the config to `false`, `WITH t AS (SELECT 1), t2 AS (WITH t AS (SELECT 2) SELECT * FROM t) SELECT * FROM t2` returns `2`, while setting it to `true`, the result is `1` which is the behavior in version 2.4 and earlier.

- Since Spark 3.0, the `add_months` function does not adjust the resulting date to a last day of month if the original date is a last day of months. For example, `select add_months(DATE'2019-02-28', 1)` results `2019-03-28`. In Spark version 2.4 and earlier, the resulting date is adjusted when the original date is a last day of months. For example, adding a month to `2019-02-28` results in `2019-03-31`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, With}
import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -28,13 +29,55 @@ import org.apache.spark.sql.internal.SQLConf.LEGACY_CTE_PRECEDENCE_ENABLED
*/
object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
if (SQLConf.get.getConf(LEGACY_CTE_PRECEDENCE_ENABLED)) {
val isLegacy = SQLConf.get.getConf(LEGACY_CTE_PRECEDENCE_ENABLED)
if (isLegacy.isEmpty) {
if (detectConflictInNestedCTE(plan, inTraverse = false)) {
throw new AnalysisException("Name collision in nested CTE was founded in current plan, " +
s"please select the CTE precedence via ${LEGACY_CTE_PRECEDENCE_ENABLED.key}, " +
"see more detail in SPARK-28228.")
} else {
traverseAndSubstituteCTE(plan, inTraverse = false)
}
} else if (isLegacy.get) {
legacyTraverseAndSubstituteCTE(plan)
} else {
traverseAndSubstituteCTE(plan, false)
traverseAndSubstituteCTE(plan, inTraverse = false)
}
}

/**
* Check the plan to be traversed has naming conflicts in nested CTE or not, traverse through
* child, innerChildren and subquery for the current plan.
*/
private def detectConflictInNestedCTE(
plan: LogicalPlan,
inTraverse: Boolean,
cteNames: Set[String] = Set.empty): Boolean = {
plan.foreach {
case w @ With(child, relations) =>
val newNames = relations.map {
case (cteName, _) =>
if (cteNames.contains(cteName)) {
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can throw exception here so that we know which name is conflicting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in c03920a.

} else {
cteName
}
}.toSet
(w.innerChildren :+ child).foreach { p =>
Copy link
Contributor

@peter-toth peter-toth Feb 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be:

        child.transformExpressions {
          case e: SubqueryExpression =>
            assertNoNameConflictsInCTE(e.plan, inTraverse = true, cteNames ++ newNames)
            e
        }
        w.innerChildren.foreach { p =>
          assertNoNameConflictsInCTE(p, inTraverse = true, cteNames ++ newNames)
        }

If you check CTE in subquery shadows outer test cases you will see that legacy (https://github.com/apache/spark/blob/master/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out#L113-L151) and new results (https://github.com/apache/spark/blob/master/sql/core/src/test/resources/sql-tests/results/cte.sql.out#L248-L286) are the same. We shouldn't give AnalysisException in those cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed checking! Yes, that makes sense, we should only give exception while legacy and non-legacy give different results. Done in 7249404

if (detectConflictInNestedCTE(p, inTraverse = true, cteNames ++ newNames)) return true
}

case other if inTraverse =>
other.transformExpressions {
case e: SubqueryExpression
if detectConflictInNestedCTE(e.plan, inTraverse = true, cteNames) => return true
}

case _ =>
}
false
}

private def legacyTraverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperatorsUp {
case With(child, relations) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2079,7 +2079,7 @@ object SQLConf {
.internal()
.doc("When true, outer CTE definitions takes precedence over inner definitions.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, it became three-state conf. Shall we mentioned more about false and empty?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds we should.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done in 7249404.

.booleanConf
.createWithDefault(false)
.createOptional

val LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC =
buildConf("spark.sql.legacy.arrayExistsFollowsThreeValuedLogic")
Expand Down
115 changes: 115 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/cte-nonlegacy.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
create temporary view t as select * from values 0, 1, 2 as t(id);
create temporary view t2 as select * from values 0, 1 as t(id);

-- CTE non-legacy substitution
SET spark.sql.legacy.ctePrecedence.enabled=false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use --SET spark.sql.legacy.ctePrecedence.enabled=false, so that this won't be treated as a query and appear in the result query.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use --IMPORT cte.sql to include the existing test

-- CTE in CTE definition
WITH t as (
WITH t2 AS (SELECT 1)
SELECT * FROM t2
)
SELECT * FROM t;

-- CTE in subquery
SELECT max(c) FROM (
WITH t(c) AS (SELECT 1)
SELECT * FROM t
);

-- CTE in subquery expression
SELECT (
WITH t AS (SELECT 1)
SELECT * FROM t
);

-- CTE in CTE definition shadows outer
WITH
t AS (SELECT 1),
t2 AS (
WITH t AS (SELECT 2)
SELECT * FROM t
)
SELECT * FROM t2;

-- CTE in CTE definition shadows outer 2
WITH
t(c) AS (SELECT 1),
t2 AS (
SELECT (
SELECT max(c) FROM (
WITH t(c) AS (SELECT 2)
SELECT * FROM t
)
)
)
SELECT * FROM t2;

-- CTE in CTE definition shadows outer 3
WITH
t AS (SELECT 1),
t2 AS (
WITH t AS (SELECT 2),
t2 AS (
WITH t AS (SELECT 3)
SELECT * FROM t
)
SELECT * FROM t2
)
SELECT * FROM t2;

-- CTE in subquery shadows outer
WITH t(c) AS (SELECT 1)
SELECT max(c) FROM (
WITH t(c) AS (SELECT 2)
SELECT * FROM t
);

-- CTE in subquery shadows outer 2
WITH t(c) AS (SELECT 1)
SELECT sum(c) FROM (
SELECT max(c) AS c FROM (
WITH t(c) AS (SELECT 2)
SELECT * FROM t
)
);

-- CTE in subquery shadows outer 3
WITH t(c) AS (SELECT 1)
SELECT sum(c) FROM (
WITH t(c) AS (SELECT 2)
SELECT max(c) AS c FROM (
WITH t(c) AS (SELECT 3)
SELECT * FROM t
)
);

-- CTE in subquery expression shadows outer
WITH t AS (SELECT 1)
SELECT (
WITH t AS (SELECT 2)
SELECT * FROM t
);

-- CTE in subquery expression shadows outer 2
WITH t AS (SELECT 1)
SELECT (
SELECT (
WITH t AS (SELECT 2)
SELECT * FROM t
)
);

-- CTE in subquery expression shadows outer 3
WITH t AS (SELECT 1)
SELECT (
WITH t AS (SELECT 2)
SELECT (
WITH t AS (SELECT 3)
SELECT * FROM t
)
);

-- Clean up
DROP VIEW IF EXISTS t;
DROP VIEW IF EXISTS t2;
Loading