-
Notifications
You must be signed in to change notification settings - Fork 29k
Spark 1246 add min max to stat counter #144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
4916016
eaf89d9
29981f2
37a7dea
1e7056d
ed67136
a5c13b0
1a97558
21dd366
5d96799
82cde0e
fd3fd4b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -477,6 +477,28 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { | |
| new java.util.ArrayList(arr) | ||
| } | ||
|
|
||
| /** | ||
| * Returns the maximum element from this RDD as defined by the specified | ||
| * Comparator[T]. | ||
| * @params comp the comparator that defines ordering | ||
| * @return the maximum of the RDD | ||
| * */ | ||
| def max(comp: Comparator[T]): T = { | ||
| import scala.collection.JavaConversions._ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you need to import this if you're going to call Ordering.comparatorToOrdering directly -- was it necessary? It was in some other methods because they used other conversions |
||
| rdd.max()(Ordering.comparatorToOrdering(comp)) | ||
| } | ||
|
|
||
| /** | ||
| * Returns the minimum element from this RDD as defined by the specified | ||
| * Comparator[T]. | ||
| * @params comp the comparator that defines ordering | ||
| * @return the minimum of the RDD | ||
| * */ | ||
| def min(comp: Comparator[T]): T = { | ||
| import scala.collection.JavaConversions._ | ||
| rdd.min()(Ordering.comparatorToOrdering(comp)) | ||
| } | ||
|
|
||
| /** | ||
| * Returns the first K elements from this RDD using the | ||
| * natural ordering for T while maintain the order. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { | |
| private var n: Long = 0 // Running count of our values | ||
| private var mu: Double = 0 // Running mean of our values | ||
| private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2) | ||
| private var max_v: Double = Double.NegativeInfinity // Running max of our values | ||
| private var min_v: Double = Double.PositiveInfinity // Running min of our values | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small style nit, both here and in Python -- we don't use underscores in variable names usually, so call these |
||
|
|
||
| merge(values) | ||
|
|
||
|
|
@@ -41,6 +43,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { | |
| n += 1 | ||
| mu += delta / n | ||
| m2 += delta * (value - mu) | ||
| max_v = math.max(max_v, value) | ||
| min_v = math.min(min_v, value) | ||
| this | ||
| } | ||
|
|
||
|
|
@@ -58,7 +62,9 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { | |
| if (n == 0) { | ||
| mu = other.mu | ||
| m2 = other.m2 | ||
| n = other.n | ||
| n = other.n | ||
| max_v = other.max_v | ||
| min_v = other.min_v | ||
| } else if (other.n != 0) { | ||
| val delta = other.mu - mu | ||
| if (other.n * 10 < n) { | ||
|
|
@@ -70,6 +76,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { | |
| } | ||
| m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n) | ||
| n += other.n | ||
| max_v = math.max(max_v, other.max_v) | ||
| min_v = math.min(min_v, other.min_v) | ||
| } | ||
| this | ||
| } | ||
|
|
@@ -81,6 +89,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { | |
| other.n = n | ||
| other.mu = mu | ||
| other.m2 = m2 | ||
| other.max_v = max_v | ||
| other.min_v = min_v | ||
| other | ||
| } | ||
|
|
||
|
|
@@ -90,6 +100,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { | |
|
|
||
| def sum: Double = n * mu | ||
|
|
||
| def max: Double = max_v | ||
|
|
||
| def min: Double = min_v | ||
|
|
||
| /** Return the variance of the values. */ | ||
| def variance: Double = { | ||
| if (n == 0) { | ||
|
|
@@ -121,7 +135,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { | |
| def sampleStdev: Double = math.sqrt(sampleVariance) | ||
|
|
||
| override def toString: String = { | ||
| "(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev) | ||
| "(count: %d, mean: %f, stdev: %f, max: %f, min: $f)".format(count, mean, stdev, max, min) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be |
||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add doc comments to these and the Scala versions