@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
2121
2222import org .scalatest .{BeforeAndAfter , BeforeAndAfterAll , FunSuite }
2323
24- import org .apache .spark .{LocalSparkContext , SparkContext , SparkEnv }
24+ import org .apache .spark .{LocalSparkContext , SparkConf , SparkContext , SparkEnv }
2525import org .apache .spark .storage .TaskResultBlockId
2626
2727/**
@@ -55,27 +55,20 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
5555/**
5656 * Tests related to handling task results (both direct and indirect).
5757 */
58- class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
59- with LocalSparkContext {
58+ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
6059
61- override def beforeAll {
62- // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
63- // as we can make it) so the tests don't take too long.
64- System .setProperty(" spark.akka.frameSize" , " 1" )
65- }
66-
67- override def afterAll {
68- System .clearProperty(" spark.akka.frameSize" )
69- }
60+ // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
61+ // as we can make it) so the tests don't take too long.
62+ def conf : SparkConf = new SparkConf ().set(" spark.akka.frameSize" , " 1" )
7063
7164 test(" handling results smaller than Akka frame size" ) {
72- sc = new SparkContext (" local" , " test" )
65+ sc = new SparkContext (" local" , " test" , conf )
7366 val result = sc.parallelize(Seq (1 ), 1 ).map(x => 2 * x).reduce((x, y) => x)
7467 assert(result === 2 )
7568 }
7669
7770 test(" handling results larger than Akka frame size" ) {
78- sc = new SparkContext (" local" , " test" )
71+ sc = new SparkContext (" local" , " test" , conf )
7972 val akkaFrameSize =
8073 sc.env.actorSystem.settings.config.getBytes(" akka.remote.netty.tcp.maximum-frame-size" ).toInt
8174 val result = sc.parallelize(Seq (1 ), 1 ).map(x => 1 .to(akkaFrameSize).toArray).reduce((x, y) => x)
@@ -89,7 +82,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
8982 test(" task retried if result missing from block manager" ) {
9083 // Set the maximum number of task failures to > 0, so that the task set isn't aborted
9184 // after the result is missing.
92- sc = new SparkContext (" local[1,2]" , " test" )
85+ sc = new SparkContext (" local[1,2]" , " test" , conf )
9386 // If this test hangs, it's probably because no resource offers were made after the task
9487 // failed.
9588 val scheduler : TaskSchedulerImpl = sc.taskScheduler match {
0 commit comments