Skip to content

Commit 96b047b

Browse files
committed
Add aggregate to python rdd
1 parent ca4bf8c commit 96b047b

File tree

1 file changed

+26
-2
lines changed

1 file changed

+26
-2
lines changed

python/pyspark/rdd.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ def _collect_iterator_through_file(self, iterator):
524524
def reduce(self, f):
525525
"""
526526
Reduces the elements of this RDD using the specified commutative and
527-
associative binary operator.
527+
associative binary operator. Currently reduces partitions locally.
528528
529529
>>> from operator import add
530530
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
@@ -566,8 +566,32 @@ def func(iterator):
566566
vals = self.mapPartitions(func).collect()
567567
return reduce(op, vals, zeroValue)
568568

569-
# TODO: aggregate
569+
def aggregate(self, zeroValue, seqOp, combOp):
570+
"""
571+
Aggregate the elements of each partition, and then the results for all
572+
the partitions, using a given combine functions and a neutral "zero
573+
value."
574+
575+
The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
576+
as its result value to avoid object allocation; however, it should not
577+
modify C{t2}.
578+
579+
The first function (seqOp) can return a different result type, U, than
580+
the type of this RDD. Thus, we need one operation for merging a T into an U
581+
and one operation for merging two U
570582
583+
>>> seqOp = (lambda x, y: (x[0]+y, x[1] + 1))
584+
>>> combOp = (lambda x, y: (x[0]+y[0], x[1] + y[1]))
585+
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
586+
(10, 4)
587+
"""
588+
def func(iterator):
589+
acc = zeroValue
590+
for obj in iterator:
591+
acc = seqOp(acc, obj)
592+
if acc is not None:
593+
yield acc
594+
return self.mapPartitions(func).reduce(combOp)
571595
def sum(self):
572596
"""
573597
Add up the elements in this RDD.

0 commit comments

Comments
 (0)