@@ -20,9 +20,7 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
2020
2121 def count (self ):
2222 """
23-
2423 """
25- # TODO: make sure count implementation, this different from what pyspark does
2624 return self ._mapPartitions (lambda i : [sum (1 for _ in i )])._sum ()
2725
2826 def _sum (self ):
@@ -79,7 +77,6 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
7977
8078 def reduce (self , func ):
8179 """
82-
8380 """
8481 return self .map (lambda x : (None , x )).reduceByKey (func , 1 ).map (lambda x : x [1 ])
8582
@@ -107,12 +104,6 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
107104 def combineLocally (iterator ):
108105 combiners = {}
109106 for x in iterator :
110-
111- #TODO for count operation make sure count implementation
112- # This is different from what pyspark does
113- #if isinstance(x, int):
114- # x = ("", x)
115-
116107 (k , v ) = x
117108 if k not in combiners :
118109 combiners [k ] = createCombiner (v )
@@ -142,6 +133,7 @@ def partitionBy(self, numPartitions, partitionFunc=None):
142133
143134 if partitionFunc is None :
144135 partitionFunc = lambda x : 0 if x is None else hash (x )
136+
145137 # Transferring O(n) objects to Java is too expensive. Instead, we'll
146138 # form the hash buckets in Python, transferring O(numPartitions) objects
147139 # to Java. Each object is a (splitNumber, [objects]) pair.
@@ -228,7 +220,6 @@ def takeAndPrint(rdd, time):
228220
229221 self .foreachRDD (takeAndPrint )
230222
231-
232223 #def transform(self, func):
233224 # from utils import RDDFunction
234225 # wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
0 commit comments