@@ -20,6 +20,7 @@ package org.apache.spark
2020import scala .concurrent .Await
2121
2222import akka .actor ._
23+ import akka .testkit .TestActorRef
2324import org .scalatest .FunSuite
2425
2526import org .apache .spark .scheduler .MapStatus
@@ -100,7 +101,25 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
100101 }
101102
102103 test(" remote fetch" ) {
103- val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(conf)
104+ val hostname = " localhost"
105+ val (actorSystem, boundPort) = AkkaUtils .createActorSystem(" spark" , hostname, 0 , conf = conf,
106+ securityManager = new SecurityManager (conf))
107+
108+ // Will be cleared by LocalSparkContext
109+ System .setProperty(" spark.driver.port" , boundPort.toString)
110+
111+ val masterTracker = new MapOutputTrackerMaster (conf)
112+ masterTracker.trackerActor = actorSystem.actorOf(
113+ Props (new MapOutputTrackerMasterActor (masterTracker, conf)), " MapOutputTracker" )
114+
115+ val (slaveSystem, _) = AkkaUtils .createActorSystem(" spark-slave" , hostname, 0 , conf = conf,
116+ securityManager = new SecurityManager (conf))
117+ val slaveTracker = new MapOutputTracker (conf)
118+ val selection = slaveSystem.actorSelection(
119+ s " akka.tcp://spark@localhost: $boundPort/user/MapOutputTracker " )
120+ val timeout = AkkaUtils .lookupTimeout(conf)
121+ slaveTracker.trackerActor = Await .result(selection.resolveOne(timeout), timeout)
122+
104123 masterTracker.registerShuffle(10 , 1 )
105124 masterTracker.incrementEpoch()
106125 slaveTracker.updateEpoch(masterTracker.getEpoch)
@@ -113,7 +132,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
113132 masterTracker.incrementEpoch()
114133 slaveTracker.updateEpoch(masterTracker.getEpoch)
115134 assert(slaveTracker.getServerStatuses(10 , 0 ).toSeq ===
116- Seq ((BlockManagerId (" a" , " hostA" , 1000 , 0 ), size1000)))
135+ Seq ((BlockManagerId (" a" , " hostA" , 1000 , 0 ), size1000)))
117136
118137 masterTracker.unregisterMapOutput(10 , 0 , BlockManagerId (" a" , " hostA" , 1000 , 0 ))
119138 masterTracker.incrementEpoch()
@@ -128,42 +147,25 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
128147 val newConf = new SparkConf
129148 newConf.set(" spark.akka.frameSize" , " 1" )
130149 newConf.set(" spark.akka.askTimeout" , " 1" ) // Fail fast
131- val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(newConf)
150+
151+ val masterTracker = new MapOutputTrackerMaster (conf)
152+ val actorSystem = ActorSystem (" test" )
153+ val actorRef = TestActorRef [MapOutputTrackerMasterActor ](
154+ new MapOutputTrackerMasterActor (masterTracker, newConf))(actorSystem)
155+ val masterActor = actorRef.underlyingActor
132156
133157 // Frame size should be ~123B, and no exception should be thrown
134158 masterTracker.registerShuffle(10 , 1 )
135159 masterTracker.registerMapOutput(10 , 0 , new MapStatus (
136160 BlockManagerId (" 88" , " mph" , 1000 , 0 ), Array .fill[Byte ](10 )(0 )))
137- slaveTracker.getServerStatuses( 10 , 0 )
161+ masterActor.receive( GetMapOutputStatuses ( 10 ) )
138162
139163 // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception
140164 masterTracker.registerShuffle(20 , 100 )
141165 (0 until 100 ).foreach { i =>
142166 masterTracker.registerMapOutput(20 , i, new MapStatus (
143167 BlockManagerId (" 999" , " mps" , 1000 , 0 ), Array .fill[Byte ](4000000 )(0 )))
144168 }
145- intercept[SparkException ] { slaveTracker.getServerStatuses(20 , 0 ) }
146- }
147-
148- private def setUpMasterSlaveSystem (conf : SparkConf ) = {
149- val hostname = " localhost"
150- val (actorSystem, boundPort) = AkkaUtils .createActorSystem(" spark" , hostname, 0 , conf = conf,
151- securityManager = new SecurityManager (conf))
152-
153- // Will be cleared by LocalSparkContext
154- System .setProperty(" spark.driver.port" , boundPort.toString)
155-
156- val masterTracker = new MapOutputTrackerMaster (conf)
157- masterTracker.trackerActor = actorSystem.actorOf(
158- Props (new MapOutputTrackerMasterActor (masterTracker, conf)), " MapOutputTracker" )
159-
160- val (slaveSystem, _) = AkkaUtils .createActorSystem(" spark-slave" , hostname, 0 , conf = conf,
161- securityManager = new SecurityManager (conf))
162- val slaveTracker = new MapOutputTracker (conf)
163- val selection = slaveSystem.actorSelection(
164- s " akka.tcp://spark@localhost: $boundPort/user/MapOutputTracker " )
165- val timeout = AkkaUtils .lookupTimeout(conf)
166- slaveTracker.trackerActor = Await .result(selection.resolveOne(timeout), timeout)
167- (masterTracker, slaveTracker)
169+ intercept[SparkException ] { masterActor.receive(GetMapOutputStatuses (20 )) }
168170 }
169171}
0 commit comments