Skip to content

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented Jan 9, 2015

This PR added a standard internal RPC interface for Spark and an Akka implementation based on @rxin's initial work. See the design document for more details. I replaced the most usages of Akka with the new RPC interface to prove it can cover all our requirements.

I sent this PR for discussing the new RPC interface. Please focus on

  1. core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
  2. core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

and ignore naming issues in other files (I avoid to change them for a clean diff)

Some problems need to discuss:

  1. We create ActorSystem without host and port. The Actor addresses in such ActorSystem don't contain host and port. The new RPC mechanism requires that a RpcEnv must bind to a host and port. Is it too strict?
  2. Because Sreaming uses ActorSystem in SparkEnv, the current PR cannot totally decouple Akka. So should Streaming create ActorSystme by itself, or use the new RPC interface?

TODO:

  1. Update the variable names from ***Actor to ***Endpoint
  2. Totally decouple Akka from Spark and make the Akka dependency optional.
  3. A test framework.

rxin and others added 30 commits December 18, 2014 15:58
Conflicts:
	core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala
Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@zsxwing
Copy link
Member Author

zsxwing commented Jan 9, 2015

cc @rxin @aarondav

@SparkQA
Copy link

SparkQA commented Jan 9, 2015

Test build #25314 has finished for PR 3974 at commit 6938093.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ClientActor(override val rpcEnv: RpcEnv) extends NetworkRpcEndpoint with Logging
    • class ExecutorActor(override val rpcEnv: RpcEnv, executorId: String) extends RpcEndpoint
    • trait RpcEnv
    • trait RpcEndpoint
    • trait NetworkRpcEndpoint extends RpcEndpoint
    • trait RpcEndpointRef
    • case class RpcAddress(host: String, port: Int)
    • case class RegisterExecutor(executorId: String, hostPort: String, cores: Int,
    • class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
    • class DriverActor(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
    • class BlockManagerMasterActor(override val rpcEnv: RpcEnv, val isLocal: Boolean, conf: SparkConf,

@SparkQA
Copy link

SparkQA commented Jan 9, 2015

Test build #25317 has finished for PR 3974 at commit ef040bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ClientActor(override val rpcEnv: RpcEnv) extends NetworkRpcEndpoint with Logging
    • class ExecutorActor(override val rpcEnv: RpcEnv, executorId: String) extends RpcEndpoint
    • trait RpcEnv
    • trait RpcEndpoint
    • trait NetworkRpcEndpoint extends RpcEndpoint
    • trait RpcEndpointRef
    • case class RpcAddress(host: String, port: Int)
    • case class RegisterExecutor(executorId: String, hostPort: String, cores: Int,
    • class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
    • class DriverActor(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
    • class BlockManagerMasterActor(override val rpcEnv: RpcEnv, val isLocal: Boolean, conf: SparkConf,

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
	core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
	core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
	core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25653 has finished for PR 3974 at commit c3359f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ClientActor(override val rpcEnv: RpcEnv) extends NetworkRpcEndpoint with Logging
    • class ExecutorActor(override val rpcEnv: RpcEnv, executorId: String) extends RpcEndpoint
    • trait RpcEnv
    • trait RpcEndpoint
    • trait NetworkRpcEndpoint extends RpcEndpoint
    • trait RpcEndpointRef
    • case class RpcAddress(host: String, port: Int)
    • case class RegisterExecutor(executorId: String, hostPort: String, cores: Int,
    • class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
    • class DriverActor(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
    • class BlockManagerMasterActor(override val rpcEnv: RpcEnv, val isLocal: Boolean, conf: SparkConf,

@zsxwing zsxwing closed this Feb 13, 2015
@zsxwing
Copy link
Member Author

zsxwing commented Feb 13, 2015

See #4588 instead

@zsxwing zsxwing deleted the rpc branch December 23, 2015 23:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants