Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkThrowableHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark

import scala.collection.JavaConverters._

import com.fasterxml.jackson.core.util.MinimalPrettyPrinter

import org.apache.spark.util.JsonProtocol.toJsonString
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -119,4 +121,16 @@ private[spark] object SparkThrowableHelper {
}
}
}

def getMessage(throwable: Throwable): String = {
toJsonString { generator =>
val g = generator.setPrettyPrinter(new MinimalPrettyPrinter)
g.writeStartObject()
g.writeStringField("errorClass", throwable.getClass.getCanonicalName)
g.writeObjectFieldStart("messageParameters")
g.writeStringField("message", throwable.getMessage)
g.writeEndObject()
g.writeEndObject()
}
Comment on lines +125 to +134
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan this follows the standard error msg json format

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => JFuture}
import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.SparkContext
import org.apache.spark.{ErrorMessageFormat, SparkContext, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
Expand Down Expand Up @@ -115,7 +115,15 @@ object SQLExecution {
throw e
} finally {
val endTime = System.nanoTime()
val event = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis())
val errorMessage = ex.map {
case e: SparkThrowable =>
SparkThrowableHelper.getMessage(e, ErrorMessageFormat.STANDARD)
case e =>
// unexpected behavior
SparkThrowableHelper.getMessage(e)
}
val event = SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis(), errorMessage)
// Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name`
// parameter. The `ExecutionListenerManager` only watches SQL executions with name. We
// can specify the execution name in more places in the future, so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
val failed = new mutable.ArrayBuffer[SQLExecutionUIData]()

