@@ -52,55 +52,82 @@ import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
5252 * implementation of this client can use that information to retry using the version specified
5353 * by the server.
5454 */
55- private [deploy] class StandaloneRestClient extends Logging {
55+ private [deploy] class StandaloneRestClient ( master : String ) extends Logging {
5656 import StandaloneRestClient ._
5757
58+ val masters : Array [String ] = master.stripPrefix(" spark://" ).split(" ," ).map(" spark://" + _)
59+
5860 /**
5961 * Submit an application specified by the parameters in the provided request.
6062 *
6163 * If the submission was successful, poll the status of the submission and report
6264 * it to the user. Otherwise, report the error message provided by the server.
6365 */
6466 private [rest] def createSubmission (
65- master : String ,
6667 request : CreateSubmissionRequest ): SubmitRestProtocolResponse = {
6768 logInfo(s " Submitting a request to launch an application in $master. " )
68- validateMaster(master)
69- val url = getSubmitUrl(master)
70- val response = postJson(url, request.toJson)
71- response match {
72- case s : CreateSubmissionResponse =>
73- reportSubmissionStatus(master, s)
74- handleRestResponse(s)
75- case unexpected =>
76- handleUnexpectedRestResponse(unexpected)
69+ var suc : Boolean = false
70+ var response : SubmitRestProtocolResponse = null
71+ for (m <- masters if ! suc) {
72+ validateMaster(m)
73+ val url = getSubmitUrl(m)
74+ response = postJson(url, request.toJson)
75+ response match {
76+ case s : CreateSubmissionResponse =>
77+ if (s.success) {
78+ reportSubmissionStatus(s)
79+ handleRestResponse(s)
80+ suc = true
81+ }
82+ case unexpected =>
83+ handleUnexpectedRestResponse(unexpected)
84+ }
7785 }
7886 response
7987 }
8088
8189 /** Request that the server kill the specified submission. */
82- def killSubmission (master : String , submissionId : String ): SubmitRestProtocolResponse = {
90+ def killSubmission (submissionId : String ): SubmitRestProtocolResponse = {
8391 logInfo(s " Submitting a request to kill submission $submissionId in $master. " )
84- validateMaster(master)
85- val response = post(getKillUrl(master, submissionId))
86- response match {
87- case k : KillSubmissionResponse => handleRestResponse(k)
88- case unexpected => handleUnexpectedRestResponse(unexpected)
92+ var suc : Boolean = false
93+ var response : SubmitRestProtocolResponse = null
94+ for (m <- masters if ! suc) {
95+ validateMaster(m)
96+ response = post(getKillUrl(m, submissionId))
97+ response match {
98+ case k : KillSubmissionResponse =>
99+ if (! k.message.contains(" Can only" )) {
100+ handleRestResponse(k)
101+ suc = true
102+ }
103+ case unexpected =>
104+ handleUnexpectedRestResponse(unexpected)
105+ }
89106 }
90107 response
91108 }
92109
93110 /** Request the status of a submission from the server. */
94111 def requestSubmissionStatus (
95- master : String ,
96112 submissionId : String ,
97113 quiet : Boolean = false ): SubmitRestProtocolResponse = {
98114 logInfo(s " Submitting a request for the status of submission $submissionId in $master. " )
99- validateMaster(master)
100- val response = get(getStatusUrl(master, submissionId))
101- response match {
102- case s : SubmissionStatusResponse => if (! quiet) { handleRestResponse(s) }
103- case unexpected => handleUnexpectedRestResponse(unexpected)
115+ var suc : Boolean = false
116+ var response : SubmitRestProtocolResponse = null
117+ for (m <- masters) {
118+ validateMaster(m)
119+ response = get(getStatusUrl(m, submissionId))
120+ response match {
121+ case s : SubmissionStatusResponse =>
122+ if (! s.message.contains(" Can only" )) {
123+ if (! quiet) {
124+ handleRestResponse(s)
125+ }
126+ suc = true
127+ }
128+ case unexpected =>
129+ handleUnexpectedRestResponse(unexpected)
130+ }
104131 }
105132 response
106133 }
@@ -228,30 +255,24 @@ private[deploy] class StandaloneRestClient extends Logging {
228255
229256 /** Report the status of a newly created submission. */
230257 private def reportSubmissionStatus (
231- master : String ,
232258 submitResponse : CreateSubmissionResponse ): Unit = {
233- if (submitResponse.success) {
234- val submissionId = submitResponse.submissionId
235- if (submissionId != null ) {
236- logInfo(s " Submission successfully created as $submissionId. Polling submission state... " )
237- pollSubmissionStatus(master, submissionId)
238- } else {
239- // should never happen
240- logError(" Application successfully submitted, but submission ID was not provided!" )
241- }
259+ val submissionId = submitResponse.submissionId
260+ if (submissionId != null ) {
261+ logInfo(s " Submission successfully created as $submissionId. Polling submission state... " )
262+ pollSubmissionStatus(submissionId)
242263 } else {
243- val failMessage = Option (submitResponse.message).map { " : " + _ }.getOrElse( " " )
244- logError(" Application submission failed " + failMessage )
264+ // should never happen
265+ logError(" Application successfully submitted, but submission ID was not provided! " )
245266 }
246267 }
247268
248269 /**
249270 * Poll the status of the specified submission and log it.
250271 * This retries up to a fixed number of times before giving up.
251272 */
252- private def pollSubmissionStatus (master : String , submissionId : String ): Unit = {
273+ private def pollSubmissionStatus (submissionId : String ): Unit = {
253274 (1 to REPORT_DRIVER_STATUS_MAX_TRIES ).foreach { _ =>
254- val response = requestSubmissionStatus(master, submissionId, quiet = true )
275+ val response = requestSubmissionStatus(submissionId, quiet = true )
255276 val statusResponse = response match {
256277 case s : SubmissionStatusResponse => s
257278 case _ => return // unexpected type, let upstream caller handle it
@@ -311,10 +332,10 @@ private[rest] object StandaloneRestClient {
311332 }
312333 val sparkProperties = conf.getAll.toMap
313334 val environmentVariables = env.filter { case (k, _) => k.startsWith(" SPARK_" ) }
314- val client = new StandaloneRestClient
335+ val client = new StandaloneRestClient (master)
315336 val submitRequest = client.constructSubmitRequest(
316337 appResource, mainClass, appArgs, sparkProperties, environmentVariables)
317- client.createSubmission(master, submitRequest)
338+ client.createSubmission(submitRequest)
318339 }
319340
320341 def main (args : Array [String ]): Unit = {
0 commit comments