@@ -35,25 +35,31 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
3535 self .ctx = ssc ._sc
3636 self ._jrdd_deserializer = jrdd_deserializer
3737
38+ def context (self ):
39+ """
40+ Return the StreamingContext associated with this DStream
41+ """
42+ return self ._ssc
43+
3844 def count (self ):
3945 """
4046 Return a new DStream which contains the number of elements in this DStream.
4147 """
42- return self ._mapPartitions (lambda i : [sum (1 for _ in i )])._sum ()
48+ return self .mapPartitions (lambda i : [sum (1 for _ in i )])._sum ()
4349
4450 def _sum (self ):
4551 """
4652 Add up the elements in this DStream.
4753 """
48- return self ._mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
54+ return self .mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
4955
5056 def print_ (self , label = None ):
5157 """
5258 Since print is reserved name for python, we cannot define a "print" method function.
5359 This function prints serialized data in RDD in DStream because Scala and Java cannot
54- deserialized pickled python object. Please use DStream.pyprint() instead to print results.
60+ deserialized pickled python object. Please use DStream.pyprint() to print results.
5561
56- Call DStream.print().
62+ Call DStream.print() and this function will print byte array in the DStream
5763 """
5864 # a hack to call print function in DStream
5965 getattr (self ._jdstream , "print" )(label )
@@ -63,29 +69,32 @@ def filter(self, f):
6369 Return a new DStream containing only the elements that satisfy predicate.
6470 """
6571 def func (iterator ): return ifilter (f , iterator )
66- return self ._mapPartitions (func )
72+ return self .mapPartitions (func )
6773
6874 def flatMap (self , f , preservesPartitioning = False ):
6975 """
7076 Pass each value in the key-value pair DStream through flatMap function
7177 without changing the keys: this also retains the original RDD's partition.
7278 """
73- def func (s , iterator ): return chain .from_iterable (imap (f , iterator ))
79+ def func (s , iterator ):
80+ return chain .from_iterable (imap (f , iterator ))
7481 return self ._mapPartitionsWithIndex (func , preservesPartitioning )
7582
76- def map (self , f ):
83+ def map (self , f , preservesPartitioning = False ):
7784 """
7885 Return a new DStream by applying a function to each element of DStream.
7986 """
80- def func (iterator ): return imap (f , iterator )
81- return self ._mapPartitions (func )
87+ def func (iterator ):
88+ return imap (f , iterator )
89+ return self .mapPartitions (func , preservesPartitioning )
8290
83- def _mapPartitions (self , f ):
91+ def mapPartitions (self , f , preservesPartitioning = False ):
8492 """
8593 Return a new DStream by applying a function to each partition of this DStream.
8694 """
87- def func (s , iterator ): return f (iterator )
88- return self ._mapPartitionsWithIndex (func )
95+ def func (s , iterator ):
96+ return f (iterator )
97+ return self ._mapPartitionsWithIndex (func , preservesPartitioning )
8998
9099 def _mapPartitionsWithIndex (self , f , preservesPartitioning = False ):
91100 """
@@ -131,7 +140,7 @@ def combineLocally(iterator):
131140 else :
132141 combiners [k ] = mergeValue (combiners [k ], v )
133142 return combiners .iteritems ()
134- locally_combined = self ._mapPartitions (combineLocally )
143+ locally_combined = self .mapPartitions (combineLocally )
135144 shuffled = locally_combined .partitionBy (numPartitions )
136145
137146 def _mergeCombiners (iterator ):
@@ -143,7 +152,7 @@ def _mergeCombiners(iterator):
143152 combiners [k ] = mergeCombiners (combiners [k ], v )
144153 return combiners .iteritems ()
145154
146- return shuffled ._mapPartitions (_mergeCombiners )
155+ return shuffled .mapPartitions (_mergeCombiners )
147156
148157 def partitionBy (self , numPartitions , partitionFunc = None ):
149158 """
@@ -233,6 +242,34 @@ def takeAndPrint(rdd, time):
233242
234243 self .foreachRDD (takeAndPrint )
235244
245+ def mapValues (self , f ):
246+ """
247+ Pass each value in the key-value pair RDD through a map function
248+ without changing the keys; this also retains the original RDD's
249+ partitioning.
250+ """
251+ map_values_fn = lambda (k , v ): (k , f (v ))
252+ return self .map (map_values_fn , preservesPartitioning = True )
253+
254+ def flatMapValues (self , f ):
255+ """
256+ Pass each value in the key-value pair RDD through a flatMap function
257+ without changing the keys; this also retains the original RDD's
258+ partitioning.
259+ """
260+ flat_map_fn = lambda (k , v ): ((k , x ) for x in f (v ))
261+ return self .flatMap (flat_map_fn , preservesPartitioning = True )
262+
263+ def glom (self ):
264+ """
265+ Return a new DStream in which RDD is generated by applying glom() to RDD of
266+ this DStream. Applying glom() to an RDD coalesces all elements within each partition into
267+ an list.
268+ """
269+ def func (iterator ):
270+ yield list (iterator )
271+ return self .mapPartitions (func )
272+
236273 #def transform(self, func): - TD
237274 # from utils import RDDFunction
238275 # wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
@@ -242,7 +279,7 @@ def takeAndPrint(rdd, time):
242279 def _test_output (self , result ):
243280 """
244281 This function is only for test case.
245- Store data in a DStream to result to verify the result in tese case
282+ Store data in a DStream to result to verify the result in test case
246283 """
247284 def get_output (rdd , time ):
248285 taken = rdd .collect ()
@@ -305,4 +342,4 @@ def _jdstream(self):
305342 return self ._jdstream_val
306343
307344 def _is_pipelinable (self ):
308- return not ( self .is_cached )
345+ return not self .is_cached
0 commit comments