diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 4db7c85b4ff3..bb4cea474fe0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -121,7 +121,11 @@ object SQLExecution { SparkThrowableHelper.getMessage(e) } val event = SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis(), errorMessage) + executionId, + System.currentTimeMillis(), + // Use empty string to indicate no error, as None may mean events generated by old + // versions of Spark. + errorMessage.orElse(Some(""))) // Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name` // parameter. The `ExecutionListenerManager` only watches SQL executions with name. We // can specify the execution name in more places in the future, so that diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index b1e9ae78dd4e..a7adc9431c35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -42,11 +42,24 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L sqlStore.executionsList().foreach { e => if (e.errorMessage.isDefined) { - failed += e - } else if (e.completionTime.nonEmpty) { - completed += e - } else { + if (e.errorMessage.get.isEmpty) { + completed += e + } else { + failed += e + } + } else if (e.completionTime.isEmpty) { running += e + } else { + // When `completionTime` is present, it means the query execution is completed and + // `errorMessage` should be present as well. However, events generated by old versions of + // Spark do not have the `errorMessage` field. We have to check the status of this query + // execution's jobs. + val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED } + if (isFailed) { + failed += e + } else { + completed += e + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index badffbff6db2..f79045bc6a89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -489,13 +489,13 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var metrics = Seq[SQLPlanMetric]() var submissionTime = -1L var completionTime: Option[Date] = None + var errorMessage: Option[String] = None var jobs = Map[Int, JobExecutionStatus]() var stages = Set[Int]() var driverAccumUpdates = Seq[(Long, Long)]() @volatile var metricsValues: Map[Long, String] = null - var errorMessage: Option[String] = None // Just in case job end and execution end arrive out of order, keep track of how many // end events arrived so that the listener can stop tracking the execution. @@ -511,10 +511,10 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { metrics, submissionTime, completionTime, + errorMessage, jobs, stages, - metricsValues, - errorMessage) + metricsValues) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index e83aef0442e8..30e81bf113b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -90,6 +90,7 @@ class SQLExecutionUIData( val metrics: Seq[SQLPlanMetric], val submissionTime: Long, val completionTime: Option[Date], + val errorMessage: Option[String], @JsonDeserialize(keyAs = classOf[Integer]) val jobs: Map[Int, JobExecutionStatus], @JsonDeserialize(contentAs = classOf[Integer]) @@ -100,8 +101,7 @@ class SQLExecutionUIData( * from the SQL listener instance. */ @JsonDeserialize(keyAs = classOf[JLong]) - val metricValues: Map[Long, String], - val errorMessage: Option[String]) { + val metricValues: Map[Long, String]) { @JsonIgnore @KVIndex("completionTime") private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index f007ab2e8b52..ce665e118938 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -55,6 +55,9 @@ case class SparkListenerSQLExecutionStart( case class SparkListenerSQLExecutionEnd( executionId: Long, time: Long, + // For backward compatibility, the `errorMessage` will be None when we parse event logs + // generated by old versions of Spark. It should always be Some in Spark 3.4+ and empty string + // means there is no error during execution. errorMessage: Option[String] = None) extends SparkListenerEvent { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala index 959a48d8c9ac..495a473a0141 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala @@ -26,6 +26,7 @@ import scala.xml.Node import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkConf import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd, SparkListenerJobStart} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} @@ -35,6 +36,11 @@ import org.apache.spark.util.kvstore.InMemoryStore class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { + override def sparkConf: SparkConf = { + // Disable async kv store write in the UI, to make tests more stable here. + super.sparkConf.set(org.apache.spark.internal.config.Status.ASYNC_TRACKING_ENABLED, false) + } + import testImplicits._ var kvstore: ElementTrackingStore = _ @@ -60,6 +66,42 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { assert(!html.contains("1970/01/01")) } + test("SPARK-40834: prioritize `errorMessage` over job failures") { + val statusStore = createStatusStore + val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS) + when(tab.sqlStore).thenReturn(statusStore) + + val request = mock(classOf[HttpServletRequest]) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + + Seq(Some(""), Some("testErrorMsg"), None).foreach { msg => + val listener = statusStore.listener.get + val page = new AllExecutionsPage(tab) + val df = createTestDataFrame + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 0, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + listener.onJobStart(SparkListenerJobStart( + jobId = 0, + time = System.currentTimeMillis(), + stageInfos = Nil, + createProperties(0))) + listener.onJobEnd(SparkListenerJobEnd( + jobId = 0, + time = System.currentTimeMillis(), + JobFailed(new RuntimeException("Oops")))) + listener.onOtherEvent(SparkListenerSQLExecutionEnd(0, System.currentTimeMillis(), msg)) + val html = page.render(request).toString().toLowerCase(Locale.ROOT) + + assert(html.contains("failed queries") == !msg.contains("")) + } + } + test("sorting should be successful") { val statusStore = createStatusStore val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)