Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
63d2d8d
rpc temp checkin
rxin Dec 18, 2014
85bafc4
Merge branch 'master' into rpc
rxin Dec 19, 2014
c9f2cc9
First version that ran.
rxin Dec 20, 2014
bdc67b5
Minor update.
rxin Dec 24, 2014
1a195d5
Merge branch 'master' into rpc
rxin Dec 24, 2014
2f4b9d8
abstract class => trait
zsxwing Dec 25, 2014
0f7f032
Move Akka classes to org.apache.spark.rpc.akka
zsxwing Dec 25, 2014
2d2cba3
Update the APIs to support to get the `sender` RpcEndPointRef when se…
zsxwing Dec 25, 2014
1fc4a01
endPoint => endpoint
zsxwing Dec 25, 2014
0627986
Remove log api from RpcEndpoint trait
zsxwing Dec 26, 2014
ee988d3
Throw an exception if RpcEndpoint is not in RpcEnv
zsxwing Dec 26, 2014
2e99b4a
Add tests for HeartbeatReceiver
zsxwing Dec 26, 2014
73db9e5
Minor changes
zsxwing Dec 26, 2014
19053ca
Add test for ExecutorActor
zsxwing Dec 26, 2014
c8d8ac1
Merge branch 'master' into rpc
zsxwing Dec 26, 2014
a435eb2
Fault tolerance for RpcEndpoint
zsxwing Dec 27, 2014
12bc1c1
Merge branch 'master' into rpc
zsxwing Dec 27, 2014
2636664
Change CoarseGrainedExecutorBackend to a RpcEndpoint
zsxwing Dec 29, 2014
85cfb33
Change CoarseMesosSchedulerBackend to use RpcEndpoint
zsxwing Dec 29, 2014
21586d0
Merge branch 'master' into rpc
zsxwing Dec 30, 2014
8e561b4
Change MapOutputTrackerMasterActor to use RpcEndpoint
zsxwing Dec 30, 2014
a067228
Fix the code style
zsxwing Dec 30, 2014
b13fbd9
Change DAGScheduler to use RpcEndpoint
zsxwing Dec 30, 2014
0b17ccd
Merge branch 'master' into rpc
zsxwing Dec 31, 2014
595fb61
Merge branch 'master' into rpc
zsxwing Jan 4, 2015
811b6b8
Change BlockManager to use RpcEndpoint
zsxwing Jan 4, 2015
c1d3df8
Change WorkerWatcher to use RpcEndpoint
zsxwing Jan 4, 2015
20682d1
Change Master to use RpcEndpoint
zsxwing Jan 4, 2015
acb18fb
Merge branch 'master' into rpc
zsxwing Jan 5, 2015
7b43e39
Add RpcAddress and change AppClient to use RpcEndpoint
zsxwing Jan 5, 2015
6dff656
Change Worker to use RpcEndpoint
zsxwing Jan 5, 2015
3e90325
Some cleanup
zsxwing Jan 5, 2015
0c9106a
Merge branch 'master' into rpc
zsxwing Jan 6, 2015
9a348cf
Change YarnSchedulerBackend and LocalBackend to use RpcEndpoint
zsxwing Jan 6, 2015
e08d762
Tune the interface for network
zsxwing Jan 6, 2015
1e32c4f
Add NetworkRpcEndpoint for RpcEndpoint interested in network events
zsxwing Jan 6, 2015
9a9c1b1
Fix the code style
zsxwing Jan 6, 2015
a05cba5
Fix AppClient
zsxwing Jan 6, 2015
b80d8b1
Hide ActorSystem into AkkaRpcEnv
zsxwing Jan 6, 2015
afe3997
Make AkkaRpcEnv pluggable
zsxwing Jan 6, 2015
952d468
Add comments and minor interface changes
zsxwing Jan 7, 2015
d8687ba
Add a type parameter to `askTracker`
zsxwing Jan 8, 2015
d99296c
Merge branch 'master' into rpc
zsxwing Jan 8, 2015
6938093
Revert the network changes since they are not ready to review
zsxwing Jan 8, 2015
ef040bf
Fix ReceivedBlockHandlerSuite in streaming
zsxwing Jan 9, 2015
728a110
Merge branch 'master' into rpc
zsxwing Jan 16, 2015
c3359f0
Revert DAGScheduler
zsxwing Jan 16, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark

