Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,11 @@
"Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses<treeNode>"
]
},
"UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE" : {
"message" : [
"Correlated column reference '<expr>' cannot be <dataType> type"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this is better!

]
},
"UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY" : {
"message" : [
"Correlated scalar subqueries can only be used in filters, aggregations, projections, and UPDATE/MERGE/DELETE commands<treeNode>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.sql.catalyst.optimizer

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.util.collection.Utils

/**
Expand Down Expand Up @@ -360,7 +361,20 @@ object DecorrelateInnerQuery extends PredicateHelper {
// +- Aggregate [a1] [a1 AS a']
// +- OuterQuery
val conditions = outerReferenceMap.map {
case (o, a) => EqualNullSafe(a, OuterReference(o))
case (o, a) =>
val cond = EqualNullSafe(a, OuterReference(o))
// SPARK-40615: Certain data types (e.g. MapType) do not support ordering, so
// the EqualNullSafe join condition can become unresolved.
if (!cond.resolved) {
if (!RowOrdering.isOrderable(a.dataType)) {
throw QueryCompilationErrors.unsupportedCorrelatedReferenceDataTypeError(
o, a.dataType, plan.origin)
} else {
throw SparkException.internalError(s"Unable to decorrelate subquery: " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I saw Wenchen's comment above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s"join condition '${cond.sql}' cannot be resolved.")
}
}
cond
}
(domainJoin, conditions.toSeq, AttributeMap(outerReferenceMap))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1944,6 +1944,17 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
messageParameters = Map("function" -> funcStr))
}

def unsupportedCorrelatedReferenceDataTypeError(
expr: Expression,
dataType: DataType,
origin: Origin): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"UNSUPPORTED_CORRELATED_REFERENCE_DATA_TYPE",
origin = origin,
messageParameters = Map("expr" -> expr.sql, "dataType" -> dataType.typeName))
}

def functionCannotProcessInputError(
unbound: UnboundFunction,
arguments: Seq[Expression],
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2336,4 +2336,30 @@ class SubquerySuite extends QueryTest
checkAnswer(sql("select(select sum((select sum(col) from t)) from t)"), Row(null))
}
}

test("SPARK-40615: Check unsupported data type when decorrelating subqueries") {
withTempView("v1", "v2") {
sql(
"""
|create temp view v1(x) as values
|from_json('{"a":1, "b":2}', 'map<string,int>') t(x)
|""".stripMargin)
sql(
"""
|create temp view v2(x) as values
|from_json('{"b":0, "c":2}', 'map<string,int>') t(x)
|""".stripMargin)

// Can use non-orderable data type in one row subquery that can be collapsed.
checkAnswer(
sql("select (select a + a from (select x['a'] as a)) from v1"),
Row(2))

// Cannot use non-orderable data type in one row subquery that cannot be collapsed.
val error = intercept[AnalysisException] {
sql("select (select a + a from (select upper(x['a']) as a)) from v1").collect()
}
assert(error.getMessage.contains("Correlated column reference 'v1.x' cannot be map type"))
}
}
}