Skip to content

Commit ffc1280

Browse files
committed
Rename 'fail' to 'sendFailure' and other minor code style changes
1 parent 28e6d0f commit ffc1280

File tree

4 files changed

+10
-10
lines changed

4 files changed

+10
-10
lines changed

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ private[spark] trait RpcEndpoint {
192192
* Process messages from [[RpcEndpointRef.sendWithReply]] or [[RpcCallContext.replyWithSender)]]
193193
*/
194194
def receiveAndReply(response: RpcCallContext): PartialFunction[Any, Unit] = {
195-
case _ => response.fail(new SparkException(self + " won't reply anything"))
195+
case _ => response.sendFailure(new SparkException(self + " won't reply anything"))
196196
}
197197

198198
/**
@@ -374,5 +374,5 @@ private[spark] trait RpcCallContext {
374374
/**
375375
* Report a failure to the sender.
376376
*/
377-
def fail(e: Throwable): Unit
377+
def sendFailure(e: Throwable): Unit
378378
}

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ package org.apache.spark.rpc.akka
2020
import java.net.URI
2121
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
2222

23-
import scala.concurrent.Await
23+
import scala.concurrent.{Await, Future}
2424
import scala.concurrent.duration._
25-
import scala.concurrent.Future
2625
import scala.language.postfixOps
2726
import scala.reflect.ClassTag
2827
import scala.util.control.NonFatal
@@ -139,7 +138,7 @@ private[spark] class AkkaRpcEnv private (
139138
val pf =
140139
if (reply) {
141140
endpoint.receiveAndReply(new RpcCallContext {
142-
override def fail(e: Throwable): Unit = {
141+
override def sendFailure(e: Throwable): Unit = {
143142
s ! AkkaFailure(e)
144143
}
145144

@@ -329,9 +328,10 @@ private[akka] class AkkaRpcEndpointRef(
329328
override def sendWithReply[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
330329
import scala.concurrent.ExecutionContext.Implicits.global
331330
actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
332-
case AkkaMessage(message, reply) =>
331+
case msg @ AkkaMessage(message, reply) =>
333332
if (reply) {
334-
Future.failed(new SparkException("The sender cannot reply"))
333+
logError(s"Receive $msg but the sender cannot reply")
334+
Future.failed(new SparkException(s"Receive $msg but the sender cannot reply"))
335335
} else {
336336
Future.successful(message)
337337
}

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
469469
override val rpcEnv = env
470470

471471
override def receiveAndReply(response: RpcCallContext) = {
472-
case m => response.fail(new SparkException("Oops"))
472+
case m => response.sendFailure(new SparkException("Oops"))
473473
}
474474
})
475475

@@ -487,7 +487,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
487487
override val rpcEnv = env
488488

489489
override def receiveAndReply(response: RpcCallContext) = {
490-
case msg: String => response.fail(new SparkException("Oops"))
490+
case msg: String => response.sendFailure(new SparkException("Oops"))
491491
}
492492
})
493493

core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
3939
AkkaRpcEnv(RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf)))
4040
try {
4141
val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_endpoint")
42-
assert(s"akka.tcp://local@localhost:12345/user/test_endpoint" ===
42+
assert("akka.tcp://local@localhost:12345/user/test_endpoint" ===
4343
newRef.asInstanceOf[AkkaRpcEndpointRef].actorRef.path.toString)
4444
} finally {
4545
newRpcEnv.shutdown()

0 commit comments

Comments
 (0)