import akka.actor.Actor
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.rpc.{RpcEnv, RpcEndpointRef, RpcEndpoint}
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.util.ActorLogReceive
import org.apache.spark.storage.BlockManagerId

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand All @@ -37,13 +36,13 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
extends Actor with ActorLogReceive with Logging {
private[spark] class HeartbeatReceiver(override val rpcEnv: RpcEnv, scheduler: TaskScheduler)
extends RpcEndpoint {

override def receiveWithLogging = {
override def receive(sender: RpcEndpointRef) = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
sender ! response
sender.send(response)
}
}
37 changes: 15 additions & 22 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.concurrent.Await
import scala.collection.mutable.{HashSet, Map}
import scala.collection.JavaConversions._

import akka.actor._
import akka.pattern.ask

import org.apache.spark.rpc.{RpcEnv, RpcEndpointRef, RpcEndpoint}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
Expand All @@ -39,14 +36,14 @@ private[spark] case class GetMapOutputStatuses(shuffleId: Int)
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage

/** Actor class for MapOutputTrackerMaster */
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {
private[spark] class MapOutputTrackerMasterActor(override val rpcEnv: RpcEnv,
tracker: MapOutputTrackerMaster, conf: SparkConf) extends RpcEndpoint with Logging {
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

override def receiveWithLogging = {
override def receive(sender: RpcEndpointRef) = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = sender.path.address.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
logInfo(
"Asked to send map output locations for shuffle " + shuffleId + " to " + sender)
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.size
if (serializedSize > maxAkkaFrameSize) {
Expand All @@ -60,12 +57,12 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
logError(msg, exception)
throw exception
}
sender ! mapOutputStatuses
sender.send(mapOutputStatuses)

case StopMapOutputTracker =>
logInfo("MapOutputTrackerActor stopped!")
sender ! true
context.stop(self)
sender.send(true)
stop()
}
}

Expand All @@ -75,12 +72,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
* (driver and executor) use different HashMap to store its metadata.
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {
private val timeout = AkkaUtils.askTimeout(conf)
private val retryAttempts = AkkaUtils.numRetries(conf)
private val retryIntervalMs = AkkaUtils.retryWaitMs(conf)

/** Set to the MapOutputTrackerActor living on the driver. */
var trackerActor: ActorRef = _
var trackerActor: RpcEndpointRef = _

/**
* This HashMap has different behavior for the driver and the executors.
Expand Down Expand Up @@ -108,9 +102,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* Send a message to the trackerActor and get its result within a default timeout, or
* throw a SparkException if this fails.
*/
protected def askTracker(message: Any): Any = {
protected def askTracker[T](message: Any): T = {
try {
AkkaUtils.askWithReply(message, trackerActor, retryAttempts, retryIntervalMs, timeout)
trackerActor.askWithReply(message)
} catch {
case e: Exception =>
logError("Error communicating with MapOutputTracker", e)
Expand All @@ -120,7 +114,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging

/** Send a one-way message to the trackerActor, to which we expect it to reply with true. */
protected def sendTracker(message: Any) {
val response = askTracker(message)
val response = askTracker[Boolean](message)
if (response != true) {
throw new SparkException(
"Error reply received from MapOutputTracker. Expecting true, got " + response.toString)
Expand Down Expand Up @@ -160,8 +154,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
logInfo("Doing the fetch; tracker actor = " + trackerActor)
// This try-finally prevents hangs due to timeouts:
try {
val fetchedBytes =
askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary
import akka.actor.Props

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -323,8 +322,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Create and start the scheduler
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")

private val heartbeatReceiver = env.rpcEnv.setupEndpoint("HeartbeatReceiver",
new HeartbeatReceiver(env.rpcEnv, taskScheduler))

@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
Expand Down Expand Up @@ -413,9 +414,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
Some(Utils.getThreadDump())
} else {
val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem)
Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, actorRef,
AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)))
val endpointRef = env.rpcEnv.setupDriverEndpointRef("ExecutorActor")
Some(endpointRef.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump))
}
} catch {
case e: Exception =>
Expand Down Expand Up @@ -1214,7 +1214,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (dagSchedulerCopy != null) {
env.metricsSystem.report()
metadataCleaner.cancel()
env.actorSystem.stop(heartbeatReceiver)
env.rpcEnv.stop(heartbeatReceiver)
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
taskScheduler = null
Expand Down
40 changes: 21 additions & 19 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Properties

import akka.actor._
import com.google.common.collect.MapMaker

import org.apache.spark.annotation.DeveloperApi
Expand All @@ -34,11 +33,13 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.rpc.akka.AkkaRpcEnv
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
Expand All @@ -53,7 +54,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
@DeveloperApi
class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
Expand All @@ -69,6 +70,9 @@ class SparkEnv (
val shuffleMemoryManager: ShuffleMemoryManager,
val conf: SparkConf) extends Logging {

// TODO actorSystem is used by Streaming
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem

private[spark] var isStopped = false
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

Expand All @@ -86,7 +90,7 @@ class SparkEnv (
blockManager.stop()
blockManager.master.stop()
metricsSystem.stop()
actorSystem.shutdown()
rpcEnv.stopAll()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
Expand Down Expand Up @@ -212,16 +216,14 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)

// Create the ActorSystem for Akka and get the port it binds to.
val (actorSystem, boundPort) = {
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
}
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)

// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
if (isDriver) {
conf.set("spark.driver.port", boundPort.toString)
conf.set("spark.driver.port", rpcEnv.boundPort.toString)
} else {
conf.set("spark.executor.port", boundPort.toString)
conf.set("spark.executor.port", rpcEnv.boundPort.toString)
}

// Create an instance of the class with the given name, possibly initializing it with our conf
Expand Down Expand Up @@ -257,12 +259,12 @@ object SparkEnv extends Logging {
val closureSerializer = instantiateClassFromConf[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
def registerOrLookup(name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
actorSystem.actorOf(Props(newActor), name = name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
AkkaUtils.makeDriverRef(name, conf, actorSystem)
rpcEnv.setupDriverEndpointRef(name)
}
}

Expand All @@ -274,9 +276,9 @@ object SparkEnv extends Logging {

// Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
mapOutputTracker.trackerActor = registerOrLookup(
"MapOutputTracker",
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
mapOutputTracker.trackerActor = registerOrLookup("MapOutputTracker",
new MapOutputTrackerMasterActor(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
Expand All @@ -298,10 +300,10 @@ object SparkEnv extends Logging {

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
new BlockManagerMasterActor(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
numUsableCores)

Expand Down Expand Up @@ -348,7 +350,7 @@ object SparkEnv extends Logging {

new SparkEnv(
executorId,
actorSystem,
rpcEnv,
serializer,
closureSerializer,
cacheManager,
Expand Down
Loading