Skip to content

Commit ed67136

Browse files
committed
broke min/max out into separate transaction, added to rdd.py
1 parent 1e7056d commit ed67136

File tree

1 file changed

+18
-59
lines changed

1 file changed

+18
-59
lines changed

python/pyspark/rdd.py

Lines changed: 18 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,24 @@ def func(iterator):
537537
# TODO: aggregate
538538

539539

540+
def max(self):
541+
"""
542+
Find the maximum item in this RDD.
543+
544+
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
545+
43.0
546+
"""
547+
return self.stats().max()
548+
549+
def min(self):
550+
"""
551+
Find the maximum item in this RDD.
552+
553+
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
554+
1.0
555+
"""
556+
return self.stats().min()
557+
540558
def sum(self):
541559
"""
542560
Add up the elements in this RDD.
@@ -612,65 +630,6 @@ def sampleVariance(self):
612630
"""
613631
return self.stats().sampleVariance()
614632

615-
def _getBuckets(self, bucketCount):
616-
#use the statscounter as a quick way of getting max and min
617-
mm_stats = self.stats()
618-
min = mm_stats.min()
619-
max = mm_stats.max()
620-
621-
increment = (max-min)/bucketCount
622-
buckets = range(min,min)
623-
if increment != 0:
624-
buckets = range(min,max, increment)
625-
626-
return {"min":min, "max":max, "buckets":buckets}
627-
628-
def histogram(self, bucketCount, buckets=None):
629-
"""
630-
Compute a histogram of the data using bucketCount number of buckets
631-
evenly spaced between the min and max of the RDD.
632-
633-
>>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3)
634-
defaultdict(<type 'int'>, {(67, inf): 2, (1, 33): 6, (34, 66): 2})
635-
"""
636-
min = float("-inf")
637-
max = float("inf")
638-
evenBuckets = False
639-
if not buckets:
640-
b = self._getBuckets(bucketCount)
641-
buckets = b["buckets"]
642-
min = b["min"]
643-
max = b["max"]
644-
645-
if len(buckets) < 2:
646-
raise ValueError("requires more than 1 bucket")
647-
if len(buckets) % 2 == 0:
648-
evenBuckets = True
649-
# histogram partition
650-
def histogramPartition(iterator):
651-
counters = defaultdict(int)
652-
for obj in iterator:
653-
k = bisect_right(buckets, obj)
654-
if k < len(buckets) and k > 0:
655-
key = (buckets[k-1], buckets[k]-1)
656-
elif k == len(buckets):
657-
key = (buckets[k-1], max)
658-
elif k == 0:
659-
key = (min, buckets[k]-1)
660-
counters[key] += 1
661-
yield counters
662-
663-
# merge counters
664-
def mergeCounters(d1, d2):
665-
for k in d2.keys():
666-
if k in d1:
667-
d1[k] += d2[k]
668-
return d1
669-
670-
#map partitions(histogram_partition(bucketFunction)).reduce(mergeCounters)
671-
return self.mapPartitions(histogramPartition).reduce(mergeCounters)
672-
673-
674633
def countByValue(self):
675634
"""
676635
Return the count of each unique value in this RDD as a dictionary of

0 commit comments

Comments
 (0)