diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 6476c7fc9c5e..0affceab971f 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -904,7 +904,7 @@ }, "NON_STRING_TYPE" : { "message" : [ - "all arguments must be strings." + "all arguments of the function must be strings." ] }, "NULL_TYPE" : { @@ -6222,7 +6222,7 @@ "Detected implicit cartesian product for join between logical plans", "", "and", - "rightPlan", + "", "Join condition is missing or trivial.", "Either: use the CROSS JOIN syntax to allow cartesian products between these relations, or: enable implicit cartesian products by setting the configuration variable spark.sql.crossJoin.enabled=true." ] @@ -7827,7 +7827,7 @@ }, "_LEGACY_ERROR_TEMP_3055" : { "message" : [ - "ScalarFunction '' neither implement magic method nor override 'produceResult'" + "ScalarFunction neither implement magic method nor override 'produceResult'" ] }, "_LEGACY_ERROR_TEMP_3056" : { diff --git a/common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala b/common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala index a1934dcf7a00..e2dd0da1aac8 100644 --- a/common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala +++ b/common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala @@ -19,7 +19,6 @@ package org.apache.spark import java.net.URL -import scala.collection.immutable.Map import scala.jdk.CollectionConverters._ import com.fasterxml.jackson.annotation.JsonIgnore @@ -52,7 +51,7 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) { val sub = new StringSubstitutor(sanitizedParameters.asJava) sub.setEnableUndefinedVariableException(true) sub.setDisableSubstitutionInValues(true) - try { + val errorMessage = try { sub.replace(ErrorClassesJsonReader.TEMPLATE_REGEX.replaceAllIn( messageTemplate, "\\$\\{$1\\}")) } catch { @@ -61,6 +60,17 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) { s"MessageTemplate: $messageTemplate, " + s"Parameters: $messageParameters", i) } + if (util.SparkEnvUtils.isTesting) { + val placeHoldersNum = ErrorClassesJsonReader.TEMPLATE_REGEX.findAllIn(messageTemplate).length + if (placeHoldersNum < sanitizedParameters.size) { + throw SparkException.internalError( + s"Found unused message parameters of the error class '$errorClass'. " + + s"Its error message format has $placeHoldersNum placeholders, " + + s"but the passed message parameters map has ${sanitizedParameters.size} items. " + + "Consider to add placeholders to the error format or remove unused message parameters.") + } + } + errorMessage } def getMessageParameters(errorClass: String): Seq[String] = { diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index 758262ead7f1..27b1ee014a71 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -334,8 +334,6 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L assert(exception.getErrorClass != null) assert(exception.getMessageParameters().get("id") == query.id.toString) assert(exception.getMessageParameters().get("runId") == query.runId.toString) - assert(!exception.getMessageParameters().get("startOffset").isEmpty) - assert(!exception.getMessageParameters().get("endOffset").isEmpty) assert(exception.getCause.isInstanceOf[SparkException]) assert(exception.getCause.getCause.isInstanceOf[SparkException]) assert( @@ -374,8 +372,6 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L assert(exception.getErrorClass != null) assert(exception.getMessageParameters().get("id") == query.id.toString) assert(exception.getMessageParameters().get("runId") == query.runId.toString) - assert(!exception.getMessageParameters().get("startOffset").isEmpty) - assert(!exception.getMessageParameters().get("endOffset").isEmpty) assert(exception.getCause.isInstanceOf[SparkException]) assert(exception.getCause.getCause.isInstanceOf[SparkException]) assert( diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala index d99589c171c3..269438d9a6b0 100644 --- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala @@ -207,13 +207,6 @@ class SparkThrowableSuite extends SparkFunSuite { } assert(e.getErrorClass === "INTERNAL_ERROR") assert(e.getMessageParameters().get("message").contains("Undefined error message parameter")) - - // Does not fail with too many args (expects 0 args) - assert(getMessage("DIVIDE_BY_ZERO", Map("config" -> "foo", "a" -> "bar")) == - "[DIVIDE_BY_ZERO] Division by zero. " + - "Use `try_divide` to tolerate divisor being 0 and return NULL instead. " + - "If necessary set foo to \"false\" " + - "to bypass this error. SQLSTATE: 22012") } test("Error message is formatted") { @@ -504,7 +497,7 @@ class SparkThrowableSuite extends SparkFunSuite { |{ | "MISSING_PARAMETER" : { | "message" : [ - | "Parameter ${param} is missing." + | "Parameter is missing." | ] | } |} @@ -517,4 +510,28 @@ class SparkThrowableSuite extends SparkFunSuite { assert(errorMessage.contains("Parameter null is missing.")) } } + + test("detect unused message parameters") { + checkError( + exception = intercept[SparkException] { + SparkThrowableHelper.getMessage( + errorClass = "CANNOT_UP_CAST_DATATYPE", + messageParameters = Map( + "expression" -> "CAST('aaa' AS LONG)", + "sourceType" -> "STRING", + "targetType" -> "LONG", + "op" -> "CAST", // unused parameter + "details" -> "implicit cast" + )) + }, + errorClass = "INTERNAL_ERROR", + parameters = Map( + "message" -> + ("Found unused message parameters of the error class 'CANNOT_UP_CAST_DATATYPE'. " + + "Its error message format has 4 placeholders, but the passed message parameters map " + + "has 5 items. Consider to add placeholders to the error format or " + + "remove unused message parameters.") + ) + ) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 41841269b6a8..4b1b9e02a242 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -371,13 +371,7 @@ abstract class StreamExecution( messageParameters = Map( "id" -> id.toString, "runId" -> runId.toString, - "message" -> message, - "queryDebugString" -> toDebugString(includeLogicalPlan = isInitialized), - "startOffset" -> getLatestExecutionContext().startOffsets.toOffsetSeq( - sources.toSeq, getLatestExecutionContext().offsetSeqMetadata).toString, - "endOffset" -> getLatestExecutionContext().endOffsets.toOffsetSeq( - sources.toSeq, getLatestExecutionContext().offsetSeqMetadata).toString - )) + "message" -> message)) errorClassOpt = e match { case t: SparkThrowable => Option(t.getErrorClass)