Skip to content

Commit 5f87700

Browse files
committed
Move the logical of processing message to a private function
1 parent 3e56123 commit 5f87700

File tree

1 file changed

+39
-34
lines changed

1 file changed

+39
-34
lines changed

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -127,41 +127,10 @@ private[spark] class AkkaRpcEnv private (
127127
case e: AssociationEvent =>
128128
// TODO ignore?
129129

130-
case AkkaMessage(message: Any, reply: Boolean)=>
131-
logDebug("Received RPC message: " + AkkaMessage(message, reply))
130+
case m: AkkaMessage =>
131+
logDebug(s"Received RPC message: $m")
132132
safelyCall(endpoint) {
133-
val s = sender()
134-
val pf =
135-
if (reply) {
136-
endpoint.receiveAndReply(new RpcCallContext {
137-
override def sendFailure(e: Throwable): Unit = {
138-
s ! AkkaFailure(e)
139-
}
140-
141-
override def reply(response: Any): Unit = {
142-
s ! AkkaMessage(response, false)
143-
}
144-
145-
// Some RpcEndpoints need to know the sender's address
146-
override val sender: RpcEndpointRef =
147-
new AkkaRpcEndpointRef(defaultAddress, s, conf)
148-
})
149-
} else {
150-
endpoint.receive
151-
}
152-
try {
153-
if (pf.isDefinedAt(message)) {
154-
pf.apply(message)
155-
}
156-
} catch {
157-
case NonFatal(e) =>
158-
if (reply) {
159-
// If the sender asks a reply, we should send the error back to the sender
160-
s ! AkkaFailure(e)
161-
} else {
162-
throw e
163-
}
164-
}
133+
processMessage(endpoint, m, sender)
165134
}
166135
case message: Any => {
167136
logWarning(s"Unknown message: $message")
@@ -182,6 +151,42 @@ private[spark] class AkkaRpcEnv private (
182151
endpointRef
183152
}
184153

154+
private def processMessage(endpoint: RpcEndpoint, m: AkkaMessage, _sender: ActorRef): Unit = {
155+
val message = m.message
156+
val reply = m.reply
157+
val pf =
158+
if (reply) {
159+
endpoint.receiveAndReply(new RpcCallContext {
160+
override def sendFailure(e: Throwable): Unit = {
161+
_sender ! AkkaFailure(e)
162+
}
163+
164+
override def reply(response: Any): Unit = {
165+
_sender ! AkkaMessage(response, false)
166+
}
167+
168+
// Some RpcEndpoints need to know the sender's address
169+
override val sender: RpcEndpointRef =
170+
new AkkaRpcEndpointRef(defaultAddress, _sender, conf)
171+
})
172+
} else {
173+
endpoint.receive
174+
}
175+
try {
176+
if (pf.isDefinedAt(message)) {
177+
pf.apply(message)
178+
}
179+
} catch {
180+
case NonFatal(e) =>
181+
if (reply) {
182+
// If the sender asks a reply, we should send the error back to the sender
183+
_sender ! AkkaFailure(e)
184+
} else {
185+
throw e
186+
}
187+
}
188+
}
189+
185190
/**
186191
* Run `action` safely to avoid to crash the thread. If any non-fatal exception happens, it will
187192
* call `endpoint.onError`. If `endpoint.onError` throws any non-fatal exception, just log it.

0 commit comments

Comments
 (0)