@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records._
3333import org .apache .hadoop .yarn .conf .YarnConfiguration
3434
3535import org .apache .spark .{Logging , SecurityManager , SparkConf , SparkContext , SparkEnv }
36+ import org .apache .spark .SparkException
3637import org .apache .spark .deploy .SparkHadoopUtil
3738import org .apache .spark .deploy .history .HistoryServer
3839import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
@@ -136,13 +137,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
136137 runExecutorLauncher(securityMgr)
137138 }
138139 } catch {
139- case e : Throwable => {
140+ case e : Exception =>
140141 // catch everything else if not specifically handled
141142 logError(" Uncaught exception: " , e)
142143 finish(FinalApplicationStatus .FAILED ,
143144 ApplicationMaster .EXIT_UNCAUGHT_EXCEPTION ,
144145 " Uncaught exception: " + e.getMessage())
145- }
146146 }
147147 exitCode
148148 }
@@ -213,6 +213,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
213213
214214 private def runDriver (securityMgr : SecurityManager ): Unit = {
215215 addAmIpFilter()
216+ setupSystemSecurityManager()
216217 userClassThread = startUserClass()
217218
218219 // This a bit hacky, but we need to wait until the spark.driver.port property has
@@ -258,7 +259,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
258259 val t = new Thread {
259260 override def run () {
260261 var failureCount = 0
261- while (! finished && ! Thread .currentThread().isInterrupted() ) {
262+ while (! finished) {
262263 try {
263264 if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
264265 finish(FinalApplicationStatus .FAILED ,
@@ -328,7 +329,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
328329 sparkContextRef.synchronized {
329330 var count = 0
330331 val waitTime = 10000L
331- val numTries = sparkConf.getInt(" spark.yarn.ApplicationMaster .waitTries" , 10 )
332+ val numTries = sparkConf.getInt(" spark.yarn.applicationMaster .waitTries" , 10 )
332333 while (sparkContextRef.get() == null && count < numTries && ! finished) {
333334 logInfo(" Waiting for spark context initialization ... " + count)
334335 count = count + 1
@@ -355,7 +356,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
355356 // spark driver should already be up since it launched us, but we don't want to
356357 // wait forever, so wait 100 seconds max to match the cluster mode setting.
357358 // Leave this config unpublished for now.
358- val numTries = sparkConf.getInt(" spark.yarn.ApplicationMaster.client .waitTries" , 1000 )
359+ val numTries = sparkConf.getInt(" spark.yarn.applicationMaster .waitTries" , 1000 )
359360
360361 while (! driverUp && ! finished && count < numTries) {
361362 try {
@@ -373,7 +374,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
373374 }
374375
375376 if (! driverUp) {
376- throw new Exception (" Failed to connect to driver!" )
377+ throw new SparkException (" Failed to connect to driver!" )
377378 }
378379
379380 sparkConf.set(" spark.driver.host" , driverHost)
@@ -403,55 +404,59 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
403404 actor ! AddWebUIFilter (amFilter, params, proxyBase)
404405 }
405406 }
407+ '
408+ /**
409+ * This system security manager applies to the entire process.
410+ * It's main purpose is to handle the case if the user code does a System.exit.
411+ * This allows us to catch that and properly set the YARN application status and
412+ * cleanup if needed.
413+ */
414+ private def setupSystemSecurityManager () = {
415+ try {
416+ var stopped = false
417+ System .setSecurityManager(new java.lang.SecurityManager () {
418+ override def checkExit (paramInt : Int ) {
419+ if (! stopped) {
420+ logInfo(" In securityManager checkExit, exit code: " + paramInt)
421+ if (paramInt == 0 ) {
422+ finish(FinalApplicationStatus .SUCCEEDED , ApplicationMaster .EXIT_SUCCESS )
423+ } else {
424+ finish(FinalApplicationStatus .FAILED ,
425+ paramInt,
426+ " User class exited with non-zero exit code" )
427+ }
428+ stopped = true
429+ }
430+ }
431+ // required for the checkExit to work properly
432+ override def checkPermission (perm : java.security.Permission ): Unit = {
433+ }
434+ })
435+ }
436+ catch {
437+ case e : SecurityException =>
438+ finish(FinalApplicationStatus .FAILED ,
439+ ApplicationMaster .EXIT_SECURITY ,
440+ " Error in setSecurityManager" )
441+ logError(" Error in setSecurityManager:" , e)
442+ }
443+ }
406444
407445 /**
408- * Start the user class, which contains the spark driver.
446+ * Start the user class, which contains the spark driver, in a separate Thread .
409447 * If the main routine exits cleanly or exits with System.exit(0) we
410448 * assume it was successful, for all other cases we assume failure.
449+ *
450+ * Returns the user thread that was started.
411451 */
412452 private def startUserClass (): Thread = {
413453 logInfo(" Starting the user JAR in a separate Thread" )
414454 System .setProperty(" spark.executor.instances" , args.numExecutors.toString)
415- var stopped = false
416455 val mainMethod = Class .forName(args.userClass, false ,
417456 Thread .currentThread.getContextClassLoader).getMethod(" main" , classOf [Array [String ]])
418457
419458 val userThread = new Thread {
420459 override def run () {
421-
422- try {
423- // Note this security manager applies to the entire process, not
424- // just this thread. It's here to handle the case if the user code
425- // does System.exit
426- System .setSecurityManager(new java.lang.SecurityManager () {
427- override def checkExit (paramInt : Int ) {
428- if (! stopped) {
429- logInfo(" In securityManager checkExit, exit code: " + paramInt)
430- if (paramInt == 0 ) {
431- finish(FinalApplicationStatus .SUCCEEDED , ApplicationMaster .EXIT_SUCCESS )
432- } else {
433- finish(FinalApplicationStatus .FAILED ,
434- paramInt,
435- " User class exited with non-zero exit code" )
436- }
437- stopped = true
438- }
439- }
440-
441- // required for the checkExit to work properly
442- override def checkPermission (perm : java.security.Permission ): Unit = {
443- }
444- })
445- }
446- catch {
447- case e : SecurityException => {
448- finish(FinalApplicationStatus .FAILED ,
449- ApplicationMaster .EXIT_SECURITY ,
450- " Error in setSecurityManager" )
451- logError(" Error in setSecurityManager:" , e)
452- }
453- }
454-
455460 try {
456461 val mainArgs = new Array [String ](args.userArgs.size)
457462 args.userArgs.copyToArray(mainArgs, 0 , args.userArgs.size)
@@ -463,14 +468,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
463468 e.getCause match {
464469 case _ : InterruptedException =>
465470 // Reporter thread can interrupt to stop user class
466-
467- case e : Throwable => {
471+ case e : Throwable =>
468472 finish(FinalApplicationStatus .FAILED ,
469473 ApplicationMaster .EXIT_EXCEPTION_USER_CLASS ,
470474 " User class threw exception: " + e.getMessage)
471475 // re-throw to get it logged
472476 throw e
473- }
474477 }
475478 }
476479 }
@@ -512,13 +515,13 @@ object ApplicationMaster extends Logging {
512515 val SHUTDOWN_HOOK_PRIORITY : Int = 30
513516
514517 // exit codes for different causes, no reason behind the values
515- val EXIT_SUCCESS = 0
516- val EXIT_UNCAUGHT_EXCEPTION = 10
517- val EXIT_MAX_EXECUTOR_FAILURES = 11
518- val EXIT_REPORTER_FAILURE = 12
519- val EXIT_SC_NOT_INITED = 13
520- val EXIT_SECURITY = 14
521- val EXIT_EXCEPTION_USER_CLASS = 15
518+ private val EXIT_SUCCESS = 0
519+ private val EXIT_UNCAUGHT_EXCEPTION = 10
520+ private val EXIT_MAX_EXECUTOR_FAILURES = 11
521+ private val EXIT_REPORTER_FAILURE = 12
522+ private val EXIT_SC_NOT_INITED = 13
523+ private val EXIT_SECURITY = 14
524+ private val EXIT_EXCEPTION_USER_CLASS = 15
522525
523526 private var master : ApplicationMaster = _
524527
0 commit comments