Skip to content

Commit ccbaf25

Browse files
committed
add key to top()
1 parent ad7e374 commit ccbaf25

File tree

1 file changed

+6
-10
lines changed

1 file changed

+6
-10
lines changed

python/pyspark/rdd.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,7 @@ def mergeMaps(m1, m2):
938938
return m1
939939
return self.mapPartitions(countPartition).reduce(mergeMaps)
940940

941-
def top(self, num):
941+
def top(self, num, key=None):
942942
"""
943943
Get the top N elements from a RDD.
944944
@@ -947,20 +947,16 @@ def top(self, num):
947947
[12]
948948
>>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
949949
[6, 5]
950+
>>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)
951+
[4, 3, 2]
950952
"""
951953
def topIterator(iterator):
952-
q = []
953-
for k in iterator:
954-
if len(q) < num:
955-
heapq.heappush(q, k)
956-
else:
957-
heapq.heappushpop(q, k)
958-
yield q
954+
yield heapq.nlargest(num, iterator, key=key)
959955

960956
def merge(a, b):
961-
return next(topIterator(a + b))
957+
return heapq.nlargest(num, a + b, key=key)
962958

963-
return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True)
959+
return self.mapPartitions(topIterator).reduce(merge)
964960

965961
def takeOrdered(self, num, key=None):
966962
"""

0 commit comments

Comments
 (0)