@@ -238,29 +238,37 @@ def textFileStream(self, directory):
238238
239239 def _check_serialzers (self , rdds ):
240240 # make sure they have same serializer
241- if len (set (rdd ._jrdd_deserializer for rdd in rdds )):
241+ if len (set (rdd ._jrdd_deserializer for rdd in rdds )) > 1 :
242242 for i in range (len (rdds )):
243243 # reset them to sc.serializer
244244 rdds [i ] = rdds [i ].map (lambda x : x , preservesPartitioning = True )
245245
246- def queueStream (self , queue , oneAtATime = True , default = None ):
246+ def queueStream (self , rdds , oneAtATime = True , default = None ):
247247 """
248248 Create an input stream from an queue of RDDs or list. In each batch,
249249 it will process either one or all of the RDDs returned by the queue.
250250
251251 NOTE: changes to the queue after the stream is created will not be recognized.
252- @param queue Queue of RDDs
253- @tparam T Type of objects in the RDD
252+
253+ @param rdds Queue of RDDs
254+ @param oneAtATime pick one rdd each time or pick all of them once.
255+ @param default The default rdd if no more in rdds
254256 """
255- if queue and not isinstance (queue [0 ], RDD ):
256- rdds = [self ._sc .parallelize (input ) for input in queue ]
257- else :
258- rdds = queue
257+ if default and not isinstance (default , RDD ):
258+ default = self ._sc .parallelize (default )
259+
260+ if not rdds and default :
261+ rdds = [rdds ]
262+
263+ if rdds and not isinstance (rdds [0 ], RDD ):
264+ rdds = [self ._sc .parallelize (input ) for input in rdds ]
259265 self ._check_serialzers (rdds )
266+
260267 jrdds = ListConverter ().convert ([r ._jrdd for r in rdds ],
261268 SparkContext ._gateway ._gateway_client )
262269 queue = self ._jvm .PythonDStream .toRDDQueue (jrdds )
263270 if default :
271+ default = default ._reserialize (rdds [0 ]._jrdd_deserializer )
264272 jdstream = self ._jssc .queueStream (queue , oneAtATime , default ._jrdd )
265273 else :
266274 jdstream = self ._jssc .queueStream (queue , oneAtATime )
0 commit comments