Skip to content

Commit 3751c97

Browse files
committed
Rename RpcResponse to RpcCallContext
1 parent fe7d1ff commit 3751c97

File tree

4 files changed

+19
-19
lines changed

4 files changed

+19
-19
lines changed

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,17 +182,17 @@ private[spark] trait RpcEndpoint {
182182
}
183183

184184
/**
185-
* Process messages from [[RpcEndpointRef.send]] or [[RpcResponse.reply)]]
185+
* Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]
186186
*/
187187
def receive: PartialFunction[Any, Unit] = {
188188
case _ =>
189189
// network events will be passed here by default, so do nothing by default to avoid noise.
190190
}
191191

192192
/**
193-
* Process messages from [[RpcEndpointRef.sendWithReply]] or [[RpcResponse.replyWithSender)]]
193+
* Process messages from [[RpcEndpointRef.sendWithReply]] or [[RpcCallContext.replyWithSender)]]
194194
*/
195-
def receiveAndReply(response: RpcResponse): PartialFunction[Any, Unit] = {
195+
def receiveAndReply(response: RpcCallContext): PartialFunction[Any, Unit] = {
196196
case _ => response.fail(new SparkException(self + " won't reply anything"))
197197
}
198198

@@ -281,7 +281,7 @@ private[spark] trait RpcEndpointRef {
281281
* Fire-and-forget semantics.
282282
*
283283
* The receiver will reply to sender's [[RpcEndpoint.receive]] or [[RpcEndpoint.receiveAndReply]]
284-
* depending on which one of [[RpcResponse.reply]]s is called.
284+
* depending on which one of [[RpcCallContext.reply]]s is called.
285285
*/
286286
def sendWithReply(message: Any, sender: RpcEndpointRef): Unit
287287

@@ -351,7 +351,7 @@ private[spark] case class NetworkErrorEvent(address: RpcAddress, cause: Throwabl
351351
/**
352352
* A callback that [[RpcEndpoint]] can use it to send back a message or failure.
353353
*/
354-
private[spark] trait RpcResponse {
354+
private[spark] trait RpcCallContext {
355355

356356
/**
357357
* Reply a message to the sender. If the sender is [[RpcEndpoint]], its [[RpcEndpoint.receive]]

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ private[spark] class AkkaRpcEnv private (
151151
val s = sender()
152152
val pf =
153153
if (reply) {
154-
endpoint.receiveAndReply(new RpcResponse {
154+
endpoint.receiveAndReply(new RpcCallContext {
155155
override def fail(e: Throwable): Unit = {
156156
s ! AkkaFailure(e)
157157
}

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler
2020
import scala.collection.mutable
2121

2222
import org.apache.spark._
23-
import org.apache.spark.rpc.{RpcResponse, RpcEndpointRef, RpcEnv, RpcEndpoint}
23+
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, RpcEndpoint}
2424

2525
private sealed trait OutputCommitCoordinationMessage extends Serializable
2626

@@ -156,7 +156,7 @@ private[spark] object OutputCommitCoordinator {
156156
override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator)
157157
extends RpcEndpoint with Logging {
158158

159-
override def receiveAndReply(response: RpcResponse) = {
159+
override def receiveAndReply(response: RpcCallContext) = {
160160
case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
161161
response.reply(
162162
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
9393
val endpoint = new RpcEndpoint {
9494
override val rpcEnv = env
9595

96-
override def receiveAndReply(response: RpcResponse) = {
96+
override def receiveAndReply(response: RpcCallContext) = {
9797
case "Hello" => response.reply(self)
9898
case "Echo" => response.reply("Echo")
9999
}
@@ -109,7 +109,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
109109
val rpcEndpointRef = env.setupEndpoint("ask-locally", new RpcEndpoint {
110110
override val rpcEnv = env
111111

112-
override def receiveAndReply(response: RpcResponse) = {
112+
override def receiveAndReply(response: RpcCallContext) = {
113113
case msg: String => {
114114
response.reply(msg)
115115
}
@@ -123,7 +123,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
123123
env.setupEndpoint("ask-remotely", new RpcEndpoint {
124124
override val rpcEnv = env
125125

126-
override def receiveAndReply(response: RpcResponse) = {
126+
override def receiveAndReply(response: RpcCallContext) = {
127127
case msg: String => {
128128
response.reply(msg)
129129
}
@@ -146,7 +146,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
146146
env.setupEndpoint("ask-timeout", new RpcEndpoint {
147147
override val rpcEnv = env
148148

149-
override def receiveAndReply(response: RpcResponse) = {
149+
override def receiveAndReply(response: RpcCallContext) = {
150150
case msg: String => {
151151
Thread.sleep(100)
152152
response.reply(msg)
@@ -175,17 +175,17 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
175175
val pongRef = env.setupEndpoint("pong", new RpcEndpoint {
176176
override val rpcEnv = env
177177

178-
override def receiveAndReply(response: RpcResponse) = {
178+
override def receiveAndReply(response: RpcCallContext) = {
179179
case Ping(id) => response.replyWithSender(Pong(id), self)
180180
}
181181
})
182182

183183
val pingRef = env.setupEndpoint("ping", new RpcEndpoint {
184184
override val rpcEnv = env
185185

186-
var requester: RpcResponse = _
186+
var requester: RpcCallContext = _
187187

188-
override def receiveAndReply(response: RpcResponse) = {
188+
override def receiveAndReply(response: RpcCallContext) = {
189189
case Start => {
190190
requester = response
191191
pongRef.sendWithReply(Ping(1), self)
@@ -430,7 +430,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
430430
val endpointRef = env.setupEndpoint("sendWithReply", new RpcEndpoint {
431431
override val rpcEnv = env
432432

433-
override def receiveAndReply(response: RpcResponse) = {
433+
override def receiveAndReply(response: RpcCallContext) = {
434434
case m => response.reply("ack")
435435
}
436436
})
@@ -446,7 +446,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
446446
env.setupEndpoint("sendWithReply-remotely", new RpcEndpoint {
447447
override val rpcEnv = env
448448

449-
override def receiveAndReply(response: RpcResponse) = {
449+
override def receiveAndReply(response: RpcCallContext) = {
450450
case m => response.reply("ack")
451451
}
452452
})
@@ -468,7 +468,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
468468
val endpointRef = env.setupEndpoint("sendWithReply-error", new RpcEndpoint {
469469
override val rpcEnv = env
470470

471-
override def receiveAndReply(response: RpcResponse) = {
471+
override def receiveAndReply(response: RpcCallContext) = {
472472
case m => response.fail(new SparkException("Oops"))
473473
}
474474
})
@@ -486,7 +486,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
486486
env.setupEndpoint("sendWithReply-remotely-error", new RpcEndpoint {
487487
override val rpcEnv = env
488488

489-
override def receiveAndReply(response: RpcResponse) = {
489+
override def receiveAndReply(response: RpcCallContext) = {
490490
case msg: String => response.fail(new SparkException("Oops"))
491491
}
492492
})

0 commit comments

Comments
 (0)