diff --git a/common/utils/src/main/java/org/apache/spark/SparkThrowable.java b/common/utils/src/main/java/org/apache/spark/SparkThrowable.java index 39808f58b08a..a798ed3980c7 100644 --- a/common/utils/src/main/java/org/apache/spark/SparkThrowable.java +++ b/common/utils/src/main/java/org/apache/spark/SparkThrowable.java @@ -60,6 +60,12 @@ default boolean isInternalError() { return SparkThrowableHelper.isInternalError(this.getCondition()); } + // If null, the error message is not for a breaking change + default BreakingChangeInfo getBreakingChangeInfo() { + return SparkThrowableHelper.getBreakingChangeInfo( + this.getCondition()).getOrElse(() -> null); + } + default Map getMessageParameters() { return new HashMap<>(); } diff --git a/common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala b/common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala index 2ff811298901..8a9adf7f87ad 100644 --- a/common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala +++ b/common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala @@ -75,6 +75,22 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) { matches.map(m => m.stripSuffix(">").stripPrefix("<")) } + def getBreakingChangeInfo(errorClass: String): Option[BreakingChangeInfo] = { + val errorClasses = errorClass.split('.') + errorClasses match { + case Array(mainClass) => + errorInfoMap.get(mainClass).flatMap(_.breakingChangeInfo) + case Array(mainClass, subClass) => + errorInfoMap.get(mainClass).flatMap{ + errorInfo => + errorInfo.subClass.flatMap(_.get(subClass)) + .flatMap(_.breakingChangeInfo) + .orElse(errorInfo.breakingChangeInfo) + } + case _ => None + } + } + def getMessageTemplate(errorClass: String): String = { val errorClasses = errorClass.split("\\.") assert(errorClasses.length == 1 || errorClasses.length == 2) @@ -128,7 +144,7 @@ private object ErrorClassesJsonReader { val map = mapper.readValue(url, new TypeReference[Map[String, ErrorInfo]]() {}) val errorClassWithDots = map.collectFirst { case (errorClass, _) if errorClass.contains('.') => errorClass - case (_, ErrorInfo(_, Some(map), _)) if map.keys.exists(_.contains('.')) => + case (_, ErrorInfo(_, Some(map), _, _)) if map.keys.exists(_.contains('.')) => map.keys.collectFirst { case s if s.contains('.') => s }.get } if (errorClassWithDots.isEmpty) { @@ -147,14 +163,17 @@ private object ErrorClassesJsonReader { * @param subClass SubClass associated with this class. * @param message Message format with optional placeholders (e.g. <parm>). * The error message is constructed by concatenating the lines with newlines. + * @param breakingChangeInfo Additional metadata if the error is due to a breaking change. */ private case class ErrorInfo( message: Seq[String], subClass: Option[Map[String, ErrorSubInfo]], - sqlState: Option[String]) { + sqlState: Option[String], + breakingChangeInfo: Option[BreakingChangeInfo] = None) { // For compatibility with multi-line error messages @JsonIgnore - val messageTemplate: String = message.mkString("\n") + val messageTemplate: String = message.mkString("\n") + + breakingChangeInfo.map(_.migrationMessage.mkString(" ", "\n", "")).getOrElse("") } /** @@ -162,13 +181,41 @@ private case class ErrorInfo( * * @param message Message format with optional placeholders (e.g. <parm>). * The error message is constructed by concatenating the lines with newlines. + * @param breakingChangeInfo Additional metadata if the error is due to a breaking change. */ -private case class ErrorSubInfo(message: Seq[String]) { +private case class ErrorSubInfo( + message: Seq[String], + breakingChangeInfo: Option[BreakingChangeInfo] = None) { // For compatibility with multi-line error messages @JsonIgnore - val messageTemplate: String = message.mkString("\n") + val messageTemplate: String = message.mkString("\n") + + breakingChangeInfo.map(_.migrationMessage.mkString(" ", "\n", "")).getOrElse("") } +/** + * Additional information if the error was caused by a breaking change. + * + * @param migrationMessage A message explaining how the user can migrate their job to work + * with the breaking change. + * @param mitigationConfig A spark config flag that can be used to mitigate the + * breaking change. + * @param needsAudit If true, the breaking change should be inspected manually. + * If false, the spark job should be retried by setting the + * mitigationConfig. + */ +case class BreakingChangeInfo( + migrationMessage: Seq[String], + mitigationConfig: Option[MitigationConfig] = None, + needsAudit: Boolean = true +) + +/** + * A spark config flag that can be used to mitigate a breaking change. + * @param key The spark config key. + * @param value The spark config value that mitigates the breaking change. + */ +case class MitigationConfig(key: String, value: String) + /** * Information associated with an error state / SQLSTATE. * diff --git a/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala b/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala index b6c2b176de62..58df9f916026 100644 --- a/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala +++ b/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala @@ -73,6 +73,14 @@ private[spark] object SparkThrowableHelper { errorReader.getMessageParameters(errorClass) } + def getBreakingChangeInfo(errorClass: String): Option[BreakingChangeInfo] = { + if (errorClass == null) { + None + } else { + errorReader.getBreakingChangeInfo(errorClass) + } + } + def isInternalError(errorClass: String): Boolean = { errorClass != null && errorClass.startsWith("INTERNAL_ERROR") } @@ -99,6 +107,19 @@ private[spark] object SparkThrowableHelper { g.writeStringField("errorClass", errorClass) if (format == STANDARD) { g.writeStringField("messageTemplate", errorReader.getMessageTemplate(errorClass)) + errorReader.getBreakingChangeInfo(errorClass).foreach { breakingChangeInfo => + g.writeObjectFieldStart("breakingChangeInfo") + g.writeStringField("migrationMessage", + breakingChangeInfo.migrationMessage.mkString("\n")) + breakingChangeInfo.mitigationConfig.foreach { mitigationConfig => + g.writeObjectFieldStart("mitigationConfig") + g.writeStringField("key", mitigationConfig.key) + g.writeStringField("value", mitigationConfig.value) + g.writeEndObject() + } + g.writeBooleanField("needsAudit", breakingChangeInfo.needsAudit) + g.writeEndObject() + } } val sqlState = e.getSqlState if (sqlState != null) g.writeStringField("sqlState", sqlState) diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala index bf62052e36da..61ffe9b4bd93 100644 --- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala @@ -504,6 +504,90 @@ class SparkThrowableSuite extends SparkFunSuite { } } + test("breaking changes info") { + assert(SparkThrowableHelper.getBreakingChangeInfo(null).isEmpty) + + val nonBreakingChangeError = new SparkException( + errorClass = "CANNOT_PARSE_DECIMAL", + messageParameters = Map.empty[String, String], + cause = null) + assert(nonBreakingChangeError.getBreakingChangeInfo == null) + + withTempDir { dir => + val json = new File(dir, "errors.json") + Files.writeString( + json.toPath, + """ + |{ + | "TEST_ERROR": { + | "message": [ + | "Error message 1 with ." + | ], + | "breakingChangeInfo": { + | "migrationMessage": [ + | "Migration message with ." + | ], + | "mitigationConfig": { + | "key": "config.key1", + | "value": "config.value1" + | }, + | "needsAudit": false + | } + | }, + | "TEST_ERROR_WITH_SUBCLASS": { + | "message": [ + | "Error message 2 with ." + | ], + | "subClass": { + | "SUBCLASS": { + | "message": [ + | "Subclass message with ." + | ], + | "breakingChangeInfo": { + | "migrationMessage": [ + | "Subclass migration message with ." + | ], + | "mitigationConfig": { + | "key": "config.key2", + | "value": "config.value2" + | }, + | "needsAudit": true + | } + | } + | } + | } + |} + |""".stripMargin, + StandardCharsets.UTF_8) + + val error1Params = Map("param1" -> "value1", "param2" -> "value2") + val error2Params = Map("param1" -> "value1", "param2" -> "value2", "param3" -> "value3") + + val reader = + new ErrorClassesJsonReader(Seq(errorJsonFilePath.toUri.toURL, json.toURI.toURL)) + val errorMessage = reader.getErrorMessage("TEST_ERROR", error1Params) + assert(errorMessage == "Error message 1 with value1. Migration message with value2.") + val breakingChangeInfo = reader.getBreakingChangeInfo("TEST_ERROR") + assert( + breakingChangeInfo.contains( + BreakingChangeInfo( + Seq("Migration message with ."), + Some(MitigationConfig("config.key1", "config.value1")), + needsAudit = false))) + val errorMessage2 = + reader.getErrorMessage("TEST_ERROR_WITH_SUBCLASS.SUBCLASS", error2Params) + assert( + errorMessage2 == "Error message 2 with value1. Subclass message with value2." + + " Subclass migration message with value3.") + val breakingChangeInfo2 = reader.getBreakingChangeInfo("TEST_ERROR_WITH_SUBCLASS.SUBCLASS") + assert( + breakingChangeInfo2.contains( + BreakingChangeInfo( + Seq("Subclass migration message with ."), + Some(MitigationConfig("config.key2", "config.value2"))))) + } + } + test("detect unused message parameters") { checkError( exception = intercept[SparkException] { diff --git a/python/pyspark/errors/exceptions/base.py b/python/pyspark/errors/exceptions/base.py index b0455b64dabf..84da3c345e92 100644 --- a/python/pyspark/errors/exceptions/base.py +++ b/python/pyspark/errors/exceptions/base.py @@ -17,7 +17,7 @@ import warnings from abc import ABC, abstractmethod from enum import Enum -from typing import Dict, Optional, TypeVar, cast, Iterable, TYPE_CHECKING, List +from typing import Any, Dict, Optional, TypeVar, cast, Iterable, TYPE_CHECKING, List from pyspark.errors.exceptions.tblib import Traceback from pyspark.errors.utils import ErrorClassesReader @@ -138,6 +138,23 @@ def getMessage(self) -> str: """ return f"[{self.getCondition()}] {self._message}" + def getBreakingChangeInfo(self) -> Optional[Dict[str, Any]]: + """ + Returns the breaking change info for an error, or None. + + Breaking change info is a dict with two fields: + + migration_message: list of str + A message explaining how the user can migrate their job to work + with the breaking change. + + mitigation_config: + A dict with key: str and value: str fields. + A spark config flag that can be used to mitigate the + breaking change. + """ + return self._error_reader.get_breaking_change_info(self._errorClass) + def getQueryContext(self) -> List["QueryContext"]: """ Returns :class:`QueryContext`. diff --git a/python/pyspark/errors/exceptions/connect.py b/python/pyspark/errors/exceptions/connect.py index 45c96a84f14d..e3f3f71db2c3 100644 --- a/python/pyspark/errors/exceptions/connect.py +++ b/python/pyspark/errors/exceptions/connect.py @@ -17,7 +17,7 @@ import grpc import json from grpc import StatusCode -from typing import Dict, List, Optional, TYPE_CHECKING +from typing import Any, Dict, List, Optional, TYPE_CHECKING from pyspark.errors.exceptions.base import ( AnalysisException as BaseAnalysisException, @@ -95,6 +95,7 @@ def _convert_exception( display_server_stacktrace = display_server_stacktrace if stacktrace else False contexts = None + breaking_change_info = None if resp and resp.HasField("root_error_idx"): root_error = resp.errors[resp.root_error_idx] if hasattr(root_error, "spark_throwable"): @@ -105,6 +106,20 @@ def _convert_exception( else DataFrameQueryContext(c) for c in root_error.spark_throwable.query_contexts ] + # Extract breaking change info if present + if hasattr( + root_error.spark_throwable, "breaking_change_info" + ) and root_error.spark_throwable.HasField("breaking_change_info"): + bci = root_error.spark_throwable.breaking_change_info + breaking_change_info = { + "migration_message": list(bci.migration_message), + "needs_audit": bci.needs_audit if bci.HasField("needs_audit") else True, + } + if bci.HasField("mitigation_config"): + breaking_change_info["mitigation_config"] = { + "key": bci.mitigation_config.key, + "value": bci.mitigation_config.value, + } if "org.apache.spark.api.python.PythonException" in classes: return PythonException( @@ -134,6 +149,7 @@ def _convert_exception( display_server_stacktrace=display_server_stacktrace, contexts=contexts, grpc_status_code=grpc_status_code, + breaking_change_info=breaking_change_info, ) # Return UnknownException if there is no matched exception class @@ -147,6 +163,7 @@ def _convert_exception( display_server_stacktrace=display_server_stacktrace, contexts=contexts, grpc_status_code=grpc_status_code, + breaking_change_info=breaking_change_info, ) @@ -193,6 +210,7 @@ def __init__( display_server_stacktrace: bool = False, contexts: Optional[List[BaseQueryContext]] = None, grpc_status_code: grpc.StatusCode = StatusCode.UNKNOWN, + breaking_change_info: Optional[Dict[str, Any]] = None, ) -> None: if contexts is None: contexts = [] @@ -221,6 +239,7 @@ def __init__( self._display_stacktrace: bool = display_server_stacktrace self._contexts: List[BaseQueryContext] = contexts self._grpc_status_code = grpc_status_code + self._breaking_change_info: Optional[Dict[str, Any]] = breaking_change_info self._log_exception() def getSqlState(self) -> Optional[str]: @@ -241,6 +260,15 @@ def getMessage(self) -> str: def getGrpcStatusCode(self) -> grpc.StatusCode: return self._grpc_status_code + def getBreakingChangeInfo(self) -> Optional[Dict[str, Any]]: + """ + Returns the breaking change info for an error, or None. + + For Spark Connect exceptions, this returns the breaking change info + received from the server, rather than looking it up from local error files. + """ + return self._breaking_change_info + def __str__(self) -> str: return self.getMessage() @@ -263,6 +291,7 @@ def __init__( display_server_stacktrace: bool = False, contexts: Optional[List[BaseQueryContext]] = None, grpc_status_code: grpc.StatusCode = StatusCode.UNKNOWN, + breaking_change_info: Optional[Dict[str, Any]] = None, ) -> None: super().__init__( message=message, @@ -274,6 +303,7 @@ def __init__( display_server_stacktrace=display_server_stacktrace, contexts=contexts, grpc_status_code=grpc_status_code, + breaking_change_info=breaking_change_info, ) diff --git a/python/pyspark/errors/tests/test_connect_errors_conversion.py b/python/pyspark/errors/tests/test_connect_errors_conversion.py index f80a6301d61b..b6f19e315fcc 100644 --- a/python/pyspark/errors/tests/test_connect_errors_conversion.py +++ b/python/pyspark/errors/tests/test_connect_errors_conversion.py @@ -189,6 +189,182 @@ def test_convert_exception_fallback(self): ) self.assertEqual(exception.getGrpcStatusCode(), StatusCode.UNKNOWN) + def test_convert_exception_with_breaking_change_info(self): + """Test that breaking change info is correctly extracted from protobuf response.""" + import pyspark.sql.connect.proto as pb2 + from google.rpc.error_details_pb2 import ErrorInfo + from grpc import StatusCode + + # Create mock FetchErrorDetailsResponse with breaking change info + resp = pb2.FetchErrorDetailsResponse() + resp.root_error_idx = 0 + + error = resp.errors.add() + error.message = "Test error with breaking change" + error.error_type_hierarchy.append("org.apache.spark.SparkException") + + # Add SparkThrowable with breaking change info + spark_throwable = error.spark_throwable + spark_throwable.error_class = "TEST_BREAKING_CHANGE_ERROR" + + # Add breaking change info + bci = spark_throwable.breaking_change_info + bci.migration_message.append("Please update your code to use new API") + bci.migration_message.append("See documentation for details") + bci.needs_audit = False + + # Add mitigation config + mitigation_config = bci.mitigation_config + mitigation_config.key = "spark.sql.legacy.behavior.enabled" + mitigation_config.value = "true" + + info = ErrorInfo() + info.reason = "org.apache.spark.SparkException" + info.metadata["classes"] = '["org.apache.spark.SparkException"]' + + exception = convert_exception( + info=info, + truncated_message="Test error", + resp=resp, + grpc_status_code=StatusCode.INTERNAL, + ) + + # Verify breaking change info is correctly extracted + breaking_change_info = exception.getBreakingChangeInfo() + self.assertIsNotNone(breaking_change_info) + self.assertEqual( + breaking_change_info["migration_message"], + ["Please update your code to use new API", "See documentation for details"], + ) + self.assertEqual(breaking_change_info["needs_audit"], False) + self.assertIn("mitigation_config", breaking_change_info) + self.assertEqual( + breaking_change_info["mitigation_config"]["key"], + "spark.sql.legacy.behavior.enabled", + ) + self.assertEqual(breaking_change_info["mitigation_config"]["value"], "true") + + def test_convert_exception_without_breaking_change_info(self): + """Test that getBreakingChangeInfo returns None when no breaking change info.""" + import pyspark.sql.connect.proto as pb2 + from google.rpc.error_details_pb2 import ErrorInfo + from grpc import StatusCode + + # Create mock FetchErrorDetailsResponse without breaking change info + resp = pb2.FetchErrorDetailsResponse() + resp.root_error_idx = 0 + + error = resp.errors.add() + error.message = "Test error without breaking change" + error.error_type_hierarchy.append("org.apache.spark.SparkException") + + # Add SparkThrowable without breaking change info + spark_throwable = error.spark_throwable + spark_throwable.error_class = "REGULAR_ERROR" + + info = ErrorInfo() + info.reason = "org.apache.spark.SparkException" + info.metadata["classes"] = '["org.apache.spark.SparkException"]' + + exception = convert_exception( + info=info, + truncated_message="Test error", + resp=resp, + grpc_status_code=StatusCode.INTERNAL, + ) + + # Verify breaking change info is None + breaking_change_info = exception.getBreakingChangeInfo() + self.assertIsNone(breaking_change_info) + + def test_breaking_change_info_storage_in_exception(self): + """Test SparkConnectGrpcException correctly stores and retrieves breaking change info.""" + from pyspark.errors.exceptions.connect import SparkConnectGrpcException + + breaking_change_info = { + "migration_message": ["Test migration message"], + "mitigation_config": {"key": "test.config.key", "value": "test.config.value"}, + "needs_audit": True, + } + + exception = SparkConnectGrpcException( + message="Test error", errorClass="TEST_ERROR", breaking_change_info=breaking_change_info + ) + + stored_info = exception.getBreakingChangeInfo() + self.assertEqual(stored_info, breaking_change_info) + + def test_breaking_change_info_inheritance(self): + """Test that subclasses of SparkConnectGrpcException + correctly inherit breaking change info.""" + from pyspark.errors.exceptions.connect import AnalysisException, UnknownException + + breaking_change_info = { + "migration_message": ["Inheritance test message"], + "needs_audit": False, + } + + # Test AnalysisException + analysis_exception = AnalysisException( + message="Analysis error with breaking change", + errorClass="TEST_ANALYSIS_ERROR", + breaking_change_info=breaking_change_info, + ) + + stored_info = analysis_exception.getBreakingChangeInfo() + self.assertEqual(stored_info, breaking_change_info) + + # Test UnknownException + unknown_exception = UnknownException( + message="Unknown error with breaking change", + errorClass="TEST_UNKNOWN_ERROR", + breaking_change_info=breaking_change_info, + ) + + stored_info = unknown_exception.getBreakingChangeInfo() + self.assertEqual(stored_info, breaking_change_info) + + def test_breaking_change_info_without_mitigation_config(self): + """Test breaking change info that only has migration messages.""" + import pyspark.sql.connect.proto as pb2 + from google.rpc.error_details_pb2 import ErrorInfo + from grpc import StatusCode + + # Create mock FetchErrorDetailsResponse with breaking change info (no mitigation config) + resp = pb2.FetchErrorDetailsResponse() + resp.root_error_idx = 0 + + error = resp.errors.add() + error.message = "Test error with breaking change" + error.error_type_hierarchy.append("org.apache.spark.SparkException") + + # Add SparkThrowable with breaking change info + spark_throwable = error.spark_throwable + spark_throwable.error_class = "TEST_BREAKING_CHANGE_ERROR" + + # Add breaking change info without mitigation config + bci = spark_throwable.breaking_change_info + bci.migration_message.append("Migration message only") + bci.needs_audit = True + + info = ErrorInfo() + info.reason = "org.apache.spark.SparkException" + info.metadata["classes"] = '["org.apache.spark.SparkException"]' + + exception = convert_exception( + info=info, + truncated_message="Test error", + resp=resp, + grpc_status_code=StatusCode.INTERNAL, + ) + + # Verify breaking change info is correctly extracted + breaking_change_info = exception.getBreakingChangeInfo() + self.assertIsNotNone(breaking_change_info) + self.assertEqual(breaking_change_info["migration_message"], ["Migration message only"]) + self.assertEqual(breaking_change_info["needs_audit"], True) + self.assertNotIn("mitigation_config", breaking_change_info) + if __name__ == "__main__": import unittest diff --git a/python/pyspark/errors/tests/test_errors.py b/python/pyspark/errors/tests/test_errors.py index 721bfb6a0f35..d0565a40d222 100644 --- a/python/pyspark/errors/tests/test_errors.py +++ b/python/pyspark/errors/tests/test_errors.py @@ -54,6 +54,60 @@ def test_invalid_error_class(self): with self.assertRaisesRegex(ValueError, "Cannot find main error class"): PySparkValueError(errorClass="invalid", messageParameters={}) + def test_breaking_change_info(self): + # Test retrieving the breaking change info for an error. + error_reader = ErrorClassesReader() + error_reader.error_info_map = { + "TEST_ERROR": { + "message": ["Error message 1 with ."], + "breaking_change_info": { + "migration_message": ["Migration message with ."], + "mitigation_config": {"key": "config.key1", "value": "config.value1"}, + "needsAudit": False, + }, + }, + "TEST_ERROR_WITH_SUB_CLASS": { + "message": ["Error message 2 with ."], + "sub_class": { + "SUBCLASS": { + "message": ["Subclass message with ."], + "breaking_change_info": { + "migration_message": ["Subclass migration message with ."], + "mitigation_config": { + "key": "config.key2", + "value": "config.value2", + }, + "needsAudit": True, + }, + } + }, + }, + } + error_message1 = error_reader.get_error_message( + "TEST_ERROR", {"param": "value1", "param2": "value2"} + ) + self.assertEqual( + error_message1, "Error message 1 with value1. Migration message with value2." + ) + error_message2 = error_reader.get_error_message( + "TEST_ERROR_WITH_SUB_CLASS.SUBCLASS", + {"param": "value1", "param2": "value2", "param3": "value3"}, + ) + self.assertEqual( + error_message2, + "Error message 2 with value1. Subclass message with value2." + " Subclass migration message with value3.", + ) + breaking_change_info1 = error_reader.get_breaking_change_info("TEST_ERROR") + self.assertEqual( + breaking_change_info1, error_reader.error_info_map["TEST_ERROR"]["breaking_change_info"] + ) + breaking_change_info2 = error_reader.get_breaking_change_info( + "TEST_ERROR_WITH_SUB_CLASS.SUBCLASS" + ) + subclass_map = error_reader.error_info_map["TEST_ERROR_WITH_SUB_CLASS"]["sub_class"] + self.assertEqual(breaking_change_info2, subclass_map["SUBCLASS"]["breaking_change_info"]) + if __name__ == "__main__": import unittest diff --git a/python/pyspark/errors/utils.py b/python/pyspark/errors/utils.py index 053b2c91370e..43bd871c564d 100644 --- a/python/pyspark/errors/utils.py +++ b/python/pyspark/errors/utils.py @@ -167,6 +167,10 @@ def get_message_template(self, errorClass: str) -> str: raise ValueError(f"Cannot find main error class '{main_error_class}'") main_message_template = "\n".join(main_error_class_info_map["message"]) + if "breaking_change_info" in main_error_class_info_map: + main_message_template += " " + "\n".join( + main_error_class_info_map["breaking_change_info"]["migration_message"] + ) has_sub_class = len_error_classes == 2 @@ -182,10 +186,43 @@ def get_message_template(self, errorClass: str) -> str: raise ValueError(f"Cannot find sub error class '{sub_error_class}'") sub_message_template = "\n".join(sub_error_class_info_map["message"]) + if "breaking_change_info" in sub_error_class_info_map: + sub_message_template += " " + "\n".join( + sub_error_class_info_map["breaking_change_info"]["migration_message"] + ) message_template = main_message_template + " " + sub_message_template return message_template + def get_breaking_change_info(self, errorClass: Optional[str]) -> Optional[Dict[str, Any]]: + """ + Returns the breaking change info for an error if it is present. + """ + if errorClass is None: + return None + error_classes = errorClass.split(".") + len_error_classes = len(error_classes) + assert len_error_classes in (1, 2) + + main_error_class = error_classes[0] + if main_error_class in self.error_info_map: + main_error_class_info_map = self.error_info_map[main_error_class] + else: + raise ValueError(f"Cannot find main error class '{main_error_class}'") + + if len_error_classes == 2: + sub_error_class = error_classes[1] + main_error_class_subclass_info_map = main_error_class_info_map["sub_class"] + if sub_error_class in main_error_class_subclass_info_map: + sub_error_class_info_map = main_error_class_subclass_info_map[sub_error_class] + else: + raise ValueError(f"Cannot find sub error class '{sub_error_class}'") + if "breaking_change_info" in sub_error_class_info_map: + return sub_error_class_info_map["breaking_change_info"] + if "breaking_change_info" in main_error_class_info_map: + return main_error_class_info_map["breaking_change_info"] + return None + def _capture_call_site(depth: int) -> str: """ diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py index f690adf40c6b..851d74df137e 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.py +++ b/python/pyspark/sql/connect/proto/base_pb2.py @@ -45,7 +45,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto\x1a\x16spark/connect/ml.proto\x1a\x1dspark/connect/pipelines.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x01(\tR\x08userName\x12\x35\n\nextensions\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\nextensions"\xf5\x14\n\x12\x41nalyzePlanRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x11 \x01(\tH\x01R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12$\n\x0b\x63lient_type\x18\x03 \x01(\tH\x02R\nclientType\x88\x01\x01\x12\x42\n\x06schema\x18\x04 \x01(\x0b\x32(.spark.connect.AnalyzePlanRequest.SchemaH\x00R\x06schema\x12\x45\n\x07\x65xplain\x18\x05 \x01(\x0b\x32).spark.connect.AnalyzePlanRequest.ExplainH\x00R\x07\x65xplain\x12O\n\x0btree_string\x18\x06 \x01(\x0b\x32,.spark.connect.AnalyzePlanRequest.TreeStringH\x00R\ntreeString\x12\x46\n\x08is_local\x18\x07 \x01(\x0b\x32).spark.connect.AnalyzePlanRequest.IsLocalH\x00R\x07isLocal\x12R\n\x0cis_streaming\x18\x08 \x01(\x0b\x32-.spark.connect.AnalyzePlanRequest.IsStreamingH\x00R\x0bisStreaming\x12O\n\x0binput_files\x18\t \x01(\x0b\x32,.spark.connect.AnalyzePlanRequest.InputFilesH\x00R\ninputFiles\x12U\n\rspark_version\x18\n \x01(\x0b\x32..spark.connect.AnalyzePlanRequest.SparkVersionH\x00R\x0csparkVersion\x12I\n\tddl_parse\x18\x0b \x01(\x0b\x32*.spark.connect.AnalyzePlanRequest.DDLParseH\x00R\x08\x64\x64lParse\x12X\n\x0esame_semantics\x18\x0c \x01(\x0b\x32/.spark.connect.AnalyzePlanRequest.SameSemanticsH\x00R\rsameSemantics\x12U\n\rsemantic_hash\x18\r \x01(\x0b\x32..spark.connect.AnalyzePlanRequest.SemanticHashH\x00R\x0csemanticHash\x12\x45\n\x07persist\x18\x0e \x01(\x0b\x32).spark.connect.AnalyzePlanRequest.PersistH\x00R\x07persist\x12K\n\tunpersist\x18\x0f \x01(\x0b\x32+.spark.connect.AnalyzePlanRequest.UnpersistH\x00R\tunpersist\x12_\n\x11get_storage_level\x18\x10 \x01(\x0b\x32\x31.spark.connect.AnalyzePlanRequest.GetStorageLevelH\x00R\x0fgetStorageLevel\x12M\n\x0bjson_to_ddl\x18\x12 \x01(\x0b\x32+.spark.connect.AnalyzePlanRequest.JsonToDDLH\x00R\tjsonToDdl\x1a\x31\n\x06Schema\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x1a\xbb\x02\n\x07\x45xplain\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12X\n\x0c\x65xplain_mode\x18\x02 \x01(\x0e\x32\x35.spark.connect.AnalyzePlanRequest.Explain.ExplainModeR\x0b\x65xplainMode"\xac\x01\n\x0b\x45xplainMode\x12\x1c\n\x18\x45XPLAIN_MODE_UNSPECIFIED\x10\x00\x12\x17\n\x13\x45XPLAIN_MODE_SIMPLE\x10\x01\x12\x19\n\x15\x45XPLAIN_MODE_EXTENDED\x10\x02\x12\x18\n\x14\x45XPLAIN_MODE_CODEGEN\x10\x03\x12\x15\n\x11\x45XPLAIN_MODE_COST\x10\x04\x12\x1a\n\x16\x45XPLAIN_MODE_FORMATTED\x10\x05\x1aZ\n\nTreeString\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12\x19\n\x05level\x18\x02 \x01(\x05H\x00R\x05level\x88\x01\x01\x42\x08\n\x06_level\x1a\x32\n\x07IsLocal\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x1a\x36\n\x0bIsStreaming\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x1a\x35\n\nInputFiles\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x1a\x0e\n\x0cSparkVersion\x1a)\n\x08\x44\x44LParse\x12\x1d\n\nddl_string\x18\x01 \x01(\tR\tddlString\x1ay\n\rSameSemantics\x12\x34\n\x0btarget_plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\ntargetPlan\x12\x32\n\nother_plan\x18\x02 \x01(\x0b\x32\x13.spark.connect.PlanR\totherPlan\x1a\x37\n\x0cSemanticHash\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x1a\x97\x01\n\x07Persist\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x12\x45\n\rstorage_level\x18\x02 \x01(\x0b\x32\x1b.spark.connect.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level\x1an\n\tUnpersist\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x12\x1f\n\x08\x62locking\x18\x02 \x01(\x08H\x00R\x08\x62locking\x88\x01\x01\x42\x0b\n\t_blocking\x1a\x46\n\x0fGetStorageLevel\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x1a,\n\tJsonToDDL\x12\x1f\n\x0bjson_string\x18\x01 \x01(\tR\njsonStringB\t\n\x07\x61nalyzeB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\xca\x0e\n\x13\x41nalyzePlanResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x0f \x01(\tR\x13serverSideSessionId\x12\x43\n\x06schema\x18\x02 \x01(\x0b\x32).spark.connect.AnalyzePlanResponse.SchemaH\x00R\x06schema\x12\x46\n\x07\x65xplain\x18\x03 \x01(\x0b\x32*.spark.connect.AnalyzePlanResponse.ExplainH\x00R\x07\x65xplain\x12P\n\x0btree_string\x18\x04 \x01(\x0b\x32-.spark.connect.AnalyzePlanResponse.TreeStringH\x00R\ntreeString\x12G\n\x08is_local\x18\x05 \x01(\x0b\x32*.spark.connect.AnalyzePlanResponse.IsLocalH\x00R\x07isLocal\x12S\n\x0cis_streaming\x18\x06 \x01(\x0b\x32..spark.connect.AnalyzePlanResponse.IsStreamingH\x00R\x0bisStreaming\x12P\n\x0binput_files\x18\x07 \x01(\x0b\x32-.spark.connect.AnalyzePlanResponse.InputFilesH\x00R\ninputFiles\x12V\n\rspark_version\x18\x08 \x01(\x0b\x32/.spark.connect.AnalyzePlanResponse.SparkVersionH\x00R\x0csparkVersion\x12J\n\tddl_parse\x18\t \x01(\x0b\x32+.spark.connect.AnalyzePlanResponse.DDLParseH\x00R\x08\x64\x64lParse\x12Y\n\x0esame_semantics\x18\n \x01(\x0b\x32\x30.spark.connect.AnalyzePlanResponse.SameSemanticsH\x00R\rsameSemantics\x12V\n\rsemantic_hash\x18\x0b \x01(\x0b\x32/.spark.connect.AnalyzePlanResponse.SemanticHashH\x00R\x0csemanticHash\x12\x46\n\x07persist\x18\x0c \x01(\x0b\x32*.spark.connect.AnalyzePlanResponse.PersistH\x00R\x07persist\x12L\n\tunpersist\x18\r \x01(\x0b\x32,.spark.connect.AnalyzePlanResponse.UnpersistH\x00R\tunpersist\x12`\n\x11get_storage_level\x18\x0e \x01(\x0b\x32\x32.spark.connect.AnalyzePlanResponse.GetStorageLevelH\x00R\x0fgetStorageLevel\x12N\n\x0bjson_to_ddl\x18\x10 \x01(\x0b\x32,.spark.connect.AnalyzePlanResponse.JsonToDDLH\x00R\tjsonToDdl\x1a\x39\n\x06Schema\x12/\n\x06schema\x18\x01 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x1a\x30\n\x07\x45xplain\x12%\n\x0e\x65xplain_string\x18\x01 \x01(\tR\rexplainString\x1a-\n\nTreeString\x12\x1f\n\x0btree_string\x18\x01 \x01(\tR\ntreeString\x1a$\n\x07IsLocal\x12\x19\n\x08is_local\x18\x01 \x01(\x08R\x07isLocal\x1a\x30\n\x0bIsStreaming\x12!\n\x0cis_streaming\x18\x01 \x01(\x08R\x0bisStreaming\x1a"\n\nInputFiles\x12\x14\n\x05\x66iles\x18\x01 \x03(\tR\x05\x66iles\x1a(\n\x0cSparkVersion\x12\x18\n\x07version\x18\x01 \x01(\tR\x07version\x1a;\n\x08\x44\x44LParse\x12/\n\x06parsed\x18\x01 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06parsed\x1a\'\n\rSameSemantics\x12\x16\n\x06result\x18\x01 \x01(\x08R\x06result\x1a&\n\x0cSemanticHash\x12\x16\n\x06result\x18\x01 \x01(\x05R\x06result\x1a\t\n\x07Persist\x1a\x0b\n\tUnpersist\x1aS\n\x0fGetStorageLevel\x12@\n\rstorage_level\x18\x01 \x01(\x0b\x32\x1b.spark.connect.StorageLevelR\x0cstorageLevel\x1a*\n\tJsonToDDL\x12\x1d\n\nddl_string\x18\x01 \x01(\tR\tddlStringB\x08\n\x06result"\x83\x06\n\x12\x45xecutePlanRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x08 \x01(\tH\x00R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12&\n\x0coperation_id\x18\x06 \x01(\tH\x01R\x0boperationId\x88\x01\x01\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x02R\nclientType\x88\x01\x01\x12X\n\x0frequest_options\x18\x05 \x03(\x0b\x32/.spark.connect.ExecutePlanRequest.RequestOptionR\x0erequestOptions\x12\x12\n\x04tags\x18\x07 \x03(\tR\x04tags\x1a\x85\x02\n\rRequestOption\x12K\n\x10reattach_options\x18\x01 \x01(\x0b\x32\x1e.spark.connect.ReattachOptionsH\x00R\x0freattachOptions\x12^\n\x17result_chunking_options\x18\x02 \x01(\x0b\x32$.spark.connect.ResultChunkingOptionsH\x00R\x15resultChunkingOptions\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textensionB\x10\n\x0erequest_optionB)\n\'_client_observed_server_side_session_idB\x0f\n\r_operation_idB\x0e\n\x0c_client_type"\xf1\x19\n\x13\x45xecutePlanResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x0f \x01(\tR\x13serverSideSessionId\x12!\n\x0coperation_id\x18\x0c \x01(\tR\x0boperationId\x12\x1f\n\x0bresponse_id\x18\r \x01(\tR\nresponseId\x12P\n\x0b\x61rrow_batch\x18\x02 \x01(\x0b\x32-.spark.connect.ExecutePlanResponse.ArrowBatchH\x00R\narrowBatch\x12\x63\n\x12sql_command_result\x18\x05 \x01(\x0b\x32\x33.spark.connect.ExecutePlanResponse.SqlCommandResultH\x00R\x10sqlCommandResult\x12~\n#write_stream_operation_start_result\x18\x08 \x01(\x0b\x32..spark.connect.WriteStreamOperationStartResultH\x00R\x1fwriteStreamOperationStartResult\x12q\n\x1estreaming_query_command_result\x18\t \x01(\x0b\x32*.spark.connect.StreamingQueryCommandResultH\x00R\x1bstreamingQueryCommandResult\x12k\n\x1cget_resources_command_result\x18\n \x01(\x0b\x32(.spark.connect.GetResourcesCommandResultH\x00R\x19getResourcesCommandResult\x12\x87\x01\n&streaming_query_manager_command_result\x18\x0b \x01(\x0b\x32\x31.spark.connect.StreamingQueryManagerCommandResultH\x00R"streamingQueryManagerCommandResult\x12\x87\x01\n&streaming_query_listener_events_result\x18\x10 \x01(\x0b\x32\x31.spark.connect.StreamingQueryListenerEventsResultH\x00R"streamingQueryListenerEventsResult\x12\\\n\x0fresult_complete\x18\x0e \x01(\x0b\x32\x31.spark.connect.ExecutePlanResponse.ResultCompleteH\x00R\x0eresultComplete\x12\x87\x01\n&create_resource_profile_command_result\x18\x11 \x01(\x0b\x32\x31.spark.connect.CreateResourceProfileCommandResultH\x00R"createResourceProfileCommandResult\x12\x65\n\x12\x65xecution_progress\x18\x12 \x01(\x0b\x32\x34.spark.connect.ExecutePlanResponse.ExecutionProgressH\x00R\x11\x65xecutionProgress\x12\x64\n\x19\x63heckpoint_command_result\x18\x13 \x01(\x0b\x32&.spark.connect.CheckpointCommandResultH\x00R\x17\x63heckpointCommandResult\x12L\n\x11ml_command_result\x18\x14 \x01(\x0b\x32\x1e.spark.connect.MlCommandResultH\x00R\x0fmlCommandResult\x12X\n\x15pipeline_event_result\x18\x15 \x01(\x0b\x32".spark.connect.PipelineEventResultH\x00R\x13pipelineEventResult\x12^\n\x17pipeline_command_result\x18\x16 \x01(\x0b\x32$.spark.connect.PipelineCommandResultH\x00R\x15pipelineCommandResult\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x12\x44\n\x07metrics\x18\x04 \x01(\x0b\x32*.spark.connect.ExecutePlanResponse.MetricsR\x07metrics\x12]\n\x10observed_metrics\x18\x06 \x03(\x0b\x32\x32.spark.connect.ExecutePlanResponse.ObservedMetricsR\x0fobservedMetrics\x12/\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x1aG\n\x10SqlCommandResult\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x1a\xf8\x01\n\nArrowBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x12&\n\x0cstart_offset\x18\x03 \x01(\x03H\x00R\x0bstartOffset\x88\x01\x01\x12$\n\x0b\x63hunk_index\x18\x04 \x01(\x03H\x01R\nchunkIndex\x88\x01\x01\x12\x32\n\x13num_chunks_in_batch\x18\x05 \x01(\x03H\x02R\x10numChunksInBatch\x88\x01\x01\x42\x0f\n\r_start_offsetB\x0e\n\x0c_chunk_indexB\x16\n\x14_num_chunks_in_batch\x1a\x85\x04\n\x07Metrics\x12Q\n\x07metrics\x18\x01 \x03(\x0b\x32\x37.spark.connect.ExecutePlanResponse.Metrics.MetricObjectR\x07metrics\x1a\xcc\x02\n\x0cMetricObject\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x17\n\x07plan_id\x18\x02 \x01(\x03R\x06planId\x12\x16\n\x06parent\x18\x03 \x01(\x03R\x06parent\x12z\n\x11\x65xecution_metrics\x18\x04 \x03(\x0b\x32M.spark.connect.ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntryR\x10\x65xecutionMetrics\x1a{\n\x15\x45xecutionMetricsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ExecutePlanResponse.Metrics.MetricValueR\x05value:\x02\x38\x01\x1aX\n\x0bMetricValue\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n\x05value\x18\x02 \x01(\x03R\x05value\x12\x1f\n\x0bmetric_type\x18\x03 \x01(\tR\nmetricType\x1a\x8d\x01\n\x0fObservedMetrics\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x39\n\x06values\x18\x02 \x03(\x0b\x32!.spark.connect.Expression.LiteralR\x06values\x12\x12\n\x04keys\x18\x03 \x03(\tR\x04keys\x12\x17\n\x07plan_id\x18\x04 \x01(\x03R\x06planId\x1a\x10\n\x0eResultComplete\x1a\xcd\x02\n\x11\x45xecutionProgress\x12V\n\x06stages\x18\x01 \x03(\x0b\x32>.spark.connect.ExecutePlanResponse.ExecutionProgress.StageInfoR\x06stages\x12,\n\x12num_inflight_tasks\x18\x02 \x01(\x03R\x10numInflightTasks\x1a\xb1\x01\n\tStageInfo\x12\x19\n\x08stage_id\x18\x01 \x01(\x03R\x07stageId\x12\x1b\n\tnum_tasks\x18\x02 \x01(\x03R\x08numTasks\x12.\n\x13num_completed_tasks\x18\x03 \x01(\x03R\x11numCompletedTasks\x12(\n\x10input_bytes_read\x18\x04 \x01(\x03R\x0einputBytesRead\x12\x12\n\x04\x64one\x18\x05 \x01(\x08R\x04\x64oneB\x0f\n\rresponse_type"A\n\x08KeyValue\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x19\n\x05value\x18\x02 \x01(\tH\x00R\x05value\x88\x01\x01\x42\x08\n\x06_value"\xaf\t\n\rConfigRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x08 \x01(\tH\x00R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\x44\n\toperation\x18\x03 \x01(\x0b\x32&.spark.connect.ConfigRequest.OperationR\toperation\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x01R\nclientType\x88\x01\x01\x1a\xf2\x03\n\tOperation\x12\x34\n\x03set\x18\x01 \x01(\x0b\x32 .spark.connect.ConfigRequest.SetH\x00R\x03set\x12\x34\n\x03get\x18\x02 \x01(\x0b\x32 .spark.connect.ConfigRequest.GetH\x00R\x03get\x12W\n\x10get_with_default\x18\x03 \x01(\x0b\x32+.spark.connect.ConfigRequest.GetWithDefaultH\x00R\x0egetWithDefault\x12G\n\nget_option\x18\x04 \x01(\x0b\x32&.spark.connect.ConfigRequest.GetOptionH\x00R\tgetOption\x12>\n\x07get_all\x18\x05 \x01(\x0b\x32#.spark.connect.ConfigRequest.GetAllH\x00R\x06getAll\x12:\n\x05unset\x18\x06 \x01(\x0b\x32".spark.connect.ConfigRequest.UnsetH\x00R\x05unset\x12P\n\ris_modifiable\x18\x07 \x01(\x0b\x32).spark.connect.ConfigRequest.IsModifiableH\x00R\x0cisModifiableB\t\n\x07op_type\x1a\\\n\x03Set\x12-\n\x05pairs\x18\x01 \x03(\x0b\x32\x17.spark.connect.KeyValueR\x05pairs\x12\x1b\n\x06silent\x18\x02 \x01(\x08H\x00R\x06silent\x88\x01\x01\x42\t\n\x07_silent\x1a\x19\n\x03Get\x12\x12\n\x04keys\x18\x01 \x03(\tR\x04keys\x1a?\n\x0eGetWithDefault\x12-\n\x05pairs\x18\x01 \x03(\x0b\x32\x17.spark.connect.KeyValueR\x05pairs\x1a\x1f\n\tGetOption\x12\x12\n\x04keys\x18\x01 \x03(\tR\x04keys\x1a\x30\n\x06GetAll\x12\x1b\n\x06prefix\x18\x01 \x01(\tH\x00R\x06prefix\x88\x01\x01\x42\t\n\x07_prefix\x1a\x1b\n\x05Unset\x12\x12\n\x04keys\x18\x01 \x03(\tR\x04keys\x1a"\n\x0cIsModifiable\x12\x12\n\x04keys\x18\x01 \x03(\tR\x04keysB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\xaf\x01\n\x0e\x43onfigResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x04 \x01(\tR\x13serverSideSessionId\x12-\n\x05pairs\x18\x02 \x03(\x0b\x32\x17.spark.connect.KeyValueR\x05pairs\x12\x1a\n\x08warnings\x18\x03 \x03(\tR\x08warnings"\xea\x07\n\x13\x41\x64\x64\x41rtifactsRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12V\n&client_observed_server_side_session_id\x18\x07 \x01(\tH\x01R!clientObservedServerSideSessionId\x88\x01\x01\x12$\n\x0b\x63lient_type\x18\x06 \x01(\tH\x02R\nclientType\x88\x01\x01\x12@\n\x05\x62\x61tch\x18\x03 \x01(\x0b\x32(.spark.connect.AddArtifactsRequest.BatchH\x00R\x05\x62\x61tch\x12Z\n\x0b\x62\x65gin_chunk\x18\x04 \x01(\x0b\x32\x37.spark.connect.AddArtifactsRequest.BeginChunkedArtifactH\x00R\nbeginChunk\x12H\n\x05\x63hunk\x18\x05 \x01(\x0b\x32\x30.spark.connect.AddArtifactsRequest.ArtifactChunkH\x00R\x05\x63hunk\x1a\x35\n\rArtifactChunk\x12\x12\n\x04\x64\x61ta\x18\x01 \x01(\x0cR\x04\x64\x61ta\x12\x10\n\x03\x63rc\x18\x02 \x01(\x03R\x03\x63rc\x1ao\n\x13SingleChunkArtifact\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x44\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x30.spark.connect.AddArtifactsRequest.ArtifactChunkR\x04\x64\x61ta\x1a]\n\x05\x42\x61tch\x12T\n\tartifacts\x18\x01 \x03(\x0b\x32\x36.spark.connect.AddArtifactsRequest.SingleChunkArtifactR\tartifacts\x1a\xc1\x01\n\x14\x42\x65ginChunkedArtifact\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x1f\n\x0btotal_bytes\x18\x02 \x01(\x03R\ntotalBytes\x12\x1d\n\nnum_chunks\x18\x03 \x01(\x03R\tnumChunks\x12U\n\rinitial_chunk\x18\x04 \x01(\x0b\x32\x30.spark.connect.AddArtifactsRequest.ArtifactChunkR\x0cinitialChunkB\t\n\x07payloadB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\x90\x02\n\x14\x41\x64\x64\x41rtifactsResponse\x12\x1d\n\nsession_id\x18\x02 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x03 \x01(\tR\x13serverSideSessionId\x12Q\n\tartifacts\x18\x01 \x03(\x0b\x32\x33.spark.connect.AddArtifactsResponse.ArtifactSummaryR\tartifacts\x1aQ\n\x0f\x41rtifactSummary\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12*\n\x11is_crc_successful\x18\x02 \x01(\x08R\x0fisCrcSuccessful"\xc6\x02\n\x17\x41rtifactStatusesRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x05 \x01(\tH\x00R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12$\n\x0b\x63lient_type\x18\x03 \x01(\tH\x01R\nclientType\x88\x01\x01\x12\x14\n\x05names\x18\x04 \x03(\tR\x05namesB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\xe0\x02\n\x18\x41rtifactStatusesResponse\x12\x1d\n\nsession_id\x18\x02 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x03 \x01(\tR\x13serverSideSessionId\x12Q\n\x08statuses\x18\x01 \x03(\x0b\x32\x35.spark.connect.ArtifactStatusesResponse.StatusesEntryR\x08statuses\x1as\n\rStatusesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ArtifactStatusesResponse.ArtifactStatusR\x05value:\x02\x38\x01\x1a(\n\x0e\x41rtifactStatus\x12\x16\n\x06\x65xists\x18\x01 \x01(\x08R\x06\x65xists"\xdb\x04\n\x10InterruptRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x07 \x01(\tH\x01R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12$\n\x0b\x63lient_type\x18\x03 \x01(\tH\x02R\nclientType\x88\x01\x01\x12T\n\x0einterrupt_type\x18\x04 \x01(\x0e\x32-.spark.connect.InterruptRequest.InterruptTypeR\rinterruptType\x12%\n\roperation_tag\x18\x05 \x01(\tH\x00R\x0coperationTag\x12#\n\x0coperation_id\x18\x06 \x01(\tH\x00R\x0boperationId"\x80\x01\n\rInterruptType\x12\x1e\n\x1aINTERRUPT_TYPE_UNSPECIFIED\x10\x00\x12\x16\n\x12INTERRUPT_TYPE_ALL\x10\x01\x12\x16\n\x12INTERRUPT_TYPE_TAG\x10\x02\x12\x1f\n\x1bINTERRUPT_TYPE_OPERATION_ID\x10\x03\x42\x0b\n\tinterruptB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\x90\x01\n\x11InterruptResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x03 \x01(\tR\x13serverSideSessionId\x12\'\n\x0finterrupted_ids\x18\x02 \x03(\tR\x0einterruptedIds"5\n\x0fReattachOptions\x12"\n\x0creattachable\x18\x01 \x01(\x08R\x0creattachable"\xb5\x01\n\x15ResultChunkingOptions\x12;\n\x1a\x61llow_arrow_batch_chunking\x18\x01 \x01(\x08R\x17\x61llowArrowBatchChunking\x12@\n\x1apreferred_arrow_chunk_size\x18\x02 \x01(\x03H\x00R\x17preferredArrowChunkSize\x88\x01\x01\x42\x1d\n\x1b_preferred_arrow_chunk_size"\x96\x03\n\x16ReattachExecuteRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x06 \x01(\tH\x00R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12!\n\x0coperation_id\x18\x03 \x01(\tR\x0boperationId\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x01R\nclientType\x88\x01\x01\x12-\n\x10last_response_id\x18\x05 \x01(\tH\x02R\x0elastResponseId\x88\x01\x01\x42)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_typeB\x13\n\x11_last_response_id"\xc9\x04\n\x15ReleaseExecuteRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x07 \x01(\tH\x01R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12!\n\x0coperation_id\x18\x03 \x01(\tR\x0boperationId\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x02R\nclientType\x88\x01\x01\x12R\n\x0brelease_all\x18\x05 \x01(\x0b\x32/.spark.connect.ReleaseExecuteRequest.ReleaseAllH\x00R\nreleaseAll\x12X\n\rrelease_until\x18\x06 \x01(\x0b\x32\x31.spark.connect.ReleaseExecuteRequest.ReleaseUntilH\x00R\x0creleaseUntil\x1a\x0c\n\nReleaseAll\x1a/\n\x0cReleaseUntil\x12\x1f\n\x0bresponse_id\x18\x01 \x01(\tR\nresponseIdB\t\n\x07releaseB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\xa5\x01\n\x16ReleaseExecuteResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x03 \x01(\tR\x13serverSideSessionId\x12&\n\x0coperation_id\x18\x02 \x01(\tH\x00R\x0boperationId\x88\x01\x01\x42\x0f\n\r_operation_id"\xd4\x01\n\x15ReleaseSessionRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12$\n\x0b\x63lient_type\x18\x03 \x01(\tH\x00R\nclientType\x88\x01\x01\x12\'\n\x0f\x61llow_reconnect\x18\x04 \x01(\x08R\x0e\x61llowReconnectB\x0e\n\x0c_client_type"l\n\x16ReleaseSessionResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x02 \x01(\tR\x13serverSideSessionId"\xcc\x02\n\x18\x46\x65tchErrorDetailsRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x05 \x01(\tH\x00R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\x19\n\x08\x65rror_id\x18\x03 \x01(\tR\x07\x65rrorId\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x01R\nclientType\x88\x01\x01\x42)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\x93\x0c\n\x19\x46\x65tchErrorDetailsResponse\x12\x33\n\x16server_side_session_id\x18\x03 \x01(\tR\x13serverSideSessionId\x12\x1d\n\nsession_id\x18\x04 \x01(\tR\tsessionId\x12)\n\x0eroot_error_idx\x18\x01 \x01(\x05H\x00R\x0crootErrorIdx\x88\x01\x01\x12\x46\n\x06\x65rrors\x18\x02 \x03(\x0b\x32..spark.connect.FetchErrorDetailsResponse.ErrorR\x06\x65rrors\x1a\xae\x01\n\x11StackTraceElement\x12\'\n\x0f\x64\x65\x63laring_class\x18\x01 \x01(\tR\x0e\x64\x65\x63laringClass\x12\x1f\n\x0bmethod_name\x18\x02 \x01(\tR\nmethodName\x12 \n\tfile_name\x18\x03 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12\x1f\n\x0bline_number\x18\x04 \x01(\x05R\nlineNumberB\x0c\n\n_file_name\x1a\xf0\x02\n\x0cQueryContext\x12\x64\n\x0c\x63ontext_type\x18\n \x01(\x0e\x32\x41.spark.connect.FetchErrorDetailsResponse.QueryContext.ContextTypeR\x0b\x63ontextType\x12\x1f\n\x0bobject_type\x18\x01 \x01(\tR\nobjectType\x12\x1f\n\x0bobject_name\x18\x02 \x01(\tR\nobjectName\x12\x1f\n\x0bstart_index\x18\x03 \x01(\x05R\nstartIndex\x12\x1d\n\nstop_index\x18\x04 \x01(\x05R\tstopIndex\x12\x1a\n\x08\x66ragment\x18\x05 \x01(\tR\x08\x66ragment\x12\x1b\n\tcall_site\x18\x06 \x01(\tR\x08\x63\x61llSite\x12\x18\n\x07summary\x18\x07 \x01(\tR\x07summary"%\n\x0b\x43ontextType\x12\x07\n\x03SQL\x10\x00\x12\r\n\tDATAFRAME\x10\x01\x1a\x99\x03\n\x0eSparkThrowable\x12$\n\x0b\x65rror_class\x18\x01 \x01(\tH\x00R\nerrorClass\x88\x01\x01\x12}\n\x12message_parameters\x18\x02 \x03(\x0b\x32N.spark.connect.FetchErrorDetailsResponse.SparkThrowable.MessageParametersEntryR\x11messageParameters\x12\\\n\x0equery_contexts\x18\x03 \x03(\x0b\x32\x35.spark.connect.FetchErrorDetailsResponse.QueryContextR\rqueryContexts\x12 \n\tsql_state\x18\x04 \x01(\tH\x01R\x08sqlState\x88\x01\x01\x1a\x44\n\x16MessageParametersEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x0e\n\x0c_error_classB\x0c\n\n_sql_state\x1a\xdb\x02\n\x05\x45rror\x12\x30\n\x14\x65rror_type_hierarchy\x18\x01 \x03(\tR\x12\x65rrorTypeHierarchy\x12\x18\n\x07message\x18\x02 \x01(\tR\x07message\x12[\n\x0bstack_trace\x18\x03 \x03(\x0b\x32:.spark.connect.FetchErrorDetailsResponse.StackTraceElementR\nstackTrace\x12 \n\tcause_idx\x18\x04 \x01(\x05H\x00R\x08\x63\x61useIdx\x88\x01\x01\x12\x65\n\x0fspark_throwable\x18\x05 \x01(\x0b\x32\x37.spark.connect.FetchErrorDetailsResponse.SparkThrowableH\x01R\x0esparkThrowable\x88\x01\x01\x42\x0c\n\n_cause_idxB\x12\n\x10_spark_throwableB\x11\n\x0f_root_error_idx"Z\n\x17\x43heckpointCommandResult\x12?\n\x08relation\x18\x01 \x01(\x0b\x32#.spark.connect.CachedRemoteRelationR\x08relation2\xb2\x07\n\x13SparkConnectService\x12X\n\x0b\x45xecutePlan\x12!.spark.connect.ExecutePlanRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12V\n\x0b\x41nalyzePlan\x12!.spark.connect.AnalyzePlanRequest\x1a".spark.connect.AnalyzePlanResponse"\x00\x12G\n\x06\x43onfig\x12\x1c.spark.connect.ConfigRequest\x1a\x1d.spark.connect.ConfigResponse"\x00\x12[\n\x0c\x41\x64\x64\x41rtifacts\x12".spark.connect.AddArtifactsRequest\x1a#.spark.connect.AddArtifactsResponse"\x00(\x01\x12\x63\n\x0e\x41rtifactStatus\x12&.spark.connect.ArtifactStatusesRequest\x1a\'.spark.connect.ArtifactStatusesResponse"\x00\x12P\n\tInterrupt\x12\x1f.spark.connect.InterruptRequest\x1a .spark.connect.InterruptResponse"\x00\x12`\n\x0fReattachExecute\x12%.spark.connect.ReattachExecuteRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12_\n\x0eReleaseExecute\x12$.spark.connect.ReleaseExecuteRequest\x1a%.spark.connect.ReleaseExecuteResponse"\x00\x12_\n\x0eReleaseSession\x12$.spark.connect.ReleaseSessionRequest\x1a%.spark.connect.ReleaseSessionResponse"\x00\x12h\n\x11\x46\x65tchErrorDetails\x12\'.spark.connect.FetchErrorDetailsRequest\x1a(.spark.connect.FetchErrorDetailsResponse"\x00\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto\x1a\x16spark/connect/ml.proto\x1a\x1dspark/connect/pipelines.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x01(\tR\x08userName\x12\x35\n\nextensions\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\nextensions"\xf5\x14\n\x12\x41nalyzePlanRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x11 \x01(\tH\x01R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12$\n\x0b\x63lient_type\x18\x03 \x01(\tH\x02R\nclientType\x88\x01\x01\x12\x42\n\x06schema\x18\x04 \x01(\x0b\x32(.spark.connect.AnalyzePlanRequest.SchemaH\x00R\x06schema\x12\x45\n\x07\x65xplain\x18\x05 \x01(\x0b\x32).spark.connect.AnalyzePlanRequest.ExplainH\x00R\x07\x65xplain\x12O\n\x0btree_string\x18\x06 \x01(\x0b\x32,.spark.connect.AnalyzePlanRequest.TreeStringH\x00R\ntreeString\x12\x46\n\x08is_local\x18\x07 \x01(\x0b\x32).spark.connect.AnalyzePlanRequest.IsLocalH\x00R\x07isLocal\x12R\n\x0cis_streaming\x18\x08 \x01(\x0b\x32-.spark.connect.AnalyzePlanRequest.IsStreamingH\x00R\x0bisStreaming\x12O\n\x0binput_files\x18\t \x01(\x0b\x32,.spark.connect.AnalyzePlanRequest.InputFilesH\x00R\ninputFiles\x12U\n\rspark_version\x18\n \x01(\x0b\x32..spark.connect.AnalyzePlanRequest.SparkVersionH\x00R\x0csparkVersion\x12I\n\tddl_parse\x18\x0b \x01(\x0b\x32*.spark.connect.AnalyzePlanRequest.DDLParseH\x00R\x08\x64\x64lParse\x12X\n\x0esame_semantics\x18\x0c \x01(\x0b\x32/.spark.connect.AnalyzePlanRequest.SameSemanticsH\x00R\rsameSemantics\x12U\n\rsemantic_hash\x18\r \x01(\x0b\x32..spark.connect.AnalyzePlanRequest.SemanticHashH\x00R\x0csemanticHash\x12\x45\n\x07persist\x18\x0e \x01(\x0b\x32).spark.connect.AnalyzePlanRequest.PersistH\x00R\x07persist\x12K\n\tunpersist\x18\x0f \x01(\x0b\x32+.spark.connect.AnalyzePlanRequest.UnpersistH\x00R\tunpersist\x12_\n\x11get_storage_level\x18\x10 \x01(\x0b\x32\x31.spark.connect.AnalyzePlanRequest.GetStorageLevelH\x00R\x0fgetStorageLevel\x12M\n\x0bjson_to_ddl\x18\x12 \x01(\x0b\x32+.spark.connect.AnalyzePlanRequest.JsonToDDLH\x00R\tjsonToDdl\x1a\x31\n\x06Schema\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x1a\xbb\x02\n\x07\x45xplain\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12X\n\x0c\x65xplain_mode\x18\x02 \x01(\x0e\x32\x35.spark.connect.AnalyzePlanRequest.Explain.ExplainModeR\x0b\x65xplainMode"\xac\x01\n\x0b\x45xplainMode\x12\x1c\n\x18\x45XPLAIN_MODE_UNSPECIFIED\x10\x00\x12\x17\n\x13\x45XPLAIN_MODE_SIMPLE\x10\x01\x12\x19\n\x15\x45XPLAIN_MODE_EXTENDED\x10\x02\x12\x18\n\x14\x45XPLAIN_MODE_CODEGEN\x10\x03\x12\x15\n\x11\x45XPLAIN_MODE_COST\x10\x04\x12\x1a\n\x16\x45XPLAIN_MODE_FORMATTED\x10\x05\x1aZ\n\nTreeString\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12\x19\n\x05level\x18\x02 \x01(\x05H\x00R\x05level\x88\x01\x01\x42\x08\n\x06_level\x1a\x32\n\x07IsLocal\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x1a\x36\n\x0bIsStreaming\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x1a\x35\n\nInputFiles\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x1a\x0e\n\x0cSparkVersion\x1a)\n\x08\x44\x44LParse\x12\x1d\n\nddl_string\x18\x01 \x01(\tR\tddlString\x1ay\n\rSameSemantics\x12\x34\n\x0btarget_plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\ntargetPlan\x12\x32\n\nother_plan\x18\x02 \x01(\x0b\x32\x13.spark.connect.PlanR\totherPlan\x1a\x37\n\x0cSemanticHash\x12\'\n\x04plan\x18\x01 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x1a\x97\x01\n\x07Persist\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x12\x45\n\rstorage_level\x18\x02 \x01(\x0b\x32\x1b.spark.connect.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level\x1an\n\tUnpersist\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x12\x1f\n\x08\x62locking\x18\x02 \x01(\x08H\x00R\x08\x62locking\x88\x01\x01\x42\x0b\n\t_blocking\x1a\x46\n\x0fGetStorageLevel\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x1a,\n\tJsonToDDL\x12\x1f\n\x0bjson_string\x18\x01 \x01(\tR\njsonStringB\t\n\x07\x61nalyzeB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\xca\x0e\n\x13\x41nalyzePlanResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x0f \x01(\tR\x13serverSideSessionId\x12\x43\n\x06schema\x18\x02 \x01(\x0b\x32).spark.connect.AnalyzePlanResponse.SchemaH\x00R\x06schema\x12\x46\n\x07\x65xplain\x18\x03 \x01(\x0b\x32*.spark.connect.AnalyzePlanResponse.ExplainH\x00R\x07\x65xplain\x12P\n\x0btree_string\x18\x04 \x01(\x0b\x32-.spark.connect.AnalyzePlanResponse.TreeStringH\x00R\ntreeString\x12G\n\x08is_local\x18\x05 \x01(\x0b\x32*.spark.connect.AnalyzePlanResponse.IsLocalH\x00R\x07isLocal\x12S\n\x0cis_streaming\x18\x06 \x01(\x0b\x32..spark.connect.AnalyzePlanResponse.IsStreamingH\x00R\x0bisStreaming\x12P\n\x0binput_files\x18\x07 \x01(\x0b\x32-.spark.connect.AnalyzePlanResponse.InputFilesH\x00R\ninputFiles\x12V\n\rspark_version\x18\x08 \x01(\x0b\x32/.spark.connect.AnalyzePlanResponse.SparkVersionH\x00R\x0csparkVersion\x12J\n\tddl_parse\x18\t \x01(\x0b\x32+.spark.connect.AnalyzePlanResponse.DDLParseH\x00R\x08\x64\x64lParse\x12Y\n\x0esame_semantics\x18\n \x01(\x0b\x32\x30.spark.connect.AnalyzePlanResponse.SameSemanticsH\x00R\rsameSemantics\x12V\n\rsemantic_hash\x18\x0b \x01(\x0b\x32/.spark.connect.AnalyzePlanResponse.SemanticHashH\x00R\x0csemanticHash\x12\x46\n\x07persist\x18\x0c \x01(\x0b\x32*.spark.connect.AnalyzePlanResponse.PersistH\x00R\x07persist\x12L\n\tunpersist\x18\r \x01(\x0b\x32,.spark.connect.AnalyzePlanResponse.UnpersistH\x00R\tunpersist\x12`\n\x11get_storage_level\x18\x0e \x01(\x0b\x32\x32.spark.connect.AnalyzePlanResponse.GetStorageLevelH\x00R\x0fgetStorageLevel\x12N\n\x0bjson_to_ddl\x18\x10 \x01(\x0b\x32,.spark.connect.AnalyzePlanResponse.JsonToDDLH\x00R\tjsonToDdl\x1a\x39\n\x06Schema\x12/\n\x06schema\x18\x01 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x1a\x30\n\x07\x45xplain\x12%\n\x0e\x65xplain_string\x18\x01 \x01(\tR\rexplainString\x1a-\n\nTreeString\x12\x1f\n\x0btree_string\x18\x01 \x01(\tR\ntreeString\x1a$\n\x07IsLocal\x12\x19\n\x08is_local\x18\x01 \x01(\x08R\x07isLocal\x1a\x30\n\x0bIsStreaming\x12!\n\x0cis_streaming\x18\x01 \x01(\x08R\x0bisStreaming\x1a"\n\nInputFiles\x12\x14\n\x05\x66iles\x18\x01 \x03(\tR\x05\x66iles\x1a(\n\x0cSparkVersion\x12\x18\n\x07version\x18\x01 \x01(\tR\x07version\x1a;\n\x08\x44\x44LParse\x12/\n\x06parsed\x18\x01 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06parsed\x1a\'\n\rSameSemantics\x12\x16\n\x06result\x18\x01 \x01(\x08R\x06result\x1a&\n\x0cSemanticHash\x12\x16\n\x06result\x18\x01 \x01(\x05R\x06result\x1a\t\n\x07Persist\x1a\x0b\n\tUnpersist\x1aS\n\x0fGetStorageLevel\x12@\n\rstorage_level\x18\x01 \x01(\x0b\x32\x1b.spark.connect.StorageLevelR\x0cstorageLevel\x1a*\n\tJsonToDDL\x12\x1d\n\nddl_string\x18\x01 \x01(\tR\tddlStringB\x08\n\x06result"\x83\x06\n\x12\x45xecutePlanRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x08 \x01(\tH\x00R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12&\n\x0coperation_id\x18\x06 \x01(\tH\x01R\x0boperationId\x88\x01\x01\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x02R\nclientType\x88\x01\x01\x12X\n\x0frequest_options\x18\x05 \x03(\x0b\x32/.spark.connect.ExecutePlanRequest.RequestOptionR\x0erequestOptions\x12\x12\n\x04tags\x18\x07 \x03(\tR\x04tags\x1a\x85\x02\n\rRequestOption\x12K\n\x10reattach_options\x18\x01 \x01(\x0b\x32\x1e.spark.connect.ReattachOptionsH\x00R\x0freattachOptions\x12^\n\x17result_chunking_options\x18\x02 \x01(\x0b\x32$.spark.connect.ResultChunkingOptionsH\x00R\x15resultChunkingOptions\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textensionB\x10\n\x0erequest_optionB)\n\'_client_observed_server_side_session_idB\x0f\n\r_operation_idB\x0e\n\x0c_client_type"\xf1\x19\n\x13\x45xecutePlanResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x0f \x01(\tR\x13serverSideSessionId\x12!\n\x0coperation_id\x18\x0c \x01(\tR\x0boperationId\x12\x1f\n\x0bresponse_id\x18\r \x01(\tR\nresponseId\x12P\n\x0b\x61rrow_batch\x18\x02 \x01(\x0b\x32-.spark.connect.ExecutePlanResponse.ArrowBatchH\x00R\narrowBatch\x12\x63\n\x12sql_command_result\x18\x05 \x01(\x0b\x32\x33.spark.connect.ExecutePlanResponse.SqlCommandResultH\x00R\x10sqlCommandResult\x12~\n#write_stream_operation_start_result\x18\x08 \x01(\x0b\x32..spark.connect.WriteStreamOperationStartResultH\x00R\x1fwriteStreamOperationStartResult\x12q\n\x1estreaming_query_command_result\x18\t \x01(\x0b\x32*.spark.connect.StreamingQueryCommandResultH\x00R\x1bstreamingQueryCommandResult\x12k\n\x1cget_resources_command_result\x18\n \x01(\x0b\x32(.spark.connect.GetResourcesCommandResultH\x00R\x19getResourcesCommandResult\x12\x87\x01\n&streaming_query_manager_command_result\x18\x0b \x01(\x0b\x32\x31.spark.connect.StreamingQueryManagerCommandResultH\x00R"streamingQueryManagerCommandResult\x12\x87\x01\n&streaming_query_listener_events_result\x18\x10 \x01(\x0b\x32\x31.spark.connect.StreamingQueryListenerEventsResultH\x00R"streamingQueryListenerEventsResult\x12\\\n\x0fresult_complete\x18\x0e \x01(\x0b\x32\x31.spark.connect.ExecutePlanResponse.ResultCompleteH\x00R\x0eresultComplete\x12\x87\x01\n&create_resource_profile_command_result\x18\x11 \x01(\x0b\x32\x31.spark.connect.CreateResourceProfileCommandResultH\x00R"createResourceProfileCommandResult\x12\x65\n\x12\x65xecution_progress\x18\x12 \x01(\x0b\x32\x34.spark.connect.ExecutePlanResponse.ExecutionProgressH\x00R\x11\x65xecutionProgress\x12\x64\n\x19\x63heckpoint_command_result\x18\x13 \x01(\x0b\x32&.spark.connect.CheckpointCommandResultH\x00R\x17\x63heckpointCommandResult\x12L\n\x11ml_command_result\x18\x14 \x01(\x0b\x32\x1e.spark.connect.MlCommandResultH\x00R\x0fmlCommandResult\x12X\n\x15pipeline_event_result\x18\x15 \x01(\x0b\x32".spark.connect.PipelineEventResultH\x00R\x13pipelineEventResult\x12^\n\x17pipeline_command_result\x18\x16 \x01(\x0b\x32$.spark.connect.PipelineCommandResultH\x00R\x15pipelineCommandResult\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x12\x44\n\x07metrics\x18\x04 \x01(\x0b\x32*.spark.connect.ExecutePlanResponse.MetricsR\x07metrics\x12]\n\x10observed_metrics\x18\x06 \x03(\x0b\x32\x32.spark.connect.ExecutePlanResponse.ObservedMetricsR\x0fobservedMetrics\x12/\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x1aG\n\x10SqlCommandResult\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x1a\xf8\x01\n\nArrowBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x12&\n\x0cstart_offset\x18\x03 \x01(\x03H\x00R\x0bstartOffset\x88\x01\x01\x12$\n\x0b\x63hunk_index\x18\x04 \x01(\x03H\x01R\nchunkIndex\x88\x01\x01\x12\x32\n\x13num_chunks_in_batch\x18\x05 \x01(\x03H\x02R\x10numChunksInBatch\x88\x01\x01\x42\x0f\n\r_start_offsetB\x0e\n\x0c_chunk_indexB\x16\n\x14_num_chunks_in_batch\x1a\x85\x04\n\x07Metrics\x12Q\n\x07metrics\x18\x01 \x03(\x0b\x32\x37.spark.connect.ExecutePlanResponse.Metrics.MetricObjectR\x07metrics\x1a\xcc\x02\n\x0cMetricObject\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x17\n\x07plan_id\x18\x02 \x01(\x03R\x06planId\x12\x16\n\x06parent\x18\x03 \x01(\x03R\x06parent\x12z\n\x11\x65xecution_metrics\x18\x04 \x03(\x0b\x32M.spark.connect.ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntryR\x10\x65xecutionMetrics\x1a{\n\x15\x45xecutionMetricsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ExecutePlanResponse.Metrics.MetricValueR\x05value:\x02\x38\x01\x1aX\n\x0bMetricValue\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n\x05value\x18\x02 \x01(\x03R\x05value\x12\x1f\n\x0bmetric_type\x18\x03 \x01(\tR\nmetricType\x1a\x8d\x01\n\x0fObservedMetrics\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x39\n\x06values\x18\x02 \x03(\x0b\x32!.spark.connect.Expression.LiteralR\x06values\x12\x12\n\x04keys\x18\x03 \x03(\tR\x04keys\x12\x17\n\x07plan_id\x18\x04 \x01(\x03R\x06planId\x1a\x10\n\x0eResultComplete\x1a\xcd\x02\n\x11\x45xecutionProgress\x12V\n\x06stages\x18\x01 \x03(\x0b\x32>.spark.connect.ExecutePlanResponse.ExecutionProgress.StageInfoR\x06stages\x12,\n\x12num_inflight_tasks\x18\x02 \x01(\x03R\x10numInflightTasks\x1a\xb1\x01\n\tStageInfo\x12\x19\n\x08stage_id\x18\x01 \x01(\x03R\x07stageId\x12\x1b\n\tnum_tasks\x18\x02 \x01(\x03R\x08numTasks\x12.\n\x13num_completed_tasks\x18\x03 \x01(\x03R\x11numCompletedTasks\x12(\n\x10input_bytes_read\x18\x04 \x01(\x03R\x0einputBytesRead\x12\x12\n\x04\x64one\x18\x05 \x01(\x08R\x04\x64oneB\x0f\n\rresponse_type"A\n\x08KeyValue\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x19\n\x05value\x18\x02 \x01(\tH\x00R\x05value\x88\x01\x01\x42\x08\n\x06_value"\xaf\t\n\rConfigRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x08 \x01(\tH\x00R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\x44\n\toperation\x18\x03 \x01(\x0b\x32&.spark.connect.ConfigRequest.OperationR\toperation\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x01R\nclientType\x88\x01\x01\x1a\xf2\x03\n\tOperation\x12\x34\n\x03set\x18\x01 \x01(\x0b\x32 .spark.connect.ConfigRequest.SetH\x00R\x03set\x12\x34\n\x03get\x18\x02 \x01(\x0b\x32 .spark.connect.ConfigRequest.GetH\x00R\x03get\x12W\n\x10get_with_default\x18\x03 \x01(\x0b\x32+.spark.connect.ConfigRequest.GetWithDefaultH\x00R\x0egetWithDefault\x12G\n\nget_option\x18\x04 \x01(\x0b\x32&.spark.connect.ConfigRequest.GetOptionH\x00R\tgetOption\x12>\n\x07get_all\x18\x05 \x01(\x0b\x32#.spark.connect.ConfigRequest.GetAllH\x00R\x06getAll\x12:\n\x05unset\x18\x06 \x01(\x0b\x32".spark.connect.ConfigRequest.UnsetH\x00R\x05unset\x12P\n\ris_modifiable\x18\x07 \x01(\x0b\x32).spark.connect.ConfigRequest.IsModifiableH\x00R\x0cisModifiableB\t\n\x07op_type\x1a\\\n\x03Set\x12-\n\x05pairs\x18\x01 \x03(\x0b\x32\x17.spark.connect.KeyValueR\x05pairs\x12\x1b\n\x06silent\x18\x02 \x01(\x08H\x00R\x06silent\x88\x01\x01\x42\t\n\x07_silent\x1a\x19\n\x03Get\x12\x12\n\x04keys\x18\x01 \x03(\tR\x04keys\x1a?\n\x0eGetWithDefault\x12-\n\x05pairs\x18\x01 \x03(\x0b\x32\x17.spark.connect.KeyValueR\x05pairs\x1a\x1f\n\tGetOption\x12\x12\n\x04keys\x18\x01 \x03(\tR\x04keys\x1a\x30\n\x06GetAll\x12\x1b\n\x06prefix\x18\x01 \x01(\tH\x00R\x06prefix\x88\x01\x01\x42\t\n\x07_prefix\x1a\x1b\n\x05Unset\x12\x12\n\x04keys\x18\x01 \x03(\tR\x04keys\x1a"\n\x0cIsModifiable\x12\x12\n\x04keys\x18\x01 \x03(\tR\x04keysB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\xaf\x01\n\x0e\x43onfigResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x04 \x01(\tR\x13serverSideSessionId\x12-\n\x05pairs\x18\x02 \x03(\x0b\x32\x17.spark.connect.KeyValueR\x05pairs\x12\x1a\n\x08warnings\x18\x03 \x03(\tR\x08warnings"\xea\x07\n\x13\x41\x64\x64\x41rtifactsRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12V\n&client_observed_server_side_session_id\x18\x07 \x01(\tH\x01R!clientObservedServerSideSessionId\x88\x01\x01\x12$\n\x0b\x63lient_type\x18\x06 \x01(\tH\x02R\nclientType\x88\x01\x01\x12@\n\x05\x62\x61tch\x18\x03 \x01(\x0b\x32(.spark.connect.AddArtifactsRequest.BatchH\x00R\x05\x62\x61tch\x12Z\n\x0b\x62\x65gin_chunk\x18\x04 \x01(\x0b\x32\x37.spark.connect.AddArtifactsRequest.BeginChunkedArtifactH\x00R\nbeginChunk\x12H\n\x05\x63hunk\x18\x05 \x01(\x0b\x32\x30.spark.connect.AddArtifactsRequest.ArtifactChunkH\x00R\x05\x63hunk\x1a\x35\n\rArtifactChunk\x12\x12\n\x04\x64\x61ta\x18\x01 \x01(\x0cR\x04\x64\x61ta\x12\x10\n\x03\x63rc\x18\x02 \x01(\x03R\x03\x63rc\x1ao\n\x13SingleChunkArtifact\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x44\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x30.spark.connect.AddArtifactsRequest.ArtifactChunkR\x04\x64\x61ta\x1a]\n\x05\x42\x61tch\x12T\n\tartifacts\x18\x01 \x03(\x0b\x32\x36.spark.connect.AddArtifactsRequest.SingleChunkArtifactR\tartifacts\x1a\xc1\x01\n\x14\x42\x65ginChunkedArtifact\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x1f\n\x0btotal_bytes\x18\x02 \x01(\x03R\ntotalBytes\x12\x1d\n\nnum_chunks\x18\x03 \x01(\x03R\tnumChunks\x12U\n\rinitial_chunk\x18\x04 \x01(\x0b\x32\x30.spark.connect.AddArtifactsRequest.ArtifactChunkR\x0cinitialChunkB\t\n\x07payloadB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\x90\x02\n\x14\x41\x64\x64\x41rtifactsResponse\x12\x1d\n\nsession_id\x18\x02 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x03 \x01(\tR\x13serverSideSessionId\x12Q\n\tartifacts\x18\x01 \x03(\x0b\x32\x33.spark.connect.AddArtifactsResponse.ArtifactSummaryR\tartifacts\x1aQ\n\x0f\x41rtifactSummary\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12*\n\x11is_crc_successful\x18\x02 \x01(\x08R\x0fisCrcSuccessful"\xc6\x02\n\x17\x41rtifactStatusesRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x05 \x01(\tH\x00R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12$\n\x0b\x63lient_type\x18\x03 \x01(\tH\x01R\nclientType\x88\x01\x01\x12\x14\n\x05names\x18\x04 \x03(\tR\x05namesB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\xe0\x02\n\x18\x41rtifactStatusesResponse\x12\x1d\n\nsession_id\x18\x02 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x03 \x01(\tR\x13serverSideSessionId\x12Q\n\x08statuses\x18\x01 \x03(\x0b\x32\x35.spark.connect.ArtifactStatusesResponse.StatusesEntryR\x08statuses\x1as\n\rStatusesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ArtifactStatusesResponse.ArtifactStatusR\x05value:\x02\x38\x01\x1a(\n\x0e\x41rtifactStatus\x12\x16\n\x06\x65xists\x18\x01 \x01(\x08R\x06\x65xists"\xdb\x04\n\x10InterruptRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x07 \x01(\tH\x01R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12$\n\x0b\x63lient_type\x18\x03 \x01(\tH\x02R\nclientType\x88\x01\x01\x12T\n\x0einterrupt_type\x18\x04 \x01(\x0e\x32-.spark.connect.InterruptRequest.InterruptTypeR\rinterruptType\x12%\n\roperation_tag\x18\x05 \x01(\tH\x00R\x0coperationTag\x12#\n\x0coperation_id\x18\x06 \x01(\tH\x00R\x0boperationId"\x80\x01\n\rInterruptType\x12\x1e\n\x1aINTERRUPT_TYPE_UNSPECIFIED\x10\x00\x12\x16\n\x12INTERRUPT_TYPE_ALL\x10\x01\x12\x16\n\x12INTERRUPT_TYPE_TAG\x10\x02\x12\x1f\n\x1bINTERRUPT_TYPE_OPERATION_ID\x10\x03\x42\x0b\n\tinterruptB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\x90\x01\n\x11InterruptResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x03 \x01(\tR\x13serverSideSessionId\x12\'\n\x0finterrupted_ids\x18\x02 \x03(\tR\x0einterruptedIds"5\n\x0fReattachOptions\x12"\n\x0creattachable\x18\x01 \x01(\x08R\x0creattachable"\xb5\x01\n\x15ResultChunkingOptions\x12;\n\x1a\x61llow_arrow_batch_chunking\x18\x01 \x01(\x08R\x17\x61llowArrowBatchChunking\x12@\n\x1apreferred_arrow_chunk_size\x18\x02 \x01(\x03H\x00R\x17preferredArrowChunkSize\x88\x01\x01\x42\x1d\n\x1b_preferred_arrow_chunk_size"\x96\x03\n\x16ReattachExecuteRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x06 \x01(\tH\x00R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12!\n\x0coperation_id\x18\x03 \x01(\tR\x0boperationId\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x01R\nclientType\x88\x01\x01\x12-\n\x10last_response_id\x18\x05 \x01(\tH\x02R\x0elastResponseId\x88\x01\x01\x42)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_typeB\x13\n\x11_last_response_id"\xc9\x04\n\x15ReleaseExecuteRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x07 \x01(\tH\x01R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12!\n\x0coperation_id\x18\x03 \x01(\tR\x0boperationId\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x02R\nclientType\x88\x01\x01\x12R\n\x0brelease_all\x18\x05 \x01(\x0b\x32/.spark.connect.ReleaseExecuteRequest.ReleaseAllH\x00R\nreleaseAll\x12X\n\rrelease_until\x18\x06 \x01(\x0b\x32\x31.spark.connect.ReleaseExecuteRequest.ReleaseUntilH\x00R\x0creleaseUntil\x1a\x0c\n\nReleaseAll\x1a/\n\x0cReleaseUntil\x12\x1f\n\x0bresponse_id\x18\x01 \x01(\tR\nresponseIdB\t\n\x07releaseB)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\xa5\x01\n\x16ReleaseExecuteResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x03 \x01(\tR\x13serverSideSessionId\x12&\n\x0coperation_id\x18\x02 \x01(\tH\x00R\x0boperationId\x88\x01\x01\x42\x0f\n\r_operation_id"\xd4\x01\n\x15ReleaseSessionRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12$\n\x0b\x63lient_type\x18\x03 \x01(\tH\x00R\nclientType\x88\x01\x01\x12\'\n\x0f\x61llow_reconnect\x18\x04 \x01(\x08R\x0e\x61llowReconnectB\x0e\n\x0c_client_type"l\n\x16ReleaseSessionResponse\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12\x33\n\x16server_side_session_id\x18\x02 \x01(\tR\x13serverSideSessionId"\xcc\x02\n\x18\x46\x65tchErrorDetailsRequest\x12\x1d\n\nsession_id\x18\x01 \x01(\tR\tsessionId\x12V\n&client_observed_server_side_session_id\x18\x05 \x01(\tH\x00R!clientObservedServerSideSessionId\x88\x01\x01\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\x19\n\x08\x65rror_id\x18\x03 \x01(\tR\x07\x65rrorId\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x01R\nclientType\x88\x01\x01\x42)\n\'_client_observed_server_side_session_idB\x0e\n\x0c_client_type"\xd9\x0f\n\x19\x46\x65tchErrorDetailsResponse\x12\x33\n\x16server_side_session_id\x18\x03 \x01(\tR\x13serverSideSessionId\x12\x1d\n\nsession_id\x18\x04 \x01(\tR\tsessionId\x12)\n\x0eroot_error_idx\x18\x01 \x01(\x05H\x00R\x0crootErrorIdx\x88\x01\x01\x12\x46\n\x06\x65rrors\x18\x02 \x03(\x0b\x32..spark.connect.FetchErrorDetailsResponse.ErrorR\x06\x65rrors\x1a\xae\x01\n\x11StackTraceElement\x12\'\n\x0f\x64\x65\x63laring_class\x18\x01 \x01(\tR\x0e\x64\x65\x63laringClass\x12\x1f\n\x0bmethod_name\x18\x02 \x01(\tR\nmethodName\x12 \n\tfile_name\x18\x03 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12\x1f\n\x0bline_number\x18\x04 \x01(\x05R\nlineNumberB\x0c\n\n_file_name\x1a\xf0\x02\n\x0cQueryContext\x12\x64\n\x0c\x63ontext_type\x18\n \x01(\x0e\x32\x41.spark.connect.FetchErrorDetailsResponse.QueryContext.ContextTypeR\x0b\x63ontextType\x12\x1f\n\x0bobject_type\x18\x01 \x01(\tR\nobjectType\x12\x1f\n\x0bobject_name\x18\x02 \x01(\tR\nobjectName\x12\x1f\n\x0bstart_index\x18\x03 \x01(\x05R\nstartIndex\x12\x1d\n\nstop_index\x18\x04 \x01(\x05R\tstopIndex\x12\x1a\n\x08\x66ragment\x18\x05 \x01(\tR\x08\x66ragment\x12\x1b\n\tcall_site\x18\x06 \x01(\tR\x08\x63\x61llSite\x12\x18\n\x07summary\x18\x07 \x01(\tR\x07summary"%\n\x0b\x43ontextType\x12\x07\n\x03SQL\x10\x00\x12\r\n\tDATAFRAME\x10\x01\x1a\xa6\x04\n\x0eSparkThrowable\x12$\n\x0b\x65rror_class\x18\x01 \x01(\tH\x00R\nerrorClass\x88\x01\x01\x12}\n\x12message_parameters\x18\x02 \x03(\x0b\x32N.spark.connect.FetchErrorDetailsResponse.SparkThrowable.MessageParametersEntryR\x11messageParameters\x12\\\n\x0equery_contexts\x18\x03 \x03(\x0b\x32\x35.spark.connect.FetchErrorDetailsResponse.QueryContextR\rqueryContexts\x12 \n\tsql_state\x18\x04 \x01(\tH\x01R\x08sqlState\x88\x01\x01\x12r\n\x14\x62reaking_change_info\x18\x05 \x01(\x0b\x32;.spark.connect.FetchErrorDetailsResponse.BreakingChangeInfoH\x02R\x12\x62reakingChangeInfo\x88\x01\x01\x1a\x44\n\x16MessageParametersEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x0e\n\x0c_error_classB\x0c\n\n_sql_stateB\x17\n\x15_breaking_change_info\x1a\xfa\x01\n\x12\x42reakingChangeInfo\x12+\n\x11migration_message\x18\x01 \x03(\tR\x10migrationMessage\x12k\n\x11mitigation_config\x18\x02 \x01(\x0b\x32\x39.spark.connect.FetchErrorDetailsResponse.MitigationConfigH\x00R\x10mitigationConfig\x88\x01\x01\x12$\n\x0bneeds_audit\x18\x03 \x01(\x08H\x01R\nneedsAudit\x88\x01\x01\x42\x14\n\x12_mitigation_configB\x0e\n\x0c_needs_audit\x1a:\n\x10MitigationConfig\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value\x1a\xdb\x02\n\x05\x45rror\x12\x30\n\x14\x65rror_type_hierarchy\x18\x01 \x03(\tR\x12\x65rrorTypeHierarchy\x12\x18\n\x07message\x18\x02 \x01(\tR\x07message\x12[\n\x0bstack_trace\x18\x03 \x03(\x0b\x32:.spark.connect.FetchErrorDetailsResponse.StackTraceElementR\nstackTrace\x12 \n\tcause_idx\x18\x04 \x01(\x05H\x00R\x08\x63\x61useIdx\x88\x01\x01\x12\x65\n\x0fspark_throwable\x18\x05 \x01(\x0b\x32\x37.spark.connect.FetchErrorDetailsResponse.SparkThrowableH\x01R\x0esparkThrowable\x88\x01\x01\x42\x0c\n\n_cause_idxB\x12\n\x10_spark_throwableB\x11\n\x0f_root_error_idx"Z\n\x17\x43heckpointCommandResult\x12?\n\x08relation\x18\x01 \x01(\x0b\x32#.spark.connect.CachedRemoteRelationR\x08relation2\xb2\x07\n\x13SparkConnectService\x12X\n\x0b\x45xecutePlan\x12!.spark.connect.ExecutePlanRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12V\n\x0b\x41nalyzePlan\x12!.spark.connect.AnalyzePlanRequest\x1a".spark.connect.AnalyzePlanResponse"\x00\x12G\n\x06\x43onfig\x12\x1c.spark.connect.ConfigRequest\x1a\x1d.spark.connect.ConfigResponse"\x00\x12[\n\x0c\x41\x64\x64\x41rtifacts\x12".spark.connect.AddArtifactsRequest\x1a#.spark.connect.AddArtifactsResponse"\x00(\x01\x12\x63\n\x0e\x41rtifactStatus\x12&.spark.connect.ArtifactStatusesRequest\x1a\'.spark.connect.ArtifactStatusesResponse"\x00\x12P\n\tInterrupt\x12\x1f.spark.connect.InterruptRequest\x1a .spark.connect.InterruptResponse"\x00\x12`\n\x0fReattachExecute\x12%.spark.connect.ReattachExecuteRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12_\n\x0eReleaseExecute\x12$.spark.connect.ReleaseExecuteRequest\x1a%.spark.connect.ReleaseExecuteResponse"\x00\x12_\n\x0eReleaseSession\x12$.spark.connect.ReleaseSessionRequest\x1a%.spark.connect.ReleaseSessionResponse"\x00\x12h\n\x11\x46\x65tchErrorDetails\x12\'.spark.connect.FetchErrorDetailsRequest\x1a(.spark.connect.FetchErrorDetailsResponse"\x00\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _globals = globals() @@ -237,7 +237,7 @@ _globals["_FETCHERRORDETAILSREQUEST"]._serialized_start = 15049 _globals["_FETCHERRORDETAILSREQUEST"]._serialized_end = 15381 _globals["_FETCHERRORDETAILSRESPONSE"]._serialized_start = 15384 - _globals["_FETCHERRORDETAILSRESPONSE"]._serialized_end = 16939 + _globals["_FETCHERRORDETAILSRESPONSE"]._serialized_end = 17393 _globals["_FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT"]._serialized_start = 15613 _globals["_FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT"]._serialized_end = 15787 _globals["_FETCHERRORDETAILSRESPONSE_QUERYCONTEXT"]._serialized_start = 15790 @@ -245,17 +245,21 @@ _globals["_FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE"]._serialized_start = 16121 _globals["_FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE"]._serialized_end = 16158 _globals["_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE"]._serialized_start = 16161 - _globals["_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE"]._serialized_end = 16570 + _globals["_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE"]._serialized_end = 16711 _globals[ "_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY" - ]._serialized_start = 16472 + ]._serialized_start = 16588 _globals[ "_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY" - ]._serialized_end = 16540 - _globals["_FETCHERRORDETAILSRESPONSE_ERROR"]._serialized_start = 16573 - _globals["_FETCHERRORDETAILSRESPONSE_ERROR"]._serialized_end = 16920 - _globals["_CHECKPOINTCOMMANDRESULT"]._serialized_start = 16941 - _globals["_CHECKPOINTCOMMANDRESULT"]._serialized_end = 17031 - _globals["_SPARKCONNECTSERVICE"]._serialized_start = 17034 - _globals["_SPARKCONNECTSERVICE"]._serialized_end = 17980 + ]._serialized_end = 16656 + _globals["_FETCHERRORDETAILSRESPONSE_BREAKINGCHANGEINFO"]._serialized_start = 16714 + _globals["_FETCHERRORDETAILSRESPONSE_BREAKINGCHANGEINFO"]._serialized_end = 16964 + _globals["_FETCHERRORDETAILSRESPONSE_MITIGATIONCONFIG"]._serialized_start = 16966 + _globals["_FETCHERRORDETAILSRESPONSE_MITIGATIONCONFIG"]._serialized_end = 17024 + _globals["_FETCHERRORDETAILSRESPONSE_ERROR"]._serialized_start = 17027 + _globals["_FETCHERRORDETAILSRESPONSE_ERROR"]._serialized_end = 17374 + _globals["_CHECKPOINTCOMMANDRESULT"]._serialized_start = 17395 + _globals["_CHECKPOINTCOMMANDRESULT"]._serialized_end = 17485 + _globals["_SPARKCONNECTSERVICE"]._serialized_start = 17488 + _globals["_SPARKCONNECTSERVICE"]._serialized_end = 18434 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi index 93dff6639665..62aa597aed2a 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.pyi +++ b/python/pyspark/sql/connect/proto/base_pb2.pyi @@ -3732,6 +3732,7 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message): MESSAGE_PARAMETERS_FIELD_NUMBER: builtins.int QUERY_CONTEXTS_FIELD_NUMBER: builtins.int SQL_STATE_FIELD_NUMBER: builtins.int + BREAKING_CHANGE_INFO_FIELD_NUMBER: builtins.int error_class: builtins.str """Succinct, human-readable, unique, and consistent representation of the error category.""" @property @@ -3750,6 +3751,9 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message): """Portable error identifier across SQL engines If null, error class or SQLSTATE is not set. """ + @property + def breaking_change_info(self) -> global___FetchErrorDetailsResponse.BreakingChangeInfo: + """Additional information if the error was caused by a breaking change.""" def __init__( self, *, @@ -3760,14 +3764,20 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message): ] | None = ..., sql_state: builtins.str | None = ..., + breaking_change_info: global___FetchErrorDetailsResponse.BreakingChangeInfo + | None = ..., ) -> None: ... def HasField( self, field_name: typing_extensions.Literal[ + "_breaking_change_info", + b"_breaking_change_info", "_error_class", b"_error_class", "_sql_state", b"_sql_state", + "breaking_change_info", + b"breaking_change_info", "error_class", b"error_class", "sql_state", @@ -3777,10 +3787,14 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_breaking_change_info", + b"_breaking_change_info", "_error_class", b"_error_class", "_sql_state", b"_sql_state", + "breaking_change_info", + b"breaking_change_info", "error_class", b"error_class", "message_parameters", @@ -3792,6 +3806,13 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message): ], ) -> None: ... @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_breaking_change_info", b"_breaking_change_info" + ], + ) -> typing_extensions.Literal["breaking_change_info"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_error_class", b"_error_class"] ) -> typing_extensions.Literal["error_class"] | None: ... @@ -3800,6 +3821,94 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_sql_state", b"_sql_state"] ) -> typing_extensions.Literal["sql_state"] | None: ... + class BreakingChangeInfo(google.protobuf.message.Message): + """BreakingChangeInfo defines the schema for breaking change information.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MIGRATION_MESSAGE_FIELD_NUMBER: builtins.int + MITIGATION_CONFIG_FIELD_NUMBER: builtins.int + NEEDS_AUDIT_FIELD_NUMBER: builtins.int + @property + def migration_message( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """A message explaining how the user can migrate their job to work + with the breaking change. + """ + @property + def mitigation_config(self) -> global___FetchErrorDetailsResponse.MitigationConfig: + """A spark config flag that can be used to mitigate the breaking change.""" + needs_audit: builtins.bool + """If true, the breaking change should be inspected manually. + If false, the spark job should be retried by setting the mitigationConfig. + """ + def __init__( + self, + *, + migration_message: collections.abc.Iterable[builtins.str] | None = ..., + mitigation_config: global___FetchErrorDetailsResponse.MitigationConfig | None = ..., + needs_audit: builtins.bool | None = ..., + ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_mitigation_config", + b"_mitigation_config", + "_needs_audit", + b"_needs_audit", + "mitigation_config", + b"mitigation_config", + "needs_audit", + b"needs_audit", + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "_mitigation_config", + b"_mitigation_config", + "_needs_audit", + b"_needs_audit", + "migration_message", + b"migration_message", + "mitigation_config", + b"mitigation_config", + "needs_audit", + b"needs_audit", + ], + ) -> None: ... + @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal["_mitigation_config", b"_mitigation_config"], + ) -> typing_extensions.Literal["mitigation_config"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_needs_audit", b"_needs_audit"] + ) -> typing_extensions.Literal["needs_audit"] | None: ... + + class MitigationConfig(google.protobuf.message.Message): + """MitigationConfig defines a spark config flag that can be used to mitigate a breaking change.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + """The spark config key.""" + value: builtins.str + """The spark config value that mitigates the breaking change.""" + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField( + self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] + ) -> None: ... + class Error(google.protobuf.message.Message): """Error defines the schema for the representing exception.""" diff --git a/sql/connect/common/src/main/protobuf/spark/connect/base.proto b/sql/connect/common/src/main/protobuf/spark/connect/base.proto index 89d22f7455db..0e96512aae73 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/base.proto @@ -1095,6 +1095,32 @@ message FetchErrorDetailsResponse { // Portable error identifier across SQL engines // If null, error class or SQLSTATE is not set. optional string sql_state = 4; + + // Additional information if the error was caused by a breaking change. + optional BreakingChangeInfo breaking_change_info = 5; + } + + // BreakingChangeInfo defines the schema for breaking change information. + message BreakingChangeInfo { + // A message explaining how the user can migrate their job to work + // with the breaking change. + repeated string migration_message = 1; + + // A spark config flag that can be used to mitigate the breaking change. + optional MitigationConfig mitigation_config = 2; + + // If true, the breaking change should be inspected manually. + // If false, the spark job should be retried by setting the mitigationConfig. + optional bool needs_audit = 3; + } + + // MitigationConfig defines a spark config flag that can be used to mitigate a breaking change. + message MitigationConfig { + // The spark config key. + string key = 1; + + // The spark config value that mitigates the breaking change. + string value = 2; } // Error defines the schema for the representing exception. diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala index 76f91ee71059..0c2c2940e685 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala @@ -142,6 +142,25 @@ private[connect] object ErrorUtils extends Logging { if (sparkThrowable.getSqlState != null) { sparkThrowableBuilder.setSqlState(sparkThrowable.getSqlState) } + // Add breaking change info if present + if (sparkThrowable.getBreakingChangeInfo != null) { + val breakingChangeInfo = sparkThrowable.getBreakingChangeInfo + val breakingChangeInfoBuilder = FetchErrorDetailsResponse.BreakingChangeInfo + .newBuilder() + import scala.jdk.CollectionConverters._ + breakingChangeInfoBuilder + .addAllMigrationMessage(breakingChangeInfo.migrationMessage.asJava) + if (breakingChangeInfo.mitigationConfig.isDefined) { + val mitigationConfig = breakingChangeInfo.mitigationConfig.get + val mitigationConfigBuilder = FetchErrorDetailsResponse.MitigationConfig + .newBuilder() + .setKey(mitigationConfig.key) + .setValue(mitigationConfig.value) + breakingChangeInfoBuilder.setMitigationConfig(mitigationConfigBuilder.build()) + } + breakingChangeInfoBuilder.setNeedsAudit(breakingChangeInfo.needsAudit) + sparkThrowableBuilder.setBreakingChangeInfo(breakingChangeInfoBuilder.build()) + } sparkThrowableBuilder.putAllMessageParameters(sparkThrowable.getMessageParameters) builder.setSparkThrowable(sparkThrowableBuilder.build()) case _ => diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala index 33315682bd73..f389b753ad01 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala @@ -24,6 +24,7 @@ import scala.util.Try import io.grpc.stub.StreamObserver +import org.apache.spark.{BreakingChangeInfo, MitigationConfig, SparkThrowable} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.FetchErrorDetailsResponse import org.apache.spark.sql.AnalysisException @@ -34,6 +35,31 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.ThreadUtils +/** + * Test SparkThrowable implementation for testing breaking change info serialization + */ +class TestSparkThrowableWithBreakingChange( + val errorClass: String, + val messageParams: Map[String, String], + val breakingChangeInfo: Option[BreakingChangeInfo]) + extends Exception(s"Test error for $errorClass") + with SparkThrowable { + + override def getCondition: String = errorClass + override def getErrorClass: String = errorClass + override def getMessageParameters: java.util.Map[String, String] = { + import scala.jdk.CollectionConverters._ + messageParams.asJava + } + + override def getBreakingChangeInfo: BreakingChangeInfo = { + breakingChangeInfo match { + case Some(info) => info + case None => null + } + } +} + private class FetchErrorDetailsResponseObserver(p: Promise[FetchErrorDetailsResponse]) extends StreamObserver[FetchErrorDetailsResponse] { override def onNext(v: FetchErrorDetailsResponse): Unit = p.success(v) @@ -205,4 +231,143 @@ class FetchErrorDetailsHandlerSuite extends SharedSparkSession with ResourceHelp assert(sparkThrowableProto.getMessageParametersMap == testError.getMessageParameters) assert(sparkThrowableProto.getSqlState == testError.getSqlState) } + + test("breaking change info is not present when error has no breaking change") { + val testError = new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.DESC_TABLE_COLUMN_JSON", + messageParameters = Map.empty) + val errorId = UUID.randomUUID().toString() + + val sessionHolder = SparkConnectService + .getOrCreateIsolatedSession(userId, sessionId, None) + sessionHolder.errorIdToError.put(errorId, testError) + + val response = fetchErrorDetails(userId, sessionId, errorId) + + assert(response.hasRootErrorIdx) + val sparkThrowableProto = response.getErrors(0).getSparkThrowable + assert(!sparkThrowableProto.hasBreakingChangeInfo) + } + + test("breaking change info serialization works for errors with breaking change") { + // This test would require creating a SparkThrowable that has breaking change info + // Since we need to test with actual breaking change info, we need to either: + // 1. Use a real error class that has breaking change info, or + // 2. Create a mock SparkThrowable that returns breaking change info + + // For now, we'll create a simple test that verifies the serialization path exists + // A full test would require setting up test error classes with breaking change info + // in a test error-conditions.json file + + val testError = Try(spark.sql("select x")).failed.get.asInstanceOf[AnalysisException] + val errorId = UUID.randomUUID().toString() + + SparkConnectService + .getOrCreateIsolatedSession(userId, sessionId, None) + .errorIdToError + .put(errorId, testError) + + val response = fetchErrorDetails(userId, sessionId, errorId) + assert(response.hasRootErrorIdx) + + // Verify that the breaking_change_info field exists in the protobuf schema + // Even if this particular error doesn't have breaking change info, + // the protobuf should support the field + val sparkThrowableProto = response.getErrors(0).getSparkThrowable + + // This test verifies that our protobuf changes don't break existing functionality + assert(sparkThrowableProto.getErrorClass == testError.errorClass.get) + + // TODO: Once we have test error classes with breaking change info, + // we should add a more comprehensive test that verifies: + // - sparkThrowableProto.hasBreakingChangeInfo == true + // - sparkThrowableProto.getBreakingChangeInfo.getMigrationMessageCount > 0 + // - sparkThrowableProto.getBreakingChangeInfo.getNeedsAudit == expected_value + // - If mitigation config exists: + // sparkThrowableProto.getBreakingChangeInfo.hasSparkConfig + } + + test("throwableToFetchErrorDetailsResponse includes breaking change info") { + val migrationMessages = + Seq("Please update your code to use new API", "See documentation for details") + val mitigationConfig = + Some(MitigationConfig("spark.sql.legacy.behavior.enabled", "true")) + val breakingChangeInfo = + BreakingChangeInfo(migrationMessages, mitigationConfig, needsAudit = false) + + val testError = new TestSparkThrowableWithBreakingChange( + "TEST_BREAKING_CHANGE_ERROR", + Map("param" -> "value"), + Some(breakingChangeInfo)) + + val response = + ErrorUtils.throwableToFetchErrorDetailsResponse(testError, serverStackTraceEnabled = false) + + assert(response.hasRootErrorIdx) + assert(response.getRootErrorIdx == 0) + assert(response.getErrorsCount == 1) + + val error = response.getErrors(0) + assert(error.hasSparkThrowable) + + val sparkThrowableProto = error.getSparkThrowable + assert(sparkThrowableProto.getErrorClass == "TEST_BREAKING_CHANGE_ERROR") + assert(sparkThrowableProto.hasBreakingChangeInfo) + + val bciProto = sparkThrowableProto.getBreakingChangeInfo + assert(bciProto.getMigrationMessageCount == 2) + assert(bciProto.getMigrationMessage(0) == "Please update your code to use new API") + assert(bciProto.getMigrationMessage(1) == "See documentation for details") + assert(bciProto.getNeedsAudit == false) + + assert(bciProto.hasMitigationConfig) + val mitigationConfigProto = bciProto.getMitigationConfig + assert(mitigationConfigProto.getKey == "spark.sql.legacy.behavior.enabled") + assert(mitigationConfigProto.getValue == "true") + } + + test("throwableToFetchErrorDetailsResponse without breaking change info") { + val testError = + new TestSparkThrowableWithBreakingChange("REGULAR_ERROR", Map("param" -> "value"), None) + + val response = + ErrorUtils.throwableToFetchErrorDetailsResponse(testError, serverStackTraceEnabled = false) + + assert(response.hasRootErrorIdx) + val sparkThrowableProto = response.getErrors(0).getSparkThrowable + assert(sparkThrowableProto.getErrorClass == "REGULAR_ERROR") + assert(!sparkThrowableProto.hasBreakingChangeInfo) + } + + test( + "throwableToFetchErrorDetailsResponse with breaking change info without mitigation config") { + val migrationMessages = Seq("Migration message only") + val breakingChangeInfo = BreakingChangeInfo(migrationMessages, None, needsAudit = true) + + val testError = new TestSparkThrowableWithBreakingChange( + "TEST_BREAKING_CHANGE_NO_MITIGATION", + Map.empty, + Some(breakingChangeInfo)) + + val response = + ErrorUtils.throwableToFetchErrorDetailsResponse(testError, serverStackTraceEnabled = false) + + val bciProto = response.getErrors(0).getSparkThrowable.getBreakingChangeInfo + assert(bciProto.getMigrationMessageCount == 1) + assert(bciProto.getMigrationMessage(0) == "Migration message only") + assert(bciProto.getNeedsAudit == true) + assert(!bciProto.hasMitigationConfig) + } + + test("throwableToFetchErrorDetailsResponse with non-SparkThrowable") { + val testError = new RuntimeException("Regular runtime exception") + + val response = + ErrorUtils.throwableToFetchErrorDetailsResponse(testError, serverStackTraceEnabled = false) + + assert(response.hasRootErrorIdx) + val error = response.getErrors(0) + assert(!error.hasSparkThrowable) + assert(error.getMessage == "Regular runtime exception") + } }