Skip to content

Commit 5ca26dc

Browse files
committed
fix task metrics aggregation in local mode
1 parent f0f1ba0 commit 5ca26dc

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.scheduler._
2626
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
2727
import org.apache.spark.storage.BlockManagerId
2828
import org.apache.spark.ui.jobs.UIData._
29+
import org.apache.spark.util.Utils
2930

3031
/**
3132
* :: DeveloperApi ::
@@ -241,8 +242,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
241242
updateAggregateMetrics(stageData, executorMetricsUpdate.execId, taskMetrics,
242243
t.taskMetrics)
243244

244-
// Overwrite task metrics
245-
t.taskMetrics = Some(taskMetrics)
245+
// Overwrite task metrics with deepcopy
246+
// TODO: only serialize it in local mode
247+
t.taskMetrics = Some(Utils.deserialize[TaskMetrics](Utils.serialize(taskMetrics)))
246248
}
247249
}
248250
}

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
144144
val taskType = Utils.getFormattedClassName(new ShuffleMapTask(0))
145145
val execId = "exe-1"
146146

147-
def makeTaskMetrics(base: Int) = {
148-
val taskMetrics = new TaskMetrics()
147+
def updateTaskMetrics(taskMetrics: TaskMetrics, base: Int) = {
149148
val shuffleReadMetrics = new ShuffleReadMetrics()
150149
val shuffleWriteMetrics = new ShuffleWriteMetrics()
151150
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
@@ -174,10 +173,16 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
174173
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L)))
175174
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
176175

176+
val metrics4 = new TaskMetrics
177+
val metrics5 = new TaskMetrics
178+
val metrics6 = new TaskMetrics
179+
val metrics7 = new TaskMetrics
177180
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
178-
(1234L, 0, 0, makeTaskMetrics(0)),
179-
(1235L, 0, 0, makeTaskMetrics(100)),
180-
(1236L, 1, 0, makeTaskMetrics(200)))))
181+
(1234L, 0, 0, updateTaskMetrics(metrics4, 0)))))
182+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
183+
(1235L, 0, 0, updateTaskMetrics(metrics5, 100)))))
184+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
185+
(1236L, 1, 0, updateTaskMetrics(metrics6, 200)))))
181186

182187
var stage0Data = listener.stageIdToData.get((0, 0)).get
183188
var stage1Data = listener.stageIdToData.get((1, 0)).get
@@ -202,10 +207,10 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
202207

203208
// task that was included in a heartbeat
204209
listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1),
205-
makeTaskMetrics(300)))
210+
updateTaskMetrics(metrics4, 300)))
206211
// task that wasn't included in a heartbeat
207212
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1),
208-
makeTaskMetrics(400)))
213+
updateTaskMetrics(metrics7, 400)))
209214

210215
stage0Data = listener.stageIdToData.get((0, 0)).get
211216
stage1Data = listener.stageIdToData.get((1, 0)).get

0 commit comments

Comments
 (0)