Skip to content

Commit fe3df4c

Browse files
committed
Move registerEndpoint and use actorSystem.dispatcher in asyncSetupEndpointRefByURI
1 parent f6f3287 commit fe3df4c

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ private[spark] class AkkaRpcEnv private[akka] (
9999
lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging {
100100

101101
assert(endpointRef != null)
102-
registerEndpoint(endpoint, endpointRef)
103102

104103
override def preStart(): Unit = {
105104
// Listen for remote client network events
@@ -154,6 +153,7 @@ private[spark] class AkkaRpcEnv private[akka] (
154153

155154
}), name = name)
156155
endpointRef = new AkkaRpcEndpointRef(defaultAddress, actorRef, conf, initInConstructor = false)
156+
registerEndpoint(endpoint, endpointRef)
157157
// Now actorRef can be created safely
158158
endpointRef.init()
159159
endpointRef
@@ -219,7 +219,7 @@ private[spark] class AkkaRpcEnv private[akka] (
219219
}
220220

221221
override def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
222-
import scala.concurrent.ExecutionContext.Implicits.global
222+
import actorSystem.dispatcher
223223
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout).
224224
map(new AkkaRpcEndpointRef(defaultAddress, _, conf))
225225
}

0 commit comments

Comments
 (0)