diff --git a/core/src/main/scala/org/apache/spark/SparkThrowableHelper.scala b/core/src/main/scala/org/apache/spark/SparkThrowableHelper.scala index 93b137615551..e40368eb619d 100644 --- a/core/src/main/scala/org/apache/spark/SparkThrowableHelper.scala +++ b/core/src/main/scala/org/apache/spark/SparkThrowableHelper.scala @@ -19,6 +19,8 @@ package org.apache.spark import scala.collection.JavaConverters._ +import com.fasterxml.jackson.core.util.MinimalPrettyPrinter + import org.apache.spark.util.JsonProtocol.toJsonString import org.apache.spark.util.Utils @@ -119,4 +121,16 @@ private[spark] object SparkThrowableHelper { } } } + + def getMessage(throwable: Throwable): String = { + toJsonString { generator => + val g = generator.setPrettyPrinter(new MinimalPrettyPrinter) + g.writeStartObject() + g.writeStringField("errorClass", throwable.getClass.getCanonicalName) + g.writeObjectFieldStart("messageParameters") + g.writeStringField("message", throwable.getMessage) + g.writeEndObject() + g.writeEndObject() + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 953c370297f0..1c36131ddb40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => JFuture} import java.util.concurrent.atomic.AtomicLong -import org.apache.spark.SparkContext +import org.apache.spark.{ErrorMessageFormat, SparkContext, SparkThrowable, SparkThrowableHelper} import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} @@ -115,7 +115,15 @@ object SQLExecution { throw e } finally { val endTime = System.nanoTime() - val event = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis()) + val errorMessage = ex.map { + case e: SparkThrowable => + SparkThrowableHelper.getMessage(e, ErrorMessageFormat.STANDARD) + case e => + // unexpected behavior + SparkThrowableHelper.getMessage(e) + } + val event = SparkListenerSQLExecutionEnd( + executionId, System.currentTimeMillis(), errorMessage) // Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name` // parameter. The `ExecutionListenerManager` only watches SQL executions with name. We // can specify the execution name in more places in the future, so that diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index b3f23cd1b5e0..b1e9ae78dd4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -41,15 +41,12 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L val failed = new mutable.ArrayBuffer[SQLExecutionUIData]() sqlStore.executionsList().foreach { e => - val isRunning = e.completionTime.isEmpty || - e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING } - val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED } - if (isRunning) { - running += e - } else if (isFailed) { + if (e.errorMessage.isDefined) { failed += e - } else { + } else if (e.completionTime.nonEmpty) { completed += e + } else { + running += e } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 9988df025b6a..badffbff6db2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -393,9 +393,10 @@ class SQLAppStatusListener( } private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { - val SparkListenerSQLExecutionEnd(executionId, time) = event + val SparkListenerSQLExecutionEnd(executionId, time, errorMessage) = event Option(liveExecutions.get(executionId)).foreach { exec => exec.completionTime = Some(new Date(time)) + exec.errorMessage = errorMessage update(exec) // Aggregating metrics can be expensive for large queries, so do it asynchronously. The end @@ -494,6 +495,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var driverAccumUpdates = Seq[(Long, Long)]() @volatile var metricsValues: Map[Long, String] = null + var errorMessage: Option[String] = None // Just in case job end and execution end arrive out of order, keep track of how many // end events arrived so that the listener can stop tracking the execution. @@ -511,7 +513,8 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { completionTime, jobs, stages, - metricsValues) + metricsValues, + errorMessage) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 95035c08a2cb..e83aef0442e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -100,7 +100,8 @@ class SQLExecutionUIData( * from the SQL listener instance. */ @JsonDeserialize(keyAs = classOf[JLong]) - val metricValues: Map[Long, String]) { + val metricValues: Map[Long, String], + val errorMessage: Option[String]) { @JsonIgnore @KVIndex("completionTime") private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index e3f51cbe3b00..86bb40fde2f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -56,7 +56,10 @@ case class SparkListenerSQLExecutionStart( } @DeveloperApi -case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) +case class SparkListenerSQLExecutionEnd( + executionId: Long, + time: Long, + errorMessage: Option[String] = None) extends SparkListenerEvent { // The name of the execution, e.g. `df.collect` will trigger a SQL execution with name "collect". diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index 4fd8341b3f52..9f2b08a65eaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.json4s.jackson.JsonMethods._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkThrowableHelper} import org.apache.spark.scheduler.SparkListenerEvent import org.apache.spark.sql.LocalSparkSession import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} @@ -74,20 +74,24 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { test("SparkListenerSQLExecutionEnd backward compatibility") { spark = new TestSparkSession() val qe = spark.sql("select 1").queryExecution - val event = SparkListenerSQLExecutionEnd(1, 10) + val errorMessage = SparkThrowableHelper.getMessage(new Exception("test")) + val event = SparkListenerSQLExecutionEnd(1, 10, Some(errorMessage)) event.duration = 1000 event.executionName = Some("test") event.qe = qe - event.executionFailure = Some(new RuntimeException("test")) + event.executionFailure = Some(new Exception("test")) val json = JsonProtocol.sparkEventToJsonString(event) + // scalastyle:off line.size.limit assert(parse(json) == parse( """ |{ | "Event" : "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd", | "executionId" : 1, - | "time" : 10 + | "time" : 10, + | "errorMessage" : "{\"errorClass\":\"java.lang.Exception\",\"messageParameters\":{\"message\":\"test\"}}" |} """.stripMargin)) + // scalastyle:on val readBack = JsonProtocol.sparkEventFromJson(json) event.duration = 0 event.executionName = None @@ -95,6 +99,35 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { event.executionFailure = None assert(readBack == event) } + + test("SPARK-40834: Use SparkListenerSQLExecutionEnd to track final SQL status in UI") { + // parse old event log using new SparkListenerSQLExecutionEnd + val executionEnd = + """ + |{ + | "Event" : "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd", + | "executionId" : 1, + | "time" : 10 + |} + """.stripMargin + val readBack = JsonProtocol.sparkEventFromJson(executionEnd) + assert(readBack == SparkListenerSQLExecutionEnd(1, 10)) + + // parse new event using old SparkListenerSQLExecutionEnd + // scalastyle:off line.size.limit + val newExecutionEnd = + """ + |{ + | "Event" : "org.apache.spark.sql.execution.OldVersionSQLExecutionEnd", + | "executionId" : 1, + | "time" : 10, + | "errorMessage" : "{\"errorClass\":\"java.lang.Exception\",\"messageParameters\":{\"message\":\"test\"}}" + |} + """.stripMargin + // scalastyle:on + val readBack2 = JsonProtocol.sparkEventFromJson(newExecutionEnd) + assert(readBack2 == OldVersionSQLExecutionEnd(1, 10)) + } } private case class OldVersionSQLExecutionStart( @@ -105,3 +138,6 @@ private case class OldVersionSQLExecutionStart( sparkPlanInfo: SparkPlanInfo, time: Long) extends SparkListenerEvent + +private case class OldVersionSQLExecutionEnd(executionId: Long, time: Long) + extends SparkListenerEvent diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala index 1f5cbb0e19ec..959a48d8c9ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala @@ -115,7 +115,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { System.currentTimeMillis(), Map.empty)) listener.onOtherEvent(SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis())) + executionId, System.currentTimeMillis(), Some("Oops"))) listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 6ce4ab3c3245..1cecb2a6ba94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -21,6 +21,7 @@ import java.util.Properties import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import com.fasterxml.jackson.databind.ObjectMapper import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.json4s.jackson.JsonMethods._ @@ -980,6 +981,30 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils } } } + + test("SPARK-40834: Use SparkListenerSQLExecutionEnd to track final SQL status in UI") { + var received = false + spark.sparkContext.addSparkListener(new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case SparkListenerSQLExecutionEnd(_, _, Some(errorMessage)) => + val error = new ObjectMapper().readTree(errorMessage) + assert(error.get("errorClass").toPrettyString === "\"java.lang.Exception\"") + assert(error.path("messageParameters").get("message").toPrettyString === "\"test\"") + received = true + case _ => + } + } + }) + + intercept[Exception] { + SQLExecution.withNewExecutionId(spark.range(1).queryExecution) { + throw new Exception("test") + } + } + spark.sparkContext.listenerBus.waitUntilEmpty(10000) + assert(received) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index 11201aadf67f..336779988fc8 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -93,7 +93,8 @@ object SqlResourceSuite { 0 -> JobExecutionStatus.SUCCEEDED, 1 -> JobExecutionStatus.SUCCEEDED), stages = Set[Int](), - metricValues = getMetricValues() + metricValues = getMetricValues(), + errorMessage = None ) }