@@ -57,6 +57,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
5757 with SparkHadoopMapReduceUtil
5858 with Serializable
5959{
60+ import PairRDDFunctions ._
61+
6062 /**
6163 * Generic function to combine the elements for each key using a custom set of aggregation
6264 * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
@@ -982,20 +984,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
982984
983985 val writer = format.getRecordWriter(hadoopContext).asInstanceOf [NewRecordWriter [K ,V ]]
984986 try {
985- var recordsSinceMetricsUpdate = 0
987+ var recordsWritten = 0L
986988 while (iter.hasNext) {
987989 val pair = iter.next()
988990 writer.write(pair._1, pair._2)
989991
990992 // Update bytes written metric every few records
991- if (recordsSinceMetricsUpdate ==
992- PairRDDFunctions .RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES
993- && bytesWrittenCallback.isDefined) {
994- recordsSinceMetricsUpdate = 0
995- bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
996- } else {
997- recordsSinceMetricsUpdate += 1
998- }
993+ maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
994+ recordsWritten += 1
999995 }
1000996 } finally {
1001997 writer.close(hadoopContext)
@@ -1060,20 +1056,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10601056 writer.setup(context.stageId, context.partitionId, attemptNumber)
10611057 writer.open()
10621058 try {
1063- var recordsSinceMetricsUpdate = 0
1059+ var recordsWritten = 0L
10641060 while (iter.hasNext) {
10651061 val record = iter.next()
10661062 writer.write(record._1.asInstanceOf [AnyRef ], record._2.asInstanceOf [AnyRef ])
10671063
10681064 // Update bytes written metric every few records
1069- if (recordsSinceMetricsUpdate ==
1070- PairRDDFunctions .RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES
1071- && bytesWrittenCallback.isDefined) {
1072- recordsSinceMetricsUpdate = 0
1073- bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1074- } else {
1075- recordsSinceMetricsUpdate += 1
1076- }
1065+ maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
1066+ recordsWritten += 1
10771067 }
10781068 } finally {
10791069 writer.close()
@@ -1098,19 +1088,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10981088 (outputMetrics, bytesWrittenCallback)
10991089 }
11001090
1101- /*
1102- private def maybeUpdateOutputMetrics(recordsWritten: Long) {
1103- // Update bytes written metric every few records
1104- if (recordsSinceMetricsUpdate ==
1105- PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES
1106- && bytesWrittenCallback.isDefined) {
1107- recordsSinceMetricsUpdate = 0
1091+ private def maybeUpdateOutputMetrics (bytesWrittenCallback : Option [() => Long ],
1092+ outputMetrics : OutputMetrics , recordsWritten : Long ): Unit = {
1093+ if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
1094+ && bytesWrittenCallback.isDefined) {
11081095 bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1109- } else {
1110- recordsSinceMetricsUpdate += 1
11111096 }
1112-
1113- }*/
1097+ }
11141098
11151099 /**
11161100 * Return an RDD with the keys of each tuple.
0 commit comments