2323to focusing to streaming test case
2424
2525"""
26- from fileinput import input
27- from glob import glob
2826from itertools import chain
2927import os
30- import re
31- import shutil
32- import subprocess
33- import sys
34- import tempfile
3528import time
3629import unittest
37- import zipfile
3830import operator
3931
4032from pyspark .context import SparkContext
4436
4537SPARK_HOME = os .environ ["SPARK_HOME" ]
4638
39+
4740class StreamOutput :
4841 """
4942 a class to store the output from stream
5043 """
5144 result = list ()
5245
46+
5347class PySparkStreamingTestCase (unittest .TestCase ):
5448 def setUp (self ):
5549 class_name = self .__class__ .__name__
@@ -69,6 +63,7 @@ def tearDownClass(cls):
6963 time .sleep (5 )
7064 SparkContext ._gateway ._shutdown_callback_server ()
7165
66+
7267class TestBasicOperationsSuite (PySparkStreamingTestCase ):
7368 """
7469 Input and output of this TestBasicOperationsSuite is the equivalent to
@@ -77,7 +72,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
7772 def setUp (self ):
7873 PySparkStreamingTestCase .setUp (self )
7974 StreamOutput .result = list ()
80- self .timeout = 10 # seconds
75+ self .timeout = 10 # seconds
8176
8277 def tearDown (self ):
8378 PySparkStreamingTestCase .tearDown (self )
@@ -88,7 +83,8 @@ def tearDownClass(cls):
8883
8984 def test_map (self ):
9085 """Basic operation test for DStream.map"""
91- test_input = [range (1 ,5 ), range (5 ,9 ), range (9 , 13 )]
86+ test_input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
87+
9288 def test_func (dstream ):
9389 return dstream .map (lambda x : str (x ))
9490 expected_output = map (lambda x : map (lambda y : str (y ), x ), test_input )
@@ -97,17 +93,19 @@ def test_func(dstream):
9793
9894 def test_flatMap (self ):
9995 """Basic operation test for DStream.faltMap"""
100- test_input = [range (1 ,5 ), range (5 ,9 ), range (9 , 13 )]
96+ test_input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
97+
10198 def test_func (dstream ):
10299 return dstream .flatMap (lambda x : (x , x * 2 ))
103100 expected_output = map (lambda x : list (chain .from_iterable ((map (lambda y : [y , y * 2 ], x )))),
104- test_input )
101+ test_input )
105102 output = self ._run_stream (test_input , test_func , expected_output )
106103 self .assertEqual (expected_output , output )
107104
108105 def test_filter (self ):
109106 """Basic operation test for DStream.filter"""
110- test_input = [range (1 ,5 ), range (5 ,9 ), range (9 , 13 )]
107+ test_input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
108+
111109 def test_func (dstream ):
112110 return dstream .filter (lambda x : x % 2 == 0 )
113111 expected_output = map (lambda x : filter (lambda y : y % 2 == 0 , x ), test_input )
@@ -116,7 +114,8 @@ def test_func(dstream):
116114
117115 def test_count (self ):
118116 """Basic operation test for DStream.count"""
119- test_input = [[], [1 ], range (1 , 3 ), range (1 ,4 ), range (1 ,5 )]
117+ test_input = [[], [1 ], range (1 , 3 ), range (1 , 4 ), range (1 , 5 )]
118+
120119 def test_func (dstream ):
121120 return dstream .count ()
122121 expected_output = map (lambda x : [len (x )], test_input )
@@ -125,7 +124,8 @@ def test_func(dstream):
125124
126125 def test_reduce (self ):
127126 """Basic operation test for DStream.reduce"""
128- test_input = [range (1 ,5 ), range (5 ,9 ), range (9 , 13 )]
127+ test_input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
128+
129129 def test_func (dstream ):
130130 return dstream .reduce (operator .add )
131131 expected_output = map (lambda x : [reduce (operator .add , x )], test_input )
@@ -135,19 +135,20 @@ def test_func(dstream):
135135 def test_reduceByKey (self ):
136136 """Basic operation test for DStream.reduceByKey"""
137137 test_input = [["a" , "a" , "b" ], ["" , "" ], []]
138+
138139 def test_func (dstream ):
139140 return dstream .map (lambda x : (x , 1 )).reduceByKey (operator .add )
140- expected_output = [[("a" , 2 ), ("b" , 1 )],[("" , 2 )], []]
141+ expected_output = [[("a" , 2 ), ("b" , 1 )], [("" , 2 )], []]
141142 output = self ._run_stream (test_input , test_func , expected_output )
142143 self .assertEqual (expected_output , output )
143144
144145 def _run_stream (self , test_input , test_func , expected_output ):
145146 """Start stream and return the output"""
146147 # Generate input stream with user-defined input
147148 test_input_stream = self .ssc ._testInputStream (test_input )
148- # Applyed test function to stream
149+ # Applied test function to stream
149150 test_stream = test_func (test_input_stream )
150- # Add job to get outpuf from stream
151+ # Add job to get output from stream
151152 test_stream ._test_output (StreamOutput .result )
152153 self .ssc .start ()
153154
0 commit comments