Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,53 +31,44 @@ object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan, inTraverse = false)
traverseAndSubstituteCTE(plan, inTraverse = false)
assertNoNameConflictsInCTE(plan)
traverseAndSubstituteCTE(plan)
case LegacyBehaviorPolicy.LEGACY =>
legacyTraverseAndSubstituteCTE(plan)
case LegacyBehaviorPolicy.CORRECTED =>
traverseAndSubstituteCTE(plan, inTraverse = false)
traverseAndSubstituteCTE(plan)
}
}

/**
* Check the plan to be traversed has naming conflicts in nested CTE or not, traverse through
* child, innerChildren and subquery for the current plan.
* child, innerChildren and subquery expressions for the current plan.
*/
private def assertNoNameConflictsInCTE(
plan: LogicalPlan,
inTraverse: Boolean,
cteNames: Set[String] = Set.empty): Unit = {
plan.foreach {
outerCTERelationNames: Set[String] = Set.empty,
namesInSubqueries: Set[String] = Set.empty): Unit = {
plan match {
case w @ With(child, relations) =>
val newNames = relations.map {
case (cteName, _) =>
if (cteNames.contains(cteName)) {
if (outerCTERelationNames.contains(cteName)) {
throw new AnalysisException(s"Name $cteName is ambiguous in nested CTE. " +
s"Please set ${LEGACY_CTE_PRECEDENCE_POLICY.key} to CORRECTED so that name " +
"defined in inner CTE takes precedence. If set it to LEGACY, outer CTE " +
"definitions will take precedence. See more details in SPARK-28228.")
} else {
cteName
}
}.toSet
child.transformExpressions {
case e: SubqueryExpression =>
assertNoNameConflictsInCTE(e.plan, inTraverse = true, cteNames ++ newNames)
e
}
w.innerChildren.foreach { p =>
assertNoNameConflictsInCTE(p, inTraverse = true, cteNames ++ newNames)
}

case other if inTraverse =>
other.transformExpressions {
case e: SubqueryExpression =>
assertNoNameConflictsInCTE(e.plan, inTraverse = true, cteNames)
e
}
}.toSet ++ namesInSubqueries
assertNoNameConflictsInCTE(child, outerCTERelationNames, newNames)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 3 kinds of conflicts:

  1. conflicts in the children, e.g. with t as (...) select * from (with t as (...) select * from t)
  2. conflicts in other CTE relations, e.g. with t as (...), t2 as (with t as (...) select * from t) ...
  3. conflicts in the subqueries of the children, e.g. with t as (...) select ... where ... in (with t as (...) select * from t)

The third one can be handled correctly by 2.4 as well, so we shouldn't fail.

Can you briefly explain how you check the first 2 conflicts and skip the third one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought 3. is not handled correctly in 2.4. (I haven't tested it on Spark 2.4.) But it certainly produces different results in Spark 3 in the 2 different modes.

Copy link
Contributor Author

@peter-toth peter-toth Apr 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked 3. in Spark 2.4 with the query in the description and it returns 1 so is not handled correctly and we should fail.

  1. In this case the legacy and corrected mode returns the same result due to the same substitution order so we shouldn't throw an exception. If we call the possible conflicting set of names coming from outer CTEs as outerCTERelationNames then I handle this case by not extending the set with new names when I traverse down the child of a With (https://github.com/apache/spark/pull/28318/files#diff-d0bfa3367c63988ad7cf33397e643e75R64) or when I traverse down the children of any other node (https://github.com/apache/spark/pull/28318/files#diff-d0bfa3367c63988ad7cf33397e643e75R70-R71).

  2. and 3. In these cases, the two modes have different substitution order and so possibly different results. Obviously we need an extended set of names (newNames) when we encounter a With and switch to that set (pass in as a new outerCTERelationNames) when we traverse down inner children (2.)( https://github.com/apache/spark/pull/28318/files#diff-d0bfa3367c63988ad7cf33397e643e75R65) or traverse down subquery expressions (3.)(https://github.com/apache/spark/pull/28318/files#diff-d0bfa3367c63988ad7cf33397e643e75R68-R69)

So what is the contents of outerCTERelationNames? The set of CTE names coming from outer CTEs. It doesn't contain the CTEs coming from a parent With from the same LogicalPlan (1.). But it contains outer CTEs coming from Withs from outer LogicalPlans (2., 3.)
And the content of namesInSubqueries is simply all CTE names from all outer Withs, no matter if the With is the same plan or in an outer plan.

So I think what we need to do here is basically skip the 1. and check 2. and 3.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @peter-toth .

w.innerChildren.foreach(assertNoNameConflictsInCTE(_, newNames, newNames))

case _ =>
case other =>
other.subqueries.foreach(
assertNoNameConflictsInCTE(_, namesInSubqueries, namesInSubqueries))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be assertNoNameConflictsInCTE(_, namesInSubqueries, Nil)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we wouldn't report

WITH t AS (SELECT 1)
SELECT (
  SELECT (
    WITH t AS (SELECT 2)
    SELECT * FROM t
  )
)

as ambiguous, but it returns 1 in legacy and 2 in corrected modes.

other.children.foreach(
assertNoNameConflictsInCTE(_, outerCTERelationNames, namesInSubqueries))
}
}

Expand Down Expand Up @@ -131,37 +122,27 @@ object CTESubstitution extends Rule[LogicalPlan] {
* SELECT * FROM t
* )
* @param plan the plan to be traversed
* @param inTraverse whether the current traverse is called from another traverse, only in this
* case name collision can occur
* @return the plan where CTE substitution is applied
*/
private def traverseAndSubstituteCTE(plan: LogicalPlan, inTraverse: Boolean): LogicalPlan = {
private def traverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperatorsUp {
case With(child: LogicalPlan, relations) =>
// child might contain an inner CTE that has priority so traverse and substitute inner CTEs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an incorrect assumption that only direct child can contain a nested, conflicting CTE as subquery expression. As the added new test case shows such a conflicting CTE can be in any node under With.

// in child first
val traversedChild: LogicalPlan = child transformExpressions {
case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan, true))
}

// Substitute CTE definitions from last to first as a CTE definition can reference a
// previous one
relations.foldRight(traversedChild) {
relations.foldRight(child) {
case ((cteName, ctePlan), currentPlan) =>
// A CTE definition might contain an inner CTE that has priority, so traverse and
// substitute CTE defined in ctePlan.
// A CTE definition might not be used at all or might be used multiple times. To avoid
// computation if it is not used and to avoid multiple recomputation if it is used
// multiple times we use a lazy construct with call-by-name parameter passing.
lazy val substitutedCTEPlan = traverseAndSubstituteCTE(ctePlan, true)
lazy val substitutedCTEPlan = traverseAndSubstituteCTE(ctePlan)
substituteCTE(currentPlan, cteName, substitutedCTEPlan)
}

// CTE name collision can occur only when inTraverse is true, it helps to avoid eager CTE
// substitution in a subquery expression.
case other if inTraverse =>
case other =>
other.transformExpressions {
case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan, true))
case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan))
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,11 @@ SELECT (
SELECT * FROM t
)
);

