@@ -24,11 +24,12 @@ import scala.io.Source
2424import com .google .common .io .Files
2525import org .apache .hadoop .io ._
2626import org .apache .hadoop .io .compress .DefaultCodec
27- import org .apache .hadoop .mapred .FileAlreadyExistsException
27+ import org .apache .hadoop .mapred .{JobConf , FileAlreadyExistsException , TextOutputFormat }
28+ import org .apache .hadoop .mapreduce .lib .output .{TextOutputFormat => NewTextOutputFormat }
29+ import org .apache .hadoop .mapreduce .Job
2830import org .scalatest .FunSuite
2931
3032import org .apache .spark .SparkContext ._
31- import org .apache .hadoop .mapreduce .lib .output .{FileOutputFormat , TextOutputFormat }
3233
3334class FileSuite extends FunSuite with LocalSparkContext {
3435
@@ -236,18 +237,44 @@ class FileSuite extends FunSuite with LocalSparkContext {
236237 val tempdir = Files .createTempDir()
237238 val randomRDD = sc.parallelize(Array ((" key1" , " a" ), (" key2" , " a" ), (" key3" , " b" ), (" key4" , " c" )), 1 )
238239 intercept[FileAlreadyExistsException ] {
239- randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat [String , String ]](tempdir.getPath)
240+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat [String , String ]](tempdir.getPath)
240241 }
241242 }
242243
243244 test (" prevent user from overwriting the non-empty directory (new Hadoop API)" ) {
244245 sc = new SparkContext (" local" , " test" )
245246 val tempdir = Files .createTempDir()
246247 val randomRDD = sc.parallelize(Array ((" key1" , " a" ), (" key2" , " a" ), (" key3" , " b" ), (" key4" , " c" )), 1 )
247- randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat [String , String ]](tempdir.getPath + " /output" )
248+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat [String , String ]](tempdir.getPath + " /output" )
248249 assert(new File (tempdir.getPath + " /output/part-r-00000" ).exists() === true )
249250 intercept[FileAlreadyExistsException ] {
250- randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat [String , String ]](tempdir.getPath)
251+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat [String , String ]](tempdir.getPath)
251252 }
252253 }
254+
255+ test (" save Hadoop Dataset through old Hadoop API" ) {
256+ sc = new SparkContext (" local" , " test" )
257+ val tempdir = Files .createTempDir()
258+ val randomRDD = sc.parallelize(Array ((" key1" , " a" ), (" key2" , " a" ), (" key3" , " b" ), (" key4" , " c" )), 1 )
259+ val job = new JobConf ()
260+ job.setOutputKeyClass(classOf [String ])
261+ job.setOutputValueClass(classOf [String ])
262+ job.set(" mapred.output.format.class" , classOf [TextOutputFormat [String , String ]].getName)
263+ job.set(" mapred.output.dir" , tempdir.getPath + " /outputDataset_old" )
264+ randomRDD.saveAsHadoopDataset(job)
265+ assert(new File (tempdir.getPath + " /outputDataset_old/part-00000" ).exists() === true )
266+ }
267+
268+ test (" save Hadoop Dataset through new Hadoop API" ) {
269+ sc = new SparkContext (" local" , " test" )
270+ val tempdir = Files .createTempDir()
271+ val randomRDD = sc.parallelize(Array ((" key1" , " a" ), (" key2" , " a" ), (" key3" , " b" ), (" key4" , " c" )), 1 )
272+ val job = new Job (sc.hadoopConfiguration)
273+ job.setOutputKeyClass(classOf [String ])
274+ job.setOutputValueClass(classOf [String ])
275+ job.setOutputFormatClass(classOf [NewTextOutputFormat [String , String ]])
276+ job.getConfiguration.set(" mapred.output.dir" , tempdir.getPath + " /outputDataset_new" )
277+ randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
278+ assert(new File (tempdir.getPath + " /outputDataset_new/part-r-00000" ).exists() === true )
279+ }
253280}
0 commit comments