Skip to content

Commit 6e1bfb8

Browse files
committed
Tweaks testuite to create spark contxt lazily to prevent contxt leaks.
1 parent 9c86a61 commit 6e1bfb8

File tree

1 file changed

+15
-11
lines changed

1 file changed

+15
-11
lines changed

streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,24 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
2727
import com.google.common.io.Files
2828
import org.apache.hadoop.conf.Configuration
2929

30-
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
30+
import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId}
3131
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
3232

3333
class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
3434
val conf = new SparkConf()
3535
.setMaster("local[2]")
3636
.setAppName(this.getClass.getSimpleName)
37-
val sparkContext = new SparkContext(conf)
3837
val hadoopConf = new Configuration()
39-
val blockManager = sparkContext.env.blockManager
4038
// Since the same BM is reused in all tests, use an atomic int to generate ids
4139
val idGenerator = new AtomicInteger(0)
40+
41+
var sparkContext: SparkContext = null
42+
var blockManager: BlockManager = null
4243
var file: File = null
4344
var dir: File = null
4445

4546
before {
47+
blockManager = sparkContext.env.blockManager
4648
dir = Files.createTempDir()
4749
file = new File(dir, "BlockManagerWrite")
4850
}
@@ -52,6 +54,10 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAn
5254
dir.delete()
5355
}
5456

57+
override def beforeAll(): Unit = {
58+
sparkContext = new SparkContext(conf)
59+
}
60+
5561
override def afterAll(): Unit = {
5662
// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not
5763
// get imported properly by sbt even if it is created.
@@ -60,19 +66,19 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAn
6066
}
6167

6268
test("Data available in BM and HDFS") {
63-
doTestHDFSBackedRDD(5, 5, 20, 5)
69+
testHDFSBackedRDD(5, 5, 20, 5)
6470
}
6571

6672
test("Data available in in BM but not in HDFS") {
67-
doTestHDFSBackedRDD(5, 0, 20, 5)
73+
testHDFSBackedRDD(5, 0, 20, 5)
6874
}
6975

7076
test("Data available in in HDFS and not in BM") {
71-
doTestHDFSBackedRDD(0, 5, 20, 5)
77+
testHDFSBackedRDD(0, 5, 20, 5)
7278
}
7379

7480
test("Data partially available in BM, and the rest in HDFS") {
75-
doTestHDFSBackedRDD(3, 2, 20, 5)
81+
testHDFSBackedRDD(3, 2, 20, 5)
7682
}
7783

7884
/**
@@ -82,7 +88,7 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAn
8288
* @param blockCount Number of blocks to write (therefore, total # of events per block =
8389
* total/blockCount
8490
*/
85-
private def doTestHDFSBackedRDD(
91+
private def testHDFSBackedRDD(
8692
writeToBMCount: Int,
8793
writeToHDFSCount: Int,
8894
total: Int,
@@ -152,8 +158,6 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAn
152158
}
153159

154160
private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = {
155-
(0 until count).map {
156-
_ => new WriteAheadLogFileSegment("random", 0l, 0)
157-
}
161+
(0 until count).map { _ => new WriteAheadLogFileSegment("random", 0l, 0) }
158162
}
159163
}

0 commit comments

Comments
 (0)