Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ object SQLExecution {
SparkThrowableHelper.getMessage(e)
}
val event = SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis(), errorMessage)
executionId,
System.currentTimeMillis(),
// Use empty string to indicate no error, as None may mean events generated by old
// versions of Spark.
errorMessage.orElse(Some("")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add docs at SparkListenerSQLExecutionEnd, Some("") is not an error. Otherwise developers may confuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

// Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name`
// parameter. The `ExecutionListenerManager` only watches SQL executions with name. We
// can specify the execution name in more places in the future, so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,24 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L

sqlStore.executionsList().foreach { e =>
if (e.errorMessage.isDefined) {
failed += e
} else if (e.completionTime.nonEmpty) {
completed += e
} else {
if (e.errorMessage.get.isEmpty) {
completed += e
} else {
failed += e
}
} else if (e.completionTime.isEmpty) {
running += e
} else {
// When `completionTime` is present, it means the query execution is completed and
// `errorMessage` should be present as well. However, events generated by old versions of
// Spark do not have the `errorMessage` field. We have to check the status of this query
// execution's jobs.
val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }
if (isFailed) {
failed += e
} else {
completed += e
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,13 +489,13 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
var metrics = Seq[SQLPlanMetric]()
var submissionTime = -1L
var completionTime: Option[Date] = None
var errorMessage: Option[String] = None

var jobs = Map[Int, JobExecutionStatus]()
var stages = Set[Int]()
var driverAccumUpdates = Seq[(Long, Long)]()

@volatile var metricsValues: Map[Long, String] = null
var errorMessage: Option[String] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move errorMessage closer to completionTime, as they both indicate the end of the execution.


// Just in case job end and execution end arrive out of order, keep track of how many
// end events arrived so that the listener can stop tracking the execution.
Expand All @@ -511,10 +511,10 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
metrics,
submissionTime,
completionTime,
errorMessage,
jobs,
stages,
metricsValues,
errorMessage)
metricsValues)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class SQLExecutionUIData(
val metrics: Seq[SQLPlanMetric],
val submissionTime: Long,
val completionTime: Option[Date],
val errorMessage: Option[String],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@JsonDeserialize(keyAs = classOf[Integer])
val jobs: Map[Int, JobExecutionStatus],
@JsonDeserialize(contentAs = classOf[Integer])
Expand All @@ -100,8 +101,7 @@ class SQLExecutionUIData(
* from the SQL listener instance.
*/
@JsonDeserialize(keyAs = classOf[JLong])
val metricValues: Map[Long, String],
val errorMessage: Option[String]) {
val metricValues: Map[Long, String]) {

@JsonIgnore @KVIndex("completionTime")
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ case class SparkListenerSQLExecutionStart(
case class SparkListenerSQLExecutionEnd(
executionId: Long,
time: Long,
// For backward compatibility, the `errorMessage` will be None when we parse event logs
// generated by old versions of Spark. It should always be Some in Spark 3.4+ and empty string
// means there is no error during execution.
errorMessage: Option[String] = None)
extends SparkListenerEvent {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.xml.Node
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd, SparkListenerJobStart}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
Expand All @@ -35,6 +36,11 @@ import org.apache.spark.util.kvstore.InMemoryStore

class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {

override def sparkConf: SparkConf = {
// Disable async kv store write in the UI, to make tests more stable here.
super.sparkConf.set(org.apache.spark.internal.config.Status.ASYNC_TRACKING_ENABLED, false)
}

import testImplicits._

var kvstore: ElementTrackingStore = _
Expand All @@ -60,6 +66,42 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {
assert(!html.contains("1970/01/01"))
}

test("SPARK-40834: prioritize `errorMessage` over job failures") {
val statusStore = createStatusStore
val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
when(tab.sqlStore).thenReturn(statusStore)

val request = mock(classOf[HttpServletRequest])
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)

Seq(Some(""), Some("testErrorMsg"), None).foreach { msg =>
val listener = statusStore.listener.get
val page = new AllExecutionsPage(tab)
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
0,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(0)))
listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobFailed(new RuntimeException("Oops"))))
listener.onOtherEvent(SparkListenerSQLExecutionEnd(0, System.currentTimeMillis(), msg))
val html = page.render(request).toString().toLowerCase(Locale.ROOT)

assert(html.contains("failed queries") == !msg.contains(""))
}
}

test("sorting should be successful") {
val statusStore = createStatusStore
val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
Expand Down