Skip to content

Commit 3007c09

Browse files
committed
Move setupDriverEndpointRef to RpcUtils and rename to makeDriverRef
1 parent c425022 commit 3007c09

File tree

4 files changed

+38
-12
lines changed

4 files changed

+38
-12
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
4141
import org.apache.spark.serializer.Serializer
4242
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
4343
import org.apache.spark.storage._
44-
import org.apache.spark.util.{AkkaUtils, Utils}
44+
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
4545

4646
/**
4747
* :: DeveloperApi ::
@@ -300,7 +300,7 @@ object SparkEnv extends Logging {
300300
logInfo("Registering " + name)
301301
rpcEnv.setupEndpoint(name, endpointCreator)
302302
} else {
303-
rpcEnv.setupDriverEndpointRef(name)
303+
RpcUtils.makeDriverRef(name, conf, rpcEnv)
304304
}
305305
}
306306

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,6 @@ private[spark] trait RpcEnv {
5858
*/
5959
def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
6060

61-
/**
62-
* Retrieve a [[RpcEndpointRef]] which is located in the driver via its name.
63-
*/
64-
def setupDriverEndpointRef(name: String): RpcEndpointRef
65-
6661
/**
6762
* Retrieve the [[RpcEndpointRef]] represented by `url`.
6863
*/

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,8 @@ private[spark] class AkkaRpcEnv private (
210210
address.port.getOrElse(defaultAddress.port))
211211
}
212212

213-
override def setupDriverEndpointRef(name: String): RpcEndpointRef = {
214-
new AkkaRpcEndpointRef(defaultAddress, AkkaUtils.makeDriverRef(name, conf, actorSystem), conf)
215-
}
216-
217213
override def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
218-
val timeout = Duration.create(conf.getLong("spark.akka.lookupTimeout", 30), "seconds")
214+
val timeout = AkkaUtils.lookupTimeout(conf)
219215
val ref = Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
220216
// TODO defaultAddress is wrong
221217
new AkkaRpcEndpointRef(defaultAddress, ref, conf)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
import org.apache.spark.{SparkEnv, SparkConf}
21+
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
22+
23+
object RpcUtils {
24+
25+
/**
26+
* Retrieve a [[RpcEndpointRef]] which is located in the driver via its name.
27+
*/
28+
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
29+
val driverActorSystemName = SparkEnv.driverActorSystemName
30+
val driverHost: String = conf.get("spark.driver.host", "localhost")
31+
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
32+
Utils.checkHost(driverHost, "Expected hostname")
33+
rpcEnv.setupEndpointRef(driverActorSystemName, RpcAddress(driverHost, driverPort), name)
34+
}
35+
}

0 commit comments

Comments
 (0)