Skip to content

Commit 3f6c512

Browse files
author
Ilya Ganelin
committed
Revert "Completed refactoring to make vars in TaskMetrics class private"
This reverts commit 5525c20.
1 parent 58034fb commit 3f6c512

File tree

7 files changed

+37
-62
lines changed

7 files changed

+37
-62
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,10 @@ private[spark] class Executor(
194194
val afterSerialization = System.currentTimeMillis()
195195

196196
for (m <- task.metrics) {
197-
m.incExecutorDeserializeTime(taskStart - deserializeStartTime)
198-
m.incExecutorRunTime(taskFinish - taskStart)
199-
m.incJvmGCTime(gcTime - startGCTime)
200-
m.incResultSerializationTime(afterSerialization - beforeSerialization)
197+
m.executorDeserializeTime = taskStart - deserializeStartTime
198+
m.executorRunTime = taskFinish - taskStart
199+
m.jvmGCTime = gcTime - startGCTime
200+
m.resultSerializationTime = afterSerialization - beforeSerialization
201201
}
202202

203203
val accumUpdates = Accumulators.values
@@ -248,8 +248,8 @@ private[spark] class Executor(
248248
val serviceTime = System.currentTimeMillis() - taskStart
249249
val metrics = attemptedTask.flatMap(t => t.metrics)
250250
for (m <- metrics) {
251-
m.incExecutorRunTime(serviceTime)
252-
m.incJvmGCTime(gcTime - startGCTime)
251+
m.executorRunTime = serviceTime
252+
m.jvmGCTime = gcTime - startGCTime
253253
}
254254
val reason = new ExceptionFailure(t, metrics)
255255
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -39,67 +39,42 @@ class TaskMetrics extends Serializable {
3939
/**
4040
* Host's name the task runs on
4141
*/
42-
private var _hostname: String = _
43-
def hostname = _hostname
44-
def setHostname(value : String) = _hostname = value
45-
42+
var hostname: String = _
43+
4644
/**
4745
* Time taken on the executor to deserialize this task
4846
*/
49-
private var _executorDeserializeTime: Long = _
50-
def executorDeserializeTime = _executorDeserializeTime
51-
def incExecutorDeserializeTime(value: Long) = _executorDeserializeTime += value
52-
def decExecutorDeserializeTime(value: Long) = _executorDeserializeTime -= value
53-
54-
47+
var executorDeserializeTime: Long = _
48+
5549
/**
5650
* Time the executor spends actually running the task (including fetching shuffle data)
5751
*/
58-
private var _executorRunTime: Long = _
59-
def executorRunTime = _executorRunTime
60-
def incExecutorRunTime(value: Long) = _executorRunTime += value
61-
def decExecutorRunTime(value: Long) = _executorRunTime -= value
62-
52+
var executorRunTime: Long = _
53+
6354
/**
6455
* The number of bytes this task transmitted back to the driver as the TaskResult
6556
*/
66-
private var _resultSize: Long = _
67-
def resultSize = _resultSize
68-
def incResultSize(value: Long) = _resultSize += value
69-
def decResultSize(value: Long) = _resultSize -= value
70-
57+
var resultSize: Long = _
7158

7259
/**
7360
* Amount of time the JVM spent in garbage collection while executing this task
7461
*/
75-
private var _jvmGCTime: Long = _
76-
def jvmGCTime = _jvmGCTime
77-
def incJvmGCTime(value: Long) = _jvmGCTime += value
78-
def decJvmGCTime(value: Long) = _jvmGCTime -= value
62+
var jvmGCTime: Long = _
7963

8064
/**
8165
* Amount of time spent serializing the task result
8266
*/
83-
private var _resultSerializationTime: Long = _
84-
def resultSerializationTime = _resultSerializationTime
85-
def incResultSerializationTime(value: Long) = _resultSerializationTime += value
86-
def decResultSerializationTime(value: Long) = _resultSerializationTime -= value
67+
var resultSerializationTime: Long = _
8768

8869
/**
8970
* The number of in-memory bytes spilled by this task
9071
*/
91-
private var _memoryBytesSpilled: Long = _
92-
def memoryBytesSpilled = _memoryBytesSpilled
93-
def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value
94-
def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value
72+
var memoryBytesSpilled: Long = _
9573

9674
/**
9775
* The number of on-disk bytes spilled by this task
9876
*/
99-
private var _diskBytesSpilled: Long = _
100-
def diskBytesSpilled = _diskBytesSpilled
101-
def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value
102-
def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value
77+
var diskBytesSpilled: Long = _
10378

10479
/**
10580
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
4747
final def run(attemptId: Long): T = {
4848
context = new TaskContextImpl(stageId, partitionId, attemptId, runningLocally = false)
4949
TaskContextHelper.setTaskContext(context)
50-
context.taskMetrics.setHostname(Utils.localHostName())
50+
context.taskMetrics.hostname = Utils.localHostName()
5151
taskThread = Thread.currentThread()
5252
if (_killed) {
5353
kill(interruptThread = false)

core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
7777
(deserializedResult, size)
7878
}
7979

80-
result.metrics.incResultSize(size)
80+
result.metrics.resultSize = size
8181
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
8282
} catch {
8383
case cnf: ClassNotFoundException =>

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -592,14 +592,14 @@ private[spark] object JsonProtocol {
592592
return TaskMetrics.empty
593593
}
594594
val metrics = new TaskMetrics
595-
metrics.setHostname((json \ "Host Name").extract[String])
596-
metrics.incExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
597-
metrics.incExecutorRunTime((json \ "Executor Run Time").extract[Long])
598-
metrics.incResultSize((json \ "Result Size").extract[Long])
599-
metrics.incJvmGCTime((json \ "JVM GC Time").extract[Long])
600-
metrics.incResultSerializationTime((json \ "Result Serialization Time").extract[Long])
601-
metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long])
602-
metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
595+
metrics.hostname = (json \ "Host Name").extract[String]
596+
metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
597+
metrics.executorRunTime = (json \ "Executor Run Time").extract[Long]
598+
metrics.resultSize = (json \ "Result Size").extract[Long]
599+
metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long]
600+
metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
601+
metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
602+
metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
603603
metrics.setShuffleReadMetrics(
604604
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
605605
metrics.shuffleWriteMetrics =

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
227227
shuffleReadMetrics.incRemoteBytesRead(base + 1)
228228
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
229229
shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
230-
taskMetrics.incExecutorRunTime(base + 4)
231-
taskMetrics.incDiskBytesSpilled(base + 5)
232-
taskMetrics.incMemoryBytesSpilled(base + 6)
230+
taskMetrics.executorRunTime = base + 4
231+
taskMetrics.diskBytesSpilled = base + 5
232+
taskMetrics.memoryBytesSpilled = base + 6
233233
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
234234
taskMetrics.inputMetrics = Some(inputMetrics)
235235
inputMetrics.incBytesRead(base + 7)

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -599,13 +599,13 @@ class JsonProtocolSuite extends FunSuite {
599599
hasHadoopInput: Boolean,
600600
hasOutput: Boolean) = {
601601
val t = new TaskMetrics
602-
t.setHostname("localhost")
603-
t.incExecutorDeserializeTime(a)
604-
t.incExecutorRunTime(b)
605-
t.incResultSize(c)
606-
t.incJvmGCTime(d)
607-
t.incResultSerializationTime(a + b)
608-
t.incMemoryBytesSpilled(a + c)
602+
t.hostname = "localhost"
603+
t.executorDeserializeTime = a
604+
t.executorRunTime = b
605+
t.resultSize = c
606+
t.jvmGCTime = d
607+
t.resultSerializationTime = a + b
608+
t.memoryBytesSpilled = a + c
609609

610610
if (hasHadoopInput) {
611611
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)

0 commit comments

Comments
 (0)