Skip to content

Commit 747f538

Browse files
committed
Merge pull request alteryx#83 from ewencp/pyspark-accumulator-add-method
Add an add() method to pyspark accumulators. Add a regular method for adding a term to accumulators in pyspark. Currently if you have a non-global accumulator, adding to it is awkward. The += operator can't be used for non-global accumulators captured via closure because it's involves an assignment. The only way to do it is using __iadd__ directly. Adding this method lets you write code like this: def main(): sc = SparkContext() accum = sc.accumulator(0) rdd = sc.parallelize([1,2,3]) def f(x): accum.add(x) rdd.foreach(f) print accum.value where using accum += x instead would have caused UnboundLocalError exceptions in workers. Currently it would have to be written as accum.__iadd__(x).
2 parents 6511bbe + 7eaa56d commit 747f538

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

python/pyspark/accumulators.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@
4242
>>> a.value
4343
13
4444
45+
>>> b = sc.accumulator(0)
46+
>>> def g(x):
47+
... b.add(x)
48+
>>> rdd.foreach(g)
49+
>>> b.value
50+
6
51+
4552
>>> from pyspark.accumulators import AccumulatorParam
4653
>>> class VectorAccumulatorParam(AccumulatorParam):
4754
... def zero(self, value):
@@ -139,9 +146,13 @@ def value(self, value):
139146
raise Exception("Accumulator.value cannot be accessed inside tasks")
140147
self._value = value
141148

149+
def add(self, term):
150+
"""Adds a term to this accumulator's value"""
151+
self._value = self.accum_param.addInPlace(self._value, term)
152+
142153
def __iadd__(self, term):
143154
"""The += operator; adds a term to this accumulator's value"""
144-
self._value = self.accum_param.addInPlace(self._value, term)
155+
self.add(term)
145156
return self
146157

147158
def __str__(self):

0 commit comments

Comments
 (0)