@@ -26,7 +26,6 @@ import scala.util.Random
2626
2727import kafka .serializer .StringDecoder
2828import kafka .utils .{ZKGroupTopicDirs , ZkUtils }
29- import org .apache .commons .io .FileUtils
3029import org .scalatest .{BeforeAndAfter , BeforeAndAfterAll , FunSuite }
3130import org .scalatest .concurrent .Eventually
3231
@@ -61,15 +60,12 @@ class ReliableKafkaStreamSuite extends FunSuite
6160 " group.id" -> groupId,
6261 " auto.offset.reset" -> " smallest"
6362 )
64- Utils .deleteRecursively(tempDirectory)
65- tearDownKafka ()
63+
64+ tempDirectory = Utils .createTempDir ()
6665 }
6766
6867 override def afterAll (): Unit = {
69- if (tempDirectory != null && tempDirectory.exists()) {
70- FileUtils .deleteDirectory(tempDirectory)
71- tempDirectory = null
72- }
68+ Utils .deleteRecursively(tempDirectory)
7369
7470 if (kafkaTestUtils != null ) {
7571 kafkaTestUtils.teardown()
@@ -79,7 +75,6 @@ class ReliableKafkaStreamSuite extends FunSuite
7975
8076 before {
8177 ssc = new StreamingContext (sparkConf, Milliseconds (500 ))
82- tempDirectory = Files .createTempDir()
8378 ssc.checkpoint(tempDirectory.getAbsolutePath)
8479 }
8580
@@ -90,7 +85,6 @@ class ReliableKafkaStreamSuite extends FunSuite
9085 }
9186 }
9287
93-
9488 test(" Reliable Kafka input stream with single topic" ) {
9589 val topic = " test-topic"
9690 kafkaTestUtils.createTopic(topic)
0 commit comments