Skip to content

Commit 5d23958

Browse files
refact some duplicated code, style and comments
1 parent 7a881b3 commit 5d23958

File tree

6 files changed

+96
-72
lines changed

6 files changed

+96
-72
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import scala.collection.mutable.HashSet
2021
import scala.concurrent._
2122

2223
import akka.actor._
@@ -28,19 +29,21 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
2829
import org.apache.spark.deploy.DeployMessages._
2930
import org.apache.spark.deploy.master.{DriverState, Master}
3031
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
31-
import scala.collection.mutable.HashSet
3232

3333
/**
3434
* Proxy that relays messages to the driver.
35+
*
36+
* Now we don't support retry in case submission failed. In HA mode, client will submit request to
37+
* all masters and see which one could handle it.
3538
*/
3639
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
3740
extends Actor with ActorLogReceive with Logging {
3841

39-
val mastersActor = driverArgs.masters.map { m =>
42+
private val masterActors = driverArgs.masters.map { m =>
4043
context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system)))
4144
}
42-
val lostMasters = new HashSet[Address]
43-
var activeMasterActor: ActorSelection = null
45+
private val lostMasters = new HashSet[Address]
46+
private var activeMasterActor: ActorSelection = null
4447

4548
val timeout = AkkaUtils.askTimeout(conf)
4649

@@ -80,15 +83,17 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
8083
driverArgs.supervise,
8184
command)
8285

83-
for (masterActor <- mastersActor) {
84-
masterActor ! RequestSubmitDriver(driverDescription)
85-
}
86+
// This assumes only one Master is active at a time
87+
for (masterActor <- masterActors) {
88+
masterActor ! RequestSubmitDriver(driverDescription)
89+
}
8690

8791
case "kill" =>
8892
val driverId = driverArgs.driverId
89-
for (masterActor <- mastersActor) {
90-
masterActor ! RequestKillDriver(driverId)
91-
}
93+
// This assumes only one Master is active at a time
94+
for (masterActor <- masterActors) {
95+
masterActor ! RequestKillDriver(driverId)
96+
}
9297
}
9398
}
9499

@@ -121,7 +126,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
121126
System.exit(-1)
122127
}
123128
System.exit(0)
124-
}
129+
}
125130
}
126131

