3232import sys
3333
3434if sys .version_info [:2 ] <= (2 , 6 ):
35- import unittest2 as unittest
36- else :
37- import unittest
35+ import unittest2 as unittest
36+ else :
37+ import unittest
3838
3939from pyspark .context import SparkContext
4040from pyspark .streaming .context import StreamingContext
@@ -57,7 +57,7 @@ def tearDown(self):
5757
5858 @classmethod
5959 def tearDownClass (cls ):
60- # Make sure tp shutdown the callback server
60+ # Make sure tp shutdown the callback server
6161 SparkContext ._gateway ._shutdown_callback_server ()
6262
6363
@@ -71,7 +71,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
7171
7272 All tests input should have list of lists(3 lists are default). This list represents stream.
7373 Every batch interval, the first object of list are chosen to make DStream.
74- e.g The first list in the list is input of the first batch.
74+ e.g The first list in the list is input of the first batch.
7575 Please see the BasicTestSuits in Scala which is close to this implementation.
7676 """
7777 def setUp (self ):
@@ -112,7 +112,7 @@ def test_flatMap_batch(self):
112112
113113 def test_func (dstream ):
114114 return dstream .flatMap (lambda x : (x , x * 2 ))
115- expected_output = map (lambda x : list (chain .from_iterable ((map (lambda y : [y , y * 2 ], x )))),
115+ expected_output = map (lambda x : list (chain .from_iterable ((map (lambda y : [y , y * 2 ], x )))),
116116 test_input )
117117 output = self ._run_stream (test_input , test_func , expected_output )
118118 self .assertEqual (expected_output , output )
@@ -191,12 +191,12 @@ def test_func(dstream):
191191 def test_reduceByKey_batch (self ):
192192 """Basic operation test for DStream.reduceByKey with batch deserializer."""
193193 test_input = [[("a" , 1 ), ("a" , 1 ), ("b" , 1 ), ("b" , 1 )],
194- [("" , 1 ),("" , 1 ), ("" , 1 ), ("" , 1 )],
194+ [("" , 1 ), ("" , 1 ), ("" , 1 ), ("" , 1 )],
195195 [(1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )]]
196196
197197 def test_func (dstream ):
198198 return dstream .reduceByKey (operator .add )
199- expected_output = [[("a" , 2 ), ("b" , 2 )], [("" , 4 )], [(1 , 2 ), (2 , 2 ), (3 , 1 )]]
199+ expected_output = [[("a" , 2 ), ("b" , 2 )], [("" , 4 )], [(1 , 2 ), (2 , 2 ), (3 , 1 )]]
200200 output = self ._run_stream (test_input , test_func , expected_output )
201201 for result in (output , expected_output ):
202202 self ._sort_result_based_on_key (result )
@@ -216,13 +216,13 @@ def test_func(dstream):
216216
217217 def test_mapValues_batch (self ):
218218 """Basic operation test for DStream.mapValues with batch deserializer."""
219- test_input = [[("a" , 2 ), ("b" , 2 ), ("c" , 1 ), ("d" , 1 )],
219+ test_input = [[("a" , 2 ), ("b" , 2 ), ("c" , 1 ), ("d" , 1 )],
220220 [("" , 4 ), (1 , 1 ), (2 , 2 ), (3 , 3 )],
221221 [(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )]]
222222
223223 def test_func (dstream ):
224224 return dstream .mapValues (lambda x : x + 10 )
225- expected_output = [[("a" , 12 ), ("b" , 12 ), ("c" , 11 ), ("d" , 11 )],
225+ expected_output = [[("a" , 12 ), ("b" , 12 ), ("c" , 11 ), ("d" , 11 )],
226226 [("" , 14 ), (1 , 11 ), (2 , 12 ), (3 , 13 )],
227227 [(1 , 11 ), (2 , 11 ), (3 , 11 ), (4 , 11 )]]
228228 output = self ._run_stream (test_input , test_func , expected_output )
@@ -250,7 +250,8 @@ def test_flatMapValues_batch(self):
250250
251251 def test_func (dstream ):
252252 return dstream .flatMapValues (lambda x : (x , x + 10 ))
253- expected_output = [[("a" , 2 ), ("a" , 12 ), ("b" , 2 ), ("b" , 12 ), ("c" , 1 ), ("c" , 11 ), ("d" , 1 ), ("d" , 11 )],
253+ expected_output = [[("a" , 2 ), ("a" , 12 ), ("b" , 2 ), ("b" , 12 ),
254+ ("c" , 1 ), ("c" , 11 ), ("d" , 1 ), ("d" , 11 )],
254255 [("" , 4 ), ("" , 14 ), (1 , 1 ), (1 , 11 ), (2 , 1 ), (2 , 11 ), (3 , 1 ), (3 , 11 )],
255256 [(1 , 1 ), (1 , 11 ), (2 , 1 ), (2 , 11 ), (3 , 1 ), (3 , 11 ), (4 , 1 ), (4 , 11 )]]
256257 output = self ._run_stream (test_input , test_func , expected_output )
@@ -344,7 +345,7 @@ def test_func(dstream):
344345
345346 def test_groupByKey_batch (self ):
346347 """Basic operation test for DStream.groupByKey with batch deserializer."""
347- test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )],
348+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )],
348349 [(1 , 1 ), (1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )],
349350 [("a" , 1 ), ("a" , 1 ), ("b" , 1 ), ("" , 1 ), ("" , 1 ), ("" , 1 )]]
350351
@@ -361,7 +362,7 @@ def test_func(dstream):
361362
362363 def test_groupByKey_unbatch (self ):
363364 """Basic operation test for DStream.groupByKey with unbatch deserializer."""
364- test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 )],
365+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 )],
365366 [(1 , 1 ), (1 , 1 ), ("" , 1 )],
366367 [("a" , 1 ), ("a" , 1 ), ("b" , 1 )]]
367368
@@ -378,12 +379,13 @@ def test_func(dstream):
378379
379380 def test_combineByKey_batch (self ):
380381 """Basic operation test for DStream.combineByKey with batch deserializer."""
381- test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )],
382- [(1 , 1 ), (1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )],
382+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 ), (4 , 1 )],
383+ [(1 , 1 ), (1 , 1 ), (1 , 1 ), (2 , 1 ), (2 , 1 ), (3 , 1 )],
383384 [("a" , 1 ), ("a" , 1 ), ("b" , 1 ), ("" , 1 ), ("" , 1 ), ("" , 1 )]]
384385
385386 def test_func (dstream ):
386- def add (a , b ): return a + str (b )
387+ def add (a , b ):
388+ return a + str (b )
387389 return dstream .combineByKey (str , add , add )
388390 expected_output = [[(1 , "1" ), (2 , "1" ), (3 , "1" ), (4 , "1" )],
389391 [(1 , "111" ), (2 , "11" ), (3 , "1" )],
@@ -395,10 +397,13 @@ def add(a, b): return a + str(b)
395397
396398 def test_combineByKey_unbatch (self ):
397399 """Basic operation test for DStream.combineByKey with unbatch deserializer."""
398- test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 )], [(1 , 1 ), (1 , 1 ), ("" , 1 )], [("a" , 1 ), ("a" , 1 ), ("b" , 1 )]]
400+ test_input = [[(1 , 1 ), (2 , 1 ), (3 , 1 )],
401+ [(1 , 1 ), (1 , 1 ), ("" , 1 )],
402+ [("a" , 1 ), ("a" , 1 ), ("b" , 1 )]]
399403
400404 def test_func (dstream ):
401- def add (a , b ): return a + str (b )
405+ def add (a , b ):
406+ return a + str (b )
402407 return dstream .combineByKey (str , add , add )
403408 expected_output = [[(1 , "1" ), (2 , "1" ), (3 , "1" )],
404409 [(1 , "11" ), ("" , "1" )],
@@ -445,7 +450,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
445450 # Check time out.
446451 if (current_time - start_time ) > self .timeout :
447452 break
448- # StreamingContext.awaitTermination is not used to wait because
453+ # StreamingContext.awaitTermination is not used to wait because
449454 # if py4j server is called every 50 milliseconds, it gets an error.
450455 time .sleep (0.05 )
451456 # Check if the output is the same length of expected output.
0 commit comments