@@ -541,7 +541,12 @@ def test_oldhadoop(self):
541541 "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable" ,
542542 "mapred.output.dir" : basepath + "/olddataset/" }
543543 self .sc .parallelize (dict_data ).saveAsHadoopDataset (conf )
544- old_dataset = sorted (self .sc .sequenceFile (basepath + "/olddataset/" ).collect ())
544+ input_conf = {"mapred.input.dir" : basepath + "/olddataset/" }
545+ old_dataset = sorted (self .sc .hadoopRDD (
546+ "org.apache.hadoop.mapred.SequenceFileInputFormat" ,
547+ "org.apache.hadoop.io.IntWritable" ,
548+ "org.apache.hadoop.io.MapWritable" ,
549+ conf = input_conf ).collect ())
545550 self .assertEqual (old_dataset , dict_data )
546551
547552 def test_newhadoop (self ):
@@ -571,10 +576,13 @@ def test_newhadoop(self):
571576 "mapred.output.dir" : basepath + "/newdataset/" }
572577 self .sc .parallelize (array_data ).saveAsNewAPIHadoopDataset (conf ,
573578 valueConverter = "org.apache.spark.api.python.DoubleArrayToWritableConverter" )
574- new_dataset = sorted (self .sc .sequenceFile (
575- basepath + "/newdataset/" ,
576- valueClass = "org.apache.spark.api.python.DoubleArrayWritable" ,
577- valueConverter = "org.apache.spark.api.python.WritableToDoubleArrayConverter" ).collect ())
579+ input_conf = {"mapred.input.dir" : basepath + "/newdataset/" }
580+ new_dataset = sorted (self .sc .newAPIHadoopRDD (
581+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat" ,
582+ "org.apache.hadoop.io.IntWritable" ,
583+ "org.apache.spark.api.python.DoubleArrayWritable" ,
584+ valueConverter = "org.apache.spark.api.python.WritableToDoubleArrayConverter" ,
585+ conf = input_conf ).collect ())
578586 self .assertEqual (new_dataset , array_data )
579587
580588 def test_newolderror (self ):
0 commit comments