File tree Expand file tree Collapse file tree 3 files changed +0
-53
lines changed
main/scala/org/apache/spark/rpc
test/scala/org/apache/spark/rpc Expand file tree Collapse file tree 3 files changed +0
-53
lines changed Original file line number Diff line number Diff line change @@ -326,15 +326,6 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
326326 */
327327 def send (message : Any ): Unit
328328
329- /**
330- * Send a message to the corresponding [[RpcEndpoint.receiveAndReply ]] asynchronously.
331- * Fire-and-forget semantics.
332- *
333- * The receiver will reply to sender's [[RpcEndpoint.receive ]] or [[RpcEndpoint.receiveAndReply ]]
334- * depending on which one of [[RpcCallContext.reply ]]s is called.
335- */
336- def sendWithReply (message : Any , sender : RpcEndpointRef ): Unit
337-
338329 /**
339330 * Send a message to the corresponding [[RpcEndpoint.receiveAndReply) ]] and return a `Future` to
340331 * receive the reply within a default timeout.
Original file line number Diff line number Diff line change @@ -291,17 +291,6 @@ private[akka] class AkkaRpcEndpointRef(
291291 actorRef ! AkkaMessage (message, false )
292292 }
293293
294- override def sendWithReply (message : Any , sender : RpcEndpointRef ): Unit = {
295- implicit val actorSender : ActorRef =
296- if (sender == null ) {
297- Actor .noSender
298- } else {
299- require(sender.isInstanceOf [AkkaRpcEndpointRef ])
300- sender.asInstanceOf [AkkaRpcEndpointRef ].actorRef
301- }
302- actorRef ! AkkaMessage (message, true )
303- }
304-
305294 override def sendWithReply [T : ClassTag ](message : Any , timeout : FiniteDuration ): Future [T ] = {
306295 import scala .concurrent .ExecutionContext .Implicits .global
307296 actorRef.ask(AkkaMessage (message, true ))(timeout).flatMap {
Original file line number Diff line number Diff line change @@ -171,39 +171,6 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
171171 }
172172 }
173173
174- test(" ping pong" ) {
175- val pongRef = env.setupEndpoint(" pong" , new RpcEndpoint {
176- override val rpcEnv = env
177-
178- override def receiveAndReply (context : RpcCallContext ) = {
179- case Ping (id) => context.sender.sendWithReply(Pong (id), self)
180- }
181- })
182-
183- val pingRef = env.setupEndpoint(" ping" , new RpcEndpoint {
184- override val rpcEnv = env
185-
186- var requester : RpcCallContext = _
187-
188- override def receiveAndReply (context : RpcCallContext ) = {
189- case Start => {
190- requester = context
191- pongRef.sendWithReply(Ping (1 ), self)
192- }
193- case p @ Pong (id) => {
194- if (id < 10 ) {
195- context.sender.sendWithReply(Ping (id + 1 ), self)
196- } else {
197- requester.reply(p)
198- }
199- }
200- }
201- })
202-
203- val reply = pingRef.askWithReply[Pong ](Start )
204- assert(Pong (10 ) === reply)
205- }
206-
207174 test(" onStart and onStop" ) {
208175 val stopLatch = new CountDownLatch (1 )
209176 val calledMethods = mutable.ArrayBuffer [String ]()
You can’t perform that action at this time.
0 commit comments