1717
1818package org .apache .spark
1919
20+ import scala .{Option , deprecated }
21+
2022import org .apache .spark .util .collection .{AppendOnlyMap , ExternalAppendOnlyMap }
2123
2224/**
@@ -34,8 +36,12 @@ case class Aggregator[K, V, C] (
3436 private val sparkConf = SparkEnv .get.conf
3537 private val externalSorting = sparkConf.getBoolean(" spark.shuffle.spill" , true )
3638
39+ @ deprecated(" use combineValuesByKey with TaskContext argument" , " 0.9.0" )
40+ def combineValuesByKey (iter : Iterator [_ <: Product2 [K , V ]]): Iterator [(K , C )] =
41+ combineValuesByKey(iter, null )
42+
3743 def combineValuesByKey (iter : Iterator [_ <: Product2 [K , V ]],
38- context : TaskContext ) : Iterator [(K , C )] = {
44+ context : TaskContext ): Iterator [(K , C )] = {
3945 if (! externalSorting) {
4046 val combiners = new AppendOnlyMap [K ,C ]
4147 var kv : Product2 [K , V ] = null
@@ -53,12 +59,17 @@ case class Aggregator[K, V, C] (
5359 val (k, v) = iter.next()
5460 combiners.insert(k, v)
5561 }
56- context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
57- context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
62+ // TODO: Make this non optional in a future release
63+ Option (context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
64+ Option (context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
5865 combiners.iterator
5966 }
6067 }
6168
69+ @ deprecated(" use combineCombinersByKey with TaskContext argument" , " 0.9.0" )
70+ def combineCombinersByKey (iter : Iterator [(K , C )]) : Iterator [(K , C )] =
71+ combineCombinersByKey(iter, null )
72+
6273 def combineCombinersByKey (iter : Iterator [(K , C )], context : TaskContext ) : Iterator [(K , C )] = {
6374 if (! externalSorting) {
6475 val combiners = new AppendOnlyMap [K ,C ]
@@ -77,8 +88,9 @@ case class Aggregator[K, V, C] (
7788 val (k, c) = iter.next()
7889 combiners.insert(k, c)
7990 }
80- context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
81- context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
91+ // TODO: Make this non optional in a future release
92+ Option (context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
93+ Option (context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
8294 combiners.iterator
8395 }
8496 }
0 commit comments