Skip to content

Commit 6cff9c4

Browse files
committed
Review feedback
1 parent fb2dde0 commit 6cff9c4

File tree

5 files changed

+46
-19
lines changed

5 files changed

+46
-19
lines changed

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,16 @@ object DataReadMethod extends Enumeration with Serializable {
163163
val Memory, Disk, Hadoop, Network = Value
164164
}
165165

166+
/**
167+
* :: DeveloperApi ::
168+
* Method by which output data was written.
169+
*/
170+
@DeveloperApi
171+
object DataWriteMethod extends Enumeration with Serializable {
172+
type DataWriteMethod = Value
173+
val Hadoop = Value
174+
}
175+
166176
/**
167177
* :: DeveloperApi ::
168178
* Metrics about reading input data.
@@ -180,7 +190,7 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
180190
* Metrics about writing output data.
181191
*/
182192
@DeveloperApi
183-
case class OutputMetrics() {
193+
case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
184194
/**
185195
* Total bytes written
186196
*/

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
4040
import org.apache.spark.SparkContext._
4141
import org.apache.spark.annotation.Experimental
4242
import org.apache.spark.deploy.SparkHadoopUtil
43-
import org.apache.spark.executor.OutputMetrics
43+
import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
4444
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4545
import org.apache.spark.serializer.Serializer
4646
import org.apache.spark.util.Utils
@@ -978,13 +978,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
978978
val committer = format.getOutputCommitter(hadoopContext)
979979
committer.setupTask(hadoopContext)
980980

981-
val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
982-
.map(new Path(_))
983-
.flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
984-
val outputMetrics = new OutputMetrics()
985-
if (bytesWrittenCallback.isDefined) {
986-
context.taskMetrics.outputMetrics = Some(outputMetrics)
987-
}
981+
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
988982

989983
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
990984
try {
@@ -1061,13 +1055,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10611055
// around by taking a mod. We expect that no task will be attempted 2 billion times.
10621056
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
10631057

1064-
val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
1065-
.map(new Path(_))
1066-
.flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
1067-
val outputMetrics = new OutputMetrics()
1068-
if (bytesWrittenCallback.isDefined) {
1069-
context.taskMetrics.outputMetrics = Some(outputMetrics)
1070-
}
1058+
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
10711059

10721060
writer.setup(context.stageId, context.partitionId, attemptNumber)
10731061
writer.open()
@@ -1098,6 +1086,32 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10981086
writer.commitJob()
10991087
}
11001088

1089+
private def initHadoopOutputMetrics(context: TaskContext, config: Configuration)
1090+
: (OutputMetrics, Option[() => Long]) = {
1091+
val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
1092+
.map(new Path(_))
1093+
.flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
1094+
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
1095+
if (bytesWrittenCallback.isDefined) {
1096+
context.taskMetrics.outputMetrics = Some(outputMetrics)
1097+
}
1098+
(outputMetrics, bytesWrittenCallback)
1099+
}
1100+
1101+
/*
1102+
private def maybeUpdateOutputMetrics(recordsWritten: Long) {
1103+
// Update bytes written metric every few records
1104+
if (recordsSinceMetricsUpdate ==
1105+
PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES
1106+
&& bytesWrittenCallback.isDefined) {
1107+
recordsSinceMetricsUpdate = 0
1108+
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1109+
} else {
1110+
recordsSinceMetricsUpdate += 1
1111+
}
1112+
1113+
}*/
1114+
11011115
/**
11021116
* Return an RDD with the keys of each tuple.
11031117
*/

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ private[spark] object JsonProtocol {
272272
}
273273

274274
def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = {
275+
("Data Write Method" -> outputMetrics.writeMethod.toString) ~
275276
("Bytes Written" -> outputMetrics.bytesWritten)
276277
}
277278

@@ -621,7 +622,8 @@ private[spark] object JsonProtocol {
621622
}
622623

623624
def outputMetricsFromJson(json: JValue): OutputMetrics = {
624-
val metrics = new OutputMetrics()
625+
val metrics = new OutputMetrics(
626+
DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
625627
metrics.bytesWritten = (json \ "Bytes Written").extract[Long]
626628
metrics
627629
}

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
159159
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
160160
taskMetrics.inputMetrics = Some(inputMetrics)
161161
inputMetrics.bytesRead = base + 7
162-
val outputMetrics = new OutputMetrics()
162+
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
163163
taskMetrics.outputMetrics = Some(outputMetrics)
164164
outputMetrics.bytesWritten = base + 8
165165
taskMetrics

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ class JsonProtocolSuite extends FunSuite {
617617
t.setShuffleReadMetrics(Some(sr))
618618
}
619619
if (hasOutput) {
620-
val outputMetrics = new OutputMetrics()
620+
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
621621
outputMetrics.bytesWritten = a + b + c
622622
t.outputMetrics = Some(outputMetrics)
623623
} else {
@@ -1022,6 +1022,7 @@ class JsonProtocolSuite extends FunSuite {
10221022
| "Bytes Read": 2100
10231023
| },
10241024
| "Output Metrics": {
1025+
| "Data Write Method": "Hadoop",
10251026
| "Bytes Written": 1200
10261027
| },
10271028
| "Updated Blocks": [

0 commit comments

Comments
 (0)