127132
override def receiveWithLogging: PartialFunction[Any, Unit] = {
@@ -131,7 +136,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
131136
if (success) {
132137
activeMasterActor = context.actorSelection(sender.path)
133138
pollAndReportStatus(driverId.get)
134-
} else if (!message.contains("Can only")) {
139+
} else if (!message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) {
135140
System.exit(-1)
136141
}
137142

@@ -141,16 +146,16 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
141146
if (success) {
142147
activeMasterActor = context.actorSelection(sender.path)
143148
pollAndReportStatus(driverId)
144-
} else if (!message.contains("Can only")) {
149+
} else if (!message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) {
145150
System.exit(-1)
146151
}
147152

148153
case DisassociatedEvent(_, remoteAddress, _) =>
149154
if (!lostMasters.contains(remoteAddress)) {
150155
println(s"Error connecting to master $remoteAddress.")
151156
lostMasters += remoteAddress
152-
if (lostMasters.size >= mastersActor.size) {
153-
println(s"No master is available, exiting.")
157+
if (lostMasters.size >= masterActors.size) {
158+
println("No master is available, exiting.")
154159
System.exit(-1)
155160
}
156161
}
@@ -160,8 +165,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
160165
println(s"Error connecting to master ($remoteAddress).")
161166
println(s"Cause was: $cause")
162167
lostMasters += remoteAddress
163-
if (lostMasters.size >= mastersActor.size) {
164-
println(s"No master is available, exiting.")
168+
if (lostMasters.size >= masterActors.size) {
169+
println("No master is available, exiting.")
165170
System.exit(-1)
166171
}
167172
}

core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ import java.net.{URI, URISyntaxException}
2222
import scala.collection.mutable.ListBuffer
2323

2424
import org.apache.log4j.Level
25-
26-
import org.apache.spark.util.{IntParam, MemoryParam}
25+
import org.apache.spark.util.{IntParam, MemoryParam, Utils}
2726

2827
/**
2928
* Command-line parser for the driver client.
@@ -80,13 +79,13 @@ private[deploy] class ClientArguments(args: Array[String]) {
8079
}
8180

8281
jarUrl = _jarUrl
83-
masters = _master.stripPrefix("spark://").split(",").map("spark://" + _)
82+
masters = Utils.splitMasterAdress(_master)
8483
mainClass = _mainClass
8584
_driverOptions ++= tail
8685

8786
case "kill" :: _master :: _driverId :: tail =>
8887
cmd = "kill"
89-
masters = _master.stripPrefix("spark://").split(",").map("spark://" + _)
88+
masters = Utils.splitMasterAdress(_master)
9089
driverId = _driverId
9190

9291
case _ =>

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ private[master] class Master(
254254

255255
case RequestSubmitDriver(description) => {
256256
if (state != RecoveryState.ALIVE) {
257-
val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
257+
val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. " +
258+
"Can only accept driver submissions in ALIVE state."
258259
sender ! SubmitDriverResponse(false, None, msg)
259260
} else {
260261
logInfo("Driver submitted " + description.command.mainClass)
@@ -274,7 +275,7 @@ private[master] class Master(
274275

275276
case RequestKillDriver(driverId) => {
276277
if (state != RecoveryState.ALIVE) {
277-
val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
278+
val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. Can only kill drivers in ALIVE state."
278279
sender ! KillDriverResponse(driverId, success = false, msg)
279280
} else {
280281
logInfo("Asked to kill driver " + driverId)
@@ -306,7 +307,8 @@ private[master] class Master(
306307

307308
case RequestDriverStatus(driverId) => {
308309
if (state != RecoveryState.ALIVE) {
309-
val msg = s"Can only request driver status in ALIVE state. Current state: $state."
310+
val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. " +
311+
"Can only request driver status in ALIVE state."
310312
sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg)))
311313
} else {
312314
(drivers ++ completedDrivers).find(_.id == driverId) match {

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ import java.io.{DataOutputStream, FileNotFoundException}
2121
import java.net.{ConnectException, HttpURLConnection, SocketException, URL}
2222
import javax.servlet.http.HttpServletResponse
2323

24+
import scala.collection.mutable
2425
import scala.io.Source
2526

2627
import com.fasterxml.jackson.core.JsonProcessingException
2728
import com.google.common.base.Charsets
2829

2930
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
30-
import scala.collection.mutable
31+
import org.apache.spark.util.Utils
3132

3233
/**
3334
* A client that submits applications to the standalone Master using a REST protocol.
@@ -52,11 +53,14 @@ import scala.collection.mutable
5253
* is a mismatch, the server will respond with the highest protocol version it supports. A future
5354
* implementation of this client can use that information to retry using the version specified
5455
* by the server.
56+
*
57+
* Now we don't support retry in case submission failed. In HA mode, client will submit request to
58+
* all masters and see which one could handle it.
5559
*/
5660
private[deploy] class StandaloneRestClient(master: String) extends Logging {
5761
import StandaloneRestClient._
5862

59-
val masters: Array[String] = master.stripPrefix("spark://").split(",").map("spark://" + _)
63+
private val masters: Array[String] = Utils.splitMasterAdress(master)
6064

6165
private val lostMasters = new mutable.HashSet[String]
6266

@@ -87,10 +91,15 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
8791
handleUnexpectedRestResponse(unexpected)
8892
}
8993
} catch {
90-
case e @ (_: SubmitRestConnectionException | _: ConnectException) =>
91-
if(handleSubmitRestConnectionException(m)) {
94+
case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) =>
95+
if(handleConnectionException(m)) {
9296
throw new SubmitRestConnectionException(
93-
"No master is available for createSubmission.", new Throwable(""))
97+
s"Unable to connect to server", unreachable)
98+
}
99+
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
100+
if(handleConnectionException(m)) {
101+
throw new SubmitRestProtocolException(
102+
"Malformed response received from server", malformed)
94103
}
95104
}
96105
}
@@ -109,18 +118,23 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
109118
response = post(url)
110119
response match {
111120
case k: KillSubmissionResponse =>
112-
if (!k.message.contains("Can only")) {
121+
if (!k.message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) {
113122
handleRestResponse(k)
114123
suc = true
115124
}
116125
case unexpected =>
117126
handleUnexpectedRestResponse(unexpected)
118127
}
119128
} catch {
120-
case e @ (_: SubmitRestConnectionException | _: ConnectException) =>
121-
if(handleSubmitRestConnectionException(m)) {
129+
case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) =>
130+
if(handleConnectionException(m)) {
122131
throw new SubmitRestConnectionException(
123-
"No master is available for killSubmission.", new Throwable(""))
132+
s"Unable to connect to server", unreachable)
133+
}
134+
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
135+
if(handleConnectionException(m)) {
136+
throw new SubmitRestProtocolException(
137+
"Malformed response received from server", malformed)
124138
}
125139
}
126140
}
@@ -149,10 +163,15 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
149163
handleUnexpectedRestResponse(unexpected)
150164
}
151165
} catch {
152-
case e @ (_: SubmitRestConnectionException | _: ConnectException) =>
153-
if(handleSubmitRestConnectionException(m)) {
166+
case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) =>
167+
if(handleConnectionException(m)) {
154168
throw new SubmitRestConnectionException(
155-
"No master is available for requestSubmissionStatus.", new Throwable(""))
169+
s"Unable to connect to server", unreachable)
170+
}
171+
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
172+
if(handleConnectionException(m)) {
173+
throw new SubmitRestProtocolException(
174+
"Malformed response received from server", malformed)
156175
}
157176
}
158177
}
@@ -213,39 +232,30 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
213232
* Exposed for testing.
214233
*/
215234
private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
216-
try {
217-
val dataStream =
218-
if (connection.getResponseCode == HttpServletResponse.SC_OK) {
219-
connection.getInputStream
220-
} else {
221-
connection.getErrorStream
222-
}
223-
// If the server threw an exception while writing a response, it will not have a body
224-
if (dataStream == null) {
225-
throw new SubmitRestProtocolException("Server returned empty body")
226-
}
227-
val responseJson = Source.fromInputStream(dataStream).mkString
228-
logDebug(s"Response from the server:\n$responseJson")
229-
val response = SubmitRestProtocolMessage.fromJson(responseJson)
230-
response.validate()
231-
response match {
232-
// If the response is an error, log the message
233-
case error: ErrorResponse =>
234-
logError(s"Server responded with error:\n${error.message}")
235-
error
236-
// Otherwise, simply return the response
237-
case response: SubmitRestProtocolResponse => response
238-
case unexpected =>
239-
throw new SubmitRestProtocolException(
240-
s"Message received from server was not a response:\n${unexpected.toJson}")
235+
val dataStream =
236+
if (connection.getResponseCode == HttpServletResponse.SC_OK) {
237+
connection.getInputStream
238+
} else {
239+
connection.getErrorStream
241240
}
242-
} catch {
243-
case unreachable @ (_: FileNotFoundException | _: SocketException) =>
244-
throw new SubmitRestConnectionException(
245-
s"Unable to connect to server ${connection.getURL}", unreachable)
246-
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
241+
// If the server threw an exception while writing a response, it will not have a body
242+
if (dataStream == null) {
243+
throw new SubmitRestProtocolException("Server returned empty body")
244+
}
245+
val responseJson = Source.fromInputStream(dataStream).mkString
246+
logDebug(s"Response from the server:\n$responseJson")
247+
val response = SubmitRestProtocolMessage.fromJson(responseJson)
248+
response.validate()
249+
response match {
250+
// If the response is an error, log the message
251+
case error: ErrorResponse =>
252+
logError(s"Server responded with error:\n${error.message}")
253+
error
254+
// Otherwise, simply return the response
255+
case response: SubmitRestProtocolResponse => response
256+
case unexpected =>
247257
throw new SubmitRestProtocolException(
248-
"Malformed response received from server", malformed)
258+
s"Message received from server was not a response:\n${unexpected.toJson}")
249259
}
250260
}
251261

