Skip to content

Commit f3f1e14

Browse files
zsxwingcloud-fan
authored andcommitted
[SPARK-23326][WEBUI] schedulerDelay should return 0 when the task is running
## What changes were proposed in this pull request? When a task is still running, metrics like executorRunTime are not available. Then `schedulerDelay` will be almost the same as `duration` and that's confusing. This PR makes `schedulerDelay` return 0 when the task is running which is the same behavior as 2.2. ## How was this patch tested? `AppStatusUtilsSuite.schedulerDelay` Author: Shixiong Zhu <[email protected]> Closes #20493 from zsxwing/SPARK-23326.
1 parent c2766b0 commit f3f1e14

File tree

2 files changed

+98
-2
lines changed

2 files changed

+98
-2
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,23 @@
1717

1818
package org.apache.spark.status
1919

20-
import org.apache.spark.status.api.v1.{TaskData, TaskMetrics}
20+
import org.apache.spark.status.api.v1.TaskData
2121

2222
private[spark] object AppStatusUtils {
2323

24+
private val TASK_FINISHED_STATES = Set("FAILED", "KILLED", "SUCCESS")
25+
26+
private def isTaskFinished(task: TaskData): Boolean = {
27+
TASK_FINISHED_STATES.contains(task.status)
28+
}
29+
2430
def schedulerDelay(task: TaskData): Long = {
25-
if (task.taskMetrics.isDefined && task.duration.isDefined) {
31+
if (isTaskFinished(task) && task.taskMetrics.isDefined && task.duration.isDefined) {
2632
val m = task.taskMetrics.get
2733
schedulerDelay(task.launchTime.getTime(), fetchStart(task), task.duration.get,
2834
m.executorDeserializeTime, m.resultSerializationTime, m.executorRunTime)
2935
} else {
36+
// The task is still running and the metrics like executorRunTime are not available.
3037
0L
3138
}
3239
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.status
18+
19+
import java.util.Date
20+
21+
import org.apache.spark.SparkFunSuite
22+
import org.apache.spark.status.api.v1.{TaskData, TaskMetrics}
23+
24+
class AppStatusUtilsSuite extends SparkFunSuite {
25+
26+
test("schedulerDelay") {
27+
val runningTask = new TaskData(
28+
taskId = 0,
29+
index = 0,
30+
attempt = 0,
31+
launchTime = new Date(1L),
32+
resultFetchStart = None,
33+
duration = Some(100L),
34+
executorId = "1",
35+
host = "localhost",
36+
status = "RUNNING",
37+
taskLocality = "PROCESS_LOCAL",
38+
speculative = false,
39+
accumulatorUpdates = Nil,
40+
errorMessage = None,
41+
taskMetrics = Some(new TaskMetrics(
42+
executorDeserializeTime = 0L,
43+
executorDeserializeCpuTime = 0L,
44+
executorRunTime = 0L,
45+
executorCpuTime = 0L,
46+
resultSize = 0L,
47+
jvmGcTime = 0L,
48+
resultSerializationTime = 0L,
49+
memoryBytesSpilled = 0L,
50+
diskBytesSpilled = 0L,
51+
peakExecutionMemory = 0L,
52+
inputMetrics = null,
53+
outputMetrics = null,
54+
shuffleReadMetrics = null,
55+
shuffleWriteMetrics = null)))
56+
assert(AppStatusUtils.schedulerDelay(runningTask) === 0L)
57+
58+
val finishedTask = new TaskData(
59+
taskId = 0,
60+
index = 0,
61+
attempt = 0,
62+
launchTime = new Date(1L),
63+
resultFetchStart = None,
64+
duration = Some(100L),
65+
executorId = "1",
66+
host = "localhost",
67+
status = "SUCCESS",
68+
taskLocality = "PROCESS_LOCAL",
69+
speculative = false,
70+
accumulatorUpdates = Nil,
71+
errorMessage = None,
72+
taskMetrics = Some(new TaskMetrics(
73+
executorDeserializeTime = 5L,
74+
executorDeserializeCpuTime = 3L,
75+
executorRunTime = 90L,
76+
executorCpuTime = 10L,
77+
resultSize = 100L,
78+
jvmGcTime = 10L,
79+
resultSerializationTime = 2L,
80+
memoryBytesSpilled = 0L,
81+
diskBytesSpilled = 0L,
82+
peakExecutionMemory = 100L,
83+
inputMetrics = null,
84+
outputMetrics = null,
85+
shuffleReadMetrics = null,
86+
shuffleWriteMetrics = null)))
87+
assert(AppStatusUtils.schedulerDelay(finishedTask) === 3L)
88+
}
89+
}

0 commit comments

Comments
 (0)