Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
45b2317
A standard RPC interface and An Akka implementation
zsxwing Feb 13, 2015
155b987
Change newURI to uriOf and add some comments
zsxwing Feb 26, 2015
2a579f4
Remove RpcEnv.systemName
zsxwing Feb 26, 2015
04a106e
Remove NopCancellable and add a const NOP in object SettableCancellable
zsxwing Feb 26, 2015
7b9e0c9
Fix the indentation
zsxwing Feb 27, 2015
fe7d1ff
Add explicit reply in rpc
zsxwing Mar 4, 2015
3751c97
Rename RpcResponse to RpcCallContext
zsxwing Mar 6, 2015
28e6d0f
Add onXXX for network events and remove the companion objects of netw…
zsxwing Mar 6, 2015
ffc1280
Rename 'fail' to 'sendFailure' and other minor code style changes
zsxwing Mar 6, 2015
51e6667
Add 'sender' to RpcCallContext and rename the parameter of receiveAnd…
zsxwing Mar 6, 2015
7cdd95e
Add docs for RpcEnv
zsxwing Mar 6, 2015
4d34191
Remove scheduler from RpcEnv
zsxwing Mar 6, 2015
07f128f
Remove ActionScheduler.scala
zsxwing Mar 6, 2015
3e56123
Use lazy to eliminate CountDownLatch
zsxwing Mar 6, 2015
5f87700
Move the logical of processing message to a private function
zsxwing Mar 6, 2015
c425022
Fix the code style
zsxwing Mar 10, 2015
3007c09
Move setupDriverEndpointRef to RpcUtils and rename to makeDriverRef
zsxwing Mar 10, 2015
9288406
Document thread-safety for setupThreadSafeEndpoint
zsxwing Mar 10, 2015
7fc95e1
Implement askWithReply in RpcEndpointRef
zsxwing Mar 11, 2015
ec7c5b0
Fix docs
zsxwing Mar 11, 2015
e5df4ca
Handle AkkaFailure(e) in Actor
zsxwing Mar 11, 2015
08564ae
Add RpcEnvFactory to create RpcEnv
zsxwing Mar 11, 2015
e8dfec3
Remove 'sendWithReply(message: Any, sender: RpcEndpointRef): Unit'
zsxwing Mar 12, 2015
2cc3f78
Add an asynchronous version of setupEndpointRefByUrl
zsxwing Mar 17, 2015
385b9c3
Fix the code style and add docs
zsxwing Mar 24, 2015
78a1733
Merge remote-tracking branch 'origin/master' into rpc-part1
zsxwing Mar 24, 2015
9ffa997
Fix MiMa tests
zsxwing Mar 24, 2015
15cfd7b
Merge branch 'master' into rpc-part1
zsxwing Mar 30, 2015
b221398
Move send methods above ask methods
zsxwing Mar 30, 2015
f459380
Add RpcAddress.fromURI and rename urls to uris
zsxwing Mar 30, 2015
8bd1097
Fix docs and the code style
zsxwing Mar 30, 2015
f6f3287
Remove RpcEndpointRef.toURI
zsxwing Mar 30, 2015
fe3df4c
Move registerEndpoint and use actorSystem.dispatcher in asyncSetupEnd…
zsxwing Mar 30, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.rpc.akka.AkkaRpcEnv
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: order. swap with import above.

import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorActor
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
Expand All @@ -54,7 +56,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
@DeveloperApi
class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
val rpcEnv: RpcEnv,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, why changing the constructor's signature didn't break MIMA checking?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a developer api

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, not really related to this change; but I believe that while SparkEnv might be a developer api, its constructor is not, right? So the constructor itself could potentially be made private[spark].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and if code outside of Spark is not supposed to use rpcEnv, it also should be private[spark].

val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
Expand All @@ -71,6 +73,9 @@ class SparkEnv (
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {

// TODO Remove actorSystem
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem

private[spark] var isStopped = false
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

Expand All @@ -91,7 +96,8 @@ class SparkEnv (
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
actorSystem.shutdown()
rpcEnv.shutdown()

// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
Expand Down Expand Up @@ -236,16 +242,15 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)

// Create the ActorSystem for Akka and get the port it binds to.
val (actorSystem, boundPort) = {
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
}
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is still needed because the MapOutputTracker stuff hasn't been ported. Is that intentional or were you planning to also port that API to use the new abstraction?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MapOutputTracker stuff will be ported later. However, the systemName is still necessary. Please see my explanation about systemName below.

val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem

// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
if (isDriver) {
conf.set("spark.driver.port", boundPort.toString)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else {
conf.set("spark.executor.port", boundPort.toString)
conf.set("spark.executor.port", rpcEnv.address.port.toString)
}

// Create an instance of the class with the given name, possibly initializing it with our conf
Expand Down Expand Up @@ -290,6 +295,15 @@ object SparkEnv extends Logging {
}
}

def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
rpcEnv.setupDriverEndpointRef(name)
}
}

