diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index d27a2cbde972..f5110aeeb083 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -879,6 +879,11 @@ "Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses" ] }, + "UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE" : { + "message" : [ + "Correlated column reference '' cannot be type" + ] + }, "UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY" : { "message" : [ "Correlated scalar subqueries can only be used in filters, aggregations, projections, and UPDATE/MERGE/DELETE commands" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala index e61ad8e1fab7..53c09a3f68d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala @@ -19,12 +19,13 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE -import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.util.collection.Utils /** @@ -360,7 +361,20 @@ object DecorrelateInnerQuery extends PredicateHelper { // +- Aggregate [a1] [a1 AS a'] // +- OuterQuery val conditions = outerReferenceMap.map { - case (o, a) => EqualNullSafe(a, OuterReference(o)) + case (o, a) => + val cond = EqualNullSafe(a, OuterReference(o)) + // SPARK-40615: Certain data types (e.g. MapType) do not support ordering, so + // the EqualNullSafe join condition can become unresolved. + if (!cond.resolved) { + if (!RowOrdering.isOrderable(a.dataType)) { + throw QueryCompilationErrors.unsupportedCorrelatedReferenceDataTypeError( + o, a.dataType, plan.origin) + } else { + throw SparkException.internalError(s"Unable to decorrelate subquery: " + + s"join condition '${cond.sql}' cannot be resolved.") + } + } + cond } (domainJoin, conditions.toSeq, AttributeMap(outerReferenceMap)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index cfb18c03760f..20859ffd3e34 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1944,6 +1944,17 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { messageParameters = Map("function" -> funcStr)) } + def unsupportedCorrelatedReferenceDataTypeError( + expr: Expression, + dataType: DataType, + origin: Origin): Throwable = { + new AnalysisException( + errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + + "UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE", + origin = origin, + messageParameters = Map("expr" -> expr.sql, "dataType" -> dataType.typeName)) + } + def functionCannotProcessInputError( unbound: UnboundFunction, arguments: Seq[Expression], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 80b9fc767d56..54ab77950440 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2336,4 +2336,30 @@ class SubquerySuite extends QueryTest checkAnswer(sql("select(select sum((select sum(col) from t)) from t)"), Row(null)) } } + + test("SPARK-40615: Check unsupported data type when decorrelating subqueries") { + withTempView("v1", "v2") { + sql( + """ + |create temp view v1(x) as values + |from_json('{"a":1, "b":2}', 'map') t(x) + |""".stripMargin) + sql( + """ + |create temp view v2(x) as values + |from_json('{"b":0, "c":2}', 'map') t(x) + |""".stripMargin) + + // Can use non-orderable data type in one row subquery that can be collapsed. + checkAnswer( + sql("select (select a + a from (select x['a'] as a)) from v1"), + Row(2)) + + // Cannot use non-orderable data type in one row subquery that cannot be collapsed. + val error = intercept[AnalysisException] { + sql("select (select a + a from (select upper(x['a']) as a)) from v1").collect() + } + assert(error.getMessage.contains("Correlated column reference 'v1.x' cannot be map type")) + } + } }