@@ -253,53 +253,6 @@ def _test_func(self, input, func, expected, numSlices=None, sort=False):
253253 self .assertEqual (expected , result )
254254
255255
256- class TestTransform (PySparkStreamingTestCase ):
257- def setUp (self ):
258- PySparkStreamingTestCase .setUp (self )
259- self .timeout = 10
260-
261- def test_transform (self ):
262- input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
263-
264- def func (stream ):
265- return stream .transform (lambda r : r and r .map (str ))
266-
267- expected = map (lambda x : map (str , x ), input )
268- self ._test_func (input , func , expected )
269- self .assertEqual (expected , output )
270-
271- def _test_func (self , input , func , expected , numSlices = None ):
272- """
273- Start stream and return the result.
274- @param input: dataset for the test. This should be list of lists.
275- @param func: wrapped function. This function should return PythonDStream object.
276- @param expected: expected output for this testcase.
277- @param numSlices: the number of slices in the rdd in the dstream.
278- """
279- # Generate input stream with user-defined input.
280- input_stream = self .ssc ._makeStream (input , numSlices )
281- # Apply test function to stream.
282- stream = func (input_stream )
283- result = stream .collect ()
284- self .ssc .start ()
285-
286- start_time = time .time ()
287- # Loop until get the expected the number of the result from the stream.
288- while True :
289- current_time = time .time ()
290- # Check time out.
291- if (current_time - start_time ) > self .timeout :
292- break
293- # StreamingContext.awaitTermination is not used to wait because
294- # if py4j server is called every 50 milliseconds, it gets an error.
295- time .sleep (0.05 )
296- # Check if the output is the same length of expected output.
297- if len (expected ) == len (result ):
298- break
299-
300- return result
301-
302-
303256class TestStreamingContext (unittest .TestCase ):
304257 """
305258 Should we have conf property in SparkContext?
0 commit comments