1818package org .apache .spark .streaming
1919
2020import java .io .File
21+ import java .nio .charset .Charset
2122
2223import scala .collection .mutable .ArrayBuffer
2324import scala .reflect .ClassTag
24- import org .apache .commons .io .FileUtils
2525import com .google .common .io .Files
2626import org .apache .hadoop .fs .{Path , FileSystem }
2727import org .apache .hadoop .conf .Configuration
2828import org .apache .spark .streaming .StreamingContext ._
2929import org .apache .spark .streaming .dstream .{DStream , FileInputDStream }
3030import org .apache .spark .streaming .util .ManualClock
3131import org .apache .spark .util .Utils
32- import org .apache .spark .SparkConf
3332
3433/**
3534 * This test suites tests the checkpointing functionality of DStreams -
@@ -46,13 +45,13 @@ class CheckpointSuite extends TestSuiteBase {
4645
4746 override def beforeFunction () {
4847 super .beforeFunction()
49- FileUtils .deleteDirectory (new File (checkpointDir))
48+ Utils .deleteRecursively (new File (checkpointDir))
5049 }
5150
5251 override def afterFunction () {
5352 super .afterFunction()
5453 if (ssc != null ) ssc.stop()
55- FileUtils .deleteDirectory (new File (checkpointDir))
54+ Utils .deleteRecursively (new File (checkpointDir))
5655 }
5756
5857 test(" basic rdd checkpoints + dstream graph checkpoint recovery" ) {
@@ -256,7 +255,7 @@ class CheckpointSuite extends TestSuiteBase {
256255 // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
257256 Thread .sleep(1000 )
258257 for (i <- Seq (1 , 2 , 3 )) {
259- FileUtils .writeStringToFile( new File (testDir, i.toString), i.toString + " \n " )
258+ Files .write(i + " \n " , new File (testDir, i.toString), Charset .forName( " UTF-8 " ) )
260259 // wait to make sure that the file is written such that it gets shown in the file listings
261260 Thread .sleep(1000 )
262261 }
@@ -273,7 +272,7 @@ class CheckpointSuite extends TestSuiteBase {
273272
274273 // Create files while the master is down
275274 for (i <- Seq (4 , 5 , 6 )) {
276- FileUtils .writeStringToFile( new File (testDir, i.toString), i.toString + " \n " )
275+ Files .write(i + " \n " , new File (testDir, i.toString), Charset .forName( " UTF-8 " ) )
277276 Thread .sleep(1000 )
278277 }
279278
@@ -289,7 +288,7 @@ class CheckpointSuite extends TestSuiteBase {
289288 // Restart stream computation
290289 ssc.start()
291290 for (i <- Seq (7 , 8 , 9 )) {
292- FileUtils .writeStringToFile( new File (testDir, i.toString), i.toString + " \n " )
291+ Files .write(i + " \n " , new File (testDir, i.toString), Charset .forName( " UTF-8 " ) )
293292 Thread .sleep(1000 )
294293 }
295294 Thread .sleep(1000 )
0 commit comments