-- CTE in subquery expression shadows outer 4
WITH t(c) AS (SELECT 1)
SELECT * FROM t
WHERE c IN (
WITH t(c) AS (SELECT 2)
SELECT * FROM t
);
15 changes: 14 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 12
-- Number of queries: 13


-- !query
Expand Down Expand Up @@ -166,3 +166,16 @@ SELECT (
struct<scalarsubquery():int>
-- !query output
1


-- !query
WITH t(c) AS (SELECT 1)
SELECT * FROM t
WHERE c IN (
WITH t(c) AS (SELECT 2)
SELECT * FROM t
)
-- !query schema
struct<c:int>
-- !query output
1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding a legacy result.

16 changes: 15 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 12
-- Number of queries: 13


-- !query
Expand Down Expand Up @@ -172,3 +172,17 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Name t is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.;


-- !query
WITH t(c) AS (SELECT 1)
SELECT * FROM t
WHERE c IN (
WITH t(c) AS (SELECT 2)
SELECT * FROM t
)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Name t is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 12
-- Number of queries: 13


-- !query
Expand Down Expand Up @@ -166,3 +166,16 @@ SELECT (
struct<scalarsubquery():int>
-- !query output
3


-- !query
WITH t(c) AS (SELECT 1)
SELECT * FROM t
WHERE c IN (
WITH t(c) AS (SELECT 2)
SELECT * FROM t
)
-- !query schema
struct<c:int>
-- !query output