Skip to content

Commit 2d47bd3

Browse files
committed
Fixed to record job submission time and completion time collectly
1 parent 14e3f11 commit 2d47bd3

File tree

7 files changed

+42
-22
lines changed

7 files changed

+42
-22
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ class DAGScheduler(
661661
// completion events or stage abort
662662
stageIdToStage -= s.id
663663
jobIdToStageIds -= job.jobId
664-
listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult))
664+
listenerBus.post(SparkListenerJobEnd(job.jobId, Some(clock.getTime()), jobResult))
665665
}
666666
}
667667

@@ -710,7 +710,7 @@ class DAGScheduler(
710710
stage.latestInfo.stageFailed(stageFailedMessage)
711711
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
712712
}
713-
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
713+
listenerBus.post(SparkListenerJobEnd(job.jobId, Some(clock.getTime()), JobFailed(error)))
714714
}
715715
}
716716

@@ -749,17 +749,20 @@ class DAGScheduler(
749749
logInfo("Missing parents: " + getMissingParentStages(finalStage))
750750
val shouldRunLocally =
751751
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
752+
val jobSubmissionTime = Some(clock.getTime())
752753
if (shouldRunLocally) {
753754
// Compute very short actions like first() or take() with no parent stages locally.
754-
listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties))
755+
listenerBus.post(
756+
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
755757
runLocally(job)
756758
} else {
757759
jobIdToActiveJob(jobId) = job
758760
activeJobs += job
759761
finalStage.resultOfJob = Some(job)
760762
val stageIds = jobIdToStageIds(jobId).toArray
761763
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
762-
listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
764+
listenerBus.post(
765+
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
763766
submitStage(finalStage)
764767
}
765768
}
@@ -965,7 +968,8 @@ class DAGScheduler(
965968
if (job.numFinished == job.numPartitions) {
966969
markStageAsFinished(stage)
967970
cleanupStateForJobAndIndependentStages(job)
968-
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
971+
listenerBus.post(
972+
SparkListenerJobEnd(job.jobId, Some(clock.getTime()), JobSucceeded))
969973
}
970974

971975
// taskSucceeded runs some user code that might throw an exception. Make sure
@@ -1234,7 +1238,7 @@ class DAGScheduler(
12341238
if (ableToCancelStages) {
12351239
job.listener.jobFailed(error)
12361240
cleanupStateForJobAndIndependentStages(job)
1237-
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
1241+
listenerBus.post(SparkListenerJobEnd(job.jobId, Some(clock.getTime()), JobFailed(error)))
12381242
}
12391243
}
12401244

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ case class SparkListenerTaskEnd(
5858
@DeveloperApi
5959
case class SparkListenerJobStart(
6060
jobId: Int,
61+
time: Option[Long],
6162
stageInfos: Seq[StageInfo],
6263
properties: Properties = null)
6364
extends SparkListenerEvent {
@@ -67,7 +68,11 @@ case class SparkListenerJobStart(
6768
}
6869

6970
@DeveloperApi
70-
case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent
71+
case class SparkListenerJobEnd(
72+
jobId: Int,
73+
time: Option[Long],
74+
jobResult: JobResult)
75+
extends SparkListenerEvent
7176

7277
@DeveloperApi
7378
case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
153153
val jobData: JobUIData =
154154
new JobUIData(
155155
jobId = jobStart.jobId,
156-
startTime = Some(System.currentTimeMillis),
156+
startTime = jobStart.time,
157157
endTime = None,
158158
stageIds = jobStart.stageIds,
159159
jobGroup = jobGroup,
160160
status = JobExecutionStatus.RUNNING)
161161
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
162162
// This may be an underestimate because the job start event references all of the result
163-
// stages's transitive stage dependencies, but some of these stages might be skipped if their
163+
// stages' transitive stage dependencies, but some of these stages might be skipped if their
164164
// output is available from earlier runs.
165165
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
166166
jobData.numTasks = {
@@ -186,7 +186,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
186186
logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
187187
new JobUIData(jobId = jobEnd.jobId)
188188
}
189-
jobData.endTime = Some(System.currentTimeMillis())
189+
jobData.endTime = jobEnd.time
190190
jobEnd.jobResult match {
191191
case JobSucceeded =>
192192
completedJobs += jobData

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ private[spark] object JsonProtocol {
136136
val properties = propertiesToJson(jobStart.properties)
137137
("Event" -> Utils.getFormattedClassName(jobStart)) ~
138138
("Job ID" -> jobStart.jobId) ~
139+
("Submission Time" -> jobStart.time) ~
139140
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0
140141
("Stage IDs" -> jobStart.stageIds) ~
141142
("Properties" -> properties)
@@ -145,6 +146,7 @@ private[spark] object JsonProtocol {
145146
val jobResult = jobResultToJson(jobEnd.jobResult)
146147
("Event" -> Utils.getFormattedClassName(jobEnd)) ~
147148
("Job ID" -> jobEnd.jobId) ~
149+
("Completaion Time" -> jobEnd.time) ~
148150
("Job Result" -> jobResult)
149151
}
150152

@@ -469,20 +471,22 @@ private[spark] object JsonProtocol {
469471

470472
def jobStartFromJson(json: JValue): SparkListenerJobStart = {
471473
val jobId = (json \ "Job ID").extract[Int]
474+
val submissionTime = (json \ "Submission Time").extractOpt[Long]
472475
val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
473476
val properties = propertiesFromJson(json \ "Properties")
474477
// The "Stage Infos" field was added in Spark 1.2.0
475478
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
476479
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
477480
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
478481
}
479-
SparkListenerJobStart(jobId, stageInfos, properties)
482+
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
480483
}
481484

482485
def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
483486
val jobId = (json \ "Job ID").extract[Int]
487+
val completionTime = (json \ "Completaion Time").extractOpt[Long]
484488
val jobResult = jobResultFromJson(json \ "Job Result")
485-
SparkListenerJobEnd(jobId, jobResult)
489+
SparkListenerJobEnd(jobId, completionTime, jobResult)
486490
}
487491

488492
def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
3434
/** Length of time to wait while draining listener events. */
3535
val WAIT_TIMEOUT_MILLIS = 10000
3636

37+
val jobCompletionTime = Option(1421191296660L)
38+
3739
before {
3840
sc = new SparkContext("local", "SparkListenerSuite")
3941
}
@@ -44,7 +46,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
4446
bus.addListener(counter)
4547

4648
// Listener bus hasn't started yet, so posting events should not increment counter
47-
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
49+
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
4850
assert(counter.count === 0)
4951

5052
// Starting listener bus should flush all buffered events
@@ -54,7 +56,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
5456

5557
// After listener bus has stopped, posting events should not increment counter
5658
bus.stop()
57-
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
59+
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
5860
assert(counter.count === 5)
5961

6062
// Listener bus must not be started twice
@@ -99,7 +101,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
99101

100102
bus.addListener(blockingListener)
101103
bus.start()
102-
bus.post(SparkListenerJobEnd(0, JobSucceeded))
104+
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
103105

104106
listenerStarted.acquire()
105107
// Listener should be blocked after start
@@ -345,7 +347,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
345347
bus.start()
346348

347349
// Post events to all listeners, and wait until the queue is drained
348-
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
350+
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
349351
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
350352

351353
// The exception should be caught, and the event should be propagated to other listeners

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import org.apache.spark.util.Utils
2828

2929
class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers {
3030

31+
val jobSubmissionTime = Option(1421191042750L)
32+
val jobCompletionTime = Option(1421191296660L)
3133

3234
private def createStageStartEvent(stageId: Int) = {
3335
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
@@ -46,12 +48,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
4648
val stageInfos = stageIds.map { stageId =>
4749
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
4850
}
49-
SparkListenerJobStart(jobId, stageInfos)
51+
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
5052
}
5153

5254
private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
5355
val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded
54-
SparkListenerJobEnd(jobId, result)
56+
SparkListenerJobEnd(jobId, jobCompletionTime, result)
5557
}
5658

5759
private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) {

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ import org.apache.spark.storage._
3333

3434
class JsonProtocolSuite extends FunSuite {
3535

36+
val jobSubmissionTime = Option(1421191042750L)
37+
val jobCompletionTime = Option(1421191296660L)
38+
3639
test("SparkListenerEvent") {
3740
val stageSubmitted =
3841
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
@@ -53,9 +56,9 @@ class JsonProtocolSuite extends FunSuite {
5356
val stageIds = Seq[Int](1, 2, 3, 4)
5457
val stageInfos = stageIds.map(x =>
5558
makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
56-
SparkListenerJobStart(10, stageInfos, properties)
59+
SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
5760
}
58-
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
61+
val jobEnd = SparkListenerJobEnd(20, jobCompletionTime, JobSucceeded)
5962
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
6063
"JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")),
6164
"Spark Properties" -> Seq(("Job throughput", "80000 jobs/s, regardless of job type")),
@@ -240,10 +243,10 @@ class JsonProtocolSuite extends FunSuite {
240243
val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
241244
val dummyStageInfos =
242245
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
243-
val jobStart = SparkListenerJobStart(10, stageInfos, properties)
246+
val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
244247
val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
245248
val expectedJobStart =
246-
SparkListenerJobStart(10, dummyStageInfos, properties)
249+
SparkListenerJobStart(10, jobSubmissionTime, dummyStageInfos, properties)
247250
assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
248251
}
249252

0 commit comments

Comments
 (0)