@@ -338,10 +348,13 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
338348
logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
339349
}
340350

341-
private def handleSubmitRestConnectionException(url: String): Boolean = {
342-
if (!lostMasters.contains(url)) {
343-
logWarning(s"Unable to connect to server ${url}.")
344-
lostMasters += url
351+
/**
352+
* When a connection exception was caught, we see whether all masters are lost.
353+
*/
354+
private def handleConnectionException(masterUrl: String): Boolean = {
355+
if (!lostMasters.contains(masterUrl)) {
356+
logWarning(s"Unable to connect to server ${masterUrl}.")
357+
lostMasters += masterUrl
345358
}
346359
lostMasters.size >= masters.size
347360
}

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
105105
if (masters != null) { // Two positional arguments were given
106106
printUsageAndExit(1)
107107
}
108-
masters = value.stripPrefix("spark://").split(",").map("spark://" + _)
108+
masters = Utils.splitMasterAdress(value)
109109
parse(tail)
110110

111111
case Nil =>

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2058,6 +2058,11 @@ private[spark] object Utils extends Logging {
20582058
.getOrElse(UserGroupInformation.getCurrentUser().getUserName())
20592059
}
20602060

2061+
def splitMasterAdress(masterAddr: String): Array[String] = {
2062+
masterAddr.stripPrefix("spark://").split(",").map("spark://" + _)
2063+
}
2064+
2065+
val MASTER_NOT_ALIVE_STRING = "Current state is not alive: "
20612066
}
20622067

20632068
/**

0 commit comments

Comments
 (0)