From bcaa235b95393e7858a11951943d545472124fd4 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 24 Jan 2022 20:18:34 +0300 Subject: [PATCH 01/17] Reuse UNSUPPORTED_FEATURE from literalTypeUnsupportedError --- core/src/main/resources/error/error-classes.json | 8 -------- .../spark/sql/errors/QueryExecutionErrors.scala | 11 ++++------- .../sql/errors/QueryExecutionErrorsSuite.scala | 13 ++++++++++++- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 2e7831bdb415a..c40c0cc1c489f 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -141,10 +141,6 @@ "message" : [ "Unrecognized SQL type %s" ], "sqlState" : "42000" }, - "UNSUPPORTED_CHANGE_COLUMN" : { - "message" : [ "Please add an implementation for a column change here" ], - "sqlState" : "0A000" - }, "UNSUPPORTED_DATATYPE" : { "message" : [ "Unsupported data type %s" ], "sqlState" : "0A000" @@ -153,10 +149,6 @@ "message" : [ "The feature is not supported: %s" ], "sqlState" : "0A000" }, - "UNSUPPORTED_LITERAL_TYPE" : { - "message" : [ "Unsupported literal type %s %s" ], - "sqlState" : "0A000" - }, "UNSUPPORTED_SIMPLE_STRING_WITH_NODE_ID" : { "message" : [ "%s does not implement simpleStringWithNodeId" ] }, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 384016216f668..fdba2709ec813 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -68,11 +68,6 @@ import org.apache.spark.util.CircularBuffer */ object QueryExecutionErrors { - def columnChangeUnsupportedError(): Throwable = { - new SparkUnsupportedOperationException(errorClass = "UNSUPPORTED_CHANGE_COLUMN", - messageParameters = Array.empty) - } - def logicalHintOperatorNotRemovedDuringAnalysisError(): Throwable = { new SparkIllegalStateException(errorClass = "INTERNAL_ERROR", messageParameters = Array( @@ -257,8 +252,10 @@ object QueryExecutionErrors { } def literalTypeUnsupportedError(v: Any): RuntimeException = { - new SparkRuntimeException("UNSUPPORTED_LITERAL_TYPE", - Array(v.getClass.toString, v.toString)) + new SparkRuntimeException( + errorClass = "UNSUPPORTED_FEATURE", + messageParameters = Array( + s"literal for '${v.toString}' of ${v.getClass.toString}.")) } def noDefaultForDataTypeError(dataType: DataType): RuntimeException = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 5137614a366d1..acb06f4d848ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.errors import org.apache.spark.{SparkException, SparkRuntimeException} import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.test.SharedSparkSession class QueryExecutionErrorsSuite extends QueryTest with SharedSparkSession { @@ -89,7 +90,7 @@ class QueryExecutionErrorsSuite extends QueryTest with SharedSparkSession { } } - test("UNSUPPORTED_MODE: unsupported combinations of AES modes and padding") { + test("UNSUPPORTED_FEATURE: unsupported combinations of AES modes and padding") { val key16 = "abcdefghijklmnop" val key32 = "abcdefghijklmnop12345678ABCDEFGH" val (df1, df2) = getAesInputs() @@ -112,4 +113,14 @@ class QueryExecutionErrorsSuite extends QueryTest with SharedSparkSession { checkUnsupportedMode(df2.selectExpr(s"aes_decrypt(value16, '$key16', 'GCM', 'PKCS')")) checkUnsupportedMode(df2.selectExpr(s"aes_decrypt(value32, '$key32', 'ECB', 'None')")) } + + test("UNSUPPORTED_FEATURE: creating a literal of unsupported type") { + val e = intercept[SparkRuntimeException] { + spark.range(1).select($"value" + Literal(java.time.Year.of(1000))) + } + assert(e.getErrorClass === "UNSUPPORTED_FEATURE") + assert(e.getSqlState === "0A000") + assert(e.getMessage.contains("The feature is not supported: " + + "literal for '1000' of class java.time.Year.")) + } } From f02699739643dd230bb04a3e36b871df03bfda11 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 24 Jan 2022 20:28:07 +0300 Subject: [PATCH 02/17] Fix style --- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index fdba2709ec813..9c7d93de76c95 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -254,8 +254,7 @@ object QueryExecutionErrors { def literalTypeUnsupportedError(v: Any): RuntimeException = { new SparkRuntimeException( errorClass = "UNSUPPORTED_FEATURE", - messageParameters = Array( - s"literal for '${v.toString}' of ${v.getClass.toString}.")) + messageParameters = Array(s"literal for '${v.toString}' of ${v.getClass.toString}.")) } def noDefaultForDataTypeError(dataType: DataType): RuntimeException = { From a903ad3aedbf3e17ac1c23cf4501afb034d1844e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 25 Jan 2022 10:01:49 +0300 Subject: [PATCH 03/17] Fix DataFramePivotSuite --- .../test/scala/org/apache/spark/sql/DataFramePivotSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index 32cbb8b457d86..fba8e3e63a86a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -331,7 +331,9 @@ class DataFramePivotSuite extends QueryTest with SharedSparkSession { .agg(sum($"sales.earnings")) .collect() } - assert(exception.getMessage.contains("Unsupported literal type")) + assert(exception.getMessage.contains("The feature is not supported: " + + "literal for '[dotnet,Dummies]' of class " + + "org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema")) } test("SPARK-26403: pivoting by array column") { From 99c79134907468f6f3c063e1ad88578581b9193f Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 25 Jan 2022 10:10:15 +0300 Subject: [PATCH 04/17] Move a test from LiteralExpressionSuite --- .../expressions/LiteralExpressionSuite.scala | 11 ----------- .../sql/errors/QueryExecutionErrorsSuite.scala | 17 ++++++++++------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index b1934a06dc1bf..6ce51f1eec8ca 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -231,17 +231,6 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkStructLiteral((Period.ZERO, ("abc", Duration.ofDays(1)))) } - test("unsupported types (map and struct) in Literal.apply") { - def checkUnsupportedTypeInLiteral(v: Any): Unit = { - val errMsgMap = intercept[RuntimeException] { - Literal(v) - } - assert(errMsgMap.getMessage.startsWith("Unsupported literal type")) - } - checkUnsupportedTypeInLiteral(Map("key1" -> 1, "key2" -> 2)) - checkUnsupportedTypeInLiteral(("mike", 29, 1.0)) - } - test("SPARK-24571: char literals") { checkEvaluation(Literal('X'), "X") checkEvaluation(Literal.create('0'), "0") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index acb06f4d848ac..787376f00aef8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -114,13 +114,16 @@ class QueryExecutionErrorsSuite extends QueryTest with SharedSparkSession { checkUnsupportedMode(df2.selectExpr(s"aes_decrypt(value32, '$key32', 'ECB', 'None')")) } - test("UNSUPPORTED_FEATURE: creating a literal of unsupported type") { - val e = intercept[SparkRuntimeException] { - spark.range(1).select($"value" + Literal(java.time.Year.of(1000))) + test("UNSUPPORTED_FEATURE: unsupported types (map and struct) in Literal.apply") { + def checkUnsupportedTypeInLiteral(v: Any): Unit = { + val e = intercept[SparkRuntimeException] { + Literal(v) + } + assert(e.getErrorClass === "UNSUPPORTED_FEATURE") + assert(e.getSqlState === "0A000") + assert(e.getMessage.contains("The feature is not supported: literal for ")) } - assert(e.getErrorClass === "UNSUPPORTED_FEATURE") - assert(e.getSqlState === "0A000") - assert(e.getMessage.contains("The feature is not supported: " + - "literal for '1000' of class java.time.Year.")) + checkUnsupportedTypeInLiteral(Map("key1" -> 1, "key2" -> 2)) + checkUnsupportedTypeInLiteral(("mike", 29, 1.0)) } } From 32884dabcfd7c3cc59664e4d140997806b2749fb Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 25 Jan 2022 17:00:29 +0300 Subject: [PATCH 05/17] Use UNSUPPORTED_FEATURE instead of UNSUPPORTED_SIMPLE_STRING_WITH_NODE_ID --- core/src/main/resources/error/error-classes.json | 3 --- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 5 +++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index c40c0cc1c489f..d94818dfe82be 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -149,9 +149,6 @@ "message" : [ "The feature is not supported: %s" ], "sqlState" : "0A000" }, - "UNSUPPORTED_SIMPLE_STRING_WITH_NODE_ID" : { - "message" : [ "%s does not implement simpleStringWithNodeId" ] - }, "UNSUPPORTED_TRANSACTION_BY_JDBC_SERVER" : { "message" : [ "The target JDBC server does not support transaction and can only support ALTER TABLE with a single action." ], "sqlState" : "0A000" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 9c7d93de76c95..4e17b3f88455f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -127,8 +127,9 @@ object QueryExecutionErrors { } def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = { - new SparkUnsupportedOperationException(errorClass = "UNSUPPORTED_SIMPLE_STRING_WITH_NODE_ID", - messageParameters = Array(nodeName)) + new SparkUnsupportedOperationException( + errorClass = "UNSUPPORTED_FEATURE", + messageParameters = Array(s"$nodeName does not implement simpleStringWithNodeId")) } def evaluateUnevaluableAggregateUnsupportedError( From 9e67cab14d9eb79f49b44c7bd932cabe94467727 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 25 Jan 2022 17:08:14 +0300 Subject: [PATCH 06/17] UNSUPPORTED_TRANSACTION_BY_JDBC_SERVER -> UNSUPPORTED_FEATURE --- core/src/main/resources/error/error-classes.json | 4 ---- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 6 ++++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index d94818dfe82be..cbfa72cc399c3 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -149,10 +149,6 @@ "message" : [ "The feature is not supported: %s" ], "sqlState" : "0A000" }, - "UNSUPPORTED_TRANSACTION_BY_JDBC_SERVER" : { - "message" : [ "The target JDBC server does not support transaction and can only support ALTER TABLE with a single action." ], - "sqlState" : "0A000" - }, "WRITING_JOB_ABORTED" : { "message" : [ "Writing job aborted" ], "sqlState" : "40000" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 4e17b3f88455f..383dd8d029862 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -781,8 +781,10 @@ object QueryExecutionErrors { } def transactionUnsupportedByJdbcServerError(): Throwable = { - new SparkSQLFeatureNotSupportedException(errorClass = "UNSUPPORTED_TRANSACTION_BY_JDBC_SERVER", - Array.empty) + new SparkSQLFeatureNotSupportedException( + errorClass = "UNSUPPORTED_FEATURE", + messageParameters = Array("the target JDBC server does not support transaction and " + + "can only support ALTER TABLE with a single action.")) } def dataTypeUnsupportedYetError(dataType: DataType): Throwable = { From fc7de5c708a067b20096fa0c7aaad1c72469596e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 25 Jan 2022 17:37:23 +0300 Subject: [PATCH 07/17] IF_PARTITION_NOT_EXISTS_UNSUPPORTED -> UNSUPPORTED_FEATURE --- core/src/main/resources/error/error-classes.json | 3 --- .../apache/spark/sql/errors/QueryCompilationErrors.scala | 4 ++-- .../org/apache/spark/sql/connector/InsertIntoTests.scala | 6 ++++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index cbfa72cc399c3..a1ac99f1a0727 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -51,9 +51,6 @@ "GROUPING_SIZE_LIMIT_EXCEEDED" : { "message" : [ "Grouping sets size cannot be greater than %s" ] }, - "IF_PARTITION_NOT_EXISTS_UNSUPPORTED" : { - "message" : [ "Cannot write, IF NOT EXISTS is not supported for table: %s" ] - }, "ILLEGAL_SUBSTRING" : { "message" : [ "%s cannot contain %s." ] }, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 14f8053233d45..201b8d6c5dc58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -93,8 +93,8 @@ object QueryCompilationErrors { def unsupportedIfNotExistsError(tableName: String): Throwable = { new AnalysisException( - errorClass = "IF_PARTITION_NOT_EXISTS_UNSUPPORTED", - messageParameters = Array(tableName)) + errorClass = "UNSUPPORTED_FEATURE", + messageParameters = Array(s"IF NOT EXISTS for the table '$tableName' by INSERT INTO")) } def nonPartitionColError(partitionName: String): Throwable = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 0dee48fbb5b92..5a0a4baa139fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -292,9 +292,11 @@ trait InsertIntoSQLOnlyTests } verifyTable(t1, spark.emptyDataFrame) - assert(exc.getMessage.contains("Cannot write, IF NOT EXISTS is not supported for table")) + assert(exc.getMessage.contains("The feature is not supported: " + + "IF NOT EXISTS for the table")) assert(exc.getMessage.contains(t1)) - assert(exc.getErrorClass == "IF_PARTITION_NOT_EXISTS_UNSUPPORTED") + assert(exc.getErrorClass === "UNSUPPORTED_FEATURE") + assert(exc.getSqlState === "0A000") } } From 0a0d19fa761deeec2cfd07f67a38eaa1b67257cb Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 25 Jan 2022 17:54:28 +0300 Subject: [PATCH 08/17] Use UNSUPPORTED_FEATURE instead of UNSUPPORTED_DATATYPE in dataTypeUnsupportedError() --- .../expressions/InterpretedUnsafeProjection.scala | 2 +- .../apache/spark/sql/errors/QueryExecutionErrors.scala | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala index d02d1e8b55b9d..9544030ccdc1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala @@ -254,7 +254,7 @@ object InterpretedUnsafeProjection { (_, _) => {} case _ => - throw QueryExecutionErrors.dataTypeUnsupportedError(dt) + throw QueryExecutionErrors.dataTypeUnsupportedForWriterFuncError(dt) } // Always wrap the writer with a null safe version. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 383dd8d029862..bfdddd9e3f0be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -138,9 +138,11 @@ object QueryExecutionErrors { messageParameters = Array(s"Cannot evaluate expression: $methodName: $unEvaluable")) } - def dataTypeUnsupportedError(dt: DataType): Throwable = { - new SparkException(errorClass = "UNSUPPORTED_DATATYPE", - messageParameters = Array(dt.typeName), null) + def dataTypeUnsupportedForWriterFuncError(dt: DataType): Throwable = { + new SparkRuntimeException( + errorClass = "UNSUPPORTED_FEATURE", + messageParameters = Array(s"the data type '${dt.typeName}' while generating " + + "a writer function for a struct field, array element, map key or map value.")) } def dataTypeUnsupportedError(dataType: String, failure: String): Throwable = { From 27cdaab028a0a100fbc6e064d314ae168c391535 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 26 Jan 2022 12:47:06 +0300 Subject: [PATCH 09/17] Use IllegalStateException instead of Spark's exception --- .../catalyst/expressions/InterpretedUnsafeProjection.scala | 3 ++- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 7 ------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala index 9544030ccdc1e..46608be103ad1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala @@ -254,7 +254,8 @@ object InterpretedUnsafeProjection { (_, _) => {} case _ => - throw QueryExecutionErrors.dataTypeUnsupportedForWriterFuncError(dt) + throw new IllegalStateException(s"The data type '${dt.typeName}' is not supported in " + + "generating a writer function for a struct field, array element, map key or map value.") } // Always wrap the writer with a null safe version. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index bfdddd9e3f0be..d7dab4e49fcaf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -138,13 +138,6 @@ object QueryExecutionErrors { messageParameters = Array(s"Cannot evaluate expression: $methodName: $unEvaluable")) } - def dataTypeUnsupportedForWriterFuncError(dt: DataType): Throwable = { - new SparkRuntimeException( - errorClass = "UNSUPPORTED_FEATURE", - messageParameters = Array(s"the data type '${dt.typeName}' while generating " + - "a writer function for a struct field, array element, map key or map value.")) - } - def dataTypeUnsupportedError(dataType: String, failure: String): Throwable = { new SparkIllegalArgumentException(errorClass = "UNSUPPORTED_DATATYPE", messageParameters = Array(dataType + failure)) From 0702cccf7d9885ef474c9ee7c28de25c70634d78 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 26 Jan 2022 12:56:19 +0300 Subject: [PATCH 10/17] Literal -> lit --- .../expressions/InterpretedUnsafeProjection.scala | 1 - .../spark/sql/errors/QueryExecutionErrorsSuite.scala | 8 +++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala index 46608be103ad1..731ad16cc7d9f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} import org.apache.spark.sql.catalyst.util.ArrayData -import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{UserDefinedType, _} import org.apache.spark.unsafe.Platform diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 787376f00aef8..9928b74fdda90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.errors import org.apache.spark.{SparkException, SparkRuntimeException} import org.apache.spark.sql.{DataFrame, QueryTest} -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.test.SharedSparkSession class QueryExecutionErrorsSuite extends QueryTest with SharedSparkSession { @@ -114,11 +114,9 @@ class QueryExecutionErrorsSuite extends QueryTest with SharedSparkSession { checkUnsupportedMode(df2.selectExpr(s"aes_decrypt(value32, '$key32', 'ECB', 'None')")) } - test("UNSUPPORTED_FEATURE: unsupported types (map and struct) in Literal.apply") { + test("UNSUPPORTED_FEATURE: unsupported types (map and struct) in lit()") { def checkUnsupportedTypeInLiteral(v: Any): Unit = { - val e = intercept[SparkRuntimeException] { - Literal(v) - } + val e = intercept[SparkRuntimeException] { lit(v) } assert(e.getErrorClass === "UNSUPPORTED_FEATURE") assert(e.getSqlState === "0A000") assert(e.getMessage.contains("The feature is not supported: literal for ")) From a2af415120be717883d0643c0f471aec7c7e958b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 26 Jan 2022 13:51:24 +0300 Subject: [PATCH 11/17] Remove simpleStringWithNodeIdUnsupportedError --- .../apache/spark/sql/catalyst/expressions/Expression.scala | 2 +- .../spark/sql/catalyst/expressions/codegen/javaCode.scala | 2 +- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 6 ------ 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 1d54efd7319e3..32b25f51b8efe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -315,7 +315,7 @@ abstract class Expression extends TreeNode[Expression] { } override def simpleStringWithNodeId(): String = { - throw QueryExecutionErrors.simpleStringWithNodeIdUnsupportedError(nodeName) + throw new IllegalStateException(s"$nodeName does not implement simpleStringWithNodeId") } protected def typeSuffix = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala index dbe9a810a493e..3651dc420fa21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala @@ -203,7 +203,7 @@ trait Block extends TreeNode[Block] with JavaCode { override def verboseString(maxFields: Int): String = toString override def simpleStringWithNodeId(): String = { - throw QueryExecutionErrors.simpleStringWithNodeIdUnsupportedError(nodeName) + throw new IllegalStateException(s"$nodeName does not implement simpleStringWithNodeId") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index d7dab4e49fcaf..6fdb728bca249 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -126,12 +126,6 @@ object QueryExecutionErrors { messageParameters = Array.empty) } - def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = { - new SparkUnsupportedOperationException( - errorClass = "UNSUPPORTED_FEATURE", - messageParameters = Array(s"$nodeName does not implement simpleStringWithNodeId")) - } - def evaluateUnevaluableAggregateUnsupportedError( methodName: String, unEvaluable: UnevaluableAggregate): Throwable = { new SparkUnsupportedOperationException(errorClass = "INTERNAL_ERROR", From c72f9b25de4d294188e88174de11d432715da8e0 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 26 Jan 2022 19:25:20 +0300 Subject: [PATCH 12/17] Add new test suite for compilation errors --- .../spark/sql/connector/InsertIntoTests.scala | 18 --- .../connector/TestV2SessionCatalogBase.scala | 2 +- .../sql/connector/V1WriteFallbackSuite.scala | 2 +- .../errors/QueryCompilationErrorsSuite.scala | 124 ++++++++++++++++++ 4 files changed, 126 insertions(+), 20 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 5a0a4baa139fc..fc98cfd5138e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -282,24 +282,6 @@ trait InsertIntoSQLOnlyTests } } - test("InsertInto: IF PARTITION NOT EXISTS not supported") { - val t1 = s"${catalogAndNamespace}tbl" - withTableAndData(t1) { view => - sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)") - - val exc = intercept[AnalysisException] { - sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id = 1) IF NOT EXISTS SELECT * FROM $view") - } - - verifyTable(t1, spark.emptyDataFrame) - assert(exc.getMessage.contains("The feature is not supported: " + - "IF NOT EXISTS for the table")) - assert(exc.getMessage.contains(t1)) - assert(exc.getErrorClass === "UNSUPPORTED_FEATURE") - assert(exc.getSqlState === "0A000") - } - } - test("InsertInto: overwrite - dynamic clause - static mode") { withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) { val t1 = s"${catalogAndNamespace}tbl" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index bf2749d1afc53..9e834189c86ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.StructType * for testing DDL as well as write operations (through df.write.saveAsTable, df.write.insertInto * and SQL). */ -private[connector] trait TestV2SessionCatalogBase[T <: Table] extends DelegatingCatalogExtension { +private[sql] trait TestV2SessionCatalogBase[T <: Table] extends DelegatingCatalogExtension { protected val tables: util.Map[Identifier, T] = new ConcurrentHashMap[Identifier, T]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index 9fbaf7890f8f8..4d6bb1f264922 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -231,7 +231,7 @@ class V1FallbackTableCatalog extends TestV2SessionCatalogBase[InMemoryTableWithV } } -private object InMemoryV1Provider { +private[sql] object InMemoryV1Provider { val tables: mutable.Map[String, InMemoryTableWithV1Fallback] = mutable.Map.empty def getTableData(spark: SparkSession, name: String): DataFrame = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala new file mode 100644 index 0000000000000..0e25181e00d46 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.errors + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest} +import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2Provider, InMemoryTableSessionCatalog, InMemoryTableWithV1Fallback, InMemoryV1Provider, TestV2SessionCatalogBase, V1FallbackTableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, InMemoryTable, Table} +import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.SharedSparkSession + +abstract class QueryCompilationErrorsSuiteBase extends QueryTest with SharedSparkSession { + protected val v2Format: String + protected val catalogAndNamespace: String + + /** Check that the results in `tableName` match the `expected` DataFrame. */ + protected def verifyTable(tableName: String, expected: DataFrame): Unit + + private def withTableAndData(tableName: String)(testFn: String => Unit): Unit = { + withTable(tableName) { + val viewName = "tmp_view" + val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") + df.createOrReplaceTempView(viewName) + withTempView(viewName) { + testFn(viewName) + } + } + } + + test("UNSUPPORTED_FEATURE: IF PARTITION NOT EXISTS not supported by INSERT") { + val t1 = s"${catalogAndNamespace}tbl" + withTableAndData(t1) { view => + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)") + + val exc = intercept[AnalysisException] { + sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id = 1) IF NOT EXISTS SELECT * FROM $view") + } + + verifyTable(t1, spark.emptyDataFrame) + assert(exc.getMessage.contains("The feature is not supported: " + + "IF NOT EXISTS for the table")) + assert(exc.getMessage.contains(t1)) + assert(exc.getErrorClass === "UNSUPPORTED_FEATURE") + assert(exc.getSqlState === "0A000") + } + } +} + +class QueryCompilationErrorsDSv2Suite + extends QueryCompilationErrorsSuiteBase + with DatasourceV2SQLBase { + + override protected val v2Format = classOf[FakeV2Provider].getName + override protected val catalogAndNamespace = "testcat.ns1.ns2." + + override def verifyTable(tableName: String, expected: DataFrame): Unit = { + checkAnswer(spark.table(tableName), expected) + } +} + +trait SessionCatalogTestBase[T <: Table, Catalog <: TestV2SessionCatalogBase[T]] + extends QueryTest + with SharedSparkSession + with BeforeAndAfter { + + protected def catalog(name: String): CatalogPlugin = + spark.sessionState.catalogManager.catalog(name) + protected val v2Format: String = classOf[FakeV2Provider].getName + protected val catalogClassName: String = classOf[InMemoryTableSessionCatalog].getName + + before { + spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION.key, catalogClassName) + } + + override def afterEach(): Unit = { + super.afterEach() + catalog(SESSION_CATALOG_NAME).asInstanceOf[Catalog].clearTables() + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + } +} + +class QueryCompilationErrorsDSv2SessionCatalogSuite + extends QueryCompilationErrorsSuiteBase + with SessionCatalogTestBase[InMemoryTable, InMemoryTableSessionCatalog] { + + override protected val catalogAndNamespace = "" + + override protected def verifyTable(tableName: String, expected: DataFrame): Unit = { + checkAnswer(spark.table(tableName), expected) + checkAnswer(sql(s"SELECT * FROM $tableName"), expected) + checkAnswer(sql(s"SELECT * FROM default.$tableName"), expected) + checkAnswer(sql(s"TABLE $tableName"), expected) + } +} + +class QueryCompilationErrorsV1WriteFallbackSuite + extends QueryCompilationErrorsSuiteBase + with SessionCatalogTestBase[InMemoryTableWithV1Fallback, V1FallbackTableCatalog] { + + override protected val v2Format = classOf[InMemoryV1Provider].getName + override protected val catalogClassName: String = classOf[V1FallbackTableCatalog].getName + override protected val catalogAndNamespace: String = "" + + override protected def verifyTable(tableName: String, expected: DataFrame): Unit = { + checkAnswer(InMemoryV1Provider.getTableData(spark, s"default.$tableName"), expected) + } +} From d9c77fd3b17e81e4d55a40e47eefce0cde6c4deb Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 26 Jan 2022 21:02:58 +0300 Subject: [PATCH 13/17] Add SessionCatalogTestBase --- .../spark/sql/SessionCatalogTestBase.scala | 48 +++++++++++++++++++ ...SourceV2DataFrameSessionCatalogSuite.scala | 29 +---------- .../errors/QueryCompilationErrorsSuite.scala | 31 ++---------- 3 files changed, 53 insertions(+), 55 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/SessionCatalogTestBase.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SessionCatalogTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/SessionCatalogTestBase.scala new file mode 100644 index 0000000000000..69a3b55dd9200 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionCatalogTestBase.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableSessionCatalog, TestV2SessionCatalogBase} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Table} +import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.SharedSparkSession + + +trait SessionCatalogTestBase[T <: Table, Catalog <: TestV2SessionCatalogBase[T]] + extends QueryTest + with SharedSparkSession + with BeforeAndAfter { + + protected def catalog(name: String): CatalogPlugin = + spark.sessionState.catalogManager.catalog(name) + protected val v2Format: String = classOf[FakeV2Provider].getName + protected val catalogClassName: String = classOf[InMemoryTableSessionCatalog].getName + + before { + spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION.key, catalogClassName) + } + + override def afterEach(): Unit = { + super.afterEach() + catalog(SESSION_CATALOG_NAME).asInstanceOf[Catalog].clearTables() + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 3edc4b9502064..d5e414c179443 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -19,17 +19,12 @@ package org.apache.spark.sql.connector import java.util -import org.scalatest.BeforeAndAfter - -import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} +import org.apache.spark.sql.{DataFrame, SaveMode, SessionCatalogTestBase} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION -import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType class DataSourceV2DataFrameSessionCatalogSuite @@ -147,27 +142,7 @@ object InMemoryTableSessionCatalog { } private [connector] trait SessionCatalogTest[T <: Table, Catalog <: TestV2SessionCatalogBase[T]] - extends QueryTest - with SharedSparkSession - with BeforeAndAfter { - - protected def catalog(name: String): CatalogPlugin = { - spark.sessionState.catalogManager.catalog(name) - } - - protected val v2Format: String = classOf[FakeV2Provider].getName - - protected val catalogClassName: String = classOf[InMemoryTableSessionCatalog].getName - - before { - spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION.key, catalogClassName) - } - - override def afterEach(): Unit = { - super.afterEach() - catalog(SESSION_CATALOG_NAME).asInstanceOf[Catalog].clearTables() - spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) - } + extends SessionCatalogTestBase[T, Catalog] { protected def verifyTable(tableName: String, expected: DataFrame): Unit diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala index 0e25181e00d46..4c35144438c7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala @@ -17,13 +17,9 @@ package org.apache.spark.sql.errors -import org.scalatest.BeforeAndAfter - -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest} -import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2Provider, InMemoryTableSessionCatalog, InMemoryTableWithV1Fallback, InMemoryV1Provider, TestV2SessionCatalogBase, V1FallbackTableCatalog} -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, InMemoryTable, Table} -import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME -import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SessionCatalogTestBase} +import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2Provider, InMemoryTableSessionCatalog, InMemoryTableWithV1Fallback, InMemoryV1Provider, V1FallbackTableCatalog} +import org.apache.spark.sql.connector.catalog.InMemoryTable import org.apache.spark.sql.test.SharedSparkSession abstract class QueryCompilationErrorsSuiteBase extends QueryTest with SharedSparkSession { @@ -75,27 +71,6 @@ class QueryCompilationErrorsDSv2Suite } } -trait SessionCatalogTestBase[T <: Table, Catalog <: TestV2SessionCatalogBase[T]] - extends QueryTest - with SharedSparkSession - with BeforeAndAfter { - - protected def catalog(name: String): CatalogPlugin = - spark.sessionState.catalogManager.catalog(name) - protected val v2Format: String = classOf[FakeV2Provider].getName - protected val catalogClassName: String = classOf[InMemoryTableSessionCatalog].getName - - before { - spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION.key, catalogClassName) - } - - override def afterEach(): Unit = { - super.afterEach() - catalog(SESSION_CATALOG_NAME).asInstanceOf[Catalog].clearTables() - spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) - } -} - class QueryCompilationErrorsDSv2SessionCatalogSuite extends QueryCompilationErrorsSuiteBase with SessionCatalogTestBase[InMemoryTable, InMemoryTableSessionCatalog] { From fedac2fb46e07408ca0921b932fcbcc2f1c0589d Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 27 Jan 2022 11:48:58 +0300 Subject: [PATCH 14/17] Remove duplicated V2 test suites. --- .../spark/sql/SessionCatalogTestBase.scala | 48 --------- ...SourceV2DataFrameSessionCatalogSuite.scala | 29 +++++- .../QueryCompilationErrorsDSv2Suite.scala | 54 ++++++++++ .../errors/QueryCompilationErrorsSuite.scala | 99 ------------------- 4 files changed, 81 insertions(+), 149 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/SessionCatalogTestBase.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SessionCatalogTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/SessionCatalogTestBase.scala deleted file mode 100644 index 69a3b55dd9200..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/SessionCatalogTestBase.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.scalatest.BeforeAndAfter - -import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableSessionCatalog, TestV2SessionCatalogBase} -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Table} -import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME -import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION -import org.apache.spark.sql.test.SharedSparkSession - - -trait SessionCatalogTestBase[T <: Table, Catalog <: TestV2SessionCatalogBase[T]] - extends QueryTest - with SharedSparkSession - with BeforeAndAfter { - - protected def catalog(name: String): CatalogPlugin = - spark.sessionState.catalogManager.catalog(name) - protected val v2Format: String = classOf[FakeV2Provider].getName - protected val catalogClassName: String = classOf[InMemoryTableSessionCatalog].getName - - before { - spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION.key, catalogClassName) - } - - override def afterEach(): Unit = { - super.afterEach() - catalog(SESSION_CATALOG_NAME).asInstanceOf[Catalog].clearTables() - spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index d5e414c179443..3edc4b9502064 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -19,12 +19,17 @@ package org.apache.spark.sql.connector import java.util -import org.apache.spark.sql.{DataFrame, SaveMode, SessionCatalogTestBase} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType class DataSourceV2DataFrameSessionCatalogSuite @@ -142,7 +147,27 @@ object InMemoryTableSessionCatalog { } private [connector] trait SessionCatalogTest[T <: Table, Catalog <: TestV2SessionCatalogBase[T]] - extends SessionCatalogTestBase[T, Catalog] { + extends QueryTest + with SharedSparkSession + with BeforeAndAfter { + + protected def catalog(name: String): CatalogPlugin = { + spark.sessionState.catalogManager.catalog(name) + } + + protected val v2Format: String = classOf[FakeV2Provider].getName + + protected val catalogClassName: String = classOf[InMemoryTableSessionCatalog].getName + + before { + spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION.key, catalogClassName) + } + + override def afterEach(): Unit = { + super.afterEach() + catalog(SESSION_CATALOG_NAME).asInstanceOf[Catalog].clearTables() + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + } protected def verifyTable(tableName: String, expected: DataFrame): Unit diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala new file mode 100644 index 0000000000000..61aeed27b0ad8 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.errors + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2Provider} +import org.apache.spark.sql.test.SharedSparkSession + +class QueryCompilationErrorsDSv2Suite + extends QueryTest + with SharedSparkSession + with DatasourceV2SQLBase { + + test("UNSUPPORTED_FEATURE: IF PARTITION NOT EXISTS not supported by INSERT") { + val v2Format = classOf[FakeV2Provider].getName + val catalogAndNamespace = "testcat.ns1.ns2." + val t1 = s"${catalogAndNamespace}tbl" + + withTable(t1) { + val view = "tmp_view" + val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") + df.createOrReplaceTempView(view) + withTempView(view) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)") + + val e = intercept[AnalysisException] { + sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id = 1) IF NOT EXISTS SELECT * FROM $view") + } + + checkAnswer(spark.table(t1), spark.emptyDataFrame) + assert(e.getMessage.contains("The feature is not supported: " + + "IF NOT EXISTS for the table")) + assert(e.getMessage.contains(t1)) + assert(e.getErrorClass === "UNSUPPORTED_FEATURE") + assert(e.getSqlState === "0A000") + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala deleted file mode 100644 index 4c35144438c7f..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.errors - -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SessionCatalogTestBase} -import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2Provider, InMemoryTableSessionCatalog, InMemoryTableWithV1Fallback, InMemoryV1Provider, V1FallbackTableCatalog} -import org.apache.spark.sql.connector.catalog.InMemoryTable -import org.apache.spark.sql.test.SharedSparkSession - -abstract class QueryCompilationErrorsSuiteBase extends QueryTest with SharedSparkSession { - protected val v2Format: String - protected val catalogAndNamespace: String - - /** Check that the results in `tableName` match the `expected` DataFrame. */ - protected def verifyTable(tableName: String, expected: DataFrame): Unit - - private def withTableAndData(tableName: String)(testFn: String => Unit): Unit = { - withTable(tableName) { - val viewName = "tmp_view" - val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") - df.createOrReplaceTempView(viewName) - withTempView(viewName) { - testFn(viewName) - } - } - } - - test("UNSUPPORTED_FEATURE: IF PARTITION NOT EXISTS not supported by INSERT") { - val t1 = s"${catalogAndNamespace}tbl" - withTableAndData(t1) { view => - sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)") - - val exc = intercept[AnalysisException] { - sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id = 1) IF NOT EXISTS SELECT * FROM $view") - } - - verifyTable(t1, spark.emptyDataFrame) - assert(exc.getMessage.contains("The feature is not supported: " + - "IF NOT EXISTS for the table")) - assert(exc.getMessage.contains(t1)) - assert(exc.getErrorClass === "UNSUPPORTED_FEATURE") - assert(exc.getSqlState === "0A000") - } - } -} - -class QueryCompilationErrorsDSv2Suite - extends QueryCompilationErrorsSuiteBase - with DatasourceV2SQLBase { - - override protected val v2Format = classOf[FakeV2Provider].getName - override protected val catalogAndNamespace = "testcat.ns1.ns2." - - override def verifyTable(tableName: String, expected: DataFrame): Unit = { - checkAnswer(spark.table(tableName), expected) - } -} - -class QueryCompilationErrorsDSv2SessionCatalogSuite - extends QueryCompilationErrorsSuiteBase - with SessionCatalogTestBase[InMemoryTable, InMemoryTableSessionCatalog] { - - override protected val catalogAndNamespace = "" - - override protected def verifyTable(tableName: String, expected: DataFrame): Unit = { - checkAnswer(spark.table(tableName), expected) - checkAnswer(sql(s"SELECT * FROM $tableName"), expected) - checkAnswer(sql(s"SELECT * FROM default.$tableName"), expected) - checkAnswer(sql(s"TABLE $tableName"), expected) - } -} - -class QueryCompilationErrorsV1WriteFallbackSuite - extends QueryCompilationErrorsSuiteBase - with SessionCatalogTestBase[InMemoryTableWithV1Fallback, V1FallbackTableCatalog] { - - override protected val v2Format = classOf[InMemoryV1Provider].getName - override protected val catalogClassName: String = classOf[V1FallbackTableCatalog].getName - override protected val catalogAndNamespace: String = "" - - override protected def verifyTable(tableName: String, expected: DataFrame): Unit = { - checkAnswer(InMemoryV1Provider.getTableData(spark, s"default.$tableName"), expected) - } -} From c0eee62ecc6d6de625f10d9deabd6db7c8d346d1 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 27 Jan 2022 11:51:32 +0300 Subject: [PATCH 15/17] Remove unnecessary changes --- .../apache/spark/sql/connector/TestV2SessionCatalogBase.scala | 2 +- .../org/apache/spark/sql/connector/V1WriteFallbackSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 9e834189c86ee..bf2749d1afc53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.StructType * for testing DDL as well as write operations (through df.write.saveAsTable, df.write.insertInto * and SQL). */ -private[sql] trait TestV2SessionCatalogBase[T <: Table] extends DelegatingCatalogExtension { +private[connector] trait TestV2SessionCatalogBase[T <: Table] extends DelegatingCatalogExtension { protected val tables: util.Map[Identifier, T] = new ConcurrentHashMap[Identifier, T]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index 4d6bb1f264922..9fbaf7890f8f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -231,7 +231,7 @@ class V1FallbackTableCatalog extends TestV2SessionCatalogBase[InMemoryTableWithV } } -private[sql] object InMemoryV1Provider { +private object InMemoryV1Provider { val tables: mutable.Map[String, InMemoryTableWithV1Fallback] = mutable.Map.empty def getTableData(spark: SparkSession, name: String): DataFrame = { From ffab56d85354c4b2fe64039f5f7446785e240f70 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 27 Jan 2022 12:19:10 +0300 Subject: [PATCH 16/17] Match to entire error message --- .../sql/errors/QueryCompilationErrors.scala | 2 +- .../errors/QueryCompilationErrorsDSv2Suite.scala | 16 +++++++--------- .../sql/errors/QueryExecutionErrorsSuite.scala | 2 +- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 201b8d6c5dc58..6c84aa6a592c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -94,7 +94,7 @@ object QueryCompilationErrors { def unsupportedIfNotExistsError(tableName: String): Throwable = { new AnalysisException( errorClass = "UNSUPPORTED_FEATURE", - messageParameters = Array(s"IF NOT EXISTS for the table '$tableName' by INSERT INTO")) + messageParameters = Array(s"IF NOT EXISTS for the table '$tableName' by INSERT INTO.")) } def nonPartitionColError(partitionName: String): Throwable = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala index 61aeed27b0ad8..bfea3f535dd94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala @@ -28,24 +28,22 @@ class QueryCompilationErrorsDSv2Suite test("UNSUPPORTED_FEATURE: IF PARTITION NOT EXISTS not supported by INSERT") { val v2Format = classOf[FakeV2Provider].getName - val catalogAndNamespace = "testcat.ns1.ns2." - val t1 = s"${catalogAndNamespace}tbl" + val tbl = "testcat.ns1.ns2.tbl" - withTable(t1) { + withTable(tbl) { val view = "tmp_view" val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView(view) withTempView(view) { - sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)") + sql(s"CREATE TABLE $tbl (id bigint, data string) USING $v2Format PARTITIONED BY (id)") val e = intercept[AnalysisException] { - sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id = 1) IF NOT EXISTS SELECT * FROM $view") + sql(s"INSERT OVERWRITE TABLE $tbl PARTITION (id = 1) IF NOT EXISTS SELECT * FROM $view") } - checkAnswer(spark.table(t1), spark.emptyDataFrame) - assert(e.getMessage.contains("The feature is not supported: " + - "IF NOT EXISTS for the table")) - assert(e.getMessage.contains(t1)) + checkAnswer(spark.table(tbl), spark.emptyDataFrame) + assert(e.getMessage === "The feature is not supported: " + + s"IF NOT EXISTS for the table '$tbl' by INSERT INTO.") assert(e.getErrorClass === "UNSUPPORTED_FEATURE") assert(e.getSqlState === "0A000") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 9928b74fdda90..216deb4c92a23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -119,7 +119,7 @@ class QueryExecutionErrorsSuite extends QueryTest with SharedSparkSession { val e = intercept[SparkRuntimeException] { lit(v) } assert(e.getErrorClass === "UNSUPPORTED_FEATURE") assert(e.getSqlState === "0A000") - assert(e.getMessage.contains("The feature is not supported: literal for ")) + assert(e.getMessage.matches("""The feature is not supported: literal for '.+' of .+\.""")) } checkUnsupportedTypeInLiteral(Map("key1" -> 1, "key2" -> 2)) checkUnsupportedTypeInLiteral(("mike", 29, 1.0)) From ff9a6a5ee217b793402912a60b0385f512a054f5 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 27 Jan 2022 12:40:29 +0300 Subject: [PATCH 17/17] Move a pivot test to QueryExecutionErrorsSuite --- .../spark/sql/DataFramePivotSuite.scala | 13 ------------ .../errors/QueryExecutionErrorsSuite.scala | 21 ++++++++++++++----- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index fba8e3e63a86a..bbdae29fa3b05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -323,19 +323,6 @@ class DataFramePivotSuite extends QueryTest with SharedSparkSession { checkAnswer(df, expected) } - test("pivoting column list") { - val exception = intercept[RuntimeException] { - trainingSales - .groupBy($"sales.year") - .pivot(struct(lower($"sales.course"), $"training")) - .agg(sum($"sales.earnings")) - .collect() - } - assert(exception.getMessage.contains("The feature is not supported: " + - "literal for '[dotnet,Dummies]' of class " + - "org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema")) - } - test("SPARK-26403: pivoting by array column") { val df = Seq( (2, Seq.empty[String]), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 216deb4c92a23..4b2564034344a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.errors import org.apache.spark.{SparkException, SparkRuntimeException} import org.apache.spark.sql.{DataFrame, QueryTest} -import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions.{lit, lower, struct, sum} import org.apache.spark.sql.test.SharedSparkSession class QueryExecutionErrorsSuite extends QueryTest with SharedSparkSession { @@ -116,12 +116,23 @@ class QueryExecutionErrorsSuite extends QueryTest with SharedSparkSession { test("UNSUPPORTED_FEATURE: unsupported types (map and struct) in lit()") { def checkUnsupportedTypeInLiteral(v: Any): Unit = { - val e = intercept[SparkRuntimeException] { lit(v) } - assert(e.getErrorClass === "UNSUPPORTED_FEATURE") - assert(e.getSqlState === "0A000") - assert(e.getMessage.matches("""The feature is not supported: literal for '.+' of .+\.""")) + val e1 = intercept[SparkRuntimeException] { lit(v) } + assert(e1.getErrorClass === "UNSUPPORTED_FEATURE") + assert(e1.getSqlState === "0A000") + assert(e1.getMessage.matches("""The feature is not supported: literal for '.+' of .+\.""")) } checkUnsupportedTypeInLiteral(Map("key1" -> 1, "key2" -> 2)) checkUnsupportedTypeInLiteral(("mike", 29, 1.0)) + + val e2 = intercept[SparkRuntimeException] { + trainingSales + .groupBy($"sales.year") + .pivot(struct(lower($"sales.course"), $"training")) + .agg(sum($"sales.earnings")) + .collect() + } + assert(e2.getMessage === "The feature is not supported: " + + "literal for '[dotnet,Dummies]' of class " + + "org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema.") } }