Skip to content

Commit 8bd1097

Browse files
committed
Fix docs and the code style
1 parent f459380 commit 8bd1097

File tree

2 files changed

+18
-14
lines changed

2 files changed

+18
-14
lines changed

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
4141
/**
4242
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
4343
* [[RpcEndpoint.self]].
44+
*
45+
* Note: This method won't return null. `IllegalArgumentException` will be thrown if calling this
46+
* on a non-existent endpoint.
4447
*/
4548
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
4649

@@ -203,14 +206,16 @@ private[spark] trait RpcEndpoint {
203206
}
204207

205208
/**
206-
* Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]
209+
* Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
210+
* unmatched message, [[SparkException]] will be thrown and sent to `onError`.
207211
*/
208212
def receive: PartialFunction[Any, Unit] = {
209213
case _ => throw new SparkException(self + " does not implement 'receive'")
210214
}
211215

212216
/**
213-
* Process messages from [[RpcEndpointRef.sendWithReply]]
217+
* Process messages from [[RpcEndpointRef.sendWithReply]]. If receiving a unmatched message,
218+
* [[SparkException]] will be thrown and sent to `onError`.
214219
*/
215220
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
216221
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
@@ -314,7 +319,8 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
314319
/**
315320
* Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
316321
* timeout, or throw a SparkException if this fails even after the default number of retries.
317-
* Because this method retries, the message handling in the receiver side should be idempotent.
322+
* The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
323+
* method retries, the message handling in the receiver side should be idempotent.
318324
*
319325
* Note: this is a blocking action which may cost a lot of time, so don't call it in an message
320326
* loop of [[RpcEndpoint]].
@@ -328,8 +334,8 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
328334
/**
329335
* Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
330336
* specified timeout, throw a SparkException if this fails even after the specified number of
331-
* retries. Because this method retries, the message handling in the receiver side should be
332-
* idempotent.
337+
* retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
338+
* retries, the message handling in the receiver side should be idempotent.
333339
*
334340
* Note: this is a blocking action which may cost a lot of time, so don't call it in an message
335341
* loop of [[RpcEndpoint]].

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private[spark] class AkkaRpcEnv private[akka] (
9898
// So `actorRef` should be created after assigning `endpointRef`.
9999
lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging {
100100

101-
require(endpointRef != null)
101+
assert(endpointRef != null)
102102
registerEndpoint(endpoint, endpointRef)
103103

104104
override def preStart(): Unit = {
@@ -135,10 +135,8 @@ private[spark] class AkkaRpcEnv private[akka] (
135135
}
136136

137137
case AkkaFailure(e) =>
138-
try {
139-
endpoint.onError(e)
140-
} catch {
141-
case NonFatal(e) => logError(s"Ignore error: ${e.getMessage}", e)
138+
safelyCall(endpoint) {
139+
throw e
142140
}
143141

144142
case message: Any => {
@@ -164,7 +162,7 @@ private[spark] class AkkaRpcEnv private[akka] (
164162
private def processMessage(endpoint: RpcEndpoint, m: AkkaMessage, _sender: ActorRef): Unit = {
165163
val message = m.message
166164
val needReply = m.needReply
167-
val pf =
165+
val pf: PartialFunction[Any, Unit] =
168166
if (needReply) {
169167
endpoint.receiveAndReply(new RpcCallContext {
170168
override def sendFailure(e: Throwable): Unit = {
@@ -183,9 +181,9 @@ private[spark] class AkkaRpcEnv private[akka] (
183181
endpoint.receive
184182
}
185183
try {
186-
if (pf.isDefinedAt(message)) {
187-
pf.apply(message)
188-
}
184+
pf.applyOrElse[Any, Unit](message, { message =>
185+
throw new SparkException(s"Unmatched message $message from ${_sender}")
186+
})
189187
} catch {
190188
case NonFatal(e) =>
191189
if (needReply) {

0 commit comments

Comments
 (0)