-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53507][CONNECT] Add breaking change info to errors #52256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
eba0c8f
0e89beb
1d53754
0fb89df
aaf91fc
82b63c0
99d7bb6
d235da2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -75,6 +75,22 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) { | |||||||||||
| matches.map(m => m.stripSuffix(">").stripPrefix("<")) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| def getBreakingChangeInfo(errorClass: String): Option[BreakingChangeInfo] = { | ||||||||||||
| val errorClasses = errorClass.split('.') | ||||||||||||
| errorClasses match { | ||||||||||||
| case Array(mainClass) => | ||||||||||||
| errorInfoMap.get(mainClass).flatMap(_.breakingChangeInfo) | ||||||||||||
| case Array(mainClass, subClass) => | ||||||||||||
| errorInfoMap.get(mainClass).flatMap{ | ||||||||||||
| errorInfo => | ||||||||||||
| errorInfo.subClass.flatMap(_.get(subClass)) | ||||||||||||
| .flatMap(_.breakingChangeInfo) | ||||||||||||
| .orElse(errorInfo.breakingChangeInfo) | ||||||||||||
| } | ||||||||||||
| case _ => None | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| def getMessageTemplate(errorClass: String): String = { | ||||||||||||
| val errorClasses = errorClass.split("\\.") | ||||||||||||
| assert(errorClasses.length == 1 || errorClasses.length == 2) | ||||||||||||
|
|
@@ -128,7 +144,7 @@ private object ErrorClassesJsonReader { | |||||||||||
| val map = mapper.readValue(url, new TypeReference[Map[String, ErrorInfo]]() {}) | ||||||||||||
| val errorClassWithDots = map.collectFirst { | ||||||||||||
| case (errorClass, _) if errorClass.contains('.') => errorClass | ||||||||||||
| case (_, ErrorInfo(_, Some(map), _)) if map.keys.exists(_.contains('.')) => | ||||||||||||
| case (_, ErrorInfo(_, Some(map), _, _)) if map.keys.exists(_.contains('.')) => | ||||||||||||
| map.keys.collectFirst { case s if s.contains('.') => s }.get | ||||||||||||
| } | ||||||||||||
| if (errorClassWithDots.isEmpty) { | ||||||||||||
|
|
@@ -147,28 +163,58 @@ private object ErrorClassesJsonReader { | |||||||||||
| * @param subClass SubClass associated with this class. | ||||||||||||
| * @param message Message format with optional placeholders (e.g. <parm>). | ||||||||||||
| * The error message is constructed by concatenating the lines with newlines. | ||||||||||||
| * @param breakingChangeInfo Additional metadata if the error is due to a breaking change. | ||||||||||||
| */ | ||||||||||||
| private case class ErrorInfo( | ||||||||||||
| message: Seq[String], | ||||||||||||
| subClass: Option[Map[String, ErrorSubInfo]], | ||||||||||||
| sqlState: Option[String]) { | ||||||||||||
| sqlState: Option[String], | ||||||||||||
| breakingChangeInfo: Option[BreakingChangeInfo] = None) { | ||||||||||||
| // For compatibility with multi-line error messages | ||||||||||||
| @JsonIgnore | ||||||||||||
| val messageTemplate: String = message.mkString("\n") | ||||||||||||
| val messageTemplate: String = message.mkString("\n") + | ||||||||||||
| breakingChangeInfo.map(_.migrationMessage.mkString(" ", "\n", "")).getOrElse("") | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Information associated with an error subclass. | ||||||||||||
| * | ||||||||||||
| * @param message Message format with optional placeholders (e.g. <parm>). | ||||||||||||
| * The error message is constructed by concatenating the lines with newlines. | ||||||||||||
| * @param breakingChangeInfo Additional metadata if the error is due to a breaking change. | ||||||||||||
| */ | ||||||||||||
| private case class ErrorSubInfo(message: Seq[String]) { | ||||||||||||
| private case class ErrorSubInfo(message: Seq[String], | ||||||||||||
| breakingChangeInfo: Option[BreakingChangeInfo] = None) { | ||||||||||||
|
||||||||||||
| private case class ErrorSubInfo(message: Seq[String], | |
| breakingChangeInfo: Option[BreakingChangeInfo] = None) { | |
| private case class ErrorSubInfo( | |
| message: Seq[String], | |
| breakingChangeInfo: Option[BreakingChangeInfo] = None) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realize that we expose it as a public API via SparkThrowable.getBreakingChangeInfo. We shouldn't expose a case class as public API as it has a wide API surface, including the companion object.
We should follow SparkThrowable and define it in Java.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 4 spaces indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| case class MitigationSparkConfig(key: String, value: String) | |
| case class MitigationConfig(key: String, value: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -73,6 +73,14 @@ private[spark] object SparkThrowableHelper { | |||||
| errorReader.getMessageParameters(errorClass) | ||||||
| } | ||||||
|
|
||||||
| def getBreakingChangeInfo(errorClass: String): Option[BreakingChangeInfo] = { | ||||||
| if (errorClass == null) { | ||||||
| None | ||||||
| } else { | ||||||
| errorReader.getBreakingChangeInfo(errorClass) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| def isInternalError(errorClass: String): Boolean = { | ||||||
| errorClass != null && errorClass.startsWith("INTERNAL_ERROR") | ||||||
| } | ||||||
|
|
@@ -99,6 +107,19 @@ private[spark] object SparkThrowableHelper { | |||||
| g.writeStringField("errorClass", errorClass) | ||||||
| if (format == STANDARD) { | ||||||
| g.writeStringField("messageTemplate", errorReader.getMessageTemplate(errorClass)) | ||||||
| errorReader.getBreakingChangeInfo(errorClass).foreach{ breakingChangeInfo => | ||||||
|
||||||
| errorReader.getBreakingChangeInfo(errorClass).foreach{ breakingChangeInfo => | |
| errorReader.getBreakingChangeInfo(errorClass).foreach { breakingChangeInfo => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for my education: this can write JSON array? The JSON writer can recognize duplicated object field names automatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not an array, it's just an Option
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
| import grpc | ||
| import json | ||
| from grpc import StatusCode | ||
| from typing import Dict, List, Optional, TYPE_CHECKING | ||
| from typing import Any, Dict, List, Optional, TYPE_CHECKING | ||
|
|
||
| from pyspark.errors.exceptions.base import ( | ||
| AnalysisException as BaseAnalysisException, | ||
|
|
@@ -95,6 +95,7 @@ def _convert_exception( | |
| display_server_stacktrace = display_server_stacktrace if stacktrace else False | ||
|
|
||
| contexts = None | ||
| breaking_change_info = None | ||
| if resp and resp.HasField("root_error_idx"): | ||
| root_error = resp.errors[resp.root_error_idx] | ||
| if hasattr(root_error, "spark_throwable"): | ||
|
|
@@ -105,6 +106,22 @@ def _convert_exception( | |
| else DataFrameQueryContext(c) | ||
| for c in root_error.spark_throwable.query_contexts | ||
| ] | ||
| # Extract breaking change info if present | ||
| if hasattr( | ||
| root_error.spark_throwable, "breaking_change_info" | ||
| ) and root_error.spark_throwable.HasField("breaking_change_info"): | ||
| bci = root_error.spark_throwable.breaking_change_info | ||
| breaking_change_info = { | ||
| "migration_message": list(bci.migration_message), | ||
| "auto_mitigation": bci.auto_mitigation | ||
| if bci.HasField("auto_mitigation") | ||
| else False, | ||
| } | ||
| if bci.HasField("mitigation_spark_config"): | ||
| breaking_change_info["mitigation_spark_config"] = { | ||
| "key": bci.mitigation_spark_config.key, | ||
| "value": bci.mitigation_spark_config.value, | ||
| } | ||
|
|
||
| if "org.apache.spark.api.python.PythonException" in classes: | ||
| return PythonException( | ||
|
|
@@ -134,6 +151,7 @@ def _convert_exception( | |
| display_server_stacktrace=display_server_stacktrace, | ||
| contexts=contexts, | ||
| grpc_status_code=grpc_status_code, | ||
| breaking_change_info=breaking_change_info, | ||
| ) | ||
|
|
||
| # Return UnknownException if there is no matched exception class | ||
|
|
@@ -147,6 +165,7 @@ def _convert_exception( | |
| display_server_stacktrace=display_server_stacktrace, | ||
| contexts=contexts, | ||
| grpc_status_code=grpc_status_code, | ||
| breaking_change_info=breaking_change_info, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -193,6 +212,7 @@ def __init__( | |
| display_server_stacktrace: bool = False, | ||
| contexts: Optional[List[BaseQueryContext]] = None, | ||
| grpc_status_code: grpc.StatusCode = StatusCode.UNKNOWN, | ||
| breaking_change_info: Optional[Dict[str, Any]] = None, | ||
| ) -> None: | ||
| if contexts is None: | ||
| contexts = [] | ||
|
|
@@ -221,6 +241,7 @@ def __init__( | |
| self._display_stacktrace: bool = display_server_stacktrace | ||
| self._contexts: List[BaseQueryContext] = contexts | ||
| self._grpc_status_code = grpc_status_code | ||
| self._breaking_change_info: Optional[Dict[str, Any]] = breaking_change_info | ||
| self._log_exception() | ||
|
|
||
| def getSqlState(self) -> Optional[str]: | ||
|
|
@@ -241,6 +262,15 @@ def getMessage(self) -> str: | |
| def getGrpcStatusCode(self) -> grpc.StatusCode: | ||
| return self._grpc_status_code | ||
|
|
||
| def getBreakingChangeInfo(self) -> Optional[Dict[str, Any]]: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we add a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a proto class defined in |
||
| """ | ||
| Returns the breaking change info for an error, or None. | ||
|
|
||
| For Spark Connect exceptions, this returns the breaking change info | ||
| received from the server, rather than looking it up from local error files. | ||
| """ | ||
| return self._breaking_change_info | ||
|
|
||
| def __str__(self) -> str: | ||
| return self.getMessage() | ||
|
|
||
|
|
@@ -263,6 +293,7 @@ def __init__( | |
| display_server_stacktrace: bool = False, | ||
| contexts: Optional[List[BaseQueryContext]] = None, | ||
| grpc_status_code: grpc.StatusCode = StatusCode.UNKNOWN, | ||
| breaking_change_info: Optional[Dict[str, Any]] = None, | ||
| ) -> None: | ||
| super().__init__( | ||
| message=message, | ||
|
|
@@ -274,6 +305,7 @@ def __init__( | |
| display_server_stacktrace=display_server_stacktrace, | ||
| contexts=contexts, | ||
| grpc_status_code=grpc_status_code, | ||
| breaking_change_info=breaking_change_info, | ||
| ) | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shall we add an
\nbetween the main error message and the breaking change message?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to use a space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this may look weird as the migration message itself can be multi lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I based this code on the existing logic for joining the subclass message:
spark/common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala
Line 111 in 0fb89df
That logic uses a space so I think it makes sense to match that for consistency.
In the common case where the message is a single line, I think a newline is more confusing than a space.