Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[sql] class StreamingQueryStatusListener(
querySummary.id,
querySummary.runId,
isActive = false,
querySummary.exception,
event.exception,
querySummary.startTimestamp,
Some(curTime)
), checkTriggers = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,40 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
store.write(testData)
store.close(closeParent = false)
}

test("SPARK-43973: onQueryTerminated should pick up exception info") {
val kvStore = new ElementTrackingStore(createStore(), sparkConf)
val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore)
val queryStore = new StreamingQueryStatusStore(kvStore)

// succeed (no exception) case
val id1 = UUID.randomUUID()
val runId1 = UUID.randomUUID()
val startEvent1 = new StreamingQueryListener.QueryStartedEvent(
id1, runId1, "test1", "2023-01-01T20:50:00.800Z")
listener.onQueryStarted(startEvent1)
val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, runId1, None, None)
listener.onQueryTerminated(terminateEvent1)

// failure (has exception) case
val id2 = UUID.randomUUID()
val runId2 = UUID.randomUUID()
val startEvent2 = new StreamingQueryListener.QueryStartedEvent(
id2, runId2, "test2", "2023-01-02T20:54:20.827Z")
listener.onQueryStarted(startEvent2)
val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent(
id2, runId2, Option("ExampleException"), Option("EXAMPLE_ERROR_CLASS"))
listener.onQueryTerminated(terminateEvent2)

// check results
val (activeQueries, stoppedQueries) = queryStore.allQueryUIData.partition(_.summary.isActive)
assert(activeQueries.isEmpty)
val (finishedQueries, failedQueries) = stoppedQueries.partition(_.summary.exception.isEmpty)
assert(finishedQueries.size == 1)
assert(failedQueries.size == 1)
assert(failedQueries.head.summary.exception == Option("ExampleException"))
// there's no UI state for errorClassOnException yet; should check it as well when it's added
}
}

class StreamingQueryStatusListenerWithDiskStoreSuite extends StreamingQueryStatusListenerSuite {
Expand Down