Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions python/pyspark/sql/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ def __init__(self, jevent: JavaObject) -> None:
self._runId: uuid.UUID = uuid.UUID(jevent.runId().toString())
jexception = jevent.exception()
self._exception: Optional[str] = jexception.get() if jexception.isDefined() else None
jerrorclass = jevent.errorClassOnException()
self._errorClassOnException: Optional[str] = (
jerrorclass.get() if jerrorclass.isDefined() else None
)

@property
def id(self) -> uuid.UUID:
Expand All @@ -295,6 +299,19 @@ def exception(self) -> Optional[str]:
"""
return self._exception

@property
def errorClassOnException(self) -> Optional[str]:
"""
The error class from the exception if the query was terminated
with an exception which is a part of error class framework.
If the query was terminated without an exception, or the
exception is not a part of error class framework, it will be
`None`.

.. versionadded:: 3.5.0
"""
return self._errorClassOnException


class StreamingQueryProgress:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def get_number_of_public_methods(clz):
get_number_of_public_methods(
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent"
),
13,
14,
msg,
)
self.assertEquals(
Expand Down Expand Up @@ -149,6 +149,7 @@ def check_terminated_event(self, event):
self.assertTrue(isinstance(event.runId, uuid.UUID))
# TODO: Needs a test for exception.
self.assertEquals(event.exception, None)
self.assertEquals(event.errorClassOnException, None)

def check_streaming_query_progress(self, progress):
"""Check StreamingQueryProgress"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.util.control.NonFatal
import com.google.common.util.concurrent.UncheckedExecutionException
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.{SparkContext, SparkException, SparkThrowable}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -353,8 +353,15 @@ abstract class StreamExecution(

// Notify others
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
val errorClassOpt = exception.flatMap {
_.cause match {
case t: SparkThrowable => Some(t.getErrorClass)
case _ => None
}
}
postEvent(
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString),
errorClassOpt))

// Delete the temp checkpoint when either force delete enabled or the query didn't fail
if (deleteCheckpointOnStop &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,22 @@ object StreamingQueryListener {
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param exception The exception message of the query if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @param errorClassOnException The error class from the exception if the query was terminated
* with an exception which is a part of error class framework.
Comment on lines +157 to +158
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is error class framework a public API that end-users understand or can find document?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkException and SparkThrowable are all public API where we expose to API doc. It's just that we lack about documentation for these classes. That's something we have to fix; we could probably refer to SparkThrowable.getErrorClass here, but it's not documented so wouldn't help unfortunately.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR May 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we think it'd be better to not mention about error class framework and say "this gives an error classification if the exception is available and classified"? If then I can avoid mentioning error class framework.

@HyukjinKwon WDYT? cc. @MaxGekk

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://spark.apache.org/docs/latest/sql-error-conditions.html

Above is the landing page on website. It's in SQL guide doc.

* If the query was terminated without an exception, or the
* exception is not a part of error class framework, it will be
* `None`.
* @since 2.1.0
*/
@Evolving
class QueryTerminatedEvent private[sql](
val id: UUID,
val runId: UUID,
val exception: Option[String]) extends Event
val exception: Option[String],
val errorClassOnException: Option[String]) extends Event {
// compatibility with versions in prior to 3.5.0
def this(id: UUID, runId: UUID, exception: Option[String]) = {
this(id, runId, exception, None)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
listeners.foreach(listener => assert(listener.terminationEvent.id === query.id))
listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId))
listeners.foreach(listener => assert(listener.terminationEvent.exception === None))
listeners.foreach(listener => assert(listener.terminationEvent.errorClassOnException
=== None))
}
listeners.foreach(listener => listener.checkAsyncErrors())
listeners.foreach(listener => listener.reset())
Expand All @@ -190,6 +192,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
listeners.foreach(listener => assert(listener.terminationEvent.id === query.id))
listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId))
listeners.foreach(listener => assert(listener.terminationEvent.exception === None))
listeners.foreach(listener => assert(listener.terminationEvent.errorClassOnException
=== None))
}
listeners.foreach(listener => listener.checkAsyncErrors())
listeners.foreach(listener => listener.reset())
Expand Down Expand Up @@ -280,11 +284,13 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(newEvent.id === event.id)
assert(newEvent.runId === event.runId)
assert(newEvent.exception === event.exception)
assert(newEvent.errorClassOnException === event.errorClassOnException)
}

val exception = new RuntimeException("exception")
val exception = SparkException.internalError("testpurpose")
testSerialization(
new QueryTerminatedEvent(UUID.randomUUID, UUID.randomUUID, Some(exception.getMessage)))
new QueryTerminatedEvent(UUID.randomUUID, UUID.randomUUID,
Some(exception.getMessage), Some(exception.getErrorClass)))
}

test("only one progress event per interval when no data") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
}

// handle terminate event
val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None)
val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None, None)
listener.onQueryTerminated(terminateEvent)

assert(!queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.isActive)
Expand All @@ -126,7 +126,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
listener.onQueryStarted(startEvent0)

// handle terminate event
val terminateEvent0 = new StreamingQueryListener.QueryTerminatedEvent(id, runId0, None)
val terminateEvent0 = new StreamingQueryListener.QueryTerminatedEvent(id, runId0, None, None)
listener.onQueryTerminated(terminateEvent0)

// handle second time start
Expand Down Expand Up @@ -178,19 +178,19 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
val (id3, runId3) = addNewQuery()
assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 0)

val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, runId1, None)
val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, runId1, None, None)
listener.onQueryTerminated(terminateEvent1)
checkInactiveQueryStatus(1, Seq(id1))
// SPARK-41972: having a short sleep here to make sure the end time of query 2 is larger than
// query 1.
Thread.sleep(20)
val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent(id2, runId2, None)
val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent(id2, runId2, None, None)
listener.onQueryTerminated(terminateEvent2)
checkInactiveQueryStatus(2, Seq(id1, id2))
// SPARK-41972: having a short sleep here to make sure the end time of query 3 is larger than
// query 2.
Thread.sleep(20)
val terminateEvent3 = new StreamingQueryListener.QueryTerminatedEvent(id3, runId3, None)
val terminateEvent3 = new StreamingQueryListener.QueryTerminatedEvent(id3, runId3, None, None)
listener.onQueryTerminated(terminateEvent3)
checkInactiveQueryStatus(2, Seq(id2, id3))
}
Expand Down