val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
Expand Down Expand Up @@ -377,13 +391,13 @@ object SparkEnv extends Logging {
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf)
}
val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator",
new OutputCommitCoordinatorActor(outputCommitCoordinator))
outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor)
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

val envInstance = new SparkEnv(
executorId,
actorSystem,
rpcEnv,
serializer,
closureSerializer,
cacheManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File

import akka.actor._

import org.apache.spark.rpc.RpcEnv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import order

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

Expand All @@ -32,9 +33,9 @@ object DriverWrapper {
args.toList match {
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
val rpcEnv = RpcEnv.create("Driver",
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))

val currentLoader = Thread.currentThread.getContextClassLoader
val userJarUrl = new File(userJar).toURI().toURL()
Expand All @@ -51,7 +52,7 @@ object DriverWrapper {
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])

actorSystem.shutdown()
rpcEnv.shutdown()

case _ =>
System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,66 @@

package org.apache.spark.deploy.worker

import akka.actor.{Actor, Address, AddressFromURIString}
import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.Logging
import org.apache.spark.deploy.DeployMessages.SendHeartbeat
import org.apache.spark.util.ActorLogReceive
import org.apache.spark.rpc._

/**
* Actor which connects to a worker process and terminates the JVM if the connection is severed.
* Provides fate sharing between a worker and its associated child processes.
*/
private[spark] class WorkerWatcher(workerUrl: String)
extends Actor with ActorLogReceive with Logging {

override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: String)
extends NetworkRpcEndpoint with Logging {

override def onStart() {
logInfo(s"Connecting to worker $workerUrl")
val worker = context.actorSelection(workerUrl)
worker ! SendHeartbeat // need to send a message here to initiate connection
if (!isTesting) {
val worker = rpcEnv.setupEndpointRefByUrl(workerUrl)
worker.send(SendHeartbeat) // need to send a message here to initiate connection
}
}

// Used to avoid shutting down JVM during tests
// In the normal case, exitNonZero will call `System.exit(-1)` to shutdown the JVM. In the unit
// test, the user should call `setTesting(true)` so that `exitNonZero` will set `isShutDown` to
// true rather than calling `System.exit`. The user can check `isShutDown` to know if
// `exitNonZero` is called.
private[deploy] var isShutDown = false
private[deploy] def setTesting(testing: Boolean) = isTesting = testing
private var isTesting = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u document what this does?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some docs to explain it.


// Lets us filter events only from the worker's actor system
private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
private def isWorker(address: Address) = address.hostPort == expectedHostPort
private val expectedHostPort = new java.net.URI(workerUrl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps RpcAddress.fromUriString(workerUrl) and use of the == method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use RpcAddress.fromUriString

private def isWorker(address: RpcAddress) = {
expectedHostPort.getHost == address.host && expectedHostPort.getPort == address.port
}

def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)

override def receiveWithLogging = {
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
logInfo(s"Successfully connected to $workerUrl")
override def receive(sender: RpcEndpointRef) = {
case e => logWarning(s"Received unexpected actor system event: $e")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "actor system" is very akka-specific.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

}

case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
if isWorker(remoteAddress) =>
// These logs may not be seen if the worker (and associated pipe) has died
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
logError(s"Error was: $cause")
exitNonZero()
override def onConnected(remoteAddress: RpcAddress): Unit = {
if (isWorker(remoteAddress)) {
logInfo(s"Successfully connected to $workerUrl")
}
}

case DisassociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (isWorker(remoteAddress)) {
// This log message will never be seen
logError(s"Lost connection to worker actor $workerUrl. Exiting.")
exitNonZero()
}
}

case e: AssociationEvent =>
// pass through association events relating to other remote actor systems

case e => logWarning(s"Received unexpected actor system event: $e")
override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
if (isWorker(remoteAddress)) {
// These logs may not be seen if the worker (and associated pipe) has died
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
logError(s"Error was: $cause")
exitNonZero()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
name = "Executor")
workerUrl.foreach { url =>
env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.actorSystem.awaitTermination()
}
Expand Down
Loading