Skip to content

Commit 4916016

Browse files
committed
added histogram method, added max and min to statscounter
1 parent 3eb009f commit 4916016

File tree

2 files changed

+77
-2
lines changed

2 files changed

+77
-2
lines changed

python/pyspark/rdd.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import sys
2525
import shlex
2626
import traceback
27+
from bisect import bisect_right
2728
from subprocess import Popen, PIPE
2829
from tempfile import NamedTemporaryFile
2930
from threading import Thread
@@ -534,6 +535,7 @@ def func(iterator):
534535
return reduce(op, vals, zeroValue)
535536

536537
# TODO: aggregate
538+
537539

538540
def sum(self):
539541
"""
@@ -610,6 +612,60 @@ def sampleVariance(self):
610612
"""
611613
return self.stats().sampleVariance()
612614

615+
def getBuckets(self, bucketCount):
616+
"""
617+
Compute a histogram of the data using bucketCount number of buckets
618+
evenly spaced between the min and max of the RDD.
619+
620+
>>> sc.parallelize([1,49, 23, 100, 75, 50]).histogram()
621+
{(0,49):3, (50, 100):3}
622+
"""
623+
624+
#use the statscounter as a quick way of getting max and min
625+
mm_stats = self.stats()
626+
min = mm_stats.min()
627+
max = mm_stats.max()
628+
629+
increment = (max-min)/bucketCount
630+
buckets = range(min,min)
631+
if increment != 0:
632+
buckets = range(min,max, increment)
633+
634+
return buckets
635+
636+
def histogram(self, bucketCount, buckets=None):
637+
evenBuckets = False
638+
if not buckets:
639+
buckets = self.getBuckets(bucketCount)
640+
if len(buckets) < 2:
641+
raise ValueError("requires more than 1 bucket")
642+
if len(buckets) % 2 == 0:
643+
evenBuckets = True
644+
# histogram partition
645+
def histogramPartition(iterator):
646+
counters = defaultdict(int)
647+
for obj in iterator:
648+
k = bisect_right(buckets, obj)
649+
if k < len(buckets) and k > 0:
650+
key = (buckets[k-1], buckets[k]-1)
651+
elif k == len(buckets):
652+
key = (buckets[k-1], float("inf"))
653+
elif k == 0:
654+
key = (float("-inf"), buckets[k]-1)
655+
counters[key] += 1
656+
yield counters
657+
658+
# merge counters
659+
def mergeCounters(d1, d2):
660+
for k in d2.keys():
661+
if k in d1:
662+
d1[k] += d2[k]
663+
return d1
664+
665+
#map partitions(histogram_partition(bucketFunction)).reduce(mergeCounters)
666+
return self.mapPartitions(histogramPartition).reduce(mergeCounters)
667+
668+
613669
def countByValue(self):
614670
"""
615671
Return the count of each unique value in this RDD as a dictionary of

python/pyspark/statcounter.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ def __init__(self, values=[]):
2626
self.n = 0L # Running count of our values
2727
self.mu = 0.0 # Running mean of our values
2828
self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)
29-
29+
self.max_v = float("-inf")
30+
self.min_v = float("inf")
31+
3032
for v in values:
3133
self.merge(v)
3234

@@ -36,6 +38,11 @@ def merge(self, value):
3638
self.n += 1
3739
self.mu += delta / self.n
3840
self.m2 += delta * (value - self.mu)
41+
if self.max_v < value:
42+
self.max_v = value
43+
if self.min_v > value:
44+
self.min_v = value
45+
3946
return self
4047

4148
# Merge another StatCounter into this one, adding up the internal statistics.
@@ -49,7 +56,10 @@ def mergeStats(self, other):
4956
if self.n == 0:
5057
self.mu = other.mu
5158
self.m2 = other.m2
52-
self.n = other.n
59+
self.n = other.n
60+
self.max_v = other.max_v
61+
self.min_v = other.min_v
62+
5363
elif other.n != 0:
5464
delta = other.mu - self.mu
5565
if other.n * 10 < self.n:
@@ -58,6 +68,9 @@ def mergeStats(self, other):
5868
self.mu = other.mu - (delta * self.n) / (self.n + other.n)
5969
else:
6070
self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n)
71+
72+
self.max_v = max(self.max_v, other.max_v)
73+
self.min_v = min(self.min_v, other.min_v)
6174

6275
self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n)
6376
self.n += other.n
@@ -76,6 +89,12 @@ def mean(self):
7689
def sum(self):
7790
return self.n * self.mu
7891

92+
def min(self):
93+
return self.min_v
94+
95+
def max(self):
96+
return self.max_v
97+
7998
# Return the variance of the values.
8099
def variance(self):
81100
if self.n == 0:

0 commit comments

Comments
 (0)