Skip to content

Commit ec65502

Browse files
committed
Updates from matei's review
1 parent 00bc81e commit ec65502

File tree

5 files changed

+31
-10
lines changed

5 files changed

+31
-10
lines changed

core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf
3939

4040
override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) {
4141
outputsMerged += 1
42-
taskResult.foreach{ case (key,value) =>
42+
taskResult.foreach { case (key, value) =>
4343
sums.changeValue(key, value, _ + value)
4444
}
4545
}
4646

4747
override def currentResult(): Map[T, BoundedDouble] = {
4848
if (outputsMerged == totalOutputs) {
4949
val result = new JHashMap[T, BoundedDouble](sums.size)
50-
sums.foreach{ case (key,sum) =>
50+
sums.foreach { case (key, sum) =>
5151
result(key) = new BoundedDouble(sum, 1.0, sum, sum)
5252
}
5353
result
@@ -57,7 +57,7 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf
5757
val p = outputsMerged.toDouble / totalOutputs
5858
val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
5959
val result = new JHashMap[T, BoundedDouble](sums.size)
60-
sums.foreach{ case (key, sum) =>
60+
sums.foreach { case (key, sum) =>
6161
val mean = (sum + 1 - p) / p
6262
val variance = (sum + 1) * (1 - p) / (p * p)
6363
val stdev = math.sqrt(variance)

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -836,13 +836,13 @@ abstract class RDD[T: ClassTag](
836836
// TODO: This should perhaps be distributed by default.
837837
def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
838838
val map = new OpenHashMap[T,Long]
839-
iter.foreach{
839+
iter.foreach {
840840
t => map.changeValue(t, 1L, _ + 1L)
841841
}
842842
Iterator(map)
843843
}
844844
def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
845-
m2.foreach{ case (key, value) =>
845+
m2.foreach { case (key, value) =>
846846
m1.changeValue(key, value, _ + value)
847847
}
848848
m1
@@ -865,7 +865,7 @@ abstract class RDD[T: ClassTag](
865865
}
866866
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) =>
867867
val map = new OpenHashMap[T,Long]
868-
iter.foreach{
868+
iter.foreach {
869869
t => map.changeValue(t, 1L, _ + 1L)
870870
}
871871
map

core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.util.collection
1919

2020
import java.util.{Arrays, Comparator}
21+
2122
import com.google.common.hash.Hashing
2223

2324
import org.apache.spark.annotation.DeveloperApi

core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.annotation.DeveloperApi
3030
* Under the hood, it uses our OpenHashSet implementation.
3131
*/
3232
@DeveloperApi
33+
private[spark]
3334
class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
3435
initialCapacity: Int)
3536
extends Iterable[(K, V)]

streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,30 @@ object RawTextHelper {
3030
*/
3131
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
3232
val map = new OpenHashMap[String,Long]
33-
val tokenized = iter.flatMap(_.split(" ").filterNot(_.isEmpty))
34-
tokenized.foreach{ s =>
35-
map.changeValue(s, 1L, _ + 1L)
33+
var i = 0
34+
var j = 0
35+
while (iter.hasNext) {
36+
val s = iter.next()
37+
i = 0
38+
while (i < s.length) {
39+
j = i
40+
while (j < s.length && s.charAt(j) != ' ') {
41+
j += 1
42+
}
43+
if (j > i) {
44+
val w = s.substring(i, j)
45+
map.changeValue(w, 1L, _ + 1L)
46+
}
47+
i = j
48+
while (i < s.length && s.charAt(i) == ' ') {
49+
i += 1
50+
}
51+
}
52+
map.toIterator.map {
53+
case (k, v) => (k, v)
54+
}
3655
}
37-
map.iterator
56+
map.toIterator.map{case (k, v) => (k, v)}
3857
}
3958

4059
/**

0 commit comments

Comments
 (0)