1818package org .apache .spark
1919
2020import java .io .{ObjectInputStream , Serializable }
21+ import java .util .concurrent .atomic .AtomicLong
2122
2223import scala .collection .generic .Growable
2324import scala .collection .mutable .Map
@@ -228,6 +229,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
228229 */
229230class Accumulator [T ](@ transient initialValue : T , param : AccumulatorParam [T ], name : Option [String ])
230231 extends Accumulable [T ,T ](initialValue, param, name) {
232+
231233 def this (initialValue : T , param : AccumulatorParam [T ]) = this (initialValue, param, None )
232234}
233235
@@ -244,6 +246,36 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
244246 }
245247}
246248
249+ object AccumulatorParam {
250+
251+ // The following implicit objects were in SparkContext before 1.2 and users had to
252+ // `import SparkContext._` to enable them. Now we move them here to make the compiler find
253+ // them automatically. However, as there are duplicate codes in SparkContext for backward
254+ // compatibility, please update them accordingly if you modify the following implicit objects.
255+
256+ implicit object DoubleAccumulatorParam extends AccumulatorParam [Double ] {
257+ def addInPlace (t1 : Double , t2 : Double ): Double = t1 + t2
258+ def zero (initialValue : Double ) = 0.0
259+ }
260+
261+ implicit object IntAccumulatorParam extends AccumulatorParam [Int ] {
262+ def addInPlace (t1 : Int , t2 : Int ): Int = t1 + t2
263+ def zero (initialValue : Int ) = 0
264+ }
265+
266+ implicit object LongAccumulatorParam extends AccumulatorParam [Long ] {
267+ def addInPlace (t1 : Long , t2 : Long ) = t1 + t2
268+ def zero (initialValue : Long ) = 0L
269+ }
270+
271+ implicit object FloatAccumulatorParam extends AccumulatorParam [Float ] {
272+ def addInPlace (t1 : Float , t2 : Float ) = t1 + t2
273+ def zero (initialValue : Float ) = 0f
274+ }
275+
276+ // TODO: Add AccumulatorParams for other types, e.g. lists and strings
277+ }
278+
247279// TODO: The multi-thread support in accumulators is kind of lame; check
248280// if there's a more intuitive way of doing it right
249281private object Accumulators {
@@ -252,7 +284,7 @@ private object Accumulators {
252284 val localAccums = Map [Thread , Map [Long , Accumulable [_, _]]]()
253285 var lastId : Long = 0
254286
255- def newId : Long = synchronized {
287+ def newId () : Long = synchronized {
256288 lastId += 1
257289 lastId
258290 }
0 commit comments