Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,11 @@
"Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses<treeNode>"
]
},
"UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE" : {
"message" : [
"Correlated column reference '<expr>' cannot be <dataType> type"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this is better!

]
},
"UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY" : {
"message" : [
"Correlated scalar subqueries can only be used in filters, aggregations, projections, and UPDATE/MERGE/DELETE commands<treeNode>"
Expand Down Expand Up @@ -3133,4 +3138,4 @@
"<className> must override either <m1> or <m2>"
]
}
}
}
Copy link
Contributor

@amaliujia amaliujia Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: revert this.

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.sql.catalyst.optimizer

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.util.collection.Utils

/**
Expand Down Expand Up @@ -360,7 +361,20 @@ object DecorrelateInnerQuery extends PredicateHelper {
// +- Aggregate [a1] [a1 AS a']
// +- OuterQuery
val conditions = outerReferenceMap.map {
case (o, a) => EqualNullSafe(a, OuterReference(o))
case (o, a) =>
val cond = EqualNullSafe(a, OuterReference(o))
// SPARK-40615: Certain data types (e.g. MapType) do not support ordering, so
// the EqualNullSafe join condition can become unresolved.
if (!cond.resolved) {
if (!RowOrdering.isOrderable(a.dataType)) {
throw QueryCompilationErrors.unsupportedCorrelatedReferenceDataTypeError(
o, a.dataType, plan.origin)
} else {
throw SparkException.internalError(s"Unable to decorrelate subquery: " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I saw Wenchen's comment above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s"join condition '${cond.sql}' cannot be resolved.")
}
}
cond
}
(domainJoin, conditions.toSeq, AttributeMap(outerReferenceMap))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1944,6 +1944,17 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
messageParameters = Map("function" -> funcStr))
}

def unsupportedCorrelatedReferenceDataTypeError(
expr: Expression,
dataType: DataType,
origin: Origin): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE",
origin = origin,
messageParameters = Map("expr" -> expr.sql, "dataType" -> dataType.typeName))
}

def functionCannotProcessInputError(
unbound: UnboundFunction,
arguments: Seq[Expression],
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2336,4 +2336,30 @@ class SubquerySuite extends QueryTest
checkAnswer(sql("select(select sum((select sum(col) from t)) from t)"), Row(null))
}
}

test("SPARK-40615: Check unsupported data type when decorrelating subqueries") {
withTempView("v1", "v2") {
sql(
"""
|create temp view v1(x) as values
|from_json('{"a":1, "b":2}', 'map<string,int>') t(x)
|""".stripMargin)
sql(
"""
|create temp view v2(x) as values
|from_json('{"b":0, "c":2}', 'map<string,int>') t(x)
|""".stripMargin)

// Can use non-orderable data type in one row subquery that can be collapsed.
checkAnswer(
sql("select (select a + a from (select x['a'] as a)) from v1"),
Row(2))

// Cannot use non-orderable data type in one row subquery that cannot be collapsed.
val error = intercept[AnalysisException] {
sql("select (select a + a from (select upper(x['a']) as a)) from v1").collect()
}
assert(error.getMessage.contains("Correlated column reference 'v1.x' cannot be map type"))
}
}
}