1717
1818package org .apache .spark
1919
20- import org .scalatest .BeforeAndAfter
2120import org .scalatest .FunSuite
2221import org .scalatest .concurrent .Timeouts ._
2322import org .scalatest .Matchers
2423import org .scalatest .time .{Millis , Span }
2524
2625import org .apache .spark .storage .{RDDBlockId , StorageLevel }
26+ import org .apache .spark .util .ResetSystemProperties
2727
2828class NotSerializableClass
2929class NotSerializableExn (val notSer : NotSerializableClass ) extends Throwable () {}
3030
3131
32- class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
32+ class DistributedSuite extends FunSuite with Matchers with ResetSystemProperties
3333 with LocalSparkContext {
3434
3535 val clusterUrl = " local-cluster[2,1,512]"
3636
37- after {
38- System .clearProperty(" spark.reducer.maxMbInFlight" )
39- System .clearProperty(" spark.storage.memoryFraction" )
40- }
41-
4237 test(" task throws not serializable exception" ) {
4338 // Ensures that executors do not crash when an exn is not serializable. If executors crash,
4439 // this test will hang. Correct behavior is that executors don't crash but fail tasks
@@ -92,7 +87,6 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
9287 val groups = pairs.groupByKey(2 ).map(x => (x._1, x._2.size)).collect()
9388 assert(groups.length === 16 )
9489 assert(groups.map(_._2).sum === 2000 )
95- // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
9690 }
9791
9892 test(" accumulators" ) {
@@ -210,15 +204,13 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
210204 }
211205
212206 test(" compute without caching when no partitions fit in memory" ) {
213- System .setProperty(" spark.storage.memoryFraction" , " 0.0001" )
214207 sc = new SparkContext (clusterUrl, " test" )
215208 // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
216209 // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
217210 val data = sc.parallelize(1 to 4000000 , 2 ).persist(StorageLevel .MEMORY_ONLY_SER )
218211 assert(data.count() === 4000000 )
219212 assert(data.count() === 4000000 )
220213 assert(data.count() === 4000000 )
221- System .clearProperty(" spark.storage.memoryFraction" )
222214 }
223215
224216 test(" compute when only some partitions fit in memory" ) {
@@ -231,7 +223,6 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
231223 assert(data.count() === 4000000 )
232224 assert(data.count() === 4000000 )
233225 assert(data.count() === 4000000 )
234- System .clearProperty(" spark.storage.memoryFraction" )
235226 }
236227
237228 test(" passing environment variables to cluster" ) {
0 commit comments