Skip to content

Commit a8d53af

Browse files
zsxwingrxin
authored andcommitted
[SPARK-5124][Core] A standard RPC interface and an Akka implementation
This PR added a standard internal RPC interface for Spark and an Akka implementation. See [the design document](https://issues.apache.org/jira/secure/attachment/12698710/Pluggable%20RPC%20-%20draft%202.pdf) for more details. I will split the whole work into multiple PRs to make it easier for code review. This is the first PR and avoid to touch too many files. Author: zsxwing <[email protected]> Closes #4588 from zsxwing/rpc-part1 and squashes the following commits: fe3df4c [zsxwing] Move registerEndpoint and use actorSystem.dispatcher in asyncSetupEndpointRefByURI f6f3287 [zsxwing] Remove RpcEndpointRef.toURI 8bd1097 [zsxwing] Fix docs and the code style f459380 [zsxwing] Add RpcAddress.fromURI and rename urls to uris b221398 [zsxwing] Move send methods above ask methods 15cfd7b [zsxwing] Merge branch 'master' into rpc-part1 9ffa997 [zsxwing] Fix MiMa tests 78a1733 [zsxwing] Merge remote-tracking branch 'origin/master' into rpc-part1 385b9c3 [zsxwing] Fix the code style and add docs 2cc3f78 [zsxwing] Add an asynchronous version of setupEndpointRefByUrl e8dfec3 [zsxwing] Remove 'sendWithReply(message: Any, sender: RpcEndpointRef): Unit' 08564ae [zsxwing] Add RpcEnvFactory to create RpcEnv e5df4ca [zsxwing] Handle AkkaFailure(e) in Actor ec7c5b0 [zsxwing] Fix docs 7fc95e1 [zsxwing] Implement askWithReply in RpcEndpointRef 9288406 [zsxwing] Document thread-safety for setupThreadSafeEndpoint 3007c09 [zsxwing] Move setupDriverEndpointRef to RpcUtils and rename to makeDriverRef c425022 [zsxwing] Fix the code style 5f87700 [zsxwing] Move the logical of processing message to a private function 3e56123 [zsxwing] Use lazy to eliminate CountDownLatch 07f128f [zsxwing] Remove ActionScheduler.scala 4d34191 [zsxwing] Remove scheduler from RpcEnv 7cdd95e [zsxwing] Add docs for RpcEnv 51e6667 [zsxwing] Add 'sender' to RpcCallContext and rename the parameter of receiveAndReply to 'context' ffc1280 [zsxwing] Rename 'fail' to 'sendFailure' and other minor code style changes 28e6d0f [zsxwing] Add onXXX for network events and remove the companion objects of network events 3751c97 [zsxwing] Rename RpcResponse to RpcCallContext fe7d1ff [zsxwing] Add explicit reply in rpc 7b9e0c9 [zsxwing] Fix the indentation 04a106e [zsxwing] Remove NopCancellable and add a const NOP in object SettableCancellable 2a579f4 [zsxwing] Remove RpcEnv.systemName 155b987 [zsxwing] Change newURI to uriOf and add some comments 45b2317 [zsxwing] A standard RPC interface and An Akka implementation
1 parent 0e2753f commit a8d53af

File tree

13 files changed

+1466
-86
lines changed

13 files changed

+1466
-86
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ import org.apache.spark.metrics.MetricsSystem
3434
import org.apache.spark.network.BlockTransferService
3535
import org.apache.spark.network.netty.NettyBlockTransferService
3636
import org.apache.spark.network.nio.NioBlockTransferService
37+
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
38+
import org.apache.spark.rpc.akka.AkkaRpcEnv
3739
import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
38-
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorActor
40+
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
3941
import org.apache.spark.serializer.Serializer
4042
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
4143
import org.apache.spark.storage._
42-
import org.apache.spark.util.{AkkaUtils, Utils}
44+
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
4345

4446
/**
4547
* :: DeveloperApi ::
@@ -54,7 +56,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
5456
@DeveloperApi
5557
class SparkEnv (
5658
val executorId: String,
57-
val actorSystem: ActorSystem,
59+
private[spark] val rpcEnv: RpcEnv,
5860
val serializer: Serializer,
5961
val closureSerializer: Serializer,
6062
val cacheManager: CacheManager,
@@ -71,6 +73,9 @@ class SparkEnv (
7173
val outputCommitCoordinator: OutputCommitCoordinator,
7274
val conf: SparkConf) extends Logging {
7375

76+
// TODO Remove actorSystem
77+
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
78+
7479
private[spark] var isStopped = false
7580
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
7681

@@ -91,7 +96,8 @@ class SparkEnv (
9196
blockManager.master.stop()
9297
metricsSystem.stop()
9398
outputCommitCoordinator.stop()
94-
actorSystem.shutdown()
99+
rpcEnv.shutdown()
100+
95101
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
96102
// down, but let's call it anyway in case it gets fixed in a later release
97103
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
@@ -236,16 +242,15 @@ object SparkEnv extends Logging {
236242
val securityManager = new SecurityManager(conf)
237243

238244
// Create the ActorSystem for Akka and get the port it binds to.
239-
val (actorSystem, boundPort) = {
240-
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
241-
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
242-
}
245+
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
246+
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)
247+
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
243248

244249
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
245250
if (isDriver) {
246-
conf.set("spark.driver.port", boundPort.toString)
251+
conf.set("spark.driver.port", rpcEnv.address.port.toString)
247252
} else {
248-
conf.set("spark.executor.port", boundPort.toString)
253+
conf.set("spark.executor.port", rpcEnv.address.port.toString)
249254
}
250255

251256
// Create an instance of the class with the given name, possibly initializing it with our conf
@@ -290,6 +295,15 @@ object SparkEnv extends Logging {
290295
}
291296
}
292297

298+
def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = {
299+
if (isDriver) {
300+
logInfo("Registering " + name)
301+
rpcEnv.setupEndpoint(name, endpointCreator)
302+
} else {
303+
RpcUtils.makeDriverRef(name, conf, rpcEnv)
304+
}
305+
}
306+
293307
val mapOutputTracker = if (isDriver) {
294308
new MapOutputTrackerMaster(conf)
295309
} else {
@@ -377,13 +391,13 @@ object SparkEnv extends Logging {
377391
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
378392
new OutputCommitCoordinator(conf)
379393
}
380-
val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator",
381-
new OutputCommitCoordinatorActor(outputCommitCoordinator))
382-
outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor)
394+
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
395+
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
396+
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
383397

384398
val envInstance = new SparkEnv(
385399
executorId,
386-
actorSystem,
400+
rpcEnv,
387401
serializer,
388402
closureSerializer,
389403
cacheManager,

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@ package org.apache.spark.deploy.worker
1919

2020
import java.io.File
2121

22-
import akka.actor._
23-
2422
import org.apache.spark.{SecurityManager, SparkConf}
25-
import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
23+
import org.apache.spark.rpc.RpcEnv
24+
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
2625

2726
/**
2827
* Utility object for launching driver programs such that they share fate with the Worker process.
@@ -39,9 +38,9 @@ object DriverWrapper {
3938
*/
4039
case workerUrl :: userJar :: mainClass :: extraArgs =>
4140
val conf = new SparkConf()
42-
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
41+
val rpcEnv = RpcEnv.create("Driver",
4342
Utils.localHostName(), 0, conf, new SecurityManager(conf))
44-
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
43+
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))
4544

