From 189c44f49b10820b22445bed3a57b0946fdc8cd7 Mon Sep 17 00:00:00 2001 From: Kris Mok Date: Mon, 5 Jun 2023 22:32:28 +0000 Subject: [PATCH] fix Streaming UI mishandling of failed queries --- .../ui/StreamingQueryStatusListener.scala | 2 +- .../StreamingQueryStatusListenerSuite.scala | 34 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index e92c504d5c5c..09553cc2a9dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -109,7 +109,7 @@ private[sql] class StreamingQueryStatusListener( querySummary.id, querySummary.runId, isActive = false, - querySummary.exception, + event.exception, querySummary.startTimestamp, Some(curTime) ), checkTriggers = true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala index 7044373362ee..1eb3ce6bd82e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -267,6 +267,40 @@ class StreamingQueryStatusListenerSuite extends StreamTest { store.write(testData) store.close(closeParent = false) } + + test("SPARK-43973: onQueryTerminated should pick up exception info") { + val kvStore = new ElementTrackingStore(createStore(), sparkConf) + val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore) + val queryStore = new StreamingQueryStatusStore(kvStore) + + // succeed (no exception) case + val id1 = UUID.randomUUID() + val runId1 = UUID.randomUUID() + val startEvent1 = new StreamingQueryListener.QueryStartedEvent( + id1, runId1, "test1", "2023-01-01T20:50:00.800Z") + listener.onQueryStarted(startEvent1) + val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, runId1, None, None) + listener.onQueryTerminated(terminateEvent1) + + // failure (has exception) case + val id2 = UUID.randomUUID() + val runId2 = UUID.randomUUID() + val startEvent2 = new StreamingQueryListener.QueryStartedEvent( + id2, runId2, "test2", "2023-01-02T20:54:20.827Z") + listener.onQueryStarted(startEvent2) + val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent( + id2, runId2, Option("ExampleException"), Option("EXAMPLE_ERROR_CLASS")) + listener.onQueryTerminated(terminateEvent2) + + // check results + val (activeQueries, stoppedQueries) = queryStore.allQueryUIData.partition(_.summary.isActive) + assert(activeQueries.isEmpty) + val (finishedQueries, failedQueries) = stoppedQueries.partition(_.summary.exception.isEmpty) + assert(finishedQueries.size == 1) + assert(failedQueries.size == 1) + assert(failedQueries.head.summary.exception == Option("ExampleException")) + // there's no UI state for errorClassOnException yet; should check it as well when it's added + } } class StreamingQueryStatusListenerWithDiskStoreSuite extends StreamingQueryStatusListenerSuite {