diff --git a/python/pyspark/sql/streaming/listener.py b/python/pyspark/sql/streaming/listener.py index 5fdcab3dfaf2..9b0ae4938a17 100644 --- a/python/pyspark/sql/streaming/listener.py +++ b/python/pyspark/sql/streaming/listener.py @@ -270,6 +270,10 @@ def __init__(self, jevent: JavaObject) -> None: self._runId: uuid.UUID = uuid.UUID(jevent.runId().toString()) jexception = jevent.exception() self._exception: Optional[str] = jexception.get() if jexception.isDefined() else None + jerrorclass = jevent.errorClassOnException() + self._errorClassOnException: Optional[str] = ( + jerrorclass.get() if jerrorclass.isDefined() else None + ) @property def id(self) -> uuid.UUID: @@ -295,6 +299,19 @@ def exception(self) -> Optional[str]: """ return self._exception + @property + def errorClassOnException(self) -> Optional[str]: + """ + The error class from the exception if the query was terminated + with an exception which is a part of error class framework. + If the query was terminated without an exception, or the + exception is not a part of error class framework, it will be + `None`. + + .. versionadded:: 3.5.0 + """ + return self._errorClassOnException + class StreamingQueryProgress: """ diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py b/python/pyspark/sql/tests/streaming/test_streaming_listener.py index df1ad780aefc..71d76bc4e8d5 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py @@ -65,7 +65,7 @@ def get_number_of_public_methods(clz): get_number_of_public_methods( "org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent" ), - 13, + 14, msg, ) self.assertEquals( @@ -149,6 +149,7 @@ def check_terminated_event(self, event): self.assertTrue(isinstance(event.runId, uuid.UUID)) # TODO: Needs a test for exception. self.assertEquals(event.exception, None) + self.assertEquals(event.errorClassOnException, None) def check_streaming_query_progress(self, progress): """Check StreamingQueryProgress""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index e5077fa7f7bc..88fb28f0da4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -31,7 +31,7 @@ import scala.util.control.NonFatal import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{SparkContext, SparkException, SparkThrowable} import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -353,8 +353,15 @@ abstract class StreamExecution( // Notify others sparkSession.streams.notifyQueryTermination(StreamExecution.this) + val errorClassOpt = exception.flatMap { + _.cause match { + case t: SparkThrowable => Some(t.getErrorClass) + case _ => None + } + } postEvent( - new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString))) + new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString), + errorClassOpt)) // Delete the temp checkpoint when either force delete enabled or the query didn't fail if (deleteCheckpointOnStop && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index aec5afe24c9e..61a0ef1b98e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -154,11 +154,22 @@ object StreamingQueryListener { * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param exception The exception message of the query if the query was terminated * with an exception. Otherwise, it will be `None`. + * @param errorClassOnException The error class from the exception if the query was terminated + * with an exception which is a part of error class framework. + * If the query was terminated without an exception, or the + * exception is not a part of error class framework, it will be + * `None`. * @since 2.1.0 */ @Evolving class QueryTerminatedEvent private[sql]( val id: UUID, val runId: UUID, - val exception: Option[String]) extends Event + val exception: Option[String], + val errorClassOnException: Option[String]) extends Event { + // compatibility with versions in prior to 3.5.0 + def this(id: UUID, runId: UUID, exception: Option[String]) = { + this(id, runId, exception, None) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 88d321af49be..6826d161d408 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -165,6 +165,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { listeners.foreach(listener => assert(listener.terminationEvent.id === query.id)) listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId)) listeners.foreach(listener => assert(listener.terminationEvent.exception === None)) + listeners.foreach(listener => assert(listener.terminationEvent.errorClassOnException + === None)) } listeners.foreach(listener => listener.checkAsyncErrors()) listeners.foreach(listener => listener.reset()) @@ -190,6 +192,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { listeners.foreach(listener => assert(listener.terminationEvent.id === query.id)) listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId)) listeners.foreach(listener => assert(listener.terminationEvent.exception === None)) + listeners.foreach(listener => assert(listener.terminationEvent.errorClassOnException + === None)) } listeners.foreach(listener => listener.checkAsyncErrors()) listeners.foreach(listener => listener.reset()) @@ -280,11 +284,13 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(newEvent.id === event.id) assert(newEvent.runId === event.runId) assert(newEvent.exception === event.exception) + assert(newEvent.errorClassOnException === event.errorClassOnException) } - val exception = new RuntimeException("exception") + val exception = SparkException.internalError("testpurpose") testSerialization( - new QueryTerminatedEvent(UUID.randomUUID, UUID.randomUUID, Some(exception.getMessage))) + new QueryTerminatedEvent(UUID.randomUUID, UUID.randomUUID, + Some(exception.getMessage), Some(exception.getErrorClass))) } test("only one progress event per interval when no data") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala index 58e04eb285a3..7044373362ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -104,7 +104,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest { } // handle terminate event - val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None) + val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None, None) listener.onQueryTerminated(terminateEvent) assert(!queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.isActive) @@ -126,7 +126,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest { listener.onQueryStarted(startEvent0) // handle terminate event - val terminateEvent0 = new StreamingQueryListener.QueryTerminatedEvent(id, runId0, None) + val terminateEvent0 = new StreamingQueryListener.QueryTerminatedEvent(id, runId0, None, None) listener.onQueryTerminated(terminateEvent0) // handle second time start @@ -178,19 +178,19 @@ class StreamingQueryStatusListenerSuite extends StreamTest { val (id3, runId3) = addNewQuery() assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 0) - val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, runId1, None) + val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, runId1, None, None) listener.onQueryTerminated(terminateEvent1) checkInactiveQueryStatus(1, Seq(id1)) // SPARK-41972: having a short sleep here to make sure the end time of query 2 is larger than // query 1. Thread.sleep(20) - val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent(id2, runId2, None) + val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent(id2, runId2, None, None) listener.onQueryTerminated(terminateEvent2) checkInactiveQueryStatus(2, Seq(id1, id2)) // SPARK-41972: having a short sleep here to make sure the end time of query 3 is larger than // query 2. Thread.sleep(20) - val terminateEvent3 = new StreamingQueryListener.QueryTerminatedEvent(id3, runId3, None) + val terminateEvent3 = new StreamingQueryListener.QueryTerminatedEvent(id3, runId3, None, None) listener.onQueryTerminated(terminateEvent3) checkInactiveQueryStatus(2, Seq(id2, id3)) }