Skip to content

Commit 486607a

Browse files
jinxingYun Ni
authored andcommitted
[SPARK-19450] Replace askWithRetry with askSync.
## What changes were proposed in this pull request? `askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and apache#16690 (comment)) and `askWithRetry` is marked as deprecated. As mentioned SPARK-18113(apache#16503 (comment)): >askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it. Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`. ## How was this patch tested? This PR doesn't change code logic, existing unit test can cover. Author: jinxing <[email protected]> Closes apache#16790 from jinxing64/SPARK-19450.
1 parent afdebf6 commit 486607a

File tree

24 files changed

+58
-119
lines changed

24 files changed

+58
-119
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
9999
*/
100100
protected def askTracker[T: ClassTag](message: Any): T = {
101101
try {
102-
trackerEndpoint.askWithRetry[T](message)
102+
trackerEndpoint.askSync[T](message)
103103
} catch {
104104
case e: Exception =>
105105
logError("Error communicating with MapOutputTracker", e)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ class SparkContext(config: SparkConf) extends Logging {
605605
Some(Utils.getThreadDump())
606606
} else {
607607
val endpointRef = env.blockManager.master.getExecutorEndpointRef(executorId).get
608-
Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
608+
Some(endpointRef.askSync[Array[ThreadStackTrace]](TriggerThreadDump))
609609
}
610610
} catch {
611611
case e: Exception =>

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private class ClientEndpoint(
123123
Thread.sleep(5000)
124124
logInfo("... polling master for driver state")
125125
val statusResponse =
126-
activeMasterEndpoint.askWithRetry[DriverStatusResponse](RequestDriverStatus(driverId))
126+
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
127127
if (statusResponse.found) {
128128
logInfo(s"State of $driverId is ${statusResponse.state.get}")
129129
// Worker node, if present

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1045,7 +1045,7 @@ private[deploy] object Master extends Logging {
10451045
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
10461046
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
10471047
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
1048-
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
1048+
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
10491049
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
10501050
}
10511051
}

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
3434
/** Executor details for a particular application */
3535
def render(request: HttpServletRequest): Seq[Node] = {
3636
val appId = request.getParameter("appId")
37-
val state = master.askWithRetry[MasterStateResponse](RequestMasterState)
37+
val state = master.askSync[MasterStateResponse](RequestMasterState)
3838
val app = state.activeApps.find(_.id == appId)
3939
.getOrElse(state.completedApps.find(_.id == appId).orNull)
4040
if (app == null) {

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
3333
private val master = parent.masterEndpointRef
3434

3535
def getMasterState: MasterStateResponse = {
36-
master.askWithRetry[MasterStateResponse](RequestMasterState)
36+
master.askSync[MasterStateResponse](RequestMasterState)
3737
}
3838

3939
override def renderJson(request: HttpServletRequest): JValue = {

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef,
7171
extends KillRequestServlet {
7272

7373
protected def handleKill(submissionId: String): KillSubmissionResponse = {
74-
val response = masterEndpoint.askWithRetry[DeployMessages.KillDriverResponse](
74+
val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse](
7575
DeployMessages.RequestKillDriver(submissionId))
7676
val k = new KillSubmissionResponse
7777
k.serverSparkVersion = sparkVersion
@@ -89,7 +89,7 @@ private[rest] class StandaloneStatusRequestServlet(masterEndpoint: RpcEndpointRe
8989
extends StatusRequestServlet {
9090

9191
protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
92-
val response = masterEndpoint.askWithRetry[DeployMessages.DriverStatusResponse](
92+
val response = masterEndpoint.askSync[DeployMessages.DriverStatusResponse](
9393
DeployMessages.RequestDriverStatus(submissionId))
9494
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
9595
val d = new SubmissionStatusResponse
@@ -174,7 +174,7 @@ private[rest] class StandaloneSubmitRequestServlet(
174174
requestMessage match {
175175
case submitRequest: CreateSubmissionRequest =>
176176
val driverDescription = buildDriverDescription(submitRequest)
177-
val response = masterEndpoint.askWithRetry[DeployMessages.SubmitDriverResponse](
177+
val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](
178178
DeployMessages.RequestSubmitDriver(driverDescription))
179179
val submitResponse = new CreateSubmissionResponse
180180
submitResponse.serverSparkVersion = sparkVersion

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
3434
private val workerEndpoint = parent.worker.self
3535

3636
override def renderJson(request: HttpServletRequest): JValue = {
37-
val workerState = workerEndpoint.askWithRetry[WorkerStateResponse](RequestWorkerState)
37+
val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState)
3838
JsonProtocol.writeWorkerState(workerState)
3939
}
4040

4141
def render(request: HttpServletRequest): Seq[Node] = {
42-
val workerState = workerEndpoint.askWithRetry[WorkerStateResponse](RequestWorkerState)
42+
val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState)
4343

4444
val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs")
4545
val runningExecutors = workerState.executors

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
199199
new SecurityManager(executorConf),
200200
clientMode = true)
201201
val driver = fetcher.setupEndpointRefByURI(driverUrl)
202-
val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)
202+
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
203203
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
204204
fetcher.shutdown()
205205

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,7 @@ private[spark] class Executor(
677677

678678
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
679679
try {
680-
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
680+
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
681681
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
682682
if (response.reregisterBlockManager) {
683683
logInfo("Told to re-register on heartbeat")

0 commit comments

Comments
 (0)