Skip to content

Commit fb2dde0

Browse files
committed
1 parent 4af5c7e commit fb2dde0

File tree

16 files changed

+335
-31
lines changed

16 files changed

+335
-31
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.lang.reflect.Method
2021
import java.security.PrivilegedExceptionAction
2122

2223
import org.apache.hadoop.conf.Configuration
@@ -133,14 +134,9 @@ class SparkHadoopUtil extends Logging {
133134
*/
134135
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
135136
: Option[() => Long] = {
136-
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
137-
val scheme = qualifiedPath.toUri().getScheme()
138-
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
139137
try {
140-
val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
141-
val statisticsDataClass =
142-
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
143-
val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
138+
val threadStats = getFileSystemThreadStatistics(path, conf)
139+
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
144140
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
145141
val baselineBytesRead = f()
146142
Some(() => f() - baselineBytesRead)
@@ -151,6 +147,42 @@ class SparkHadoopUtil extends Logging {
151147
}
152148
}
153149
}
150+
151+
/**
152+
* Returns a function that can be called to find Hadoop FileSystem bytes written. If
153+
* getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
154+
* return the bytes written on r since t. Reflection is required because thread-level FileSystem
155+
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
156+
* Returns None if the required method can't be found.
157+
*/
158+
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
159+
: Option[() => Long] = {
160+
try {
161+
val threadStats = getFileSystemThreadStatistics(path, conf)
162+
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
163+
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
164+
val baselineBytesRead = f()
165+
Some(() => f() - baselineBytesRead)
166+
} catch {
167+
case e: NoSuchMethodException => {
168+
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
169+
None
170+
}
171+
}
172+
}
173+
174+
private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
175+
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
176+
val scheme = qualifiedPath.toUri().getScheme()
177+
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
178+
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
179+
}
180+
181+
private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
182+
val statisticsDataClass =
183+
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
184+
statisticsDataClass.getDeclaredMethod(methodName)
185+
}
154186
}
155187

156188
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ class TaskMetrics extends Serializable {
8282
*/
8383
var inputMetrics: Option[InputMetrics] = None
8484

85+
/**
86+
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
87+
* data was written are stored here.
88+
*/
89+
var outputMetrics: Option[OutputMetrics] = None
90+
8591
/**
8692
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
8793
* This includes read metrics aggregated over all the task's shuffle dependencies.
@@ -169,6 +175,18 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
169175
var bytesRead: Long = 0L
170176
}
171177

178+
/**
179+
* :: DeveloperApi ::
180+
* Metrics about writing output data.
181+
*/
182+
@DeveloperApi
183+
case class OutputMetrics() {
184+
/**
185+
* Total bytes written
186+
*/
187+
var bytesWritten: Long = 0L
188+
}
189+
172190
/**
173191
* :: DeveloperApi ::
174192
* Metrics pertaining to shuffle data read in a given task.

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.reflect.ClassTag
2828

2929
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
3030
import org.apache.hadoop.conf.{Configurable, Configuration}
31-
import org.apache.hadoop.fs.FileSystem
31+
import org.apache.hadoop.fs.{Path, FileSystem}
3232
import org.apache.hadoop.io.SequenceFile.CompressionType
3333
import org.apache.hadoop.io.compress.CompressionCodec
3434
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
@@ -40,6 +40,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
4040
import org.apache.spark.SparkContext._
4141
import org.apache.spark.annotation.Experimental
4242
import org.apache.spark.deploy.SparkHadoopUtil
43+
import org.apache.spark.executor.OutputMetrics
4344
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4445
import org.apache.spark.serializer.Serializer
4546
import org.apache.spark.util.Utils
@@ -961,30 +962,52 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
961962
}
962963

963964
val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
965+
val config = wrappedConf.value
964966
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
965967
// around by taking a mod. We expect that no task will be attempted 2 billion times.
966968
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
967969
/* "reduce task" <split #> <attempt # = spark task #> */
968970
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
969971
attemptNumber)
970-
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
972+
val hadoopContext = newTaskAttemptContext(config, attemptId)
971973
val format = outfmt.newInstance
972974
format match {
973-
case c: Configurable => c.setConf(wrappedConf.value)
975+
case c: Configurable => c.setConf(config)
974976
case _ => ()
975977
}
976978
val committer = format.getOutputCommitter(hadoopContext)
977979
committer.setupTask(hadoopContext)
980+
981+
val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
982+
.map(new Path(_))
983+
.flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
984+
val outputMetrics = new OutputMetrics()
985+
if (bytesWrittenCallback.isDefined) {
986+
context.taskMetrics.outputMetrics = Some(outputMetrics)
987+
}
988+
978989
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
979990
try {
991+
var recordsSinceMetricsUpdate = 0
980992
while (iter.hasNext) {
981993
val pair = iter.next()
982994
writer.write(pair._1, pair._2)
995+
996+
// Update bytes written metric every few records
997+
if (recordsSinceMetricsUpdate ==
998+
PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES
999+
&& bytesWrittenCallback.isDefined) {
1000+
recordsSinceMetricsUpdate = 0
1001+
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1002+
} else {
1003+
recordsSinceMetricsUpdate += 1
1004+
}
9831005
}
9841006
} finally {
9851007
writer.close(hadoopContext)
9861008
}
9871009
committer.commitTask(hadoopContext)
1010+
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
9881011
1
9891012
} : Int
9901013

