1717
1818from itertools import chain , ifilter , imap
1919import operator
20+ from datetime import datetime
2021
2122from pyspark import RDD
2223from pyspark .storagelevel import StorageLevel
@@ -54,17 +55,6 @@ def sum(self):
5455 """
5556 return self .mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
5657
57- def print_ (self , label = None ):
58- """
59- Since print is reserved name for python, we cannot define a "print" method function.
60- This function prints serialized data in RDD in DStream because Scala and Java cannot
61- deserialized pickled python object. Please use DStream.pyprint() to print results.
62-
63- Call DStream.print() and this function will print byte array in the DStream
64- """
65- # a hack to call print function in DStream
66- getattr (self ._jdstream , "print" )(label )
67-
6858 def filter (self , f ):
6959 """
7060 Return a new DStream containing only the elements that satisfy predicate.
@@ -154,19 +144,15 @@ def foreachRDD(self, func):
154144 jfunc = RDDFunction (self .ctx , func , self ._jrdd_deserializer )
155145 self .ctx ._jvm .PythonForeachDStream (self ._jdstream .dstream (), jfunc )
156146
157- def pyprint (self ):
147+ def pprint (self ):
158148 """
159149 Print the first ten elements of each RDD generated in this DStream. This is an output
160150 operator, so this DStream will be registered as an output stream and there materialized.
161151 """
162152 def takeAndPrint (rdd , time ):
163- """
164- Closure to take element from RDD and print first 10 elements.
165- This closure is called by py4j callback server.
166- """
167153 taken = rdd .take (11 )
168154 print "-------------------------------------------"
169- print "Time: %s" % ( str ( time ) )
155+ print "Time: %s" % datetime . fromtimestamp ( time / 1000.0 )
170156 print "-------------------------------------------"
171157 for record in taken [:10 ]:
172158 print record
@@ -176,6 +162,20 @@ def takeAndPrint(rdd, time):
176162
177163 self .foreachRDD (takeAndPrint )
178164
165+ def collect (self ):
166+ """
167+ Collect each RDDs into the returned list.
168+
169+ :return: list, which will have the collected items.
170+ """
171+ result = []
172+
173+ def get_output (rdd , time ):
174+ r = rdd .collect ()
175+ result .append (r )
176+ self .foreachRDD (get_output )
177+ return result
178+
179179 def mapValues (self , f ):
180180 """
181181 Pass each value in the key-value pair RDD through a map function
@@ -196,9 +196,9 @@ def flatMapValues(self, f):
196196
197197 def glom (self ):
198198 """
199- Return a new DStream in which RDD is generated by applying glom() to RDD of
200- this DStream. Applying glom() to an RDD coalesces all elements within each partition into
201- an list.
199+ Return a new DStream in which RDD is generated by applying glom()
200+ to RDD of this DStream. Applying glom() to an RDD coalesces all
201+ elements within each partition into an list.
202202 """
203203 def func (iterator ):
204204 yield list (iterator )
@@ -228,11 +228,11 @@ def checkpoint(self, interval):
228228 Mark this DStream for checkpointing. It will be saved to a file inside the
229229 checkpoint directory set with L{SparkContext.setCheckpointDir()}
230230
231- @param interval: Time interval after which generated RDD will be checkpointed
232- interval has to be pyspark.streaming.duration.Duration
231+ @param interval: time in seconds, after which generated RDD will
232+ be checkpointed
233233 """
234234 self .is_checkpointed = True
235- self ._jdstream .checkpoint (interval . _jduration )
235+ self ._jdstream .checkpoint (self . _ssc . _jduration ( interval ) )
236236 return self
237237
238238 def groupByKey (self , numPartitions = None ):
@@ -245,7 +245,6 @@ def groupByKey(self, numPartitions=None):
245245 Note: If you are grouping in order to perform an aggregation (such as a
246246 sum or average) over each key, using reduceByKey will provide much
247247 better performance.
248-
249248 """
250249 return self .transform (lambda rdd : rdd .groupByKey (numPartitions ))
251250
@@ -288,15 +287,6 @@ def saveAsPickleFile(rdd, time):
288287
289288 return self .foreachRDD (saveAsPickleFile )
290289
291- def collect (self ):
292- result = []
293-
294- def get_output (rdd , time ):
295- r = rdd .collect ()
296- result .append (r )
297- self .foreachRDD (get_output )
298- return result
299-
300290 def transform (self , func ):
301291 return TransformedDStream (self , lambda a , t : func (a ), True )
302292
0 commit comments