@@ -22,25 +22,23 @@ def count(self):
2222 """
2323
2424 """
25- pass
26- #TODO: make sure count implementation, thiis different from what pyspark does
27- #return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
25+ # TODO: make sure count implementation, this different from what pyspark does
26+ return self ._mapPartitions (lambda i : [sum (1 for _ in i )])._sum ()
2827
2928 def _sum (self ):
3029 """
3130 """
32- pass
33- #return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
31+ return self ._mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
3432
3533 def print_ (self ):
3634 """
37- Since print is reserved name for python, we cannot make a print method function.
35+ Since print is reserved name for python, we cannot define a print method function.
3836 This function prints serialized data in RDD in DStream because Scala and Java cannot
39- deserialized pickled python object. Please use DStream.pyprint() instead to print result .
37+ deserialized pickled python object. Please use DStream.pyprint() instead to print results .
4038
4139 Call DStream.print().
4240 """
43- #hack to call print function in DStream
41+ # a hack to call print function in DStream
4442 getattr (self ._jdstream , "print" )()
4543
4644 def filter (self , f ):
@@ -79,17 +77,23 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
7977 """
8078 return PipelinedDStream (self , f , preservesPartitioning )
8179
80+ def reduce (self , func ):
81+ """
82+
83+ """
84+ return self .map (lambda x : (None , x )).reduceByKey (func , 1 ).map (lambda x : x [1 ])
85+
8286 def reduceByKey (self , func , numPartitions = None ):
8387 """
8488 Merge the value for each key using an associative reduce function.
8589
8690 This will also perform the merging locally on each mapper before
87- sending resuls to reducer, similarly to a "combiner" in MapReduce.
91+ sending results to reducer, similarly to a "combiner" in MapReduce.
8892
8993 Output will be hash-partitioned with C{numPartitions} partitions, or
9094 the default parallelism level if C{numPartitions} is not specified.
9195 """
92- return self .combineByKey (lambda x :x , func , func , numPartitions )
96+ return self .combineByKey (lambda x : x , func , func , numPartitions )
9397
9498 def combineByKey (self , createCombiner , mergeValue , mergeCombiners ,
9599 numPartitions = None ):
@@ -99,6 +103,7 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
99103 """
100104 if numPartitions is None :
101105 numPartitions = self ._defaultReducePartitions ()
106+
102107 def combineLocally (iterator ):
103108 combiners = {}
104109 for x in iterator :
@@ -116,6 +121,7 @@ def combineLocally(iterator):
116121 return combiners .iteritems ()
117122 locally_combined = self ._mapPartitions (combineLocally )
118123 shuffled = locally_combined .partitionBy (numPartitions )
124+
119125 def _mergeCombiners (iterator ):
120126 combiners = {}
121127 for (k , v ) in iterator :
@@ -124,6 +130,7 @@ def _mergeCombiners(iterator):
124130 else :
125131 combiners [k ] = mergeCombiners (combiners [k ], v )
126132 return combiners .iteritems ()
133+
127134 return shuffled ._mapPartitions (_mergeCombiners )
128135
129136 def partitionBy (self , numPartitions , partitionFunc = None ):
0 commit comments