Skip to content

Commit 385b9c3

Browse files
committed
Fix the code style and add docs
1 parent 2cc3f78 commit 385b9c3

File tree

4 files changed

+17
-9
lines changed

4 files changed

+17
-9
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin
5353
def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
5454

5555
override def receive = {
56-
case e => logWarning(s"Received unexpected actor system event: $e")
56+
case e => logWarning(s"Received unexpected message: $e")
5757
}
5858

5959
override def onConnected(remoteAddress: RpcAddress): Unit = {

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
293293
/**
294294
* Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
295295
* timeout, or throw a SparkException if this fails even after the default number of retries.
296+
* Because this method retries, the message handling in the receiver side should be idempotent.
296297
*
297298
* Note: this is a blocking action which may cost a lot of time, so don't call it in an message
298299
* loop of [[RpcEndpoint]].
@@ -306,7 +307,8 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
306307
/**
307308
* Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
308309
* specified timeout, throw a SparkException if this fails even after the specified number of
309-
* retries.
310+
* retries. Because this method retries, the message handling in the receiver side should be
311+
* idempotent.
310312
*
311313
* Note: this is a blocking action which may cost a lot of time, so don't call it in an message
312314
* loop of [[RpcEndpoint]].
@@ -350,12 +352,16 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
350352
/**
351353
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a `Future` to
352354
* receive the reply within a default timeout.
355+
*
356+
* This method only sends the message once and never retries.
353357
*/
354358
def sendWithReply[T: ClassTag](message: Any): Future[T] = sendWithReply(message, defaultTimeout)
355359

356360
/**
357361
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a `Future` to
358362
* receive the reply within the specified timeout.
363+
*
364+
* This method only sends the message once and never retries.
359365
*/
360366
def sendWithReply[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
361367

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,18 @@ private[spark] class AkkaRpcEnv private[akka] (
133133
safelyCall(endpoint) {
134134
processMessage(endpoint, m, sender)
135135
}
136+
136137
case AkkaFailure(e) =>
137138
try {
138139
endpoint.onError(e)
139140
} catch {
140141
case NonFatal(e) => logError(s"Ignore error: ${e.getMessage}", e)
141142
}
143+
142144
case message: Any => {
143145
logWarning(s"Unknown message: $message")
144146
}
147+
145148
}
146149

147150
override def postStop(): Unit = {
@@ -160,9 +163,9 @@ private[spark] class AkkaRpcEnv private[akka] (
160163

161164
private def processMessage(endpoint: RpcEndpoint, m: AkkaMessage, _sender: ActorRef): Unit = {
162165
val message = m.message
163-
val reply = m.reply
166+
val needReply = m.needReply
164167
val pf =
165-
if (reply) {
168+
if (needReply) {
166169
endpoint.receiveAndReply(new RpcCallContext {
167170
override def sendFailure(e: Throwable): Unit = {
168171
_sender ! AkkaFailure(e)
@@ -185,7 +188,7 @@ private[spark] class AkkaRpcEnv private[akka] (
185188
}
186189
} catch {
187190
case NonFatal(e) =>
188-
if (reply) {
191+
if (needReply) {
189192
// If the sender asks a reply, we should send the error back to the sender
190193
_sender ! AkkaFailure(e)
191194
} else {
@@ -241,7 +244,7 @@ private[spark] class AkkaRpcEnv private[akka] (
241244
actorSystem.awaitTermination()
242245
}
243246

244-
override def toString = s"${getClass.getSimpleName}($actorSystem)"
247+
override def toString: String = s"${getClass.getSimpleName}($actorSystem)"
245248
}
246249

247250
private[spark] class AkkaRpcEnvFactory extends RpcEnvFactory {
@@ -308,9 +311,9 @@ private[akka] class AkkaRpcEndpointRef(
308311
/**
309312
* A wrapper to `message` so that the receiver knows if the sender expects a reply.
310313
* @param message
311-
* @param reply if the sender expects a reply message
314+
* @param needReply if the sender expects a reply message
312315
*/
313-
private[akka] case class AkkaMessage(message: Any, reply: Boolean)
316+
private[akka] case class AkkaMessage(message: Any, needReply: Boolean)
314317

315318
/**
316319
* A reply with the failure error from the receiver to the sender

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
192192
}
193193
}
194194
val rpcEndpointRef = env.setupEndpoint("start-stop-test", endpoint)
195-
rpcEndpointRef.send("message")
196195
env.stop(rpcEndpointRef)
197196
stopLatch.await(10, TimeUnit.SECONDS)
198197
assert(List("start", "stop") === calledMethods)

0 commit comments

Comments
 (0)