Skip to content

Commit 155b987

Browse files
committed
Change newURI to uriOf and add some comments
1 parent 45b2317 commit 155b987

File tree

4 files changed

+15
-4
lines changed

4 files changed

+15
-4
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin
3737
}
3838

3939
// Used to avoid shutting down JVM during tests
40+
// In the normal case, exitNonZero will call `System.exit(-1)` to shutdown the JVM. In the unit
41+
// test, the user should call `setTesting(true)` so that `exitNonZero` will set `isShutDown` to
42+
// true rather than calling `System.exit`. The user can check `isShutDown` to know if
43+
// `exitNonZero` is called.
4044
private[deploy] var isShutDown = false
4145
private[deploy] def setTesting(testing: Boolean) = isTesting = testing
4246
private var isTesting = false

core/src/main/scala/org/apache/spark/rpc/ActionScheduler.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,10 @@ private[spark] trait ActionScheduler {
6666
}
6767

6868
private[spark] trait Cancellable {
69-
// Should be reentrant
69+
/**
70+
* Cancel the corresponding work. The caller may call this method multiple times and call it in
71+
* any thread.
72+
*/
7073
def cancel(): Unit
7174
}
7275

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[spark] trait RpcEnv {
8484
/**
8585
* Create a URI used to create a [[RpcEndpointRef]]
8686
*/
87-
def newURI(systemName: String, address: RpcAddress, endpointName: String): String
87+
def uriOf(systemName: String, address: RpcAddress, endpointName: String): String
8888
}
8989

9090
private[spark] case class RpcEnvConfig(

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ private[spark] class AkkaRpcEnv private (
172172
}
173173
}
174174

175+
/**
176+
* Run `action` safely to avoid to crash the thread. If any non-fatal exception happens, it will
177+
* call `endpoint.onError`. If `endpoint.onError` throws any non-fatal exception, just log it.
178+
*/
175179
private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = {
176180
try {
177181
action
@@ -204,10 +208,10 @@ private[spark] class AkkaRpcEnv private (
204208

205209
override def setupEndpointRef(
206210
systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = {
207-
setupEndpointRefByUrl(newURI(systemName, address, endpointName))
211+
setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
208212
}
209213

210-
override def newURI(systemName: String, address: RpcAddress, endpointName: String): String = {
214+
override def uriOf(systemName: String, address: RpcAddress, endpointName: String): String = {
211215
AkkaUtils.address(
212216
AkkaUtils.protocol(actorSystem), systemName, address.host, address.port, endpointName)
213217
}

0 commit comments

Comments
 (0)