Skip to content

Commit f459380

Browse files
committed
Add RpcAddress.fromURI and rename urls to uris
1 parent b221398 commit f459380

File tree

3 files changed

+20
-16
lines changed

3 files changed

+20
-16
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin
3131
override def onStart() {
3232
logInfo(s"Connecting to worker $workerUrl")
3333
if (!isTesting) {
34-
rpcEnv.asyncSetupEndpointRefByUrl(workerUrl)
34+
rpcEnv.asyncSetupEndpointRefByURI(workerUrl)
3535
}
3636
}
3737

@@ -45,10 +45,8 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin
4545
private var isTesting = false
4646

4747
// Lets us filter events only from the worker's actor system
48-
private val expectedHostPort = new java.net.URI(workerUrl)
49-
private def isWorker(address: RpcAddress) = {
50-
expectedHostPort.getHost == address.host && expectedHostPort.getPort == address.port
51-
}
48+
private val expectedAddress = RpcAddress.fromURIString(workerUrl)
49+
private def isWorker(address: RpcAddress) = expectedAddress == address
5250

5351
private def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
5452

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,15 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
7070
def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
7171

7272
/**
73-
* Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
73+
* Retrieve the [[RpcEndpointRef]] represented by `uri` asynchronously.
7474
*/
75-
def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
75+
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
7676

7777
/**
78-
* Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action.
78+
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
7979
*/
80-
def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
81-
Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
80+
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
81+
Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout)
8282
}
8383

8484
/**
@@ -87,7 +87,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
8787
*/
8888
def asyncSetupEndpointRef(
8989
systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = {
90-
asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
90+
asyncSetupEndpointRefByURI(uriOf(systemName, address, endpointName))
9191
}
9292

9393
/**
@@ -96,7 +96,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
9696
*/
9797
def setupEndpointRef(
9898
systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = {
99-
setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
99+
setupEndpointRefByURI(uriOf(systemName, address, endpointName))
100100
}
101101

102102
/**
@@ -381,12 +381,18 @@ private[spark] case class RpcAddress(host: String, port: Int) {
381381

382382
private[spark] object RpcAddress {
383383

384+
/**
385+
* Return the [[RpcAddress]] represented by `uri`.
386+
*/
387+
def fromURI(uri: URI): RpcAddress = {
388+
RpcAddress(uri.getHost, uri.getPort)
389+
}
390+
384391
/**
385392
* Return the [[RpcAddress]] represented by `uri`.
386393
*/
387394
def fromURIString(uri: String): RpcAddress = {
388-
val u = new java.net.URI(uri)
389-
RpcAddress(u.getHost, u.getPort)
395+
fromURI(new java.net.URI(uri))
390396
}
391397

392398
def fromSparkURL(sparkUrl: String): RpcAddress = {

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,9 @@ private[spark] class AkkaRpcEnv private[akka] (
220220
address.port.getOrElse(defaultAddress.port))
221221
}
222222

223-
override def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] = {
223+
override def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
224224
import scala.concurrent.ExecutionContext.Implicits.global
225-
actorSystem.actorSelection(url).resolveOne(defaultLookupTimeout).
225+
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout).
226226
map(new AkkaRpcEndpointRef(defaultAddress, _, conf))
227227
}
228228

0 commit comments

Comments
 (0)