Skip to content

Commit dd91e08

Browse files
committed
add comp argument for RDD.max() and RDD.min()
1 parent 050f8d0 commit dd91e08

File tree

1 file changed

+29
-7
lines changed

1 file changed

+29
-7
lines changed

python/pyspark/rdd.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -810,23 +810,45 @@ def func(iterator):
810810

811811
return self.mapPartitions(func).fold(zeroValue, combOp)
812812

813-
def max(self):
813+
def max(self, comp=None):
814814
"""
815815
Find the maximum item in this RDD.
816816
817-
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
817+
@param comp: A function used to compare two elements, the builtin `cmp`
818+
will be used by default.
819+
820+
>>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
821+
>>> rdd.max()
818822
43.0
823+
>>> rdd.max(lambda a, b: cmp(str(a), str(b)))
824+
5.0
819825
"""
820-
return self.reduce(max)
826+
if comp is not None:
827+
func = lambda a, b: a if comp(a, b) >= 0 else b
828+
else:
829+
func = max
821830

822-
def min(self):
831+
return self.reduce(func)
832+
833+
def min(self, comp=None):
823834
"""
824835
Find the minimum item in this RDD.
825836
826-
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
827-
1.0
837+
@param comp: A function used to compare two elements, the builtin `cmp`
838+
will be used by default.
839+
840+
>>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0])
841+
>>> rdd.min()
842+
2.0
843+
>>> rdd.min(lambda a, b: cmp(str(a), str(b)))
844+
10.0
828845
"""
829-
return self.reduce(min)
846+
if comp is not None:
847+
func = lambda a, b: a if comp(a, b) <= 0 else b
848+
else:
849+
func = min
850+
851+
return self.reduce(func)
830852

831853
def sum(self):
832854
"""

0 commit comments

Comments
 (0)