4645
val currentLoader = Thread.currentThread.getContextClassLoader
4746
val userJarUrl = new File(userJar).toURI().toURL()
@@ -58,7 +57,7 @@ object DriverWrapper {
5857
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
5958
mainMethod.invoke(null, extraArgs.toArray[String])
6059

61-
actorSystem.shutdown()
60+
rpcEnv.shutdown()
6261

6362
case _ =>
6463
System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")

core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,58 +17,63 @@
1717

1818
package org.apache.spark.deploy.worker
1919

20-
import akka.actor.{Actor, Address, AddressFromURIString}
21-
import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent}
22-
2320
import org.apache.spark.Logging
2421
import org.apache.spark.deploy.DeployMessages.SendHeartbeat
25-
import org.apache.spark.util.ActorLogReceive
22+
import org.apache.spark.rpc._
2623

2724
/**
2825
* Actor which connects to a worker process and terminates the JVM if the connection is severed.
2926
* Provides fate sharing between a worker and its associated child processes.
3027
*/
31-
private[spark] class WorkerWatcher(workerUrl: String)
32-
extends Actor with ActorLogReceive with Logging {
33-
34-
override def preStart() {
35-
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
28+
private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: String)
29+
extends RpcEndpoint with Logging {
3630

31+
override def onStart() {
3732
logInfo(s"Connecting to worker $workerUrl")
38-
val worker = context.actorSelection(workerUrl)
39-
worker ! SendHeartbeat // need to send a message here to initiate connection
33+
if (!isTesting) {
34+
rpcEnv.asyncSetupEndpointRefByURI(workerUrl)
35+
}
4036
}
4137

4238
// Used to avoid shutting down JVM during tests
39+
// In the normal case, exitNonZero will call `System.exit(-1)` to shutdown the JVM. In the unit
40+
// test, the user should call `setTesting(true)` so that `exitNonZero` will set `isShutDown` to
41+
// true rather than calling `System.exit`. The user can check `isShutDown` to know if
42+
// `exitNonZero` is called.
4343
private[deploy] var isShutDown = false
4444
private[deploy] def setTesting(testing: Boolean) = isTesting = testing
4545
private var isTesting = false
4646

4747
// Lets us filter events only from the worker's actor system
48-
private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
49-
private def isWorker(address: Address) = address.hostPort == expectedHostPort
48+
private val expectedAddress = RpcAddress.fromURIString(workerUrl)
49+
private def isWorker(address: RpcAddress) = expectedAddress == address
5050

5151
private def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
5252

53-
override def receiveWithLogging: PartialFunction[Any, Unit] = {
54-
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
55-
logInfo(s"Successfully connected to $workerUrl")
53+
override def receive: PartialFunction[Any, Unit] = {
54+
case e => logWarning(s"Received unexpected message: $e")
55+
}
5656

57-
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
58-
if isWorker(remoteAddress) =>
59-
// These logs may not be seen if the worker (and associated pipe) has died
60-
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
61-
logError(s"Error was: $cause")
62-
exitNonZero()
57+
override def onConnected(remoteAddress: RpcAddress): Unit = {
58+
if (isWorker(remoteAddress)) {
59+
logInfo(s"Successfully connected to $workerUrl")
60+
}
61+
}
6362

64-
case DisassociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
63+
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
64+
if (isWorker(remoteAddress)) {
6565
// This log message will never be seen
6666
logError(s"Lost connection to worker actor $workerUrl. Exiting.")
6767
exitNonZero()
68+
}
69+
}
6870

69-
case e: AssociationEvent =>
70-
// pass through association events relating to other remote actor systems
71-
72-
case e => logWarning(s"Received unexpected actor system event: $e")
71+
override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
72+
if (isWorker(remoteAddress)) {
73+
// These logs may not be seen if the worker (and associated pipe) has died
74+
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
75+
logError(s"Error was: $cause")
76+
exitNonZero()
77+
}
7378
}
7479
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
169169
driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
170170
name = "Executor")
171171
workerUrl.foreach { url =>
172-
env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
172+
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
173173
}
174174
env.actorSystem.awaitTermination()
175175
}

0 commit comments

Comments
 (0)