Skip to content

Commit 51e6667

Browse files
committed
Add 'sender' to RpcCallContext and rename the parameter of receiveAndReply to 'context'
1 parent ffc1280 commit 51e6667

File tree

4 files changed

+34
-42
lines changed

4 files changed

+34
-42
lines changed

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,8 @@ private[spark] trait RpcEndpoint {
191191
/**
192192
* Process messages from [[RpcEndpointRef.sendWithReply]] or [[RpcCallContext.replyWithSender)]]
193193
*/
194-
def receiveAndReply(response: RpcCallContext): PartialFunction[Any, Unit] = {
195-
case _ => response.sendFailure(new SparkException(self + " won't reply anything"))
194+
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
195+
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
196196
}
197197

198198
/**
@@ -360,19 +360,12 @@ private[spark] trait RpcCallContext {
360360
def reply(response: Any): Unit
361361

362362
/**
363-
* Reply a message to the corresponding [[RpcEndpoint.receiveAndReply]]. If you use this one to
364-
* reply, it means you expect the target [[RpcEndpoint]] should reply you something.
365-
*
366-
* TODO better method name?
367-
*
368-
* @param response the response message
369-
* @param sender who replies this message. The target [[RpcEndpoint]] will use `sender` to send
370-
* back something.
363+
* Report a failure to the sender.
371364
*/
372-
def replyWithSender(response: Any, sender: RpcEndpointRef): Unit
365+
def sendFailure(e: Throwable): Unit
373366

374367
/**
375-
* Report a failure to the sender.
368+
* The sender of this message.
376369
*/
377-
def sendFailure(e: Throwable): Unit
370+
def sender: RpcEndpointRef
378371
}

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,9 @@ private[spark] class AkkaRpcEnv private (
146146
s ! AkkaMessage(response, false)
147147
}
148148

149-
override def replyWithSender(response: Any, sender: RpcEndpointRef): Unit = {
150-
s.!(AkkaMessage(response, true))(
151-
sender.asInstanceOf[AkkaRpcEndpointRef].actorRef)
152-
}
149+
// Some RpcEndpoints need to know the sender's address
150+
override val sender: RpcEndpointRef =
151+
new AkkaRpcEndpointRef(defaultAddress, s, conf)
153152
})
154153
} else {
155154
endpoint.receive

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,13 @@ private[spark] object OutputCommitCoordinator {
156156
override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator)
157157
extends RpcEndpoint with Logging {
158158

159-
override def receiveAndReply(response: RpcCallContext) = {
159+
override def receiveAndReply(context: RpcCallContext) = {
160160
case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
161-
response.reply(
161+
context.reply(
162162
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
163163
case StopCoordinator =>
164164
logInfo("OutputCommitCoordinator stopped!")
165-
response.reply(true)
165+
context.reply(true)
166166
stop()
167167
}
168168
}

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
9393
val endpoint = new RpcEndpoint {
9494
override val rpcEnv = env
9595

96-
override def receiveAndReply(response: RpcCallContext) = {
97-
case "Hello" => response.reply(self)
98-
case "Echo" => response.reply("Echo")
96+
override def receiveAndReply(context: RpcCallContext) = {
97+
case "Hello" => context.reply(self)
98+
case "Echo" => context.reply("Echo")
9999
}
100100
}
101101
val rpcEndpointRef = env.setupEndpoint("send-ref", endpoint)
@@ -109,9 +109,9 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
109109
val rpcEndpointRef = env.setupEndpoint("ask-locally", new RpcEndpoint {
110110
override val rpcEnv = env
111111

112-
override def receiveAndReply(response: RpcCallContext) = {
112+
override def receiveAndReply(context: RpcCallContext) = {
113113
case msg: String => {
114-
response.reply(msg)
114+
context.reply(msg)
115115
}
116116
}
117117
})
@@ -123,9 +123,9 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
123123
env.setupEndpoint("ask-remotely", new RpcEndpoint {
124124
override val rpcEnv = env
125125

126-
override def receiveAndReply(response: RpcCallContext) = {
126+
override def receiveAndReply(context: RpcCallContext) = {
127127
case msg: String => {
128-
response.reply(msg)
128+
context.reply(msg)
129129
}
130130
}
131131
})
@@ -146,10 +146,10 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
146146
env.setupEndpoint("ask-timeout", new RpcEndpoint {
147147
override val rpcEnv = env
148148

149-
override def receiveAndReply(response: RpcCallContext) = {
149+
override def receiveAndReply(context: RpcCallContext) = {
150150
case msg: String => {
151151
Thread.sleep(100)
152-
response.reply(msg)
152+
context.reply(msg)
153153
}
154154
}
155155
})
@@ -175,8 +175,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
175175
val pongRef = env.setupEndpoint("pong", new RpcEndpoint {
176176
override val rpcEnv = env
177177

178-
override def receiveAndReply(response: RpcCallContext) = {
179-
case Ping(id) => response.replyWithSender(Pong(id), self)
178+
override def receiveAndReply(context: RpcCallContext) = {
179+
case Ping(id) => context.sender.sendWithReply(Pong(id), self)
180180
}
181181
})
182182

@@ -185,14 +185,14 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
185185

186186
var requester: RpcCallContext = _
187187

188-
override def receiveAndReply(response: RpcCallContext) = {
188+
override def receiveAndReply(context: RpcCallContext) = {
189189
case Start => {
190-
requester = response
190+
requester = context
191191
pongRef.sendWithReply(Ping(1), self)
192192
}
193193
case p @ Pong(id) => {
194194
if (id < 10) {
195-
response.replyWithSender(Ping(id + 1), self)
195+
context.sender.sendWithReply(Ping(id + 1), self)
196196
} else {
197197
requester.reply(p)
198198
}
@@ -430,8 +430,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
430430
val endpointRef = env.setupEndpoint("sendWithReply", new RpcEndpoint {
431431
override val rpcEnv = env
432432

433-
override def receiveAndReply(response: RpcCallContext) = {
434-
case m => response.reply("ack")
433+
override def receiveAndReply(context: RpcCallContext) = {
434+
case m => context.reply("ack")
435435
}
436436
})
437437

@@ -446,8 +446,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
446446
env.setupEndpoint("sendWithReply-remotely", new RpcEndpoint {
447447
override val rpcEnv = env
448448

449-
override def receiveAndReply(response: RpcCallContext) = {
450-
case m => response.reply("ack")
449+
override def receiveAndReply(context: RpcCallContext) = {
450+
case m => context.reply("ack")
451451
}
452452
})
453453

@@ -468,8 +468,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
468468
val endpointRef = env.setupEndpoint("sendWithReply-error", new RpcEndpoint {
469469
override val rpcEnv = env
470470

471-
override def receiveAndReply(response: RpcCallContext) = {
472-
case m => response.sendFailure(new SparkException("Oops"))
471+
override def receiveAndReply(context: RpcCallContext) = {
472+
case m => context.sendFailure(new SparkException("Oops"))
473473
}
474474
})
475475

@@ -486,8 +486,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
486486
env.setupEndpoint("sendWithReply-remotely-error", new RpcEndpoint {
487487
override val rpcEnv = env
488488

489-
override def receiveAndReply(response: RpcCallContext) = {
490-
case msg: String => response.sendFailure(new SparkException("Oops"))
489+
override def receiveAndReply(context: RpcCallContext) = {
490+
case msg: String => context.sendFailure(new SparkException("Oops"))
491491
}
492492
})
493493

0 commit comments

Comments
 (0)