@@ -539,8 +539,7 @@ def test_sequencefiles(self):
539539
540540 ed = [(1.0 , u'aa' ), (1.0 , u'aa' ), (2.0 , u'aa' ), (2.0 , u'bb' ), (2.0 , u'bb' ), (3.0 , u'cc' )]
541541 self .sc .parallelize (ed ).saveAsSequenceFile (basepath + "/sfdouble/" )
542- doubles = sorted (
543- self .sc .sequenceFile (basepath + "/sfdouble/" ).collect ())
542+ doubles = sorted (self .sc .sequenceFile (basepath + "/sfdouble/" ).collect ())
544543 self .assertEqual (doubles , ed )
545544
546545 ebs = [(1 , bytearray (b'\x00 \x07 spam\x08 ' )), (2 , bytearray (b'\x00 \x07 spam\x08 ' ))]
@@ -725,25 +724,25 @@ def test_unbatched_save_and_read(self):
725724 self .sc .parallelize (ei , numSlices = len (ei )).saveAsSequenceFile (
726725 basepath + "/unbatched/" )
727726
728- unbatched_sequence = sorted (self .sc .sequenceFile (basepath + "/unbatched/" ,
729- batchSize = 1 ).collect ())
727+ unbatched_sequence = sorted (self .sc .sequenceFile (
728+ basepath + "/unbatched/" ,
729+ batchSize = 1 ).collect ())
730730 self .assertEqual (unbatched_sequence , ei )
731731
732- unbatched_hadoopFile = sorted (
733- self . sc . hadoopFile ( basepath + "/unbatched/" ,
734- "org.apache.hadoop.mapred.SequenceFileInputFormat" ,
735- "org.apache.hadoop.io.IntWritable" ,
736- "org.apache.hadoop.io.Text" ,
737- batchSize = 1 ).collect ())
732+ unbatched_hadoopFile = sorted (self . sc . hadoopFile (
733+ basepath + "/unbatched/" ,
734+ "org.apache.hadoop.mapred.SequenceFileInputFormat" ,
735+ "org.apache.hadoop.io.IntWritable" ,
736+ "org.apache.hadoop.io.Text" ,
737+ batchSize = 1 ).collect ())
738738 self .assertEqual (unbatched_hadoopFile , ei )
739739
740- unbatched_newAPIHadoopFile = sorted (
741- self .sc .newAPIHadoopFile (
742- basepath + "/unbatched/" ,
743- "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat" ,
744- "org.apache.hadoop.io.IntWritable" ,
745- "org.apache.hadoop.io.Text" ,
746- batchSize = 1 ).collect ())
740+ unbatched_newAPIHadoopFile = sorted (self .sc .newAPIHadoopFile (
741+ basepath + "/unbatched/" ,
742+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat" ,
743+ "org.apache.hadoop.io.IntWritable" ,
744+ "org.apache.hadoop.io.Text" ,
745+ batchSize = 1 ).collect ())
747746 self .assertEqual (unbatched_newAPIHadoopFile , ei )
748747
749748 oldconf = {"mapred.input.dir" : basepath + "/unbatched/" }
@@ -949,9 +948,8 @@ def test_module_dependency(self):
949948 |def myfunc(x):
950949 | return x + 1
951950 """ )
952- proc = subprocess .Popen (
953- [self .sparkSubmit , "--py-files" , zip , script ],
954- stdout = subprocess .PIPE )
951+ proc = subprocess .Popen ([self .sparkSubmit , "--py-files" , zip , script ],
952+ stdout = subprocess .PIPE )
955953 out , err = proc .communicate ()
956954 self .assertEqual (0 , proc .returncode )
957955 self .assertIn ("[2, 3, 4]" , out )
@@ -969,10 +967,9 @@ def test_module_dependency_on_cluster(self):
969967 |def myfunc(x):
970968 | return x + 1
971969 """ )
972- proc = subprocess .Popen (
973- [self .sparkSubmit , "--py-files" , zip , "--master" ,
974- "local-cluster[1,1,512]" , script ],
975- stdout = subprocess .PIPE )
970+ proc = subprocess .Popen ([self .sparkSubmit , "--py-files" , zip , "--master" ,
971+ "local-cluster[1,1,512]" , script ],
972+ stdout = subprocess .PIPE )
976973 out , err = proc .communicate ()
977974 self .assertEqual (0 , proc .returncode )
978975 self .assertIn ("[2, 3, 4]" , out )
0 commit comments