@@ -1005,6 +1028,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10051028
def saveAsHadoopDataset(conf: JobConf) {
10061029
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
10071030
val hadoopConf = conf
1031+
val wrappedConf = new SerializableWritable(hadoopConf)
10081032
val outputFormatInstance = hadoopConf.getOutputFormat
10091033
val keyClass = hadoopConf.getOutputKeyClass
10101034
val valueClass = hadoopConf.getOutputValueClass
@@ -1032,21 +1056,42 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10321056
writer.preSetup()
10331057

10341058
val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
1059+
val config = wrappedConf.value
10351060
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
10361061
// around by taking a mod. We expect that no task will be attempted 2 billion times.
10371062
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
10381063

1064+
val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
1065+
.map(new Path(_))
1066+
.flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
1067+
val outputMetrics = new OutputMetrics()
1068+
if (bytesWrittenCallback.isDefined) {
1069+
context.taskMetrics.outputMetrics = Some(outputMetrics)
1070+
}
1071+
10391072
writer.setup(context.stageId, context.partitionId, attemptNumber)
10401073
writer.open()
10411074
try {
1075+
var recordsSinceMetricsUpdate = 0
10421076
while (iter.hasNext) {
10431077
val record = iter.next()
10441078
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
1079+
1080+
// Update bytes written metric every few records
1081+
if (recordsSinceMetricsUpdate ==
1082+
PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES
1083+
&& bytesWrittenCallback.isDefined) {
1084+
recordsSinceMetricsUpdate = 0
1085+
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1086+
} else {
1087+
recordsSinceMetricsUpdate += 1
1088+
}
10451089
}
10461090
} finally {
10471091
writer.close()
10481092
}
10491093
writer.commit()
1094+
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
10501095
}
10511096

10521097
self.context.runJob(self, writeToFile)
@@ -1069,3 +1114,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10691114

10701115
private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
10711116
}
1117+
1118+
private[spark] object PairRDDFunctions {
1119+
val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
1120+
}

core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
158158
" INPUT_BYTES=" + metrics.bytesRead
159159
case None => ""
160160
}
161+
val outputMetrics = taskMetrics.outputMetrics match {
162+
case Some(metrics) =>
163+
" OUTPUT_BYTES=" + metrics.bytesWritten
164+
case None => ""
165+
}
161166
val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
162167
case Some(metrics) =>
163168
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
@@ -173,7 +178,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
173178
" SHUFFLE_WRITE_TIME=" + metrics.shuffleWriteTime
174179
case None => ""
175180
}
176-
stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics +
181+
stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics + outputMetrics +
177182
shuffleReadMetrics + writeMetrics)
178183
}
179184

core/src/main/scala/org/apache/spark/ui/ToolTips.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ private[spark] object ToolTips {
2929

3030
val INPUT = "Bytes read from Hadoop or from Spark storage."
3131

32+
val OUTPUT = "Bytes written to Hadoop."
33+
3234
val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
3335

3436
val SHUFFLE_READ =

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
4848
val executorToTasksFailed = HashMap[String, Int]()
4949
val executorToDuration = HashMap[String, Long]()
5050
val executorToInputBytes = HashMap[String, Long]()
51+
val executorToOutputBytes = HashMap[String, Long]()
5152
val executorToShuffleRead = HashMap[String, Long]()
5253
val executorToShuffleWrite = HashMap[String, Long]()
5354

@@ -78,6 +79,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
7879
executorToInputBytes(eid) =
7980
executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
8081
}
82+
metrics.outputMetrics.foreach { outputMetrics =>
83+
executorToOutputBytes(eid) =
84+
executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten
85+
}
8186
metrics.shuffleReadMetrics.foreach { shuffleRead =>
8287
executorToShuffleRead(eid) =
8388
executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead

core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
4545
<th>Failed Tasks</th>
4646
<th>Succeeded Tasks</th>
4747
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
48+
<th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
4849
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
4950
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle Write</span></th>
5051
<th>Shuffle Spill (Memory)</th>
@@ -77,6 +78,8 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
7778
<td>{v.succeededTasks}</td>
7879
<td sorttable_customkey={v.inputBytes.toString}>
7980
{Utils.bytesToString(v.inputBytes)}</td>
81+
<td sorttable_customkey={v.outputBytes.toString}>
82+
{Utils.bytesToString(v.outputBytes)}</td>
8083
<td sorttable_customkey={v.shuffleRead.toString}>
8184
{Utils.bytesToString(v.shuffleRead)}</td>
8285
<td sorttable_customkey={v.shuffleWrite.toString}>

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
259259
stageData.inputBytes += inputBytesDelta
260260
execSummary.inputBytes += inputBytesDelta
261261

262+
val outputBytesDelta =
263+
(taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L)
264+
- oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L))
265+
stageData.outputBytes += outputBytesDelta
266+
execSummary.outputBytes += outputBytesDelta
267+
262268
val diskSpillDelta =
263269
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
264270
stageData.diskBytesSpilled += diskSpillDelta

0 commit comments

Comments
 (0)