sqlStore.executionsList().foreach { e =>
val isRunning = e.completionTime.isEmpty ||
e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING }
val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }
if (isRunning) {
running += e
} else if (isFailed) {
if (e.errorMessage.isDefined) {
failed += e
} else {
} else if (e.completionTime.nonEmpty) {
completed += e
} else {
running += e
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,10 @@ class SQLAppStatusListener(
}

private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
val SparkListenerSQLExecutionEnd(executionId, time) = event
val SparkListenerSQLExecutionEnd(executionId, time, errorMessage) = event
Option(liveExecutions.get(executionId)).foreach { exec =>
exec.completionTime = Some(new Date(time))
exec.errorMessage = errorMessage
update(exec)

// Aggregating metrics can be expensive for large queries, so do it asynchronously. The end
Expand Down Expand Up @@ -494,6 +495,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
var driverAccumUpdates = Seq[(Long, Long)]()

@volatile var metricsValues: Map[Long, String] = null
var errorMessage: Option[String] = None

// Just in case job end and execution end arrive out of order, keep track of how many
// end events arrived so that the listener can stop tracking the execution.
Expand All @@ -511,7 +513,8 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
completionTime,
jobs,
stages,
metricsValues)
metricsValues,
errorMessage)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class SQLExecutionUIData(
* from the SQL listener instance.
*/
@JsonDeserialize(keyAs = classOf[JLong])
val metricValues: Map[Long, String]) {
val metricValues: Map[Long, String],
val errorMessage: Option[String]) {

@JsonIgnore @KVIndex("completionTime")
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ case class SparkListenerSQLExecutionStart(
}

@DeveloperApi
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
case class SparkListenerSQLExecutionEnd(
executionId: Long,
time: Long,
errorMessage: Option[String] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I think we should use Some("") to indicate no error, as None may mean event logs produced by old version of Spark that don't have this error message field. @ulysses-you

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a followup: #38747

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this break binary compatibility? I think it should have overridden constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a public class. It generates JSON string where backward compatibility matters, and this PR added tests for it.

extends SparkListenerEvent {

// The name of the execution, e.g. `df.collect` will trigger a SQL execution with name "collect".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkFunSuite, SparkThrowableHelper}
import org.apache.spark.scheduler.SparkListenerEvent
import org.apache.spark.sql.LocalSparkSession
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
Expand Down Expand Up @@ -74,27 +74,60 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession {
test("SparkListenerSQLExecutionEnd backward compatibility") {
spark = new TestSparkSession()
val qe = spark.sql("select 1").queryExecution
val event = SparkListenerSQLExecutionEnd(1, 10)
val errorMessage = SparkThrowableHelper.getMessage(new Exception("test"))
val event = SparkListenerSQLExecutionEnd(1, 10, Some(errorMessage))
event.duration = 1000
event.executionName = Some("test")
event.qe = qe
event.executionFailure = Some(new RuntimeException("test"))
event.executionFailure = Some(new Exception("test"))
val json = JsonProtocol.sparkEventToJsonString(event)
// scalastyle:off line.size.limit
assert(parse(json) == parse(
"""
|{
| "Event" : "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd",
| "executionId" : 1,
| "time" : 10
| "time" : 10,
| "errorMessage" : "{\"errorClass\":\"java.lang.Exception\",\"messageParameters\":{\"message\":\"test\"}}"
|}
""".stripMargin))
// scalastyle:on
val readBack = JsonProtocol.sparkEventFromJson(json)
event.duration = 0
event.executionName = None
event.qe = null
event.executionFailure = None
assert(readBack == event)
}

test("SPARK-40834: Use SparkListenerSQLExecutionEnd to track final SQL status in UI") {
// parse old event log using new SparkListenerSQLExecutionEnd
val executionEnd =
"""
|{
| "Event" : "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd",
| "executionId" : 1,
| "time" : 10
|}
""".stripMargin
val readBack = JsonProtocol.sparkEventFromJson(executionEnd)
assert(readBack == SparkListenerSQLExecutionEnd(1, 10))

// parse new event using old SparkListenerSQLExecutionEnd
// scalastyle:off line.size.limit
val newExecutionEnd =
"""
|{
| "Event" : "org.apache.spark.sql.execution.OldVersionSQLExecutionEnd",
| "executionId" : 1,
| "time" : 10,
| "errorMessage" : "{\"errorClass\":\"java.lang.Exception\",\"messageParameters\":{\"message\":\"test\"}}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this \"messageParameters\":{\"message\":\"test\"} odd...
Shouldn't it just be an error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just follow the style of error json format, so we can keep consistent with SparkThrowable. We'd expect all java throwables are wrapped by SparkThrowable so we will never go into here.

|}
""".stripMargin
// scalastyle:on
val readBack2 = JsonProtocol.sparkEventFromJson(newExecutionEnd)
assert(readBack2 == OldVersionSQLExecutionEnd(1, 10))
}
}

private case class OldVersionSQLExecutionStart(
Expand All @@ -105,3 +138,6 @@ private case class OldVersionSQLExecutionStart(
sparkPlanInfo: SparkPlanInfo,
time: Long)
extends SparkListenerEvent

private case class OldVersionSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {
System.currentTimeMillis(),
Map.empty))
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
executionId, System.currentTimeMillis(), Some("Oops")))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Properties

import scala.collection.mutable.{ArrayBuffer, ListBuffer}

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -980,6 +981,30 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
}
}
}

test("SPARK-40834: Use SparkListenerSQLExecutionEnd to track final SQL status in UI") {
var received = false
spark.sparkContext.addSparkListener(new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case SparkListenerSQLExecutionEnd(_, _, Some(errorMessage)) =>
val error = new ObjectMapper().readTree(errorMessage)
assert(error.get("errorClass").toPrettyString === "\"java.lang.Exception\"")
assert(error.path("messageParameters").get("message").toPrettyString === "\"test\"")
received = true
case _ =>
}
}
})

intercept[Exception] {
SQLExecution.withNewExecutionId(spark.range(1).queryExecution) {
throw new Exception("test")
}
}
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
assert(received)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ object SqlResourceSuite {
0 -> JobExecutionStatus.SUCCEEDED,
1 -> JobExecutionStatus.SUCCEEDED),
stages = Set[Int](),
metricValues = getMetricValues()
metricValues = getMetricValues(),
errorMessage = None
)
}

Expand Down